66import subprocess
77import simplejson as json
88import logging
9+ import shlex
910from datetime import datetime
1011from lxml .etree import iterparse
1112from functools import reduce
@@ -47,8 +48,10 @@ def cleanup(*args):
4748table_structure = {}
4849
4950DUMP_CMD = 'mysqldump -h {host} -P {port} -u {user} --password={password} {db} {table} ' \
50- '--default-character-set=utf8 -X' \
51+ '--default-character-set=utf8 -X' \
5152 .format (** config ['mysql' ])
53+
54+
5255BINLOG_CFG = {key : config ['mysql' ][key ] for key in ['host' , 'port' , 'user' , 'password' , 'db' ]}
5356BULK_SIZE = config .get ('elastic' ).get ('bulk_size' )
5457
@@ -257,7 +260,6 @@ def parse_and_remove(f, path):
257260 doc = iterparse (f , ('start' , 'end' ), recover = True , encoding = 'utf-8' )
258261 # Skip the root element
259262 next (doc )
260-
261263 tag_stack = []
262264 elem_stack = []
263265 for event , elem in doc :
@@ -289,9 +291,7 @@ def xml_parser(f_obj):
289291 k = field .attrib .get ('name' )
290292 v = field .text
291293 doc [k ] = v
292- # print(rv)
293294 yield {'action' : 'index' , 'doc' : doc }
294- # print(doc)
295295
296296
297297def save_binlog_record ():
@@ -303,9 +303,10 @@ def save_binlog_record():
303303
304304def xml_dump_loader ():
305305 p = subprocess .Popen (
306- DUMP_CMD .split (),
306+ shlex .split (DUMP_CMD ),
307307 stdout = subprocess .PIPE ,
308- stderr = subprocess .DEVNULL )
308+ stderr = subprocess .DEVNULL ,
309+ close_fds = True )
309310 return p .stdout # can be used as file object. (stream io)
310311
311312
0 commit comments