数据是在本地录入的,这样要将本地的数据同步到线上就比较麻烦,所以便用python写了一个socket脚本来同步本地录入的新数据。client是用java写的,根据业务逻辑导出需要同步的数据通过socket发送给python写的server端,数据在传输过程中采用流的方式进行传输,起到了压缩的作用。以下是代码片段。

#!/usr/bin/env python
# -*- coding:utf-8 -*-
 
from database import *
from threading import Thread
import socket
import zlib
import json
import sys
import time
import struct
 
host = "192.168.1.192"
port = 23211
 
dbhost = '192.168.1.222:3306'
db_onebyone = 'hk_onebyone'
db_base = 'hk_base'
dbuser = 'hiker'
dbpwd = 'hiker'
 
'''
	@name 行程单业务处理
	@param Resource db
	@param Array data
	@return String
'''
def it_itinerary(db,data):
	try:
		....
		respone = json.dumps({'status':'T','message':'处理完成'})
		print '行程单数据处理成功,总共处理:%d条' %i
	except Exception,e:
		print e
		#print data
		import traceback
		traceback.print_exc()
		respone = json.dumps({'status':'F','message':'行程单数据处理出现异常。异常信息:' + str(e)})
 
	return respone
 
'''
	@name 商家业务处理
	@param Resource db
	@param Array data
	@return String
'''
def hk_merchant(db,data):
	try:
		....
		respone = json.dumps({'status':'T','message':'商家数据处理完成'})
		print '商家数据处理成功,总共处理:%d条' %i
	except Exception,e:
		print e
		#print data
		import traceback
		traceback.print_exc()		
		respone = json.dumps({'status':'F','message':'商家数据处理出现异常。异常信息:' + str(e)})
 
	return respone
 
'''
	@name 景区业务处理
	@param Resource db
	@param Array data
	@return String
'''	
def hk_scenic(db,data):
	try:
		....
		respone = json.dumps({'status':'T','message':'处理完成'})
		print '景区数据处理成功,总共处理:%d条' %i
	except Exception,e:
		print e
		#print data
		import traceback
		traceback.print_exc()		
		respone = json.dumps({'status':'F','message':'景区数据处理出现异常。异常信息:' + str(e)})
 
	return respone		
 
 
def recv_basic(the_socket):
	total_data=[]
	while True:
		try:
			the_socket.settimeout(5)
			data = the_socket.recv(8192)
		except:
			break
 
		if (len(data) < 1):
			total_data.append(data)
			break
		total_data.append(data)
	return ''.join(total_data)
 
def recv_timeout(the_socket,timeout=2):
    the_socket.setblocking(0)
    total_data=[];data='';begin=time.time()
    while 1:
        #if you got some data, then break after wait sec
        if total_data and time.time()-begin>timeout:
            break
        #if you got no data at all, wait a little longer
        elif time.time()-begin>timeout*2:
            break
        try:
            data=the_socket.recv(8192)
            if data:
                total_data.append(data)
                begin=time.time()
            else:
                time.sleep(0.1)
        except:
            pass
    return ''.join(total_data)
 
End='$end$'
def recv_end(the_socket):
	total_data=[];data=''
	while True:
		data=the_socket.recv(8192)
		if End in data:
			total_data.append(data[:data.find(End)])
			break
		total_data.append(data)
		if len(total_data)>1:
			#check if end_of_data was split
			last_pair=total_data[-2]+total_data[-1]
			if End in last_pair:
				total_data[-2]=last_pair[:last_pair.find(End)]
				total_data.pop()
				break
	return ''.join(total_data)
 
def recv_size(the_socket):
    #data length is packed into 4 bytes
    total_len=0;total_data=[];size=sys.maxint
    size_data=sock_data='';recv_size=8192
    while total_len4:
                size_data+=sock_data
                size=struct.unpack('>i', size_data[:4])[0]
                recv_size=size
                if recv_size>524288:recv_size=524288
                total_data.append(size_data[4:])
            else:
                size_data+=sock_data
        else:
            total_data.append(sock_data)
        total_len=sum([len(i) for i in total_data ])
    return ''.join(total_data)
 
#线程处理
def handlechild(clientsock):
	clientInfo = str(clientsock.getpeername())
	print "Got connect from %s time:%s" %(clientInfo,time.strftime('%Y-%m-%d %H:%M:%S',time.localtime(time.time())))
	while True:
 
		value = recv_end(clientsock)
 
		if not len(value):
			respone = json.dumps({'status':'F','message':'请求参数不能为空'})
			clientsock.send(respone)
			break
 
		try:
			value = zlib.decompress(value)			
		except Exception,e:
			print e
			respone = json.dumps({'status':'F','message':'字符流解压失败,请检查你所压缩的字符流是否正确'})
			clientsock.send(respone)
			break
 
		try:
			value = json.loads(value)
		except:
			print '解压失败:' + str(value)
			respone = json.dumps({'status':'F','message':"json解压失败,数据包可能接收不完整"})
			clientsock.send(respone)
			break
 
		if not isinstance(value, dict):
			print '数据类型不是字典:' + str(value)
			respone = json.dumps({'status':'F','message':'数据类型不正确,请检查数据类型是否是字典型'})
			clientsock.send(respone)
			break
 
		if ('data' in value.keys()):
			data = value['data']
		else:
			print '参数中无data' + str(value.keys())
			respone = json.dumps({'status':'F','message':'参数不正确,请检查参数中是否有data'})
			clientsock.send(respone)
			break			

		if ('command' in value.keys()):
			if isinstance(data, list):
 
				if (value['command'] == 6102):
					db=Connection(host=dbhost,database=db_onebyone,user=dbuser,password=dbpwd)
				else:
					db=Connection(host=dbhost,database=db_base,user=dbuser,password=dbpwd)
 
				if (value['command'] == 6100):
					respone = hk_merchant(db,data)
				elif (value['command'] == 6101):
					respone = hk_scenic(db,data)
				elif (value['command'] == 6102):
					respone = it_itinerary(db,data)
				else:
					print '命令不正确:' + str(value['command'])
					respone = json.dumps({'status':'F','message':'无此命令'})
					clientsock.send(respone)
					break	
 
				db.close()
 
				clientsock.send(respone)
				break
 
			else:
				print 'data的数据类型不是list型:' + str(value)
				respone = json.dumps({'status':'F','message':'data数据类型不正确,请检查是否是list型'})
				clientsock.send(respone)
				break				
		else:
			print '参数中无command:' + str(value.keys())
			respone = json.dumps({'status':'F','message':'参数不正确,请检查参数中是否有command命令'})
			clientsock.send(respone)
			break			
 
	print 'close %s connection. time:%s' %(clientInfo,time.strftime('%Y-%m-%d %H:%M:%S',time.localtime(time.time())))
	clientsock.close()
 
 
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sock.bind((host, port))
sock.listen(1)
 
while True:
    try:
        clientsock, clientaddr = sock.accept()
    except (KeyboardInterrupt, SystemExit):
        raise
    except:
        continue
 
    t = Thread(target=handlechild, args=[clientsock])
    t.setDaemon(1)
    t.start()