From e1d8ffca0af4031684ca0cc0bf7f86257bce6c87 Mon Sep 17 00:00:00 2001 From: David Trihy Date: Thu, 2 Oct 2025 12:54:51 +0100 Subject: [PATCH 1/2] sql_cacher: add support for reads of sql_cached_value while table is being reloaded --- atomic.h | 3 + modules/sql_cacher/README | 12 +- modules/sql_cacher/sql_cacher.c | 188 +++++++++++++++++++++++++------- modules/sql_cacher/sql_cacher.h | 21 +++- 4 files changed, 181 insertions(+), 43 deletions(-) diff --git a/atomic.h b/atomic.h index de454ff8653..63ab499b2cc 100644 --- a/atomic.h +++ b/atomic.h @@ -43,6 +43,7 @@ #undef NO_ATOMIC_OPS typedef _Atomic(unsigned long) atomic_t; +typedef _Atomic(_Bool) atomic_bool_t; #else /* HAVE_STDATOMIC */ /************************* i386 & x86_64 ARCH ****************************/ @@ -72,6 +73,7 @@ typedef _Atomic(unsigned long) atomic_t; * not some alias that contains the same information. */ typedef struct { volatile unsigned int counter; } atomic_t; +typedef atomic_t atomic_bool_t; /*! \brief * atomic_add - add integer to atomic variable @@ -141,6 +143,7 @@ static __inline__ void atomic_dec(atomic_t *v) * not some alias that contains the same information. */ typedef struct { volatile unsigned long counter; } atomic_t; +typedef atomic_t atomic_bool_t; /*! \brief * atomic_set - set atomic variable diff --git a/modules/sql_cacher/README b/modules/sql_cacher/README index 85e114414d3..143452d88ee 100644 --- a/modules/sql_cacher/README +++ b/modules/sql_cacher/README @@ -148,6 +148,15 @@ Chapter 1. Admin Guide * expire : expire period for the values stored in the cache for the on demand caching type in seconds If not present, default value is “1 hour” + * full_caching_lock_scope : whether to lock reading from the cache + when loading the table into the cache on a row or table basis + with it set to “table” large tables writing into the cache can + block reads when reading an sql_cached_value from that table + stalling the SIP processing but ensures full table consistency, + “row” allows reads during writing on rows not currently being written to. + + row + + table + If not present, default value is “row” The parameters must be given in the exact order specified above. @@ -164,7 +173,8 @@ cachedb_url=mongodb:mycluster://127.0.0.1:27017/db.col table=table_name key=column_name_0 columns=column_name_1 column_name_2 column_name_3 -on_demand=0") +on_demand=0 +full_caching_lock_scope=row") 1.3.2. spec_delimiter (string) diff --git a/modules/sql_cacher/sql_cacher.c b/modules/sql_cacher/sql_cacher.c index b635f9a6706..60b74302944 100644 --- a/modules/sql_cacher/sql_cacher.c +++ b/modules/sql_cacher/sql_cacher.c @@ -37,6 +37,48 @@ #include "sql_cacher.h" #include "../../lib/csv.h" +/* + * cache_lock conditional compilation of atomic variable support + * so we don't need to acquire a lock for reads + * if the cache_table param full_caching_lock_scope is set to row + * if atomic ops are not supported then the param has no effect + */ +#ifndef NO_ATOMIC_OPS +#define lock_cache_writes(cache_lock) \ + do { \ + atomic_store(&cache_lock.is_writing, 1); \ + lock_start_write(cache_lock.ref_lock); \ + } while (0) +#define unlock_cache_writes(cache_lock) \ + do { \ + atomic_store(&cache_lock.is_writing, 0); \ + lock_stop_write(cache_lock.ref_lock); \ + } while (0) +#define acquire_read_lock(cache_lock) \ + do { \ + if (cache_lock.scope == CACHE_LOCK_TABLE) { \ + lock_start_read(cache_lock.ref_lock); \ + } \ + } while (0) +#define release_read_lock(cache_lock) \ + do { \ + if (cache_lock.scope == CACHE_LOCK_TABLE) { \ + lock_stop_read(cache_lock.ref_lock); \ + } \ + } while (0) +#define is_cache_writing(cache_lock) atomic_load(&cache_lock.is_writing) +#define init_cache_lock ((cache_lock_t){NULL, CACHE_LOCK_ROW, 0}) +#else +#define lock_cache_writes(cache_lock) lock_start_write(cache_lock.ref_lock) +#define unlock_cache_writes(cache_lock) lock_stop_write(cache_lock.ref_lock) +#define acquire_read_lock(cache_lock) lock_start_read(cache_lock.ref_lock) +#define release_read_lock(cache_lock) lock_stop_read(cache_lock.ref_lock) +#define is_cache_writing(cache_lock) 0 +#define init_cache_lock ((cache_lock_t){NULL, CACHE_LOCK_ROW}) +#endif + +/* end of cache_lock conditional compilation */ + static int mod_init(void); static void destroy(void); static int child_init(int rank); @@ -164,7 +206,7 @@ static int parse_cache_entry(unsigned int type, void *val) int len; str parse_str_copy, parse_str; - if(!entry_list){ + if (!entry_list) { entry_list = shm_malloc(sizeof(cache_entry_t*)); if (!entry_list) { LM_ERR("No more memory for cache entries list\n"); @@ -175,26 +217,28 @@ static int parse_cache_entry(unsigned int type, void *val) parse_str.len = strlen((char *)val); parse_str.s = pkg_malloc(parse_str.len); - if(!parse_str.s){ + if (!parse_str.s) { LM_ERR("No more pkg memory\n"); - return -1; + goto setup_err; } memcpy(parse_str.s, (char *)val, parse_str.len); - new_entry = shm_malloc(sizeof(cache_entry_t)); - if (!new_entry) { - LM_ERR("No more memory for cache entry struct\n"); - return -1; - } - new_entry->id.s = NULL; - new_entry->columns = NULL; - new_entry->key_type = DB_STR; - new_entry->nr_columns = 0; - new_entry->on_demand = 0; - new_entry->expire = DEFAULT_ON_DEMAND_EXPIRE; - new_entry->nr_ints = 0; - new_entry->nr_strs = 0; - new_entry->column_types = 0; - new_entry->ref_lock = NULL; + + new_entry = shm_malloc(sizeof(cache_entry_t)); + if (!new_entry) { + LM_ERR("No more memory for cache entry struct\n"); + goto setup_err; + } + + new_entry->id.s = NULL; + new_entry->columns = NULL; + new_entry->key_type = DB_STR; + new_entry->nr_columns = 0; + new_entry->on_demand = 0; + new_entry->expire = DEFAULT_ON_DEMAND_EXPIRE; + new_entry->nr_ints = 0; + new_entry->nr_strs = 0; + new_entry->column_types = 0; + new_entry->cache_lock = init_cache_lock; #define PARSE_TOKEN(_ptr1, _ptr2, field, field_name_str, field_name_len) \ do { \ @@ -450,6 +494,55 @@ static int parse_cache_entry(unsigned int type, void *val) EXPIRE_STR_LEN, EXPIRE_STR, str_val.len, str_val.s); goto parse_err; } + + if (!tmp) /* delimiter not found, reached the end of the string to parse */ + goto end_parsing; + else { + p1 = tmp + 1; + p2 = memchr(p1, '=', parse_str.len - (p1 - parse_str.s)); + if (!p2) { + LM_ERR("expected: '='\n"); + goto parse_err; + } + } + } + + if (!memcmp(p1, FULL_CACHING_LOCK_SCOPE, FULL_CACHING_LOCK_SCOPE_STR_LEN)) { + if (*(p1+FULL_CACHING_LOCK_SCOPE_STR_LEN) != '=') { \ + LM_ERR("expected: '=' after: %.*s\n", FULL_CACHING_LOCK_SCOPE_STR_LEN, FULL_CACHING_LOCK_SCOPE); + goto parse_err; + } + + tmp = memchr(p2 + 1, spec_delimiter.s[0], + parse_str.len - (p2 - parse_str.s)); + if (!tmp) + len = parse_str.len - (p2 - parse_str.s + 1); + else + len = tmp - p2 - 1; + + if (len <= 0) { + LM_ERR("expected value of: %.*s\n", FULL_CACHING_LOCK_SCOPE_STR_LEN, FULL_CACHING_LOCK_SCOPE); + goto parse_err; + } + + if (len == LOCK_SCOPE_ROW_STR_LEN && !memcmp(p2+1, LOCK_SCOPE_ROW, len)) + new_entry->cache_lock.scope = CACHE_LOCK_ROW; + else if (len == LOCK_SCOPE_TABLE_STR_LEN && !memcmp(p2+1, LOCK_SCOPE_TABLE, len)) + new_entry->cache_lock.scope = CACHE_LOCK_TABLE; + else { + LM_ERR("Unsupported cache lock scope '%.*s' defaulting to row\n", len, p2+1); + } + + if (!tmp) /* delimiter not found, reached the end of the string to parse */ + goto end_parsing; + else { + p1 = tmp + 1; + p2 = memchr(p1, '=', parse_str.len - (p1 - parse_str.s)); + if (!p2) { + LM_ERR("expected: '='\n"); + goto parse_err; + } + } } else if (parse_str.len - (p1 - parse_str.s) > 0) { LM_ERR("unknown parameter: %.*s\n", (int)(parse_str.len - (p1 - parse_str.s)), p1); @@ -483,6 +576,16 @@ static int parse_cache_entry(unsigned int type, void *val) pkg_free(parse_str_copy.s); return rc; +setup_err: + if (parse_str.s) { + pkg_free(parse_str.s); + } + + if (new_entry) { + shm_free(new_entry); + } + + return -1; } /* get the column types from the sql query result */ @@ -1001,10 +1104,10 @@ static int load_entire_table(cache_entry_t *c_entry, db_handlers_t *db_hdls, pkg_free(query_cols); - lock_start_write(db_hdls->c_entry->ref_lock); + lock_cache_writes(db_hdls->c_entry->cache_lock); if (inc_rld_vers && inc_cache_rld_vers(db_hdls, &reload_vers) < 0) { - lock_stop_write(db_hdls->c_entry->ref_lock); + unlock_cache_writes(db_hdls->c_entry->cache_lock); goto error; } @@ -1015,7 +1118,7 @@ static int load_entire_table(cache_entry_t *c_entry, db_handlers_t *db_hdls, row = RES_ROWS(sql_res); values = ROW_VALUES(row); if (get_column_types(c_entry, values + 1, ROW_N(row) - 1) < 0) { - lock_stop_write(db_hdls->c_entry->ref_lock); + unlock_cache_writes(db_hdls->c_entry->cache_lock); goto error; } @@ -1027,7 +1130,7 @@ static int load_entire_table(cache_entry_t *c_entry, db_handlers_t *db_hdls, if (!VAL_NULL(values)) { if (insert_in_cachedb(c_entry, db_hdls, values ,values + 1, reload_vers, ROW_N(row) - 1) < 0) { - lock_stop_write(db_hdls->c_entry->ref_lock); + unlock_cache_writes(db_hdls->c_entry->cache_lock); goto error; } loaded_rec++; @@ -1038,7 +1141,7 @@ static int load_entire_table(cache_entry_t *c_entry, db_handlers_t *db_hdls, if (db_hdls->db_funcs.fetch_result(db_hdls->db_con,&sql_res,fetch_nr_rows)<0) { LM_ERR("Error fetching rows (1) from SQL DB: %s\n", db_url_escape(&c_entry->db_url)); - lock_stop_write(db_hdls->c_entry->ref_lock); + unlock_cache_writes(db_hdls->c_entry->cache_lock); goto error; } } else { @@ -1047,7 +1150,7 @@ static int load_entire_table(cache_entry_t *c_entry, db_handlers_t *db_hdls, } while (RES_ROW_N(sql_res) > 0); done: - lock_stop_write(db_hdls->c_entry->ref_lock); + unlock_cache_writes(db_hdls->c_entry->cache_lock); db_hdls->db_funcs.free_result(db_hdls->db_con, sql_res); @@ -1247,7 +1350,7 @@ static mi_item_t *mi_reload(const mi_params_t *params, str *key) lock_get(it->wait_sql_query); } } else { - lock_start_write(db_hdls->c_entry->ref_lock); + lock_cache_writes(db_hdls->c_entry->cache_lock); } if ((rld_vers = get_rld_vers_from_cache(db_hdls->c_entry, db_hdls)) < 0) { @@ -1258,7 +1361,7 @@ static mi_item_t *mi_reload(const mi_params_t *params, str *key) else lock_release(queries_lock); } else { - lock_stop_write(db_hdls->c_entry->ref_lock); + unlock_cache_writes(db_hdls->c_entry->cache_lock); } return init_mi_error(500, MI_SSTR("ERROR Reloading key from SQL" @@ -1275,7 +1378,7 @@ static mi_item_t *mi_reload(const mi_params_t *params, str *key) else lock_release(queries_lock); } else { - lock_stop_write(db_hdls->c_entry->ref_lock); + unlock_cache_writes(db_hdls->c_entry->cache_lock); } if (rc == -1) @@ -1424,8 +1527,8 @@ static int mod_init(void) if (!c_entry->on_demand) { use_timer = 1; c_entry->expire = full_caching_expire; - c_entry->ref_lock = lock_init_rw(); - if (!c_entry->ref_lock) { + c_entry->cache_lock.ref_lock = lock_init_rw(); + if (!c_entry->cache_lock.ref_lock) { LM_ERR("Failed to init readers-writers lock\n"); continue; } @@ -1537,8 +1640,11 @@ static int cdb_val_decode(const pv_name_fix_t *pv_name, const str *cdb_val, int goto error; memcpy(&int_val, int_buf, 4); - if (reload_version != int_val) - return 3; + if (reload_version != int_val) { + if (!is_cache_writing(pv_name->c_entry->cache_lock) || + (is_cache_writing(pv_name->c_entry->cache_lock) && reload_version != int_val +1)) + return 3; + } /* null integer value in db */ if (!memcmp(cdb_val->s + pv_name->col_offset, zeroes, INT_B64_ENC_LEN)) @@ -1982,26 +2088,26 @@ int pv_get_sql_cached_value(struct sip_msg *msg, pv_param_t *param, pv_value_t } if (!pv_name->c_entry->on_demand) - lock_start_read(pv_name->c_entry->ref_lock); + acquire_read_lock(pv_name->c_entry->cache_lock); rc = cdb_fetch(pv_name, &cdb_res, &entry_rld_vers); if (rc == -1) { LM_ERR("Error fetching from cachedb\n"); if (!pv_name->c_entry->on_demand) - lock_stop_read(pv_name->c_entry->ref_lock); + release_read_lock(pv_name->c_entry->cache_lock); return pv_get_null(msg, param, res); } if (!pv_name->c_entry->on_demand) { if (rc == -2) { LM_DBG("key: %.*s not found\n", pv_name->key.len, pv_name->key.s); - lock_stop_read(pv_name->c_entry->ref_lock); + release_read_lock(pv_name->c_entry->cache_lock); return pv_get_null(msg, param, res); } else { if (cdb_res.len == 0 || !cdb_res.s) { LM_DBG("key: %.*s not found in SQL db\n", pv_name->key.len, pv_name->key.s); - lock_stop_read(pv_name->c_entry->ref_lock); + release_read_lock(pv_name->c_entry->cache_lock); pkg_free(cdb_res.s); return pv_get_null(msg, param, res); } @@ -2011,7 +2117,7 @@ int pv_get_sql_cached_value(struct sip_msg *msg, pv_param_t *param, pv_value_t rc2 = cdb_val_decode(pv_name, &cdb_res, entry_rld_vers, &str_res, &int_res); - lock_stop_read(pv_name->c_entry->ref_lock); + release_read_lock(pv_name->c_entry->cache_lock); if (rc2 == 2) goto out_free_null; @@ -2115,7 +2221,7 @@ static void free_c_entry(cache_entry_t *c) shm_free(c->columns[i]); } shm_free(c->columns); - lock_destroy_rw(c->ref_lock); + lock_destroy_rw(c->cache_lock.ref_lock); shm_free(c); } @@ -2330,11 +2436,11 @@ static int sql_cache_dump(struct sip_msg *msg, db_handlers_t *dbh, LM_DBG("dumping data from cache: %.*s\n", cache->id.len, cache->id.s); - lock_start_read(cache->ref_lock); + acquire_read_lock(cache->cache_lock); ver = get_rld_vers_from_cache(cache, dbh); if (ver < 0) { - lock_stop_read(cache->ref_lock); + release_read_lock(cache->cache_lock); LM_ERR("failed to get reload version\n"); return -1; } @@ -2357,13 +2463,13 @@ static int sql_cache_dump(struct sip_msg *msg, db_handlers_t *dbh, n = dbh->cdbf.iter_keys(dbh->cdbcon, decode_kv2avps); if (n < 0) { - lock_stop_read(cache->ref_lock); + release_read_lock(cache->cache_lock); LM_ERR("failed to fully iterate through cache '%.*s'\n", cache->id.len, cache->id.s); return -1; } - lock_stop_read(cache->ref_lock); + release_read_lock(cache->cache_lock); return n == 0 ? -2 : n; } diff --git a/modules/sql_cacher/sql_cacher.h b/modules/sql_cacher/sql_cacher.h index 6fa90b77fc0..8ec4eefef8a 100644 --- a/modules/sql_cacher/sql_cacher.h +++ b/modules/sql_cacher/sql_cacher.h @@ -28,6 +28,7 @@ #include "../../db/db.h" #include "../../cachedb/cachedb.h" +#include "../../atomic.h" #define DEFAULT_SPEC_DELIM "\n" #define DEFAULT_COLUMNS_DELIM " " @@ -51,6 +52,12 @@ #define ONDEMAND_STR_LEN ((int)(sizeof(ONDEMAND_STR) - 1)) #define EXPIRE_STR "expire" #define EXPIRE_STR_LEN ((int)(sizeof(EXPIRE_STR) - 1)) +#define FULL_CACHING_LOCK_SCOPE "full_caching_lock_scope" +#define FULL_CACHING_LOCK_SCOPE_STR_LEN ((int)(sizeof(FULL_CACHING_LOCK_SCOPE) - 1)) +#define LOCK_SCOPE_ROW "row" +#define LOCK_SCOPE_ROW_STR_LEN ((int)(sizeof(LOCK_SCOPE_ROW) - 1)) +#define LOCK_SCOPE_TABLE "table" +#define LOCK_SCOPE_TABLE_STR_LEN ((int)(sizeof(LOCK_SCOPE_TABLE) - 1)) #define TYPE_STR_STR "string" #define TYPE_STR_LEN ((int)(sizeof(TYPE_STR_STR) - 1)) @@ -73,6 +80,18 @@ #define is_str_column(pv_name_fix_p) \ ((pv_name_fix_p)->c_entry->column_types & (1LL << (pv_name_fix_p)->col_nr)) +typedef enum _caching_lock_scope { + CACHE_LOCK_ROW, CACHE_LOCK_TABLE +} caching_lock_scope; + +typedef struct _cache_lock { + rw_lock_t *ref_lock; + caching_lock_scope scope; +#ifndef NO_ATOMIC_OPS + atomic_bool_t is_writing; +#endif +} cache_lock_t; + typedef struct _cache_entry { str id; str db_url; @@ -90,7 +109,7 @@ typedef struct _cache_entry { * where n = (entry.nr_columns - 1) */ long long column_types; - rw_lock_t *ref_lock; + cache_lock_t cache_lock; struct _cache_entry *next; } cache_entry_t; From d5fbaa8ec9963b5a7149947ebdd27b542f9a3457 Mon Sep 17 00:00:00 2001 From: David Trihy Date: Thu, 2 Oct 2025 14:18:57 +0100 Subject: [PATCH 2/2] Fixing compile warning --- modules/sql_cacher/sql_cacher.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/modules/sql_cacher/sql_cacher.c b/modules/sql_cacher/sql_cacher.c index 60b74302944..537642f072e 100644 --- a/modules/sql_cacher/sql_cacher.c +++ b/modules/sql_cacher/sql_cacher.c @@ -198,7 +198,7 @@ struct module_exports exports = { static int parse_cache_entry(unsigned int type, void *val) { - cache_entry_t *new_entry; + cache_entry_t *new_entry = NULL; char *p1, *p2, *tmp, *c_tmp1, *c_tmp2; int col_idx; int rc = -1;