X-Git-Url: http://git.eversberg.eu/gitweb.cgi?p=lcr.git;a=blobdiff_plain;f=message.c;h=cfda36be28f839b268e4c79f67fb3119c0b490a4;hp=97c032cbbaca6c4f9c4edbed225862bb0c90282a;hb=034d3a91404addedc1c7a3494862c79532b0b878;hpb=2ed0fee489c37a6e2d4473f6185ebbe3e746ac11 diff --git a/message.c b/message.c index 97c032c..cfda36b 100644 --- a/message.c +++ b/message.c @@ -1,6 +1,6 @@ /*****************************************************************************\ ** ** -** PBX4Linux ** +** Linux Call Route ** ** ** **---------------------------------------------------------------------------** ** Copyright: Andreas Eversberg ** @@ -9,46 +9,46 @@ ** ** \*****************************************************************************/ -#include -#include -#include #include "main.h" MESSAGES -struct message *message_first = NULL; -struct message **messagepointer_end = &message_first; +struct lcr_msg *message_first = NULL; +struct lcr_msg **messagepointer_end = &message_first; +struct lcr_work message_work; -//#ifdef H323 -//PMutex mutex_message; -//#endif +static int work_message(struct lcr_work *work, void *instance, int index); + +void init_message(void) +{ + memset(&message_work, 0, sizeof(message_work)); + add_work(&message_work, work_message, NULL, 0); +} + +void cleanup_message(void) +{ + del_work(&message_work); +} + +unsigned int lcr_random = 0; /* creates a new message with the given attributes. the message must be filled then. after filling, the message_put must be called */ -struct message *message_create(int id_from, int id_to, int flow, int type) +struct lcr_msg *message_create(int id_from, int id_to, int flow, int type) { - struct message *message; - int i = 0; + struct lcr_msg *message; + struct timeval now_tv; + struct timezone now_tz; - while(i < 10) - { - message = (struct message *)calloc(1, sizeof(struct message)); - if (message) - break; + gettimeofday(&now_tv, &now_tz); + lcr_random = (lcr_random << 1) | (lcr_random >> 31); + lcr_random ^= now_tv.tv_sec; + lcr_random ^= now_tv.tv_usec; - if (!i) - PERROR("no mem for message, retrying...\n"); - i++; - usleep(300000); - } + message = (struct lcr_msg *)MALLOC(sizeof(struct lcr_msg)); if (!message) - { - PERROR("***Fatal error: no mem for message!!! exitting.\n"); - exit(-1); - } + FATAL("No memory for message.\n"); mmemuse++; - memset(message, 0, sizeof(struct message)); - message->id_from = id_from; message->id_to = id_to; message->flow = flow; @@ -58,71 +58,135 @@ struct message *message_create(int id_from, int id_to, int flow, int type) } /* attaches a message to the end of the message chain */ -void message_put(struct message *message) +void _message_put(struct lcr_msg *message, const char *file, int line) { - /* the mutex prevents from creating two messages at a time (h323 thread and main thread). */ -//#ifdef H323 -// mutex_message.Wait(); -//#endif - - if (message->id_to == 0) - { + if (message->id_to == 0) { PDEBUG(DEBUG_MSG, "message %s not written, because destination is 0.\n", messages_txt[message->type]); message_free(message); return; } - if ((options.deb&DEBUG_MSG) && message->type != MESSAGE_DATA) - PDEBUG(DEBUG_MSG, "message %s written from %ld to %ld (memory %x)\n", messages_txt[message->type], message->id_from, message->id_to, message); + if ((options.deb & DEBUG_MSG)) + PDEBUG(DEBUG_MSG, "message %s written from %ld to %ld (memory %x at file %s, line %d)\n", messages_txt[message->type], message->id_from, message->id_to, message, file, line); *messagepointer_end = message; messagepointer_end = &(message->next); + /* Nullify next pointer if recycled messages */ + *messagepointer_end=NULL; -//#ifdef H323 -// mutex_message.Signal(); -//#endif + /* trigger work */ + trigger_work(&message_work); } +struct lcr_msg *message_forward(int id_from, int id_to, int flow, union parameter *param) +{ + struct lcr_msg *message; + + /* get point to message */ + message = (struct lcr_msg *)((unsigned long)param - ((unsigned long)(&message->param) - (unsigned long)message)); + + /* protect, so forwarded messages are not freed after handling */ + message->keep = 1; + + message->id_from = id_from; + message->id_to = id_to; + message->flow = flow; + message_put(message); + + return(message); +} /* detaches the first messages from the message chain */ -struct message *message_get(void) +struct lcr_msg *message_get(void) { - struct message *message; - - /* the mutex prevents from getting a message while creating a messages at a time (h323 thread and main thread). */ -//#ifdef H323 -// mutex_message.Wait(); -//#endif + struct lcr_msg *message; if (!message_first) - { -//#ifdef H323 -// mutex_message.Signal(); -//#endif return(0); - } message = message_first; message_first = message->next; if (!message_first) messagepointer_end = &message_first; -//#ifdef H323 -// mutex_message.Signal(); -//#endif + message->keep = 0; - if ((options.deb&DEBUG_MSG) && message->type != MESSAGE_DATA) + if ((options.deb & DEBUG_MSG)) PDEBUG(DEBUG_MSG, "message %s reading from %ld to %ld (memory %x)\n", messages_txt[message->type], message->id_from, message->id_to, message); return(message); } /* free a message */ -void message_free(struct message *message) +void message_free(struct lcr_msg *message) { - memset(message, 0, sizeof(struct message)); - free(message); + if (message->keep) + return; + FREE(message, sizeof(struct lcr_msg)); mmemuse--; } +static int work_message(struct lcr_work *work, void *instance, int index) +{ + struct lcr_msg *message; + class Port *port; + class Endpoint *epoint; + class Join *join; + + while ((message = message_get())) { + switch(message->flow) { + case PORT_TO_EPOINT: + epoint = find_epoint_id(message->id_to); + if (epoint) { + if (epoint->ep_app) { + epoint->ep_app->ea_message_port(message->id_from, message->type, &message->param); + } else { + PDEBUG(DEBUG_MSG, "Warning: message %s from port %d to endpoint %d. endpoint doesn't have an application.\n", messages_txt[message->type], message->id_from, message->id_to); + } + } else { + PDEBUG(DEBUG_MSG, "Warning: message %s from port %d to endpoint %d. endpoint doesn't exist anymore.\n", messages_txt[message->type], message->id_from, message->id_to); + } + break; + + case EPOINT_TO_JOIN: + join = find_join_id(message->id_to); + if (join) { + join->message_epoint(message->id_from, message->type, &message->param); + } else { + PDEBUG(DEBUG_MSG, "Warning: message %s from endpoint %d to join %d. join doesn't exist anymore\n", messages_txt[message->type], message->id_from, message->id_to); + } + break; + + case JOIN_TO_EPOINT: + epoint = find_epoint_id(message->id_to); + if (epoint) { + if (epoint->ep_app) { + epoint->ep_app->ea_message_join(message->id_from, message->type, &message->param); + } else { + PDEBUG(DEBUG_MSG, "Warning: message %s from join %d to endpoint %d. endpoint doesn't have an application.\n", messages_txt[message->type], message->id_from, message->id_to); + } + } else { + PDEBUG(DEBUG_MSG, "Warning: message %s from join %d to endpoint %d. endpoint doesn't exist anymore.\n", messages_txt[message->type], message->id_from, message->id_to); + } + break; + + case EPOINT_TO_PORT: + port = find_port_id(message->id_to); + if (port) { + port->message_epoint(message->id_from, message->type, &message->param); +BUDETECT + } else { + PDEBUG(DEBUG_MSG, "Warning: message %s from endpoint %d to port %d. port doesn't exist anymore\n", messages_txt[message->type], message->id_from, message->id_to); + } + break; + + default: + PERROR("Message flow %d unknown.\n", message->flow); + } + message_free(message); + } + + return 0; +} +