1- import os
2- import inspect
3- import csv
1+ import os , inspect
42
53from code42cli .compat import open , str
64from code42cli .worker import Worker
75from code42cli .logger import get_main_cli_logger
86from code42cli .args import SDK_ARG_NAME , PROFILE_ARG_NAME
7+ from code42cli .progress_bar import ProgressBar
8+
9+
10+ _logger = get_main_cli_logger ()
911
1012
1113class BulkCommandType (object ):
@@ -29,7 +31,7 @@ def generate_template(handler, path=None):
2931 ]
3032
3133 if len (args ) <= 1 :
32- get_main_cli_logger () .print_info (
34+ _logger .print_info (
3335 u"A blank file was generated because there are no csv headers needed for this command. "
3436 u"Simply enter one {} per line." .format (args [0 ])
3537 )
@@ -45,31 +47,28 @@ def _write_template_file(path, columns=None):
4547 new_file .write (u"," .join (columns ))
4648
4749
48- def run_bulk_process (file_path , row_handler , reader = None ):
50+ def run_bulk_process (row_handler , reader ):
4951 """Runs a bulk process.
5052
5153 Args:
52- file_path (str or unicode): The path to the file feeding the data for the bulk process.
5354 row_handler (callable): A callable that you define to process values from the row as
5455 either *args or **kwargs.
5556 reader: (CSVReader or FlatFileReader, optional): A generator that reads rows and yields data into
5657 `row_handler`. If None, it will use a CSVReader. Defaults to None.
5758 """
58- reader = reader or CSVReader ()
59- processor = _create_bulk_processor (file_path , row_handler , reader )
59+ processor = _create_bulk_processor (row_handler , reader )
6060 processor .run ()
6161
6262
63- def _create_bulk_processor (file_path , row_handler , reader ):
63+ def _create_bulk_processor (row_handler , reader ):
6464 """A factory method to create the bulk processor, useful for testing purposes."""
65- return BulkProcessor (file_path , row_handler , reader )
65+ return BulkProcessor (row_handler , reader )
6666
6767
6868class BulkProcessor (object ):
6969 """A class for bulk processing a file.
7070
7171 Args:
72- file_path (str or unicode): The path to the file for processing.
7372 row_handler (callable): A callable that you define to process values from the row as
7473 either *args or **kwargs. For example, if it's a csv file with header `prop_a,prop_b`
7574 and first row `1,test`, then `row_handler` should receive kwargs
@@ -78,19 +77,22 @@ class BulkProcessor(object):
7877 reader (CSVReader or FlatFileReader): A generator that reads rows and yields data into `row_handler`.
7978 """
8079
81- def __init__ (self , file_path , row_handler , reader ):
82- self .file_path = file_path
80+ def __init__ (self , row_handler , reader , worker = None , progress_bar = None ):
81+ total = reader .get_rows_count ()
82+ self .file_path = reader .file_path
8383 self ._row_handler = row_handler
8484 self ._reader = reader
85- self .__worker = Worker (5 )
85+ self .__worker = worker or Worker (5 , total )
86+ self ._stats = self .__worker .stats
87+ self ._progress_bar = progress_bar or ProgressBar (total )
8688
8789 def run (self ):
8890 """Processes the csv file specified in the ctor, calling `self.row_handler` on each row."""
8991 with open (self .file_path , newline = u"" , encoding = u"utf8" ) as bulk_file :
9092 for row in self ._reader (bulk_file = bulk_file ):
9193 self ._process_row (row )
9294 self .__worker .wait ()
93- self ._print_result ()
95+ self ._print_results ()
9496
9597 def _process_row (self , row ):
9698 if isinstance (row , dict ):
@@ -104,35 +106,20 @@ def _process_csv_row(self, row):
104106 row .pop (None , None )
105107 row_values = {key : val if val != u"" else None for key , val in row .items ()}
106108 self .__worker .do_async (
107- lambda * args , ** kwargs : self ._row_handler (* args , ** kwargs ), ** row_values
109+ lambda * args , ** kwargs : self ._handle_row (* args , ** kwargs ), ** row_values
108110 )
109111
110112 def _process_flat_file_row (self , row ):
111113 if row :
112- self .__worker .do_async (lambda * args , ** kwargs : self ._row_handler (* args , ** kwargs ), row )
113-
114- def _print_result (self ):
115- stats = self .__worker .stats
116- successes = stats .total - stats .total_errors
117- logger = get_main_cli_logger ()
118- logger .print_and_log_info (
119- u"{} processed successfully out of {}." .format (successes , stats .total )
120- )
121- if stats .total_errors :
122- logger .print_errors_occurred_message ()
123-
114+ self .__worker .do_async (lambda * args , ** kwargs : self ._handle_row (* args , ** kwargs ), row )
124115
125- class CSVReader (object ):
126- """A generator that yields header keys mapped to row values from a csv file."""
116+ def _handle_row (self , * args , ** kwargs ):
117+ message = str (self ._stats )
118+ self ._progress_bar .update (self ._stats .total_processed , message )
119+ self ._row_handler (* args , ** kwargs )
127120
128- def __call__ (self , * args , ** kwargs ):
129- for row in csv .DictReader (kwargs .get (u"bulk_file" )):
130- yield row
131-
132-
133- class FlatFileReader (object ):
134- """A generator that yields a single-value per row from a file."""
135-
136- def __call__ (self , * args , ** kwargs ):
137- for row in kwargs [u"bulk_file" ]:
138- yield row
121+ def _print_results (self ):
122+ self ._progress_bar .clear_bar_and_print_final (str (self ._stats ))
123+ if self ._stats .total_errors :
124+ logger = get_main_cli_logger ()
125+ logger .print_errors_occurred_message ()
0 commit comments