python3实现ftp服务功能(服务端 For Linux)

647次阅读  |  发布于5年以前

本文实例为大家分享了python3实现ftp服务功能的具体代码,供大家参考,具体内容如下

功能介绍:

可执行的命令:

ls
pwd
cd
put
rm
get
mkdir

1、用户加密认证

2、允许多用户同时登陆

3、每个用户有自己的家目录,且只可以访问自己的家目录

4、运行在自己家目录下随意切换目录

5、允许上传下载文件,且文件一致

6、传输过程中显示进度条
server main 代码:


    # Author by Andy
    # _*_ coding:utf-8 _*_
    import os, sys, json, hashlib, socketserver, time

    base_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
    sys.path.append(base_dir)
    from conf import userdb_set
    class Ftp_server(socketserver.BaseRequestHandler):
     user_home_dir = ''
     def auth(self, *args):
      '''验证用户名及密码'''
      cmd_dic = args[0]
      username = cmd_dic["username"]
      password = cmd_dic["password"]
      f = open(userdb_set.userdb_set(), 'r')
      user_info = json.load(f)
      if username in user_info.keys():
       if password == user_info[username]:
        self.request.send('0'.encode())
        os.chdir('/home/%s' % username)
        self.user_home_dir = os.popen('pwd').read().strip()
        data = "%s login successed" % username
        self.loging(data)
       else:
        self.request.send('1'.encode())
        data = "%s login failed" % username
        self.loging(data)
        f.close
      else:
       self.request.send('1'.encode())
       data = "%s login failed" % username
       self.loging(data)
       f.close
       ##########################################

     def get(self, *args):
      '''给客户端传输文件'''
      request_code = {
       '0': 'file is ready to get',
       '1': 'file not found!'
      }
      cmd_dic = args[0]
      self.loging(json.dumps(cmd_dic))
      filename = cmd_dic["filename"]
      if os.path.isfile(filename):
       self.request.send('0'.encode('utf-8')) # 确认文件存在
       self.request.recv(1024)
       self.request.send(str(os.stat(filename).st_size).encode('utf-8'))
       self.request.recv(1024)
       m = hashlib.md5()
       f = open(filename, 'rb')
       for line in f:
        m.update(line)
        self.request.send(line)
       self.request.send(m.hexdigest().encode('utf-8'))
       print('From server:Md5 value has been sended!')
       f.close()
      else:
       self.request.send('1'.encode('utf-8'))
       ###########################################

     def cd(self, *args):
      '''执行cd命令'''
      user_current_dir = os.popen('pwd').read().strip()
      cmd_dic = args[0]
      self.loging(json.dumps(cmd_dic))
      path = cmd_dic['path']
      if path.startswith('/'):
       if self.user_home_dir in path:
        os.chdir(path)
        new_dir = os.popen('pwd').read()
        user_current_dir = new_dir
        self.request.send('Change dir successfully!'.encode("utf-8"))
        data = 'Change dir successfully!'
        self.loging(data)
       elif os.path.exists(path):
        self.request.send('Permission Denied!'.encode("utf-8"))
        data = 'Permission Denied!'
        self.loging(data)
       else:
        self.request.send('Directory not found!'.encode("utf-8"))
        data = 'Directory not found!'
        self.loging(data)
      elif os.path.exists(path):
       os.chdir(path)
       new_dir = os.popen('pwd').read().strip()
       if self.user_home_dir in new_dir:
        self.request.send('Change dir successfully!'.encode("utf-8"))
        user_current_dir = new_dir
        data = 'Change dir successfully!'
        self.loging(data)
       else:
        os.chdir(user_current_dir)
        self.request.send('Permission Denied!'.encode("utf-8"))
        data = 'Permission Denied!'
        self.loging(data)
      else:
       self.request.send('Directory not found!'.encode("utf-8"))
       data = 'Directory not found!'
       self.loging(data)
       ###########################################

     def rm(self, *args):
      request_code = {
       '0': 'file exist,and Please confirm whether to rm',
       '1': 'file not found!'
      }
      cmd_dic = args[0]
      self.loging(json.dumps(cmd_dic))
      filename = cmd_dic['filename']
      if os.path.exists(filename):
       self.request.send('0'.encode("utf-8")) # 确认文件存在
       client_response = self.request.recv(1024).decode()
       if client_response == '0':
        os.popen('rm -rf %s' % filename)
        self.request.send(('File %s has been deleted!' % filename).encode("utf-8"))
        self.loging('File %s has been deleted!' % filename)
       else:
        self.request.send(('File %s not deleted!' % filename).encode("utf-8"))
        self.loging('File %s not deleted!' % filename)
      else:
       self.request.send('1'.encode("utf-8"))
       ########################################

     def pwd(self, *args):
      '''执行pwd命令'''
      cmd_dic = args[0]
      self.loging(json.dumps(cmd_dic))
      server_response = os.popen('pwd').read().strip().encode("utf-8")
      self.request.send(server_response)

     #############################################
     def ls(self, *args):
      '''执行ls命名'''
      cmd_dic = args[0]
      self.loging(json.dumps(cmd_dic))
      path = cmd_dic['path']
      cmd = 'ls -l %s' % path
      server_response = os.popen(cmd).read().encode("utf-8")
      self.request.send(server_response)

     ############################################
     def put(self, *args):
      '''接收客户端文件'''
      cmd_dic = args[0]
      self.loging(json.dumps(cmd_dic))
      filename = cmd_dic["filename"]
      filesize = cmd_dic["size"]
      if os.path.isfile(filename):
       f = open(filename + '.new', 'wb')
      else:
       f = open(filename, 'wb')
      request_code = {
       '200': 'Ready to recceive data!',
       '210': 'Not ready to received data!'
      }
      self.request.send('200'.encode())
      receive_size = 0
      while True:
       if receive_size < filesize:
        data = self.request.recv(1024)
        f.write(data)
        receive_size += len(data)
       else:
        data = "File %s has been uploaded successfully!" % filename
        self.loging(data)
        print(data)
        break

        ################################################

     def mkdir(self, *args):
      request_code = {
       '0': 'Directory has been made!',
       '1': 'Directory is aleady exist!'
      }
      cmd_dic = args[0]
      self.loging(json.dumps(cmd_dic))
      dir_name = cmd_dic['dir_name']
      if os.path.exists(dir_name):
       self.request.send('1'.encode("utf-8"))
      else:
       os.popen('mkdir %s' % dir_name)
       self.request.send('0'.encode("utf-8"))

       #############################################

      def loging(self, data):
      '''日志记录'''
      localtime = time.asctime(time.localtime(time.time()))
      log_file = '/root/ftp/ftpserver/log/server.log'
      with open(log_file, 'a', encoding='utf-8') as f:
       f.write('%s-->' % localtime + data + '\n')
       ##############################################

     def handle(self):
      # print("您本次访问使用的IP为:%s" %self.client_address[0])
      # localtime = time.asctime( time.localtime(time.time()))
      # print(localtime)

      while True:
       try:
        self.data = self.request.recv(1024).decode() #
        # print(self.data)
        cmd_dic = json.loads(self.data)
        action = cmd_dic["action"]
        # print("用户请求%s"%action)
        if hasattr(self, action):
         func = getattr(self, action)
         func(cmd_dic)
       except Exception as e:
        self.loging(str(e))
        break

    def run():
     HOST, PORT = '0.0.0.0', 6969
     print("The server is started,and listenning at port 6969")
     server = socketserver.ThreadingTCPServer((HOST, PORT), Ftp_server)
     server.serve_forever()
    if __name__ == '__main__':
     run()


设置用户口令代码:


    #Author by Andy
    #_*_ coding:utf-8 _*_
    import os,json,hashlib,sys

    base_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
    userdb_file = base_dir+"\data\\userdb"
    # print(userdb_file)
    def userdb_set():
     if os.path.isfile(userdb_file):
      # print(userdb_file)
      return userdb_file
     else:
      print('请先为您的服务器创建用户!')
      user_data = {}
      dict={}
      Exit_flags = True
      while Exit_flags:
       username = input("Please input username:")
       if username != 'exit':
        password = input("Please input passwod:")
        if password != 'exit':
          user_data.update({username:password})
          m = hashlib.md5()
          # m.update('hello')
          # print(m.hexdigest())
          for i in user_data:
           # print(i,user_data[i])
           m.update(user_data[i].encode())
           dict.update({i:m.hexdigest()})
        else:
         break
       else:
        break
      f = open(userdb_file,'w')
      json.dump(dict,f)
      f.close()
     return userdb_file

目录结构:

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持脚本之家。

Copyright© 2013-2020

All Rights Reserved 京ICP备2023019179号-8