55 * Make this more robust across multiple machines. If you started two instances
66 talking to the same database backend things could go really badly.
77"""
8+ from collections import defaultdict
89import gevent
10+ from gevent .coros import BoundedSemaphore
911from sqlalchemy import asc
1012
1113from inbox .util .concurrency import retry_with_logging
4345}
4446
4547
48+ CONCURRENCY_LIMIT = 3
49+
50+
4651class SyncbackService (gevent .Greenlet ):
4752 """Asynchronously consumes the action log and executes syncback actions."""
4853
49- def __init__ (self , poll_interval = 1 , chunk_size = 100 , max_pool_size = 22 ):
54+ def __init__ (self , poll_interval = 1 , chunk_size = 100 ):
55+ semaphore_factory = lambda : BoundedSemaphore (CONCURRENCY_LIMIT )
56+ self .semaphore_map = defaultdict (semaphore_factory )
5057 self .keep_running = True
5158 self .running = False
5259 self .log = logger .new (component = 'syncback' )
53- self .worker_pool = gevent .pool .Pool (max_pool_size )
5460 self .poll_interval = poll_interval
5561 self .chunk_size = chunk_size
5662 self ._scheduled_actions = set ()
@@ -72,15 +78,15 @@ def _process_log(self):
7278 namespace = db_session .query (Namespace ). \
7379 get (log_entry .namespace_id )
7480 self ._scheduled_actions .add (log_entry .id )
75- worker = SyncbackWorker (action_function , log_entry .id ,
76- log_entry .record_id ,
77- namespace .account_id ,
78- syncback_service = self ,
79- extra_args = log_entry .extra_args )
8081 self .log .info ('delegating action' ,
8182 action_id = log_entry .id ,
8283 msg = log_entry .action )
83- self .worker_pool .start (worker )
84+ semaphore = self .semaphore_map [(namespace .account_id ,
85+ log_entry .action )]
86+ gevent .spawn (syncback_worker , semaphore , action_function ,
87+ log_entry .id , log_entry .record_id ,
88+ namespace .account_id , syncback_service = self ,
89+ extra_args = log_entry .extra_args )
8490
8591 def remove_from_schedule (self , log_entry_id ):
8692 self ._scheduled_actions .discard (log_entry_id )
@@ -117,46 +123,32 @@ def stop(self):
117123 gevent .sleep ()
118124
119125
120- class SyncbackWorker (gevent .Greenlet ):
121- """A greenlet spawned to execute a single syncback action."""
122- def __init__ (self , func , action_log_id , record_id , account_id ,
123- syncback_service , retry_interval = 30 , extra_args = None ):
124- self .func = func
125- self .action_log_id = action_log_id
126- self .record_id = record_id
127- self .account_id = account_id
128- self .syncback_service = syncback_service
129- self .retry_interval = retry_interval
130- self .extra_args = extra_args
131-
132- self .log = logger .new (record_id = record_id , action_log_id = action_log_id ,
133- action = self .func , account_id = self .account_id ,
134- extra_args = extra_args )
135- gevent .Greenlet .__init__ (self )
136-
137- def _run (self ):
138- # Not ignoring soft-deleted objects here because if you, say, delete a
139- # draft, we still need to access the object to delete it on the remote.
140- with session_scope (ignore_soft_deletes = False ) as db_session :
126+ def syncback_worker (semaphore , func , action_log_id , record_id , account_id ,
127+ syncback_service , retry_interval = 30 , extra_args = None ):
128+ with semaphore :
129+ log = logger .new (record_id = record_id , action_log_id = action_log_id ,
130+ action = func , account_id = account_id ,
131+ extra_args = extra_args )
132+ # Not ignoring soft-deleted objects here because if you, say,
133+ # delete a draft, we still need to access the object to delete it
134+ # on the remote.
141135 try :
142- if self .extra_args :
143- self .func (self .account_id , self .record_id , db_session ,
144- self .extra_args )
145- else :
146- self .func (self .account_id , self .record_id , db_session )
136+ with session_scope (ignore_soft_deletes = False ) as db_session :
137+ if extra_args :
138+ func (account_id , record_id , db_session , extra_args )
139+ else :
140+ func (account_id , record_id , db_session )
141+ action_log_entry = db_session .query (ActionLog ).get (
142+ action_log_id )
143+ action_log_entry .executed = True
144+ db_session .commit ()
145+ log .info ('syncback action completed' ,
146+ action_id = action_log_id )
147+ syncback_service .remove_from_schedule (action_log_id )
147148 except Exception :
148- log_uncaught_errors (self . log )
149+ log_uncaught_errors (log )
149150 # Wait for a bit, then remove the log id from the scheduled set
150151 # so that it can be retried.
151- gevent .sleep (self . retry_interval )
152- self . syncback_service .remove_from_schedule (self . action_log_id )
152+ gevent .sleep (retry_interval )
153+ syncback_service .remove_from_schedule (action_log_id )
153154 raise
154- else :
155- action_log_entry = db_session .query (ActionLog ).get (
156- self .action_log_id )
157- action_log_entry .executed = True
158- db_session .commit ()
159-
160- self .log .info ('syncback action completed' ,
161- action_id = self .action_log_id )
162- self .syncback_service .remove_from_schedule (self .action_log_id )
0 commit comments