Skip to content

Commit fa23801

Browse files
committed
update bulk size control, #5
1 parent 5bc4fa6 commit fa23801

File tree

2 files changed

+20
-8
lines changed

2 files changed

+20
-8
lines changed

src/__init__.py

Lines changed: 17 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -34,13 +34,18 @@ def encode_in_py2(s):
3434
REMOVE_INVALID_PIPE = r'tr -d "\00\01\02\03\04\05\06\07\10\13\14\16\17\20\21\22\23\24\25\26\27\30\31\32\33\34\35\36\37"'
3535

3636
DEFAULT_BULKSIZE = 100
37-
37+
DEFAULT_BINLOG_BULKSIZE = 1
3838

3939
class ElasticSync(object):
4040
table_structure = {}
4141
log_file = None
4242
log_pos = None
4343

44+
@property
45+
def is_binlog_sync(self):
46+
rv = bool(self.log_file and self.log_pos)
47+
return rv
48+
4449
def __init__(self):
4550
try:
4651
self.config = yaml.load(open(sys.argv[1]))
@@ -75,7 +80,9 @@ def __init__(self):
7580
self.log_file = record.get('log_file')
7681
self.log_pos = record.get('log_pos')
7782

78-
self.bulk_size = self.config.get('elastic').get('bulk_size')
83+
self.bulk_size = self.config.get('elastic').get('bulk_size') or DEFAULT_BULKSIZE
84+
self.binlog_bulk_size = self.config.get('elastic').get('binlog_bulk_size') or DEFAULT_BINLOG_BULKSIZE
85+
7986
self._init_logging()
8087

8188
def _init_logging(self):
@@ -105,7 +112,7 @@ def _post_to_es(self, data):
105112
else:
106113
self._save_binlog_record()
107114

108-
def _bulker(self, bulk_size=DEFAULT_BULKSIZE):
115+
def _bulker(self, bulk_size):
109116
"""
110117
Example:
111118
u = bulker()
@@ -132,7 +139,11 @@ def _updater(self, data):
132139
"""
133140
encapsulation of bulker
134141
"""
135-
u = self._bulker()
142+
if self.is_binlog_sync:
143+
u = self._bulker(bulk_size=self.binlog_bulk_size)
144+
else:
145+
u = self._bulker(bulk_size=self.bulk_size)
146+
136147
u.send(None) # push the generator to first yield
137148
for item in data:
138149
u.send(item)
@@ -320,7 +331,7 @@ def _xml_parser(self, f_obj):
320331
yield {'action': 'index', 'doc': doc}
321332

322333
def _save_binlog_record(self):
323-
if self.log_file and self.log_pos:
334+
if self.is_binlog_sync:
324335
with open(self.config['binlog_sync']['record_file'], 'w') as f:
325336
logging.info("Sync binlog_file: {file} binlog_pos: {pos}".format(
326337
file=self.log_file,
@@ -404,7 +415,7 @@ def run(self):
404415
2. sync binlog
405416
"""
406417
try:
407-
if not self.log_file and not self.log_pos:
418+
if not self.is_binlog_sync:
408419
if len(sys.argv) > 2 and sys.argv[2] == '--fromfile':
409420
self._sync_from_file()
410421
else:

src/sample.yaml

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,11 +11,12 @@ mysql:
1111
elastic:
1212
host: 127.0.0.1
1313
port: 9200
14-
bulk_size: 100
14+
bulk_size: 200 # the update bulk size when mysqldump, default is 100 if not specified
15+
binlog_bulk_size: 10 # the update bulk size when syncing binlog, default is 1 if not specified
1516
index: article
1617
type: article
1718

18-
# path to your own xml file, if you want to initialize dump from xml file
19+
# path to your own xml file, if you want to initialize dump from xml file. run with argument --fromfile in command
1920
xml_file:
2021
filename: a.xml
2122

0 commit comments

Comments
 (0)