Skip to content

Commit 4282933

Browse files
committed
more fix
1 parent 612108b commit 4282933

File tree

2 files changed

+69
-70
lines changed

2 files changed

+69
-70
lines changed

setup.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
],
1919
entry_points={
2020
'console_scripts': [
21-
'es-sync=:run',
21+
'es-sync=:start',
2222
]
2323
},
2424
include_package_data=True

src/__init__.py

Lines changed: 68 additions & 69 deletions
Original file line numberDiff line numberDiff line change
@@ -16,14 +16,16 @@
1616
__version__ = '0.2.0'
1717

1818

19-
# For removing invalid characters in xml stream.
19+
# The magic spell for removing invalid characters in xml stream.
2020
REMOVE_INVALID_PIPE = r'tr -d "\00\01\02\03\04\05\06\07\10\13\14\16\17\20\21\22\23\24\25\26\27\30\31\32\33\34\35\36\37"'
2121

2222
DEFAULT_BULKSIZE = 100
2323

2424

2525
class ElasticSync(object):
26-
26+
table_structure = {}
27+
log_file = None
28+
log_pos = None
2729
def __init__(self):
2830
try:
2931
self.config = yaml.load(open(sys.argv[1]))
@@ -41,19 +43,14 @@ def __init__(self):
4143
port=self.config['elastic']['port'],
4244
index=self.config['elastic']['index'],
4345
type=self.config['elastic']['type']
44-
)
45-
# todo: supporting multi-index
46+
) # todo: supporting multi-index
47+
4648
self.mapping = self.config.get('mapping')
4749
if self.mapping.get('_id'):
4850
self.id_key = self.mapping.pop('_id')
4951
else:
5052
self.id_key = None
5153

52-
self.table_structure = {}
53-
54-
self.log_file = None
55-
self.log_pos = None
56-
5754
record_path = self.config['binlog_sync']['record_file']
5855
if os.path.isfile(record_path):
5956
with open(record_path, 'r') as f:
@@ -79,7 +76,7 @@ def cleanup(*args):
7976
signal.signal(signal.SIGINT, cleanup)
8077
signal.signal(signal.SIGTERM, cleanup)
8178

82-
def post_to_es(self, data):
79+
def _post_to_es(self, data):
8380
"""
8481
send post requests to es restful api
8582
"""
@@ -89,9 +86,9 @@ def post_to_es(self, data):
8986
if list(item.values())[0].get('error'):
9087
logging.error(item)
9188
else:
92-
self.save_binlog_record()
89+
self._save_binlog_record()
9390

94-
def bulker(self, bulk_size=DEFAULT_BULKSIZE):
91+
def _bulker(self, bulk_size=DEFAULT_BULKSIZE):
9592
"""
9693
Example:
9794
u = bulker()
@@ -112,27 +109,27 @@ def bulker(self, bulk_size=DEFAULT_BULKSIZE):
112109
# print(data)
113110
print('-'*10)
114111
if data:
115-
self.post_to_es(data)
112+
self._post_to_es(data)
116113

117-
def updater(self, data):
114+
def _updater(self, data):
118115
"""
119116
encapsulation of bulker
120117
"""
121-
u = self.bulker()
118+
u = self._bulker()
122119
u.send(None) # push the generator to first yield
123120
for item in data:
124121
u.send(item)
125122
u.send(None) # tell the generator it's the end
126123

127-
def json_serializer(self, obj):
124+
def _json_serializer(self, obj):
128125
"""
129126
format the object which json not supported
130127
"""
131128
if isinstance(obj, datetime):
132129
return obj.isoformat()
133130
raise TypeError('Type not serializable')
134131

135-
def processor(self, data):
132+
def _processor(self, data):
136133
"""
137134
The action must be one of the following:
138135
create
@@ -151,22 +148,22 @@ def processor(self, data):
151148
action_content = {}
152149
meta = json.dumps({item['action']: action_content})
153150
if item['action'] == 'index':
154-
body = json.dumps(item['doc'], default=self.json_serializer)
151+
body = json.dumps(item['doc'], default=self._json_serializer)
155152
rv = meta + '\n' + body
156153
elif item['action'] == 'update':
157-
body = json.dumps({'doc': item['doc']}, default=self.json_serializer)
154+
body = json.dumps({'doc': item['doc']}, default=self._json_serializer)
158155
rv = meta + '\n' + body
159156
elif item['action'] == 'delete':
160157
rv = meta + '\n'
161158
elif item['action'] == 'create':
162-
body = json.dumps(item['doc'], default=self.json_serializer)
159+
body = json.dumps(item['doc'], default=self._json_serializer)
163160
rv = meta + '\n' + body
164161
else:
165162
logging.error('unknown action type in doc')
166163
raise TypeError('unknown action type in doc')
167164
yield rv
168165

169-
def mapper(self, data):
166+
def _mapper(self, data):
170167
"""
171168
mapping old key to new key
172169
"""
@@ -177,12 +174,12 @@ def mapper(self, data):
177174
# print(doc)
178175
yield item
179176

180-
def formatter(self, data):
177+
def _formatter(self, data):
181178
"""
182179
format every field from xml, according to parsed table structure
183180
"""
184181
for item in data:
185-
for field, serializer in table_structure.items():
182+
for field, serializer in self.table_structure.items():
186183
if item['doc'][field]:
187184
try:
188185
item['doc'][field] = serializer(item['doc'][field])
@@ -193,15 +190,13 @@ def formatter(self, data):
193190
# print(item)
194191
yield item
195192

196-
def binlog_loader(self):
193+
def _binlog_loader(self):
197194
"""
198195
read row from binlog
199196
"""
200-
global log_file
201-
global log_pos
202-
if log_file and log_pos:
197+
if self.log_file and self.log_pos:
203198
resume_stream = True
204-
logging.info("Resume from binlog_file: {} binlog_pos: {}".format(log_file, log_pos))
199+
logging.info("Resume from binlog_file: {} binlog_pos: {}".format(self.log_file, self.log_pos))
205200
else:
206201
resume_stream = False
207202

@@ -211,11 +206,11 @@ def binlog_loader(self):
211206
only_tables=[self.config['mysql']['table']],
212207
resume_stream=resume_stream,
213208
blocking=True,
214-
log_file=log_file,
215-
log_pos=log_pos)
209+
log_file=self.log_file,
210+
log_pos=self.log_pos)
216211
for binlogevent in stream:
217-
log_file = stream.log_file
218-
log_pos = stream.log_pos
212+
self.log_file = stream.log_file
213+
self.log_pos = stream.log_pos
219214
for row in binlogevent.rows:
220215
if isinstance(binlogevent, DeleteRowsEvent):
221216
rv = {
@@ -239,11 +234,10 @@ def binlog_loader(self):
239234
# print(rv)
240235
stream.close()
241236

242-
def parse_table_structure(self, data):
237+
def _parse_table_structure(self, data):
243238
"""
244239
parse the table structure
245240
"""
246-
global table_structure
247241
for item in data.iter():
248242
if item.tag == 'field':
249243
field = item.attrib.get('Field')
@@ -263,9 +257,9 @@ def parse_table_structure(self, data):
263257
serializer = str
264258
else:
265259
serializer = str
266-
table_structure[field] = serializer
260+
self.table_structure[field] = serializer
267261

268-
def parse_and_remove(self, f, path):
262+
def _parse_and_remove(self, f, path):
269263
"""
270264
snippet from python cookbook, for parsing large xml file
271265
"""
@@ -284,33 +278,33 @@ def parse_and_remove(self, f, path):
284278
yield elem
285279
elem_stack[-2].remove(elem)
286280
if tag_stack == ['database', 'table_structure']: # dirty hack for getting the tables structure
287-
self.parse_table_structure(elem)
281+
self._parse_table_structure(elem)
288282
elem_stack[-2].remove(elem)
289283
try:
290284
tag_stack.pop()
291285
elem_stack.pop()
292286
except IndexError:
293287
pass
294288

295-
def xml_parser(self, f_obj):
289+
def _xml_parser(self, f_obj):
296290
"""
297291
parse mysqldump XML streaming, convert every item to dict object. 'database/table_data/row'
298292
"""
299-
for row in self.parse_and_remove(f_obj, 'database/table_data/row'):
293+
for row in self._parse_and_remove(f_obj, 'database/table_data/row'):
300294
doc = {}
301295
for field in row.iter(tag='field'):
302296
k = field.attrib.get('name')
303297
v = field.text
304298
doc[k] = v
305299
yield {'action': 'index', 'doc': doc}
306300

307-
def save_binlog_record(self):
308-
if log_file and log_pos:
301+
def _save_binlog_record(self):
302+
if self.log_file and self.log_pos:
309303
with open(self.config['binlog_sync']['record_file'], 'w') as f:
310-
logging.info("Sync binlog_file: {} binlog_pos: {}".format(log_file, log_pos))
311-
yaml.dump({"log_file": log_file, "log_pos": log_pos}, f)
304+
logging.info("Sync binlog_file: {} binlog_pos: {}".format(self.log_file, self.log_pos))
305+
yaml.dump({"log_file": self.log_file, "log_pos": self.log_pos}, f)
312306

313-
def xml_dump_loader(self):
307+
def _xml_dump_loader(self):
314308
mysqldump = subprocess.Popen(
315309
shlex.split(self.dump_cmd),
316310
stdout=subprocess.PIPE,
@@ -326,11 +320,11 @@ def xml_dump_loader(self):
326320

327321
return remove_invalid_pipe.stdout
328322

329-
def xml_file_loader(self, filename):
323+
def _xml_file_loader(self, filename):
330324
f = open(filename, 'rb') # bytes required
331325
return f
332326

333-
def send_email(self, title, content):
327+
def _send_email(self, title, content):
334328
"""
335329
send notification email
336330
"""
@@ -353,30 +347,31 @@ def send_email(self, title, content):
353347
s.send_message(msg)
354348
s.quit()
355349

356-
def sync_from_stream(self):
350+
def _sync_from_stream(self):
357351
logging.info("Start to dump from stream")
358-
docs = reduce(lambda x, y: y(x), [self.xml_parser,
359-
self.formatter,
360-
self.mapper,
361-
self.processor],
362-
self.xml_dump_loader())
363-
self.updater(docs)
352+
docs = reduce(lambda x, y: y(x), [self._xml_parser,
353+
self._formatter,
354+
self._mapper,
355+
self._processor],
356+
self._xml_dump_loader())
357+
self._updater(docs)
364358
logging.info("Dump success")
365359

366-
def sync_from_file(self):
360+
def _sync_from_file(self):
367361
logging.info("Start to dump from xml file")
368-
docs = reduce(lambda x, y: y(x), [self.xml_parser,
369-
self.formatter,
370-
self.mapper,
371-
self.processor],
372-
self.xml_file_loader(self.config['xml_file']['filename']))
373-
self.updater(docs)
362+
logging.info("Filename: {}".format(self.config['xml_file']['filename']))
363+
docs = reduce(lambda x, y: y(x), [self._xml_parser,
364+
self._formatter,
365+
self._mapper,
366+
self._processor],
367+
self._xml_file_loader(self.config['xml_file']['filename']))
368+
self._updater(docs)
374369
logging.info("Dump success")
375370

376-
def sync_from_binlog(self):
371+
def _sync_from_binlog(self):
377372
logging.info("Start to sync binlog")
378-
docs = reduce(lambda x, y: y(x), [self.mapper, self.processor], self.binlog_loader())
379-
self.updater(docs)
373+
docs = reduce(lambda x, y: y(x), [self._mapper, self._processor], self._binlog_loader())
374+
self._updater(docs)
380375

381376
def run(self):
382377
"""
@@ -385,18 +380,22 @@ def run(self):
385380
2. sync binlog
386381
"""
387382
try:
388-
if not log_file and not log_pos:
383+
if not self.log_file and not self.log_pos:
389384
if len(sys.argv) > 2 and sys.argv[2] == '--fromfile':
390-
self.sync_from_file()
385+
self._sync_from_file()
391386
else:
392-
self.sync_from_stream()
393-
self.sync_from_binlog()
387+
self._sync_from_stream()
388+
self._sync_from_binlog()
394389
except Exception:
395390
import traceback
396391
logging.error(traceback.format_exc())
397-
self.send_email('es sync error', traceback.format_exc())
392+
self._send_email('es sync error', traceback.format_exc())
398393
raise
399394

400395

396+
def start():
397+
instance = ElasticSync()
398+
instance.run()
399+
401400
if __name__ == '__main__':
402-
pass
401+
start()

0 commit comments

Comments
 (0)