1010import logging
1111import multiprocessing
1212import os
13+ import traceback
1314import warnings
1415from concurrent import futures
1516
@@ -67,7 +68,7 @@ class InsufficientResourcesError(Exception):
6768def process_advisories (
6869 advisories ,
6970 advisory_func ,
70- progress_logger = None ,
71+ log = None ,
7172 batch_size = 1000 ,
7273):
7374 """
@@ -83,23 +84,27 @@ def process_advisories(
8384 """
8485 advisories_count = advisories .count ()
8586 logger .info (f"Process { advisories_count } advisories with { advisory_func .__name__ } " )
86- progress = LoopProgress (advisories_count , logger = progress_logger )
87+ progress = LoopProgress (advisories_count , logger = log )
8788 max_workers = get_max_workers (keep_available = 4 )
8889
89- advisory_batches = get_advisory_batches (advisories , batch_size )
90+ advisory_batches = get_advisory_batches (
91+ advisories = advisories ,
92+ batch_size = batch_size ,
93+ log = log ,
94+ )
9095
9196 if max_workers <= 0 :
9297 for advisory_ids in progress .iter (advisory_batches ):
9398 progress .log_progress ()
9499 logger .debug (f"{ advisory_func .__name__ } len={ len (advisory_ids )} " )
95- advisory_func (advisory_ids = advisory_ids , logger = progress_logger )
100+ advisory_func (advisory_ids = advisory_ids , logger = log )
96101 return
97102
98103 logger .info (f"Starting ProcessPoolExecutor with { max_workers } max_workers" )
99104
100105 with futures .ProcessPoolExecutor (max_workers ) as executor :
101106 future_to_advisories = {
102- executor .submit (advisory_func , advisory_ids , progress_logger ): advisory_ids
107+ executor .submit (advisory_func , advisory_ids , log ): advisory_ids
103108 for advisory_ids in advisory_batches
104109 }
105110
@@ -120,14 +125,22 @@ def process_advisories(
120125 raise broken_pool_error from InsufficientResourcesError (message )
121126
122127
123- def get_advisory_batches (advisories , batch_size = 1000 ):
128+ def get_advisory_batches (advisories , batch_size = 1000 , log = None ):
124129 """
125130 Yield lists of advisory ids each of upto batch size length.
126131 """
127132 paginator = Paginator (advisories , per_page = batch_size )
128133 for page_number in paginator .page_range :
129134 page = paginator .page (page_number )
130- yield [obj .id for obj in page .object_list ]
135+ advisory_ids = None
136+ try :
137+ advisory_ids = [obj .id for obj in page .object_list ]
138+ except Exception as e :
139+ if log :
140+ log (f"Error getting advisory batch { traceback .format_exc ()} " , level = logging .ERROR )
141+ log (f"While processing advisories { advisory_ids } " , level = logging .ERROR )
142+ raise
143+ yield advisory_ids
131144
132145
133146def recompute_content_ids (advisory_ids , logger ):
@@ -193,6 +206,6 @@ def recompute_content_ids(self):
193206 process_advisories (
194207 advisories = advisories ,
195208 advisory_func = recompute_content_ids ,
196- progress_logger = self .log ,
209+ log = self .log ,
197210 batch_size = 1000 ,
198211 )
0 commit comments