@@ -342,6 +342,8 @@ def __init__(
342342 self .owners = rs .load_tenants (self )
343343 self .start_ts = time .time ()
344344 self .msg_cache : list [rp .PubSubMsg ] = []
345+ self .msg_topic_cache : dict [str , List [rp .PubSubMsg ]] = {}
346+ self .tables : dict [str , rdb .Table ] = {}
345347 self ._builtins ()
346348 if data_at_rest :
347349 self .db = rdb .init_db (self , schema )
@@ -423,8 +425,9 @@ async def _pubsub_msg(self, msg: rp.PubSubMsg):
423425
424426 data = rp .tag2df (msg .data )
425427
426- # save the message into msg_cache
427- self .msg_cache .append (msg )
428+ if self .db is not None :
429+ # save the message into msg_cache
430+ self .append_message (msg )
428431
429432 try :
430433 if msg .topic in self .handler :
@@ -441,6 +444,16 @@ async def _pubsub_msg(self, msg: rp.PubSubMsg):
441444 traceback .print_exc ()
442445 return
443446
447+ def append_message (self , msg : rp .PubSubMsg ):
448+ """Append a message to the message cache."""
449+ topic = msg .topic
450+ rdb .msg_table (self , msg )
451+ self .msg_cache .append (msg )
452+ if msg .table in self .tables :
453+ if topic not in self .msg_topic_cache :
454+ self .msg_topic_cache [msg .table ] = []
455+ self .msg_topic_cache [msg .table ].append (msg )
456+
444457 async def send_message (self , msg ):
445458 """Send message to remote node using a twin from the pool of twins"""
446459 for t in self .id_twin .values ():
@@ -763,7 +776,7 @@ def __lt__(self, other):
763776 def db (self ):
764777 """Return the database associated with this twin."""
765778 return self .router .db
766-
779+
767780 @property
768781 def rid (self ):
769782 """Return the unique id of the rembus component."""
0 commit comments