多个进程可以协同工作来完成一项任务。通常需要共享数据。所以在多进程之间保持数据的一致性就很重要了。需要共享数据协同的进程必须以适当的策略来读写数据。相关的同步原语和线程的库很类似。
进程的同步原语如下:
acquire()
release()
Event
set()
clear()
wait()
notify_all()
Threading
下面的代码展示了如何使用 barrier() 函数来同步两个进程。我们有4个进程,进程1和进程2由barrier语句管理,进程3和进程4没有同步策略。 :
barrier()
import multiprocessing from multiprocessing import Barrier, Lock, Process from time import time from datetime import datetime def test_with_barrier(synchronizer, serializer): name = multiprocessing.current_process().name synchronizer.wait() now = time() with serializer: print("process %s ----> %s" % (name, datetime.fromtimestamp(now))) def test_without_barrier(): name = multiprocessing.current_process().name now = time() print("process %s ----> %s" % (name, datetime.fromtimestamp(now))) if __name__ == '__main__': synchronizer = Barrier(2) serializer = Lock() Process(name='p1 - test_with_barrier', target=test_with_barrier, args=(synchronizer,serializer)).start() Process(name='p2 - test_with_barrier', target=test_with_barrier, args=(synchronizer,serializer)).start() Process(name='p3 - test_without_barrier', target=test_without_barrier).start() Process(name='p4 - test_without_barrier', target=test_without_barrier).start()
运行下面的代码,将看到进程1和进程2在同一时间打印: :
$ python process_barrier.py process p1 - test_with_barrier ----> 2015-05-09 11:11:33.291229 process p2 - test_with_barrier ----> 2015-05-09 11:11:33.291229 process p3 - test_without_barrier ----> 2015-05-09 11:11:33.310230 process p4 - test_without_barrier ----> 2015-05-09 11:11:33.333231
(译者注:译者在实际运行了10次,没有一次时间是相同的,感觉这个地方同一时间打印出来的影响因素很多。只能看到 with_barrier 的进程1和2比 without_barrier 的进程3和4时间差的小很多。)
with_barrier
without_barrier
if __name__ == '__main__': synchronizer = Barrier(2) serializer = Lock() Process(name='p1 - test_with_barrier', target=test_with_barrier, args=(synchronizer,serializer)).start() Process(name='p2 - test_with_barrier', target=test_with_barrier, args=(synchronizer,serializer)).start() Process(name='p3 - test_without_barrier', target=test_without_barrier).start() Process(name='p4 - test_without_barrier', target=test_without_barrier).start()
test_with_barrier 函数调用了barrier的 wait() 方法: :
test_with_barrier
def test_with_barrier(synchronizer, serializer): name = multiprocessing.current_process().name synchronizer.wait()
当两个进程都调用 wait() 方法的时候,它们会一起继续执行: :
now = time() with serializer: print("process %s ----> %s" % (name, datetime.fromtimestamp(now)))
下面这幅图表示了barrier如何同步两个进程:
Copyright© 2013-2020
All Rights Reserved 京ICP备2023019179号-8