本文实例讲述了python实现的文件同步服务器。分享给大家供大家参考。具体实现方法如下:
服务端使用asyncore, 收到文件后保存到本地。
客户端使用pyinotify监视目录的变化 ,把变动的文件发送到服务端。
重点:
1. 使用structs打包发送文件的信息,服务端收到后,根据文件信息来接收客户端传送过来的文件。
2. 客户端使用多线程,pyinotify监视到文件变化,放到队列中,由另外一个线程发送。
上代码:
服务端:
# receive file from client and store them into file use asyncore.#
#/usr/bin/python
#coding: utf-8
import asyncore
import socket
from socket import errno
import logging
import time
import sys
import struct
import os
import fcntl
import threading
from rrd_graph import MakeGraph
try:
import rrdtool
except (ImportError, ImportWarnning):
print "Hope this information can help you:"
print "Can not find pyinotify module in sys path, just run [apt-get install python-rrdtool] in ubuntu."
sys.exit(1)
class RequestHandler(asyncore.dispatcher):
def __init__(self, sock, map=None, chunk_size=1024):
self.logger = logging.getLogger('%s-%s' % (self.__class__.__name__, str(sock.getsockname())))
self.chunk_size = chunk_size
asyncore.dispatcher.__init__(self,sock,map)
self.data_to_write = list()
def readable(self):
#self.logger.debug("readable() called.")
return True
def writable(self):
response = (not self.connected) or len(self.data_to_write)
#self.logger.debug('writable() -> %s data length -> %s' % (response, len(self.data_to_write)))
return response
def handle_write(self):
data = self.data_to_write.pop()
#self.logger.debug("handle_write()->%s size: %s",data.rstrip('\r\n'),len(data))
sent = self.send(data[:self.chunk_size])
if sent < len(data):
remaining = data[sent:]
self.data_to_write.append(remaining)
def handle_read(self):
self.writen_size = 0
nagios_perfdata = '../perfdata'
head_packet_format = "!LL128s128sL"
head_packet_size = struct.calcsize(head_packet_format)
data = self.recv(head_packet_size)
if not data:
return
filepath_len, filename_len, filepath,filename, filesize = struct.unpack(head_packet_format,data)
filepath = os.path.join(nagios_perfdata, filepath[:filepath_len])
filename = filename[:filename_len]
self.logger.debug("update file: %s" % filepath + '/' + filename)
try:
if not os.path.exists(filepath):
os.makedirs(filepath)
except OSError:
pass
self.fd = open(os.path.join(filepath,filename), 'w')
#self.fd = open(filename,'w')
if filesize > self.chunk_size:
times = filesize / self.chunk_size
first_part_size = times * self.chunk_size
second_part_size = filesize % self.chunk_size
while 1:
try:
data = self.recv(self.chunk_size)
#self.logger.debug("handle_read()->%s size.",len(data))
except socket.error,e:
if e.args[0] == errno.EWOULDBLOCK:
print "EWOULDBLOCK"
time.sleep(1)
else:
#self.logger.debug("Error happend while receive data: %s" % e)
break
else:
self.fd.write(data)
self.fd.flush()
self.writen_size += len(data)
if self.writen_size == first_part_size:
break
#receive the packet at last
while 1:
try:
data = self.recv(second_part_size)
#self.logger.debug("handle_read()->%s size.",len(data))
except socket.error,e:
if e.args[0] == errno.EWOULDBLOCK:
print "EWOULDBLOCK"
time.sleep(1)
else:
#self.logger.debug("Error happend while receive data: %s" % e)
break
else:
self.fd.write(data)
self.fd.flush()
self.writen_size += len(data)
if len(data) == second_part_size:
break
elif filesize <= self.chunk_size:
while 1:
try:
data = self.recv(filesize)
#self.logger.debug("handle_read()->%s size.",len(data))
except socket.error,e:
if e.args[0] == errno.EWOULDBLOCK:
print "EWOULDBLOCK"
time.sleep(1)
else:
#self.logger.debug("Error happend while receive data: %s" % e)
break
else:
self.fd.write(data)
self.fd.flush()
self.writen_size += len(data)
if len(data) == filesize:
break
self.logger.debug("File size: %s" % self.writen_size)
class SyncServer(asyncore.dispatcher):
def __init__(self,host,port):
asyncore.dispatcher.__init__(self)
self.debug = True
self.logger = logging.getLogger(self.__class__.__name__)
self.create_socket(socket.AF_INET,socket.SOCK_STREAM)
self.set_reuse_addr()
self.bind((host,port))
self.listen(2000)
def handle_accept(self):
client_socket = self.accept()
if client_socket is None:
pass
else:
sock, addr = client_socket
#self.logger.debug("Incoming connection from %s" % repr(addr))
handler = RequestHandler(sock=sock)
class RunServer(threading.Thread):
def __init__(self):
super(RunServer,self).__init__()
self.daemon = False
def run(self):
server = SyncServer('',9999)
asyncore.loop(use_poll=True)
def StartServer():
logging.basicConfig(level=logging.DEBUG,
format='%(name)s: %(message)s',
)
RunServer().start()
#MakeGraph().start()
if __name__ == '__main__':
StartServer()
客户端:
# monitor path with inotify(python module), and send them to remote server.#
# use sendfile(2) instead of send function in socket, if we have python-sendfile installed.#
import socket
import time
import os
import sys
import struct
import threading
import Queue
try:
import pyinotify
except (ImportError, ImportWarnning):
print "Hope this information can help you:"
print "Can not find pyinotify module in sys path, just run [apt-get install python-pyinotify] in ubuntu."
sys.exit(1)
try:
from sendfile import sendfile
except (ImportError,ImportWarnning):
pass
filetype_filter = [".rrd",".xml"]
def check_filetype(pathname):
for suffix_name in filetype_filter:
if pathname[-4:] == suffix_name:
return True
try:
end_string = pathname.rsplit('.')[-1:][0]
end_int = int(end_string)
except:
pass
else:
# means pathname endwith digit
return False
class sync_file(threading.Thread):
def __init__(self, addr, events_queue):
super(sync_file,self).__init__()
self.daemon = False
self.queue = events_queue
self.addr = addr
self.chunk_size = 1024
def run(self):
while 1:
event = self.queue.get()
if check_filetype(event.pathname):
print time.asctime(),event.maskname, event.pathname
filepath = event.path.split('/')[-1:][0]
filename = event.name
filesize = os.stat(os.path.join(event.path, filename)).st_size
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
filepath_len = len(filepath)
filename_len = len(filename)
sock.connect(self.addr)
offset = 0
data = struct.pack("!LL128s128sL",filepath_len, filename_len, filepath,filename,filesize)
fd = open(event.pathname,'rb')
sock.sendall(data)
if "sendfile" in sys.modules:
# print "use sendfile(2)"
while 1:
sent = sendfile(sock.fileno(), fd.fileno(), offset, self.chunk_size)
if sent == 0:
break
offset += sent
else:
# print "use original send function"
while 1:
data = fd.read(self.chunk_size)
if not data: break
sock.send(data)
sock.close()
fd.close()
class EventHandler(pyinotify.ProcessEvent):
def __init__(self, events_queue):
super(EventHandler,self).__init__()
self.events_queue = events_queue
def my_init(self):
pass
def process_IN_CLOSE_WRITE(self,event):
self.events_queue.put(event)
def process_IN_MOVED_TO(self,event):
self.events_queue.put(event)
def start_notify(path, mask, sync_server):
events_queue = Queue.Queue()
sync_thread_pool = list()
for i in range(500):
sync_thread_pool.append(sync_file(sync_server, events_queue))
for i in sync_thread_pool:
i.start()
wm = pyinotify.WatchManager()
notifier = pyinotify.Notifier(wm,EventHandler(events_queue))
wdd = wm.add_watch(path,mask,rec=True)
notifier.loop()
def do_notify():
perfdata_path = '/var/lib/pnp4nagios/perfdata'
mask = pyinotify.IN_CLOSE_WRITE|pyinotify.IN_MOVED_TO
sync_server = ('127.0.0.1',9999)
start_notify(perfdata_path,mask,sync_server)
if __name__ == '__main__':
do_notify()
python监视线程池
#!/usr/bin/python
import threading
import time
class Monitor(threading.Thread):
def __init__(self, *args,**kwargs):
super(Monitor,self).__init__()
self.daemon = False
self.args = args
self.kwargs = kwargs
self.pool_list = []
def run(self):
print self.args
print self.kwargs
for name,value in self.kwargs.items():
obj = value[0]
temp = {}
temp[name] = obj
self.pool_list.append(temp)
while 1:
print self.pool_list
for name,value in self.kwargs.items():
obj = value[0]
parameters = value[1:]
died_threads = self.cal_died_thread(self.pool_list,name)
print "died_threads", died_threads
if died_threads >0:
for i in range(died_threads):
print "start %s thread..." % name
t = obj[0].__class__(*parameters)
t.start()
self.add_to_pool_list(t,name)
else:
break
time.sleep(0.5)
def cal_died_thread(self,pool_list,name):
i = 0
for item in self.pool_list:
for k,v in item.items():
if name == k:
lists = v
for t in lists:
if not t.isAlive():
self.remove_from_pool_list(t)
i +=1
return i
def add_to_pool_list(self,obj,name):
for item in self.pool_list:
for k,v in item.items():
if name == k:
v.append(obj)
def remove_from_pool_list(self, obj):
for item in self.pool_list:
for k,v in item.items():
try:
v.remove(obj)
except:
pass
else:
return
使用方法:
rrds_queue = Queue.Queue()
make_rrds_pool = []
for i in range(5):
make_rrds_pool.append(MakeRrds(rrds_queue))
for i in make_rrds_pool:
i.start()
make_graph_pool = []
for i in range(5):
make_graph_pool.append(MakeGraph(rrds_queue))
for i in make_graph_pool:
i.start()
monitor = Monitor(make_rrds_pool=(make_rrds_pool, rrds_queue), \
make_graph_pool=(make_graph_pool, rrds_queue))
monitor.start()
解析:
1. 接受字典参数,value为一个元组,第一个元素是线程池,后面的都是参数。
2. 每0.5秒监视线程池中的线程数量,如果线程死掉了,记录死掉线程的数目,再启动同样数量的线程。
3. 如果没有线程死去,则什么也不做。
从外部调用Django模块
import os
import sys
sys.path.insert(0,'/data/cloud_manage')
from django.core.management import setup_environ
import settings
setup_environ(settings)
from common.monitor import Monitor
from django.db import connection, transaction
前提就是,要新建一个django的project,这里我们新建了一个cloud_manage.
这样不仅可以调用django自身的模块,还能调用project本身的东西。
希望本文所述对大家的Python程序设计有所帮助。
Copyright© 2013-2020
All Rights Reserved 京ICP备2023019179号-8