数据是在本地录入的,这样要将本地的数据同步到线上就比较麻烦,所以便用python写了一个socket脚本来同步本地录入的新数据。client是用java写的,根据业务逻辑导出需要同步的数据通过socket发送给python写的server端,数据在传输过程中采用流的方式进行传输,起到了压缩的作用。以下是代码片段。
#!/usr/bin/env python
# -*- coding:utf-8 -*-
from database import *
from threading import Thread
import socket
import zlib
import json
import sys
import time
import struct
host = "192.168.1.192"
port = 23211
dbhost = '192.168.1.222:3306'
db_onebyone = 'hk_onebyone'
db_base = 'hk_base'
dbuser = 'hiker'
dbpwd = 'hiker'
'''
@name 行程单业务处理
@param Resource db
@param Array data
@return String
'''
def it_itinerary(db,data):
try:
....
respone = json.dumps({'status':'T','message':'处理完成'})
print '行程单数据处理成功,总共处理:%d条' %i
except Exception,e:
print e
#print data
import traceback
traceback.print_exc()
respone = json.dumps({'status':'F','message':'行程单数据处理出现异常。异常信息:' + str(e)})
return respone
'''
@name 商家业务处理
@param Resource db
@param Array data
@return String
'''
def hk_merchant(db,data):
try:
....
respone = json.dumps({'status':'T','message':'商家数据处理完成'})
print '商家数据处理成功,总共处理:%d条' %i
except Exception,e:
print e
#print data
import traceback
traceback.print_exc()
respone = json.dumps({'status':'F','message':'商家数据处理出现异常。异常信息:' + str(e)})
return respone
'''
@name 景区业务处理
@param Resource db
@param Array data
@return String
'''
def hk_scenic(db,data):
try:
....
respone = json.dumps({'status':'T','message':'处理完成'})
print '景区数据处理成功,总共处理:%d条' %i
except Exception,e:
print e
#print data
import traceback
traceback.print_exc()
respone = json.dumps({'status':'F','message':'景区数据处理出现异常。异常信息:' + str(e)})
return respone
def recv_basic(the_socket):
total_data=[]
while True:
try:
the_socket.settimeout(5)
data = the_socket.recv(8192)
except:
break
if (len(data) < 1):
total_data.append(data)
break
total_data.append(data)
return ''.join(total_data)
def recv_timeout(the_socket,timeout=2):
the_socket.setblocking(0)
total_data=[];data='';begin=time.time()
while 1:
#if you got some data, then break after wait sec
if total_data and time.time()-begin>timeout:
break
#if you got no data at all, wait a little longer
elif time.time()-begin>timeout*2:
break
try:
data=the_socket.recv(8192)
if data:
total_data.append(data)
begin=time.time()
else:
time.sleep(0.1)
except:
pass
return ''.join(total_data)
End='$end$'
def recv_end(the_socket):
total_data=[];data=''
while True:
data=the_socket.recv(8192)
if End in data:
total_data.append(data[:data.find(End)])
break
total_data.append(data)
if len(total_data)>1:
#check if end_of_data was split
last_pair=total_data[-2]+total_data[-1]
if End in last_pair:
total_data[-2]=last_pair[:last_pair.find(End)]
total_data.pop()
break
return ''.join(total_data)
def recv_size(the_socket):
#data length is packed into 4 bytes
total_len=0;total_data=[];size=sys.maxint
size_data=sock_data='';recv_size=8192
while total_len4:
size_data+=sock_data
size=struct.unpack('>i', size_data[:4])[0]
recv_size=size
if recv_size>524288:recv_size=524288
total_data.append(size_data[4:])
else:
size_data+=sock_data
else:
total_data.append(sock_data)
total_len=sum([len(i) for i in total_data ])
return ''.join(total_data)
#线程处理
def handlechild(clientsock):
clientInfo = str(clientsock.getpeername())
print "Got connect from %s time:%s" %(clientInfo,time.strftime('%Y-%m-%d %H:%M:%S',time.localtime(time.time())))
while True:
value = recv_end(clientsock)
if not len(value):
respone = json.dumps({'status':'F','message':'请求参数不能为空'})
clientsock.send(respone)
break
try:
value = zlib.decompress(value)
except Exception,e:
print e
respone = json.dumps({'status':'F','message':'字符流解压失败,请检查你所压缩的字符流是否正确'})
clientsock.send(respone)
break
try:
value = json.loads(value)
except:
print '解压失败:' + str(value)
respone = json.dumps({'status':'F','message':"json解压失败,数据包可能接收不完整"})
clientsock.send(respone)
break
if not isinstance(value, dict):
print '数据类型不是字典:' + str(value)
respone = json.dumps({'status':'F','message':'数据类型不正确,请检查数据类型是否是字典型'})
clientsock.send(respone)
break
if ('data' in value.keys()):
data = value['data']
else:
print '参数中无data' + str(value.keys())
respone = json.dumps({'status':'F','message':'参数不正确,请检查参数中是否有data'})
clientsock.send(respone)
break
if ('command' in value.keys()):
if isinstance(data, list):
if (value['command'] == 6102):
db=Connection(host=dbhost,database=db_onebyone,user=dbuser,password=dbpwd)
else:
db=Connection(host=dbhost,database=db_base,user=dbuser,password=dbpwd)
if (value['command'] == 6100):
respone = hk_merchant(db,data)
elif (value['command'] == 6101):
respone = hk_scenic(db,data)
elif (value['command'] == 6102):
respone = it_itinerary(db,data)
else:
print '命令不正确:' + str(value['command'])
respone = json.dumps({'status':'F','message':'无此命令'})
clientsock.send(respone)
break
db.close()
clientsock.send(respone)
break
else:
print 'data的数据类型不是list型:' + str(value)
respone = json.dumps({'status':'F','message':'data数据类型不正确,请检查是否是list型'})
clientsock.send(respone)
break
else:
print '参数中无command:' + str(value.keys())
respone = json.dumps({'status':'F','message':'参数不正确,请检查参数中是否有command命令'})
clientsock.send(respone)
break
print 'close %s connection. time:%s' %(clientInfo,time.strftime('%Y-%m-%d %H:%M:%S',time.localtime(time.time())))
clientsock.close()
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sock.bind((host, port))
sock.listen(1)
while True:
try:
clientsock, clientaddr = sock.accept()
except (KeyboardInterrupt, SystemExit):
raise
except:
continue
t = Thread(target=handlechild, args=[clientsock])
t.setDaemon(1)
t.start()