Python多进程通信Queue、Pipe、Value、Array实例

806次阅读  |  发布于5年以前

queue和pipe的区别: pipe用来在两个进程间通信。queue用来在多个进程间实现通信。 此两种方法为所有系统多进程通信的基本方法,几乎所有的语言都支持此两种方法。

1)Queue & JoinableQueue

queue用来在进程间传递消息,任何可以pickle-able的对象都可以在加入到queue。

multiprocessing.JoinableQueue 是 Queue的子类,增加了task_done()和join()方法。

task_done()用来告诉queue一个task完成。一般地在调用get()获得一个task,在task结束后调用task_done()来通知Queue当前task完成。

join() 阻塞直到queue中的所有的task都被处理(即task_done方法被调用)。

代码:

复制代码 代码如下:

import multiprocessing
import time

class Consumer(multiprocessing.Process):

def __init__(self, task_queue, result_queue):  
    multiprocessing.Process.__init__(self)  
    self.task_queue = task_queue  
    self.result_queue = result_queue

def run(self):  
    proc_name = self.name  
    while True:  
        next_task = self.task_queue.get()  
        if next_task is None:  
            # Poison pill means shutdown  
            print ('%s: Exiting' % proc_name)  
            self.task_queue.task_done()  
            break  
        print ('%s: %s' % (proc_name, next_task))  
        answer = next_task() # __call__()  
        self.task_queue.task_done()  
        self.result_queue.put(answer)  
    return

class Task(object):
def init(self, a, b):
self.a = a
self.b = b
def call(self):
time.sleep(0.1) # pretend to take some time to do the work
return '%s %s = %s' % (self.a, self.b, self.a self.b)
def str(self):
return '%s * %s' % (self.a, self.b)

if name == 'main':

Establish communication queues

tasks = multiprocessing.JoinableQueue()  
results = multiprocessing.Queue()  

# Start consumers  
num_consumers = multiprocessing.cpu_count()  
print ('Creating %d consumers' % num_consumers)  
consumers = [ Consumer(tasks, results)  
              for i in range(num_consumers) ]  
for w in consumers:  
    w.start()  

# Enqueue jobs  
num_jobs = 10  
for i in range(num_jobs):  
    tasks.put(Task(i, i))  

# Add a poison pill for each consumer  
for i in range(num_consumers):  
    tasks.put(None)

# Wait for all of the tasks to finish  
tasks.join()  

# Start printing results  
while num_jobs:  
    result = results.get()  
    print ('Result:', result)  
    num_jobs -= 1  

Copyright© 2013-2020

All Rights Reserved 京ICP备2023019179号-8