Python自定义进程池实例分析【生产者、消费者模型问题】

363次阅读  |  发布于5年以前

本文实例分析了Python自定义进程池。分享给大家供大家参考,具体如下:

代码说明一切:


    #encoding=utf-8
    #author: walker
    #date: 2014-05-21
    #function: 自定义进程池遍历目录下文件
    from multiprocessing import Process, Queue, Lock
    import time, os
    #消费者
    class Consumer(Process):
      def __init__(self, queue, ioLock):
        super(Consumer, self).__init__()
        self.queue = queue
        self.ioLock = ioLock
      def run(self):
        while True:
          task = self.queue.get()  #队列中无任务时,会阻塞进程
          if isinstance(task, str) and task == 'quit':
            break;
          time.sleep(1)  #假定任务处理需要1秒钟
          self.ioLock.acquire()
          print( str(os.getpid()) + ' ' + task)
          self.ioLock.release()
        self.ioLock.acquire()
        print 'Bye-bye'
        self.ioLock.release()
    #生产者
    def Producer():
      queue = Queue()  #这个队列是进程/线程安全的
      ioLock = Lock()
      subNum = 4  #子进程数量
      workers = build_worker_pool(queue, ioLock, subNum)
      start_time = time.time()
      for parent, dirnames, filenames in os.walk(r'D:\test'):
        for filename in filenames:
          queue.put(filename)
          ioLock.acquire()
          print('qsize:' + str(queue.qsize()))
          ioLock.release()
          while queue.qsize() > subNum * 10: #控制队列中任务数量
            time.sleep(1)
      for worker in workers:
        queue.put('quit')
      for worker in workers:
        worker.join()
      ioLock.acquire()
      print('Done! Time taken: {}'.format(time.time() - start_time))
      ioLock.release()
    #创建进程池
    def build_worker_pool(queue, ioLock, size):
      workers = []
      for _ in range(size):
        worker = Consumer(queue, ioLock)
        worker.start()
        workers.append(worker)
      return workers
    if __name__ == '__main__':
      Producer()

ps:


    self.ioLock.acquire()
    ...
    self.ioLock.release()

可用:


    with self.ioLock:
      ...

替代。

再来一个好玩的例子:


    #encoding=utf-8
    #author: walker
    #date: 2016-01-06
    #function: 一个多进程的好玩例子
    import os, sys, time
    from multiprocessing import Pool
    cur_dir_fullpath = os.path.dirname(os.path.abspath(__file__))
    g_List = ['a']
    #修改全局变量g_List
    def ModifyDict_1():
      global g_List
      g_List.append('b')
    #修改全局变量g_List
    def ModifyDict_2():
      global g_List
      g_List.append('c')
    #处理一个
    def ProcOne(num):
      print('ProcOne ' + str(num) + ', g_List:' + repr(g_List))
    #处理所有
    def ProcAll():
      pool = Pool(processes = 4)
      for i in range(1, 20):
        #ProcOne(i)
        #pool.apply(ProcOne, (i,))
        pool.apply_async(ProcOne, (i,))
      pool.close()
      pool.join()
    ModifyDict_1() #修改全局变量g_List
    if __name__ == '__main__':
      ModifyDict_2() #修改全局变量g_List
      print('In main g_List :' + repr(g_List))
      ProcAll()

Windows7 下运行的结果:


    λ python3 demo.py
    In main g_List :['a', 'b', 'c']
    ProcOne 1, g_List:['a', 'b']
    ProcOne 2, g_List:['a', 'b']
    ProcOne 3, g_List:['a', 'b']
    ProcOne 4, g_List:['a', 'b']
    ProcOne 5, g_List:['a', 'b']
    ProcOne 6, g_List:['a', 'b']
    ProcOne 7, g_List:['a', 'b']
    ProcOne 8, g_List:['a', 'b']
    ProcOne 9, g_List:['a', 'b']
    ProcOne 10, g_List:['a', 'b']
    ProcOne 11, g_List:['a', 'b']
    ProcOne 12, g_List:['a', 'b']
    ProcOne 13, g_List:['a', 'b']
    ProcOne 14, g_List:['a', 'b']
    ProcOne 15, g_List:['a', 'b']
    ProcOne 16, g_List:['a', 'b']
    ProcOne 17, g_List:['a', 'b']
    ProcOne 18, g_List:['a', 'b']
    ProcOne 19, g_List:['a', 'b']

Ubuntu 14.04下运行的结果:


    In main g_List :['a', 'b', 'c']
    ProcOne 1, g_List:['a', 'b', 'c']
    ProcOne 2, g_List:['a', 'b', 'c']
    ProcOne 3, g_List:['a', 'b', 'c']
    ProcOne 5, g_List:['a', 'b', 'c']
    ProcOne 4, g_List:['a', 'b', 'c']
    ProcOne 8, g_List:['a', 'b', 'c']
    ProcOne 9, g_List:['a', 'b', 'c']
    ProcOne 7, g_List:['a', 'b', 'c']
    ProcOne 11, g_List:['a', 'b', 'c']
    ProcOne 6, g_List:['a', 'b', 'c']
    ProcOne 12, g_List:['a', 'b', 'c']
    ProcOne 13, g_List:['a', 'b', 'c']
    ProcOne 10, g_List:['a', 'b', 'c']
    ProcOne 14, g_List:['a', 'b', 'c']
    ProcOne 15, g_List:['a', 'b', 'c']
    ProcOne 16, g_List:['a', 'b', 'c']
    ProcOne 17, g_List:['a', 'b', 'c']
    ProcOne 18, g_List:['a', 'b', 'c']
    ProcOne 19, g_List:['a', 'b', 'c']

可以看见Windows7下第二次修改没有成功,而Ubuntu下修改成功了。据uliweb作者limodou讲,原因是Windows下是充重启实现的子进程;Linux下是fork实现的。

更多关于Python相关内容感兴趣的读者可查看本站专题:《Python URL操作技巧总结》、《Python图片操作技巧总结》、《Python数据结构与算法教程》、《Python Socket编程技巧总结》、《Python函数使用技巧总结》、《Python字符串操作技巧汇总》、《Python入门与进阶经典教程》及《Python文件与目录操作技巧汇总

希望本文所述对大家Python程序设计有所帮助。

Copyright© 2013-2020

All Rights Reserved 京ICP备2023019179号-8