Skip to content

Commit 08bbd83

Browse files
committed
init project
1 parent 4cdccd3 commit 08bbd83

File tree

4 files changed

+427
-0
lines changed

4 files changed

+427
-0
lines changed

main.py

Lines changed: 364 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,364 @@
1+
import os.path
2+
import sys
3+
import yaml
4+
import signal
5+
import requests
6+
import subprocess
7+
import simplejson as json
8+
import logging
9+
from datetime import datetime
10+
from lxml.etree import iterparse
11+
from functools import reduce
12+
from pymysqlreplication import BinLogStreamReader
13+
from pymysqlreplication.row_event import DeleteRowsEvent, UpdateRowsEvent, WriteRowsEvent
14+
15+
16+
def cleanup(*args):
17+
logger.info('Received stop signal')
18+
logger.info('Shutdown')
19+
sys.exit(0)
20+
21+
signal.signal(signal.SIGINT, cleanup)
22+
signal.signal(signal.SIGTERM, cleanup)
23+
24+
try:
25+
config = yaml.load(open(sys.argv[1]))
26+
except IndexError:
27+
print('Error: not specify config file')
28+
exit(1)
29+
30+
logging.basicConfig(filename=config['logging']['file'],
31+
level=logging.INFO,
32+
format='[%(levelname)s] %(asctime)s %(message)s')
33+
logger = logging.getLogger(__name__)
34+
logging.getLogger("requests").setLevel(logging.WARNING) # disable requests info logging
35+
36+
endpoint = 'http://{host}:{port}/{index}/{type}/_bulk'.format(host=config['elastic']['host'],
37+
port=config['elastic']['port'],
38+
index=config['elastic']['index'],
39+
type=config['elastic']['type'])
40+
# todo: supporting multi-index
41+
MAPPING = config.get('mapping')
42+
if MAPPING.get('_id'):
43+
id_key = MAPPING.pop('_id')
44+
else:
45+
id_key = None
46+
47+
table_structure = {}
48+
49+
DUMP_CMD = 'mysqldump -h {host} -P {port} -u {user} --password={password} {db} {table} --default-character-set=utf8' \
50+
' -X'\
51+
.format(**config['mysql'])
52+
BINLOG_CFG = {key: config['mysql'][key] for key in ['host', 'port', 'user', 'password', 'db']}
53+
BULK_SIZE = config.get('elastic').get('bulk_size')
54+
55+
log_file = None
56+
log_pos = None
57+
58+
59+
record_path = config['binlog_sync']['record_file']
60+
if os.path.isfile(record_path):
61+
with open(record_path, 'r') as f:
62+
record = yaml.load(f)
63+
log_file = record.get('log_file')
64+
log_pos = record.get('log_pos')
65+
66+
67+
def post_to_es(data):
68+
"""
69+
send post requests to es restful api
70+
"""
71+
resp = requests.post(endpoint, data=data)
72+
if resp.json().get('errors'): # a boolean to figure error occurs
73+
for item in resp.json()['items']:
74+
if list(item.values())[0].get('error'):
75+
logging.error(item)
76+
else:
77+
save_binlog_record()
78+
79+
80+
def bulker(bulk_size=BULK_SIZE):
81+
"""
82+
Example:
83+
u = bulker()
84+
u.send(None) #for generator initialize
85+
u.send(json_str) # input json item
86+
u.send(another_json_str) # input json item
87+
...
88+
u.send(None) force finish bulk and post
89+
"""
90+
while True:
91+
data = ""
92+
for i in range(bulk_size):
93+
item = yield
94+
if item:
95+
data = data + item + "\n"
96+
else:
97+
break
98+
# print(data)
99+
print('-'*10)
100+
if data:
101+
post_to_es(data)
102+
103+
104+
def updater(data):
105+
u = bulker()
106+
u.send(None) # push the generator to first yield
107+
for item in data:
108+
u.send(item)
109+
u.send(None) # tell the generator it's the end
110+
111+
112+
def json_serializer(obj):
113+
if isinstance(obj, datetime):
114+
return obj.isoformat()
115+
raise TypeError('Type not serializable')
116+
117+
118+
def processor(data):
119+
"""
120+
The action must be one of the following:
121+
create
122+
Create a document only if the document does not already exist.
123+
index
124+
Create a new document or replace an existing document.
125+
update
126+
Do a partial update on a document.
127+
delete
128+
Delete a document.
129+
"""
130+
for item in data:
131+
if id_key:
132+
action_content = {'_id': item['doc'][id_key]}
133+
else:
134+
action_content = {}
135+
meta = json.dumps({item['action']: action_content})
136+
if item['action'] == 'index':
137+
body = json.dumps(item['doc'], default=json_serializer)
138+
rv = meta + '\n' + body
139+
elif item['action'] == 'update':
140+
body = json.dumps({'doc': item['doc']}, default=json_serializer)
141+
rv = meta + '\n' + body
142+
elif item['action'] == 'delete':
143+
rv = json.dumps(item['doc'], default=json_serializer) + '\n'
144+
elif item['action'] == 'create':
145+
body = json.dumps(item['doc'], default=json_serializer)
146+
rv = meta + '\n' + body
147+
else:
148+
logging.error('unknown action type')
149+
raise TypeError('unknown action type')
150+
yield rv
151+
152+
153+
def mapper(data):
154+
"""
155+
mapping old key to new key
156+
"""
157+
for item in data:
158+
for k, v in MAPPING.items():
159+
item['doc'][k] = item['doc'][v]
160+
del item['doc'][v]
161+
# print(doc)
162+
yield item
163+
164+
165+
def formatter(data):
166+
for item in data:
167+
for field, serializer in table_structure.items():
168+
if item['doc'][field]:
169+
item['doc'][field] = serializer(item['doc'][field])
170+
# print(item)
171+
yield item
172+
173+
174+
def binlog_loader():
175+
global log_file
176+
global log_pos
177+
if log_file and log_pos:
178+
resume_stream = True
179+
logging.info("Resume from binlog_file: {} binlog_pos: {}".format(log_file, log_pos))
180+
else:
181+
resume_stream = False
182+
183+
stream = BinLogStreamReader(connection_settings=BINLOG_CFG, server_id=config['mysql']['server_id'],
184+
only_events=[DeleteRowsEvent, WriteRowsEvent, UpdateRowsEvent],
185+
only_tables=[config['mysql']['table']],
186+
resume_stream=resume_stream,
187+
blocking=True,
188+
log_file=log_file,
189+
log_pos=log_pos)
190+
for binlogevent in stream:
191+
log_file = stream.log_file
192+
log_pos = stream.log_pos
193+
for row in binlogevent.rows:
194+
if isinstance(binlogevent, DeleteRowsEvent):
195+
rv = {
196+
'action': 'delete',
197+
'doc': row['values']
198+
}
199+
elif isinstance(binlogevent, UpdateRowsEvent):
200+
rv = {
201+
'action': 'update',
202+
'doc': row['after_values']
203+
}
204+
elif isinstance(binlogevent, WriteRowsEvent):
205+
rv = {
206+
'action': 'index',
207+
'doc': row['values']
208+
}
209+
yield rv
210+
# print(rv)
211+
stream.close()
212+
213+
214+
def parse_table_structure(data):
215+
global table_structure
216+
for item in data.iter():
217+
if item.tag == 'field':
218+
field = item.attrib.get('Field')
219+
type = item.attrib.get('Type')
220+
if 'int' in type:
221+
serializer = int
222+
elif 'float' in type:
223+
serializer = float
224+
elif 'datetime' in type:
225+
if '(6)' in type:
226+
serializer = lambda x: datetime.strptime(x, '%Y-%m-%d %H:%M:%S.%f')
227+
else:
228+
serializer = lambda x: datetime.strptime(x, '%Y-%m-%d %H:%M:%S')
229+
elif 'char' in type:
230+
serializer = str
231+
elif 'text' in type:
232+
serializer = str
233+
else:
234+
serializer = str
235+
table_structure[field] = serializer
236+
237+
238+
def parse_and_remove(f, path):
239+
"""
240+
snippet from python cookbook, for parsing large xml file
241+
"""
242+
path_parts = path.split('/')
243+
doc = iterparse(f, ('start', 'end'), recover=True, encoding='utf-8')
244+
# Skip the root element
245+
next(doc)
246+
247+
tag_stack = []
248+
elem_stack = []
249+
for event, elem in doc:
250+
if event == 'start':
251+
tag_stack.append(elem.tag)
252+
elem_stack.append(elem)
253+
elif event == 'end':
254+
if tag_stack == path_parts:
255+
yield elem
256+
elem_stack[-2].remove(elem)
257+
if tag_stack == ['database', 'table_structure']: # dirty hack for getting the tables structure
258+
parse_table_structure(elem)
259+
elem_stack[-2].remove(elem)
260+
try:
261+
tag_stack.pop()
262+
elem_stack.pop()
263+
except IndexError:
264+
pass
265+
266+
267+
def xml_parser(f_obj):
268+
"""
269+
parse mysqldump XML streaming, convert every item to dict object. 'database/table_data/row'
270+
"""
271+
272+
for row in parse_and_remove(f_obj, 'database/table_data/row'):
273+
doc = {}
274+
for field in row.iter():
275+
k = field.attrib.get('name')
276+
v = field.text
277+
doc[k] = v
278+
# print(rv)
279+
yield {'action': 'index', 'doc': doc}
280+
# print(doc)
281+
282+
283+
def save_binlog_record():
284+
if log_file and log_pos:
285+
with open(config['binlog_sync']['record_file'], 'w') as f:
286+
logging.info("Sync binlog_file: {} binlog_pos: {}".format(log_file, log_pos))
287+
yaml.dump({"log_file": log_file, "log_pos": log_pos}, f)
288+
289+
290+
def xml_dump_loader():
291+
p = subprocess.Popen(
292+
DUMP_CMD.split(),
293+
stdout=subprocess.PIPE,
294+
stderr=subprocess.DEVNULL)
295+
return p.stdout # can be used as file object. (stream io)
296+
297+
298+
def xml_file_loader(filename):
299+
f = open(filename, 'rb') # bytes required
300+
return f
301+
302+
303+
def send_email(title, content):
304+
if not config.get('email'):
305+
return
306+
307+
import smtplib
308+
from email.mime.text import MIMEText
309+
310+
msg = MIMEText(content)
311+
msg['Subject'] = title
312+
msg['From'] = config['email']['from']['username']
313+
msg['To'] = ', '.join(config['email']['to'])
314+
315+
# Send the message via our own SMTP server.
316+
s = smtplib.SMTP()
317+
s.connect(config['email']['from']['host'])
318+
s.login(config['email']['from']['username'], config['email']['from']['password'])
319+
s.send_message(msg)
320+
s.quit()
321+
322+
323+
def sync_from_stream():
324+
logging.info("Start to dump from stream")
325+
docs = reduce(lambda x, y: y(x), [xml_parser, formatter, mapper, processor], xml_dump_loader())
326+
updater(docs)
327+
logging.info("Dump success")
328+
329+
330+
def sync_from_file():
331+
logging.info("Start to dump from stream")
332+
docs = reduce(lambda x, y: y(x), [xml_parser, formatter, mapper, processor],
333+
xml_file_loader(config['xml_file']['filename']))
334+
updater(docs)
335+
logging.info("Dump success")
336+
337+
338+
def sync_from_binlog():
339+
logging.info("Start to sync binlog")
340+
docs = reduce(lambda x, y: y(x), [mapper, processor], binlog_loader())
341+
updater(docs)
342+
343+
344+
def main():
345+
"""
346+
workflow:
347+
1. sync dump data
348+
2. sync binlog
349+
"""
350+
try:
351+
if not log_file and not log_pos:
352+
if len(sys.argv) > 2 and sys.argv[2] == 'fromfile':
353+
sync_from_file()
354+
else:
355+
sync_from_stream()
356+
sync_from_binlog()
357+
except Exception:
358+
import traceback
359+
logging.error(traceback.format_exc())
360+
send_email('es sync error', traceback.format_exc())
361+
raise
362+
363+
if __name__ == '__main__':
364+
main()

requirements.txt

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
mysql-replication
2+
requests
3+
simplejson
4+
pyyaml
5+
elasticsearch==1.9.0
6+
elasticsearch_dsl
7+
lxml==3.4.4
8+
#yum install libxslt-devel libxml2-devel

0 commit comments

Comments
 (0)