From c8770b66f34850c0cf6f2d8fb6f9e1781ba9a898 Mon Sep 17 00:00:00 2001 From: Maksym Sobolyev Date: Sat, 21 Jun 2025 03:08:17 -0700 Subject: [PATCH 1/6] core: split DEP_REVERSE_INIT into _MINIT and _CINIT parts Add DEP_REVERSE_CINIT and DEP_REVERSE_MINIT dependency flags allowing to reverse mod_init() and child_init() init order independently. Make DEP_REVERSE_INIT be (DEP_REVERSE_CINIT|DEP_REVERSE_MINIT). --- sr_module.c | 2 +- sr_module.h | 3 + sr_module_deps.c | 172 ++++++++++++++++++++++------------------------- sr_module_deps.h | 6 +- 4 files changed, 87 insertions(+), 96 deletions(-) diff --git a/sr_module.c b/sr_module.c index 67cdc073e1a..60181d3bb52 100644 --- a/sr_module.c +++ b/sr_module.c @@ -675,7 +675,7 @@ static int init_mod_child( struct sr_module* m, int rank, char *type, if (!skip_others && init_mod_child(m->next, rank, type, 0) != 0) return -1; - for (dep = m->sr_deps_init; dep; dep = dep->next) + for (dep = m->sr_deps_cinit; dep; dep = dep->next) if (!dep->mod->init_child_done) if (init_mod_child(dep->mod, rank, type, 1) != 0) return -1; diff --git a/sr_module.h b/sr_module.h index db95f268feb..600ad4aa90a 100644 --- a/sr_module.h +++ b/sr_module.h @@ -145,6 +145,9 @@ struct sr_module{ /* modules which must be initialized before this module */ struct sr_module_dep *sr_deps_init; + /* modules which childs must be initialized before this module */ + struct sr_module_dep *sr_deps_cinit; + /* modules which must be destroyed before this module */ struct sr_module_dep *sr_deps_destroy; diff --git a/sr_module_deps.c b/sr_module_deps.c index 18fdda0cd7d..82d76f46f11 100644 --- a/sr_module_deps.c +++ b/sr_module_deps.c @@ -229,6 +229,50 @@ int add_module_dependencies(struct sr_module *mod) return 0; } +static struct sr_module_dep *make_dep(struct sr_module_dep *md, + unsigned int dep_type, unsigned int df, struct sr_module *mod_a, + struct sr_module *mod_b) +{ + struct sr_module_dep **dip_a, **dip_b; + + switch (df) { + case DEP_REVERSE_MINIT: + dip_a = &mod_a->sr_deps_init; + dip_b = &mod_b->sr_deps_init; + break; + case DEP_REVERSE_CINIT: + dip_a = &mod_a->sr_deps_cinit; + dip_b = &mod_b->sr_deps_cinit; + break; + case DEP_REVERSE_DESTROY: + dip_a = &mod_a->sr_deps_destroy; + dip_b = &mod_b->sr_deps_destroy; + break; + default: + LM_ERR("BUG, unhandled dep_type: %d\n", df); + abort(); + } + + if (md == NULL) { + md = pkg_malloc(sizeof *md); + if (!md) { + LM_ERR("no more pkg\n"); + return NULL; + } + memset(md, 0, sizeof *md); + } + + if (dep_type & df) { + md->mod = mod_a; + md->next = *dip_b; + *dip_b = md; + } else { + md->mod = mod_b; + md->next = *dip_a; + *dip_a = md; + } + return md; +} int solve_module_dependencies(struct sr_module *modules) { @@ -251,111 +295,53 @@ int solve_module_dependencies(struct sr_module *modules) this = md->mod; dep_type = md->type; + int byname = (md->dep.s != NULL) ? 1 : 0; + mod_type = md->mod_type; /* * for generic dependencies (e.g. dialog depends on MOD_TYPE_SQLDB), * first load all modules of given type + * + * re-purpose this @md structure by linking it into a module's + * list of dependencies (will be used at init time) + * + * md->mod used to point to (highlighted with []): + * [sr_module A] ---> "mod_name" + * + * now, the dependency is solved. md->mod will point to: + * sr_module A ---> [sr_module B] */ - if (!md->dep.s) { - /* - * re-purpose this @md structure by linking it into a module's - * list of dependencies (will be used at init time) - * - * md->mod used to point to (highlighted with []): - * [sr_module A] ---> "mod_name" - * - * now, the dependency is solved. md->mod will point to: - * sr_module A ---> [sr_module B] - */ - mod_type = md->mod_type; - - for (dep_solved = 0, mod = modules; mod; mod = mod->next) { - if (mod != this && mod->exports->type == mod_type) { - if (!md) { - md = pkg_malloc(sizeof *md); - if (!md) { - LM_ERR("no more pkg\n"); - return -1; - } - memset(md, 0, sizeof *md); - } - - if (dep_type & DEP_REVERSE_INIT) { - md->mod = this; - md->next = mod->sr_deps_init; - mod->sr_deps_init = md; - } else { - md->mod = mod; - md->next = this->sr_deps_init; - this->sr_deps_init = md; - } - - md = pkg_malloc(sizeof *md); - if (!md) { - LM_ERR("no more pkg\n"); - return -1; - } - memset(md, 0, sizeof *md); - - if (dep_type & DEP_REVERSE_DESTROY) { - md->mod = mod; - md->next = this->sr_deps_destroy; - this->sr_deps_destroy = md; - } else { - md->mod = this; - md->next = mod->sr_deps_destroy; - mod->sr_deps_destroy = md; - } - - md = NULL; - dep_solved++; - } - } - } else { - for (dep_solved = 0, mod = modules; mod; mod = mod->next) { + + for (dep_solved = 0, mod = modules; mod; mod = mod->next) { + if (!byname) { + if (mod == this || mod->exports->type != mod_type) + continue; + } else { if (strcmp(mod->exports->name, md->dep.s) != 0) continue; - - /* quick sanity check */ - if (mod->exports->type != md->mod_type) + if (mod->exports->type != mod_type) LM_BUG("[%.*s %d] -> [%s %d]\n", md->dep.len, md->dep.s, - md->mod_type, mod->exports->name, - mod->exports->type); - - /* same re-purposing technique as above */ - if (dep_type & DEP_REVERSE_INIT) { - md->next = mod->sr_deps_init; - mod->sr_deps_init = md; - } else { - md->mod = mod; - md->next = this->sr_deps_init; - this->sr_deps_init = md; - } - - md = pkg_malloc(sizeof *md); - if (!md) { - LM_ERR("no more pkg\n"); - return -1; - } - memset(md, 0, sizeof *md); - - if (dep_type & DEP_REVERSE_DESTROY) { - md->mod = mod; - md->next = this->sr_deps_destroy; - this->sr_deps_destroy = md; - } else { - md->mod = this; - md->next = mod->sr_deps_destroy; - mod->sr_deps_destroy = md; - } - - dep_solved++; - break; + mod_type, mod->exports->name, + mod->exports->type); } + md = make_dep(md, dep_type, DEP_REVERSE_MINIT, this, mod); + if (!md) + return -1; + md = make_dep(NULL, dep_type, DEP_REVERSE_DESTROY, mod, this); + if (!md) + return -1; + md = make_dep(NULL, dep_type, DEP_REVERSE_CINIT, this, mod); + if (!md) + return -1; + + dep_solved++; + if (byname) + break; + md = NULL; } /* reverse-init dependencies are always solved! */ - if (dep_solved || dep_type & DEP_REVERSE_INIT) + if (dep_solved || dep_type & DEP_REVERSE_MINIT) continue; /* treat unmet dependencies using the intended behaviour */ diff --git a/sr_module_deps.h b/sr_module_deps.h index 2501dccc351..8b7b3087545 100644 --- a/sr_module_deps.h +++ b/sr_module_deps.h @@ -74,10 +74,12 @@ enum module_type { #define DEP_WARN (1 << 1) /* re-order init & destroy; warn if dep n/f */ #define DEP_ABORT (1 << 2) /* re-order init & destroy; exit if dep n/f */ /* in some cases, the dependency direction will be reversed */ -#define DEP_REVERSE_INIT (1 << 3) /* if A->B, A inits before B */ +#define DEP_REVERSE_MINIT (1 << 3) /* if A->B, A.mod_init() before B.mod_init() */ #define DEP_REVERSE_DESTROY (1 << 4) /* if A->B, B destroys before A */ +#define DEP_REVERSE_CINIT (1 << 5) /* if A->B, A.child_init() before B.child_init() */ -#define DEP_REVERSE (DEP_REVERSE_INIT|DEP_REVERSE_DESTROY) +#define DEP_REVERSE_INIT (DEP_REVERSE_MINIT|DEP_REVERSE_CINIT) +#define DEP_REVERSE (DEP_REVERSE_INIT|DEP_REVERSE_DESTROY) typedef struct module_dependency { enum module_type mod_type; From af129b11ea16c3f9019c50bee51de1bd01ed4dc9 Mon Sep 17 00:00:00 2001 From: Maksym Sobolyev Date: Wed, 18 Jun 2025 22:23:22 -0700 Subject: [PATCH 2/6] rtpproxy: collect all notification configuration into one struct, use struct rtpp_sock to keep FD and type. --- modules/rtpproxy/notification_process.c | 28 +++++++------- modules/rtpproxy/rtpproxy.c | 49 ++++++++++--------------- modules/rtpproxy/rtpproxy.h | 13 ++++++- 3 files changed, 45 insertions(+), 45 deletions(-) diff --git a/modules/rtpproxy/notification_process.c b/modules/rtpproxy/notification_process.c index 8376e201f4a..2efdbea37ca 100644 --- a/modules/rtpproxy/notification_process.c +++ b/modules/rtpproxy/notification_process.c @@ -273,19 +273,18 @@ static int rtpproxy_io_callback(int fd, void *fs, int was_timeout) static int rtpproxy_io_new_callback(int fd, void *fs, int was_timeout) { int size; - struct sockaddr_storage rtpp_info; + struct sockaddr_storage rtpp_info = {0}; struct rtpp_node *node; struct rtpp_notify *notify; size = sizeof(rtpp_info); - memset(&rtpp_info, 0, size); fd = accept(fd, (struct sockaddr *)&rtpp_info, (socklen_t *)&size); if(fd < 0) { LM_ERR("socket accept failed: %s(%d)\n", strerror(errno), errno); return -1; } - if (rtpp_notify_socket_un) { + if (rtpp_notify_cfg.sock.rn_umode == CM_UNIX) { LM_DBG("trusting unix socket connection\n"); if (reactor_proc_add_fd(fd, rtpproxy_io_callback, NULL)<0) { LM_CRIT("failed to add RTPProxy new connection to reactor\n"); @@ -337,29 +336,30 @@ void notification_listener_process(int rank) int len, n; int optval = 1; int socket_fd; + str *rn_name = &rtpp_notify_cfg.name; *rtpp_notify_process_no = process_no; - if (!rtpp_notify_socket_un) { - p = strrchr(rtpp_notify_socket.s, ':'); + if (rtpp_notify_cfg.sock.rn_umode == CM_TCP) { + p = strrchr(rn_name->s, ':'); if (!p) { - LM_ERR("invalid udp address <%.*s>\n", rtpp_notify_socket.len, rtpp_notify_socket.s); + LM_ERR("invalid udp address <%.*s>\n", rn_name->len, rn_name->s); return; } - n = p- rtpp_notify_socket.s; - rtpp_notify_socket.s[n] = 0; + n = p - rn_name->s; + rn_name->s[n] = 0; id.s = p+1; - id.len = rtpp_notify_socket.len - n -1; + id.len = rn_name->len - n - 1; port= str2s(id.s, id.len, &n); if(n) { LM_ERR("Bad format for socket name. Expected ip:port\n"); return; } /* skip here tcp part */ - rtpp_notify_socket.s += 4; + rn_name->s += 4; memset(&saddr_in, 0, sizeof(saddr_in)); - saddr_in.sin_addr.s_addr = inet_addr(rtpp_notify_socket.s); + saddr_in.sin_addr.s_addr = inet_addr(rn_name->s); saddr_in.sin_family = AF_INET; saddr_in.sin_port = htons(port); @@ -370,7 +370,7 @@ void notification_listener_process(int rank) } saddr = (struct sockaddr*)&saddr_in; len = sizeof(saddr_in); - LM_DBG("binding socket %d to %s:%d\n", socket_fd, rtpp_notify_socket.s, port); + LM_DBG("binding socket %d to %s:%d\n", socket_fd, rn_name->s, port); } else { /* create socket */ socket_fd = socket(AF_UNIX, SOCK_STREAM, 0); @@ -381,11 +381,11 @@ void notification_listener_process(int rank) memset(&saddr_un, 0, sizeof(struct sockaddr_un)); saddr_un.sun_family = AF_LOCAL; - strncpy(saddr_un.sun_path, rtpp_notify_socket.s, + strncpy(saddr_un.sun_path, rn_name->s, sizeof(saddr_un.sun_path) - 1); saddr = (struct sockaddr*)&saddr_un; len = sizeof(saddr_un); - LM_DBG("binding unix socket %s\n", rtpp_notify_socket.s); + LM_DBG("binding unix socket %s\n", rn_name->s); } if (setsockopt(socket_fd, SOL_SOCKET, SO_REUSEADDR, (void*)&optval, diff --git a/modules/rtpproxy/rtpproxy.c b/modules/rtpproxy/rtpproxy.c index 8f1682207f5..d5a5da4f493 100644 --- a/modules/rtpproxy/rtpproxy.c +++ b/modules/rtpproxy/rtpproxy.c @@ -372,12 +372,7 @@ static int myrand = 0; static unsigned int myseqn = 0; static int myrank = 0; static str nortpproxy_str = str_init("a=nortpproxy:yes"); -str rtpp_notify_socket = {0, 0}; -/* - * 0 - Unix socket - * 1 - TCP socket - */ -int rtpp_notify_socket_un = 0; +struct rtpp_notify_cfg rtpp_notify_cfg = {.name = {0, 0}}; /* used in rtpproxy_set_store() */ static int rtpp_sets=0; @@ -388,11 +383,6 @@ struct rtpp_set_head ** rtpp_set_list =0; struct rtpp_set ** default_rtpp_set=0; static int default_rtpp_set_no = DEFAULT_RTPP_SET_ID; -struct rtpp_sock { - int fd; - enum comm_modes rn_umode; -}; - /* array with the sockets used by rtpporxy (per process)*/ static struct rtpp_sock *rtpp_socks = NULL; static unsigned int *rtpp_no = 0; @@ -641,8 +631,8 @@ static int rtpproxy_set_notify(modparam_t type, void * val) return 0; } - rtpp_notify_socket.s = p; - rtpp_notify_socket.len = strlen(p); + rtpp_notify_cfg.name.s = p; + rtpp_notify_cfg.name.len = strlen(p); exports.procs = procs; @@ -1161,6 +1151,7 @@ mod_init(void) int i; int tmp; float timeout; + str *rn_name = &rtpp_notify_cfg.name; if (rtpproxy_autobridge != 0) { LM_WARN("Auto bridging does not properly function when doing " @@ -1304,16 +1295,16 @@ mod_init(void) parse_bavp(¶m3_bavp_name, ¶m3_spec) < 0) LM_DBG("cannot parse bavps\n"); - if(rtpp_notify_socket.s) { - if (strncmp("tcp:", rtpp_notify_socket.s, 4) == 0) { - rtpp_notify_socket_un = 0; + if (rn_name->s) { + if (strncmp("tcp:", rn_name->s, 4) == 0) { + rtpp_notify_cfg.sock.rn_umode = CM_TCP; } else { - if (strncmp("unix:", rtpp_notify_socket.s, 5) == 0) - rtpp_notify_socket.s += 5; - rtpp_notify_socket_un = 1; + if (strncmp("unix:", rn_name->s, 5) == 0) + rn_name->s += 5; + rtpp_notify_cfg.sock.rn_umode = CM_UNIX; } /* check if the notify socket parameter is set */ - rtpp_notify_socket.len = strlen(rtpp_notify_socket.s); + rn_name->len = strlen(rn_name->s); if(dlg_api.get_dlg == 0) { LM_ERR("You need to load dialog module if you want to use the" " timeout notification feature\n"); @@ -1656,8 +1647,8 @@ static void mod_destroy(void) nh_lock = NULL; } - if (rtpp_notify_socket_un) { - if (unlink(rtpp_notify_socket.s)) { + if (rtpp_notify_cfg.sock.rn_umode == CM_UNIX) { + if (unlink(rtpp_notify_cfg.name.s)) { LM_ERR("cannot remove the notification socket(%s:%d)\n", strerror(errno), errno); } @@ -3592,7 +3583,7 @@ static int rtpproxy_offer_answer(struct sip_msg *msg, struct rtpp_args *args, char *adv_address = NULL; struct dlg_cell * dlg; str dtmf_tag = {0, 0}, timeout_tag = {0, 0}; - str notification_socket = rtpp_notify_socket; + str rn_name = rtpp_notify_cfg.name; int allocated_body = 0; str *did; int ret = -1; @@ -3689,9 +3680,9 @@ static int rtpproxy_offer_answer(struct sip_msg *msg, struct rtpp_args *args, enable_notification = 1; /* check to see if we have a notification socket */ if (cp[1] != '\0' && cp[1] == '<') { - notification_socket.s = &cp[2]; + rn_name.s = &cp[2]; for (; cp[1] != '\0' && cp[1] != '>'; cp++); - notification_socket.len = &cp[1] - notification_socket.s; + rn_name.len = &cp[1] - rn_name.s; cp++; } break; @@ -3821,7 +3812,7 @@ static int rtpproxy_offer_answer(struct sip_msg *msg, struct rtpp_args *args, STR2IOVEC(args->from_tag, vup.vu[11]); STR2IOVEC(args->to_tag, vup.vu[15]); - if (notification_socket.s == 0 || notification_socket.len == 0) { + if (rn_name.s == 0 || rn_name.len == 0) { if (enable_notification) LM_WARN("cannot receive notifications because" "notification socket is not specified\n"); @@ -4086,9 +4077,9 @@ static int rtpproxy_offer_answer(struct sip_msg *msg, struct rtpp_args *args, node_has_notification = enable_notification && HAS_CAP(args->node, NOTIFY); if (node_has_dtmf_catch || node_has_notification) { if (opts.s.s[0] == 'U') { - STR2IOVEC(notification_socket, vup.vu[vcnt + 1]); - if (!HAS_CAP(args->node, NOTIFY_WILD) && !rtpp_notify_socket_un && - notification_socket.s == rtpp_notify_socket.s) { + STR2IOVEC(rn_name, vup.vu[vcnt + 1]); + if (!HAS_CAP(args->node, NOTIFY_WILD) && rtpp_notify_cfg.sock.rn_umode == CM_TCP && + rn_name.s == rtpp_notify_cfg.name.s) { vup.vu[vcnt + 1].iov_base += 4; vup.vu[vcnt + 1].iov_len -= 4; } diff --git a/modules/rtpproxy/rtpproxy.h b/modules/rtpproxy/rtpproxy.h index b2e0c6839ab..387569a9501 100644 --- a/modules/rtpproxy/rtpproxy.h +++ b/modules/rtpproxy/rtpproxy.h @@ -57,6 +57,16 @@ struct rtpp_node { struct rtpp_node *rn_next; }; +struct rtpp_sock { + int fd; + enum comm_modes rn_umode; +}; + +struct rtpp_notify_cfg { + str name; + struct rtpp_sock sock; +}; + #define CM_STREAM(ndp) ((ndp)->rn_umode == CM_TCP || (ndp)->rn_umode == CM_TCP6 || \ (ndp)->rn_umode == CM_CUNIX || (ndp)->rn_umode == CM_RTPIO) @@ -150,8 +160,7 @@ struct rtpp_dtmf_event { int rtpproxy_raise_dtmf_event(struct rtpp_dtmf_event *dtmf); extern rw_lock_t *nh_lock; -extern str rtpp_notify_socket; -extern int rtpp_notify_socket_un; +extern struct rtpp_notify_cfg rtpp_notify_cfg; extern struct dlg_binds dlg_api; extern int detect_rtp_idle; extern int rtpproxy_tout; From a01e682c97f3b7cbead5e0334f7e8fe2c64bf4a4 Mon Sep 17 00:00:00 2001 From: Maksym Sobolyev Date: Wed, 18 Jun 2025 23:11:34 -0700 Subject: [PATCH 3/6] rtp.io: integrate notification channel Allocate a single notification socketpair to be shared by all opensips workers, pass one side to the hosted rtpproxy process and provide API for the rtpproxy module to tap into the other end. --- modules/rtp.io/rtp_io.c | 48 +++++++++++++++++++++++++++++++++--- modules/rtp.io/rtp_io.h | 13 ++++++++++ modules/rtp.io/rtp_io_api.h | 3 +++ modules/rtp.io/rtp_io_host.c | 5 ++++ modules/rtp.io/rtp_io_util.c | 21 ++++++++++++++-- modules/rtp.io/rtp_io_util.h | 1 + 6 files changed, 85 insertions(+), 6 deletions(-) diff --git a/modules/rtp.io/rtp_io.c b/modules/rtp.io/rtp_io.c index 5510bab4ba9..2f434f5552b 100644 --- a/modules/rtp.io/rtp_io.c +++ b/modules/rtp.io/rtp_io.c @@ -29,6 +29,8 @@ #include "../../dprint.h" #include "../../timer.h" +#include "../rtpproxy/rtpproxy.h" +#include "../rtpproxy/notification_process.h" #include "rtp_io.h" #include "rtp_io_util.h" #include "rtp_io_params.h" @@ -40,7 +42,7 @@ static void mod_destroy(void); static const dep_export_t deps = { { /* OpenSIPS module dependencies */ - { MOD_TYPE_DEFAULT, "rtpproxy", DEP_SILENT|DEP_REVERSE }, + { MOD_TYPE_DEFAULT, "rtpproxy", (DEP_SILENT|DEP_REVERSE) & (~DEP_REVERSE_CINIT) }, { MOD_TYPE_NULL, NULL, 0 }, }, { /* modparam dependencies */ @@ -49,12 +51,14 @@ static const dep_export_t deps = { }; static int rtp_io_getchildsock(int); +static int rtp_io_getrnsock(struct rtpp_notify_cfg *); /* * Exported functions */ static const cmd_export_t cmds[] = { {"rtp_io_getchildsock", (cmd_function)rtp_io_getchildsock, {0}, 0}, + {"rtp_io_getrnsock", (cmd_function)rtp_io_getrnsock, {0}, 0}, {0} }; @@ -162,17 +166,29 @@ static int mod_init(void) ENV_ADD(argv_stat[i], e1); } + struct rtpp_n_sock *n_sock = &rpi_descp->n_sock; + int *fdp = n_sock->_fds; + if (socketpair(AF_UNIX, SOCK_STREAM, 0, fdp) < 0) + goto e1; + snprintf(n_sock->_name, sizeof(n_sock->_name), "fd:%d", n_sock->fds.rtpp); + n_sock->name.s = n_sock->_name; + n_sock->name.len = strlen(n_sock->_name); + ENV_ADD("-n", e2); + ENV_ADD("%s", e2, n_sock->name.s); for (int i = 0; i < nsocks; i++) { int *fdp = &rpi_descp->socks->holder[i * 2]; if (socketpair(AF_UNIX, SOCK_STREAM, 0, fdp) < 0) - goto e1; - ENV_ADD("-s", e1); - ENV_ADD("fd:%d", e1, fdp[0]); + goto e2; + ENV_ADD("-s", e2); + ENV_ADD("fd:%d", e2, fdp[0]); } rpi_descp->socks->n = nsocks; return 0; +e2: + close(n_sock->_fds[0]); + close(n_sock->_fds[1]); e1: free(rpi_descp->socks); e0: @@ -198,6 +214,7 @@ void mod_destroy(void) } rtp_io_close_serv_socks(); rtp_io_close_cnlt_socks(); + rtp_io_close_cnlt_nsock(); free(rpi_descp->socks); } @@ -206,11 +223,25 @@ void mod_destroy(void) static int child_init(int rank) { + rtpproxy_is_nproc_t is_nproc_f; + + is_nproc_f = (rtpproxy_is_nproc_t)find_export("rtpproxy_is_nproc", 0); + if (is_nproc_f == NULL) { + LM_ERR("rtpproxy_is_nproc() not found in the rtpproxy module\n"); + return -1; + } + int is_nproc = is_nproc_f(NPROC_CHECK); + LM_DBG("rtp.io: child_init(%d), notifier: %d\n", rank, is_nproc); if (rank > rpi_descp->socks->n) { LM_ERR("BUG: rank is higher than the number of sockets!\n"); return -1; } + if (!is_nproc && rtp_io_close_cnlt_nsock() != 0) { + LM_ERR("rtp_io_close_cnlt_nsock() failed\n"); + return -1; + } + if (rank <= 0) { if (rtp_io_close_cnlt_socks() != 0) { LM_ERR("rtp_io_close_cnlt_socks() failed\n"); @@ -240,3 +271,12 @@ static int rtp_io_getchildsock(int rank) int *fdp = &rpi_descp->socks->holder[(rank - 1) * 2]; return (fdp[1]); } + +static int rtp_io_getrnsock(struct rtpp_notify_cfg *rn_cfg) +{ + + rn_cfg->name = rpi_descp->n_sock.name; + rn_cfg->sock.rn_umode = CM_RTPIO; + rn_cfg->sock.fd = rpi_descp->n_sock.fds.osips; + return (0); +}; diff --git a/modules/rtp.io/rtp_io.h b/modules/rtp.io/rtp_io.h index 3990cdbe655..2bcec894136 100644 --- a/modules/rtp.io/rtp_io.h +++ b/modules/rtp.io/rtp_io.h @@ -25,10 +25,23 @@ struct rtp_io_socks { int holder[]; }; +struct rtpp_n_sock { + char _name[32]; + str name; + union { + struct { + int rtpp; + int osips; + } fds; + int _fds[2]; + }; +}; + struct rtp_io_desc { struct rtpp_cfg *rtpp_cfsp; struct rtpp_env_hd env; struct rtp_io_socks *socks; + struct rtpp_n_sock n_sock; }; extern struct rtp_io_desc *rpi_descp; diff --git a/modules/rtp.io/rtp_io_api.h b/modules/rtp.io/rtp_io_api.h index c09bf127e69..166454ca531 100644 --- a/modules/rtp.io/rtp_io_api.h +++ b/modules/rtp.io/rtp_io_api.h @@ -1,3 +1,6 @@ #pragma once +struct rtpp_notify_cfg; + typedef int (*rtp_io_getchildsock_t)(int); +typedef int (*rtp_io_getrnsock_t)(struct rtpp_notify_cfg *); diff --git a/modules/rtp.io/rtp_io_host.c b/modules/rtp.io/rtp_io_host.c index a5eddd49fd9..5f223bd9807 100644 --- a/modules/rtp.io/rtp_io_host.c +++ b/modules/rtp.io/rtp_io_host.c @@ -72,6 +72,8 @@ void rtpproxy_host_process(int rank) goto e1; if (rtp_io_close_cnlt_socks() != 0) goto e1; + if (rtp_io_close_cnlt_nsock() != 0) + goto e1; OPT_RESTORE(); rpi_descp->rtpp_cfsp = rtpp_main(argc, argv); @@ -101,5 +103,8 @@ ipc_shutdown_rtpp_host(int sender, void *param) if (rpi_descp->socks->holder[i] != -1) close(rpi_descp->socks->holder[i]); } + if (rpi_descp->n_sock.fds.rtpp != -1) { + close(rpi_descp->n_sock.fds.rtpp); + } free(rpi_descp->socks); } diff --git a/modules/rtp.io/rtp_io_util.c b/modules/rtp.io/rtp_io_util.c index a04c3697696..e7a54b5c6eb 100644 --- a/modules/rtp.io/rtp_io_util.c +++ b/modules/rtp.io/rtp_io_util.c @@ -85,19 +85,36 @@ int rtp_io_close_serv_socks(void) for (int i = 0; i < (rpi_descp->socks->n * 2); i+=2) { if (rpi_descp->socks->holder[i] != -1) { - close(rpi_descp->socks->holder[i]); + if (close(rpi_descp->socks->holder[i]) != 0) + return -1; rpi_descp->socks->holder[i] = -1; } } + if (rpi_descp->n_sock.fds.rtpp != -1) { + if (close(rpi_descp->n_sock.fds.rtpp) != 0) + return -1; + rpi_descp->n_sock.fds.rtpp = -1; + } return (0); } +int rtp_io_close_cnlt_nsock(void) +{ + if (rpi_descp->n_sock.fds.osips != -1) { + if (close(rpi_descp->n_sock.fds.osips) != 0) + return -1; + rpi_descp->n_sock.fds.osips = -1; + } + return 0; +} + int rtp_io_close_cnlt_socks(void) { for (int i = 0; i < (rpi_descp->socks->n * 2); i+=2) { if (rpi_descp->socks->holder[i+1] != -1) { - close(rpi_descp->socks->holder[i+1]); + if (close(rpi_descp->socks->holder[i+1]) != 0) + return -1; rpi_descp->socks->holder[i+1] = -1; } } diff --git a/modules/rtp.io/rtp_io_util.h b/modules/rtp.io/rtp_io_util.h index 6dc0cddaaa8..e9bcb9cd2b9 100644 --- a/modules/rtp.io/rtp_io_util.h +++ b/modules/rtp.io/rtp_io_util.h @@ -11,3 +11,4 @@ const char *const * rtp_io_env_gen_argv(struct rtpp_env_hd *, int *); int rtp_io_close_serv_socks(void); int rtp_io_close_cnlt_socks(void); +int rtp_io_close_cnlt_nsock(void); From 615cc069e1756e2c0b864b7e5efde1bbae58efe7 Mon Sep 17 00:00:00 2001 From: Maksym Sobolyev Date: Thu, 19 Jun 2025 00:11:52 -0700 Subject: [PATCH 4/6] rtpproxy: hook up notification socket into rtp.io automatically Do it when rtp.io is enabled. Start notification listener process always, turn it into dummy if rtp.io is not enabled and notification socket is not configured either. --- modules/rtpproxy/notification_process.c | 94 ++++++++++++++++++++----- modules/rtpproxy/notification_process.h | 7 ++ modules/rtpproxy/rtpproxy.c | 30 +++++--- modules/rtpproxy/rtpproxy.h | 2 + 4 files changed, 104 insertions(+), 29 deletions(-) create mode 100644 modules/rtpproxy/notification_process.h diff --git a/modules/rtpproxy/notification_process.c b/modules/rtpproxy/notification_process.c index 2efdbea37ca..b12ef2e26ee 100644 --- a/modules/rtpproxy/notification_process.c +++ b/modules/rtpproxy/notification_process.c @@ -43,6 +43,7 @@ #include "../../lib/list.h" #include "rtpproxy.h" +#include "notification_process.h" #define BUF_LEN 255 @@ -325,6 +326,26 @@ int init_rtpp_notify(void) return 0; } +static rtp_io_getrnsock_t +rtp_io_rnsock_f(void) +{ + static rtp_io_getrnsock_t _rtp_io_getrnsock = {0}; + if (_rtp_io_getrnsock == NULL) + _rtp_io_getrnsock = (rtp_io_getrnsock_t)find_export("rtp_io_getrnsock", 0); + return _rtp_io_getrnsock; +} + +int fill_rtp_io_rnsock(void) +{ + + rtp_io_getrnsock_t rtp_io_rnsock = rtp_io_rnsock_f(); + if (rtp_io_rnsock == NULL) + return -1; + if (rtp_io_rnsock(&rtpp_notify_cfg) != 0) + return -1; + return 0; +} + void notification_listener_process(int rank) { struct sockaddr_un saddr_un; @@ -335,12 +356,19 @@ void notification_listener_process(int rank) struct sockaddr* saddr; int len, n; int optval = 1; - int socket_fd; + int socket_fd = -1; str *rn_name = &rtpp_notify_cfg.name; + struct rtpp_sock *rn_sock = &rtpp_notify_cfg.sock; + + if (rn_name->s == NULL) { + if (fill_rtp_io_rnsock() != 0) + goto serve; + } *rtpp_notify_process_no = process_no; - if (rtpp_notify_cfg.sock.rn_umode == CM_TCP) { + switch (rn_sock->rn_umode) { + case CM_TCP: p = strrchr(rn_name->s, ':'); if (!p) { LM_ERR("invalid udp address <%.*s>\n", rn_name->len, rn_name->s); @@ -371,7 +399,8 @@ void notification_listener_process(int rank) saddr = (struct sockaddr*)&saddr_in; len = sizeof(saddr_in); LM_DBG("binding socket %d to %s:%d\n", socket_fd, rn_name->s, port); - } else { + break; + case CM_UNIX: /* create socket */ socket_fd = socket(AF_UNIX, SOCK_STREAM, 0); if (socket_fd == -1) { @@ -386,34 +415,50 @@ void notification_listener_process(int rank) saddr = (struct sockaddr*)&saddr_un; len = sizeof(saddr_un); LM_DBG("binding unix socket %s\n", rn_name->s); + break; + case CM_RTPIO: + socket_fd = rn_sock->fd; + len = -1; + saddr = NULL; + LM_DBG("using rtp.io notification socket %d\n", socket_fd); + break; + default: + abort(); } - if (setsockopt(socket_fd, SOL_SOCKET, SO_REUSEADDR, (void*)&optval, + if (rn_sock->rn_umode != CM_RTPIO) { + if (setsockopt(socket_fd, SOL_SOCKET, SO_REUSEADDR, (void*)&optval, sizeof(optval)) == -1) { - LM_ERR("setsockopt failed %s\n", strerror(errno)); - return; - } + LM_ERR("setsockopt failed %s\n", strerror(errno)); + return; + } - if (bind(socket_fd, saddr, len) == -1) { - LM_ERR("failed to bind to socket: %s\n", strerror(errno)); - return; - } + if (bind(socket_fd, saddr, len) == -1) { + LM_ERR("failed to bind to socket: %s\n", strerror(errno)); + return; + } - /* open socket for listening */ - if(listen(socket_fd, 10) == -1) { - LM_ERR("socket listen failed: %s(%d)\n", strerror(errno), errno); - close(socket_fd); - return; + /* open socket for listening */ + if(listen(socket_fd, 10) == -1) { + LM_ERR("socket listen failed: %s(%d)\n", strerror(errno), errno); + close(socket_fd); + return; + } } +serve: if (reactor_proc_init("RTPProxy events") < 0) { LM_ERR("failed to init the RTPProxy events\n"); return; } - if (reactor_proc_add_fd( socket_fd, rtpproxy_io_new_callback, NULL) < 0) { - LM_CRIT("failed to add RTPProxy listen socket to reactor\n"); - return; + if (socket_fd != -1) { + reactor_proc_cb_f cb_f = (rn_sock->rn_umode != CM_RTPIO) ? rtpproxy_io_new_callback : + rtpproxy_io_callback; + if (reactor_proc_add_fd(socket_fd, cb_f, NULL) < 0) { + LM_CRIT("failed to add RTPProxy listen socket to reactor\n"); + return; + } } reactor_proc_loop(); @@ -449,3 +494,14 @@ void update_rtpp_notify(void) if (ipc_send_rpc(*rtpp_notify_process_no, ipc_update_rtpp_notify, NULL) != 0) LM_ERR("could not send RTPProxy update to notify process!\n"); } + +int rtpproxy_is_nproc(enum inp_op op) +{ + static int _rtpproxy_is_nproc = 0; + const int lastval = _rtpproxy_is_nproc; + + if (op == NPROC_SET) { + _rtpproxy_is_nproc = 1; + } + return lastval; +} diff --git a/modules/rtpproxy/notification_process.h b/modules/rtpproxy/notification_process.h new file mode 100644 index 00000000000..37909f60145 --- /dev/null +++ b/modules/rtpproxy/notification_process.h @@ -0,0 +1,7 @@ +#pragma once + +enum inp_op {NPROC_CHECK = 0, NPROC_SET}; + +int rtpproxy_is_nproc(enum inp_op); + +typedef int (*rtpproxy_is_nproc_t)(enum inp_op); diff --git a/modules/rtpproxy/rtpproxy.c b/modules/rtpproxy/rtpproxy.c index d5a5da4f493..6b3dbefabe3 100644 --- a/modules/rtpproxy/rtpproxy.c +++ b/modules/rtpproxy/rtpproxy.c @@ -188,7 +188,7 @@ #include "rtpproxy_vcmd.h" #include "rtppn_connect.h" #include "../rtp_relay/rtp_relay.h" -#include "../rtp.io/rtp_io_api.h" +#include "notification_process.h" #define NH_TABLE_VERSION 0 @@ -494,6 +494,7 @@ static const cmd_export_t cmds[] = { {CMD_PARAM_VAR | CMD_PARAM_OPT, 0, 0}, {0,0,0}}, REQUEST_ROUTE|FAILURE_ROUTE|ONREPLY_ROUTE|BRANCH_ROUTE|LOCAL_ROUTE}, {"load_rtpproxy", (cmd_function)load_rtpproxy, {{0,0,0}}, 0}, + {"rtpproxy_is_nproc", (cmd_function)rtpproxy_is_nproc, {{0}}, 0}, {0,0,{{0,0,0}},0} }; @@ -568,7 +569,7 @@ struct module_exports exports = { mi_cmds, /* exported MI functions */ 0, /* exported pseudo-variables */ 0, /* exported transformations */ - 0, /* extra processes */ + procs, /* extra processes */ mod_preinit, mod_init, 0, /* reply processing */ @@ -634,8 +635,6 @@ static int rtpproxy_set_notify(modparam_t type, void * val) rtpp_notify_cfg.name.s = p; rtpp_notify_cfg.name.len = strlen(p); - exports.procs = procs; - return 0; } @@ -1310,11 +1309,11 @@ mod_init(void) " timeout notification feature\n"); return -1; } + } - if (init_rtpp_notify() < 0) { - LM_ERR("cannot init notify handlers\n"); - return -1; - } + if (init_rtpp_notify() < 0) { + LM_ERR("cannot init notify handlers\n"); + return -1; } ei_id = evi_publish_event(event_name); @@ -1489,9 +1488,14 @@ static int _add_proxies_from_database(void) { static int child_init(int rank) { + LM_INFO("rtpproxy: child_init(%d)\n", rank); /* we need DB conn in the worker processes only */ - if (rank<1) + if (rank<1) { + if (rank < 0) { + assert(rtpproxy_is_nproc(NPROC_SET) == 0); + } return 0; + } if(*rtpp_set_list==NULL ) return 0; @@ -1542,6 +1546,12 @@ int connect_rtpproxies(struct rtpp_set *filter) LM_ERR("rtp.io is not loaded\n"); return -1; } + if (rtpp_notify_cfg.name.s == NULL) { + if (fill_rtp_io_rnsock() != 0) { + LM_ERR("rtp.io notification socket cannot be initialized\n"); + return -1; + } + } rtpp_socks[pnode->idx].fd = gcs_f(myrank); } break; @@ -1647,7 +1657,7 @@ static void mod_destroy(void) nh_lock = NULL; } - if (rtpp_notify_cfg.sock.rn_umode == CM_UNIX) { + if (rtpp_notify_cfg.name.s != NULL && rtpp_notify_cfg.sock.rn_umode == CM_UNIX) { if (unlink(rtpp_notify_cfg.name.s)) { LM_ERR("cannot remove the notification socket(%s:%d)\n", strerror(errno), errno); diff --git a/modules/rtpproxy/rtpproxy.h b/modules/rtpproxy/rtpproxy.h index 387569a9501..87e9eaee024 100644 --- a/modules/rtpproxy/rtpproxy.h +++ b/modules/rtpproxy/rtpproxy.h @@ -30,6 +30,7 @@ #include "../../pvar.h" #include "../dialog/dlg_load.h" #include "../../rw_locking.h" +#include "../rtp.io/rtp_io_api.h" struct rtpproxy_vcmd; @@ -168,6 +169,7 @@ extern struct rtpp_set_head ** rtpp_set_list; int init_rtpp_notify(); void update_rtpp_notify(); void notification_listener_process(int rank); +int fill_rtp_io_rnsock(void); /* Functions from nathelper */ struct rtpp_set *get_rtpp_set(nh_set_param_t *); From 06c8423eda389d8714682953a609889724dc4467 Mon Sep 17 00:00:00 2001 From: Maksym Sobolyev Date: Wed, 28 May 2025 05:11:14 -0700 Subject: [PATCH 5/6] rtp.io: enable dialog and related modules in the CI build --- docker/Dockerfile.rtp.io | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/docker/Dockerfile.rtp.io b/docker/Dockerfile.rtp.io index 0134c0d0a3f..d6de70a144e 100644 --- a/docker/Dockerfile.rtp.io +++ b/docker/Dockerfile.rtp.io @@ -30,8 +30,10 @@ RUN --mount=type=bind,source=dist/voiptests/requirements.txt,target=requirements COPY --exclude=.git --exclude=.github --exclude=docker --exclude=dist \ . . -ARG KEEP_MODULES="dialog sipmsgops sl tm rr maxfwd rtp.io rtpproxy textops" -ARG SKIP_MODULES="usrloc event_routing clusterer rtp_relay" +ARG KEEP_MODULES="dialog sipmsgops sl tm rr maxfwd rtp.io rtpproxy textops \ + signaling mi_fifo usrloc registrar acc rtp_relay siprec b2b_entities \ + uac_auth presence pua alias_db b2b_logic" +ARG SKIP_MODULES="event_routing clusterer" RUN mkdir tmp && cd modules && mv ${KEEP_MODULES} ${SKIP_MODULES} ../tmp && \ rm -rf * && cd ../tmp && mv ${KEEP_MODULES} ${SKIP_MODULES} ../modules && \ cd .. && rmdir tmp From 14c89d09c1d489a9f0ded82510d1c894c5c03df0 Mon Sep 17 00:00:00 2001 From: Maksym Sobolyev Date: Thu, 19 Jun 2025 07:24:49 -0700 Subject: [PATCH 6/6] Extra debug. --- modules/rtp.io/rtp_io.c | 2 +- modules/rtpproxy/notification_process.c | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/modules/rtp.io/rtp_io.c b/modules/rtp.io/rtp_io.c index 2f434f5552b..4d0c44dabad 100644 --- a/modules/rtp.io/rtp_io.c +++ b/modules/rtp.io/rtp_io.c @@ -231,7 +231,7 @@ child_init(int rank) return -1; } int is_nproc = is_nproc_f(NPROC_CHECK); - LM_DBG("rtp.io: child_init(%d), notifier: %d\n", rank, is_nproc); + LM_INFO("rtp.io: child_init(%d), notifier: %d\n", rank, is_nproc); if (rank > rpi_descp->socks->n) { LM_ERR("BUG: rank is higher than the number of sockets!\n"); return -1; diff --git a/modules/rtpproxy/notification_process.c b/modules/rtpproxy/notification_process.c index b12ef2e26ee..9814e28f461 100644 --- a/modules/rtpproxy/notification_process.c +++ b/modules/rtpproxy/notification_process.c @@ -420,7 +420,7 @@ void notification_listener_process(int rank) socket_fd = rn_sock->fd; len = -1; saddr = NULL; - LM_DBG("using rtp.io notification socket %d\n", socket_fd); + LM_INFO("using rtp.io notification socket %d\n", socket_fd); break; default: abort();