当处理 list 或其他类型的序列时,一种很常用的操作是对序列中的每一个元素都执行相同的操作,然后收集结果。举例说,通过 Python IDLE 可以对一个 list 这样更新: :
>>>items = [1,2,3,4,5,6,7,8,9,10] >>>updated_items = [] >>>for x in items: >>> updated_items.append(x*2) >>> updated_items >>> [2, 4, 6, 8, 10, 12, 14, 16, 18, 20]
这是比较常见的一个操作,但是 Python 内置了一些功能可以更优雅地完成类似操作。
Python 的函数 map(aFunction, aSequence) 将在序列的每一个元素上调用传入的函数,并将结果以 list 的形式返回。用这个函数完成上面的操作: :
map(aFunction, aSequence)
>>>items = [1,2,3,4,5,6,7,8,9,10] >>>def multiplyFor2(x):return x*2 >>>print(list(map(multiplyFor2,items))) >>>[2, 4, 6, 8, 10, 12, 14, 16, 18, 20]
这里,我们将自定义的 multiplyFor2 函数传给 map . 它将对 items 中所有的元素执行此函数,最后以 list 的形式返回结果。
multiplyFor2
map
items
我们也可以将 lambda 函数(不用绑定变量名的匿名函数)当做参数传给 map 函数。这段代码就变成以下这样: :
>>>items = [1,2,3,4,5,6,7,8,9,10] >>>print(list(map(lambda x:x*2,items))) >>>[2, 4, 6, 8, 10, 12, 14, 16, 18, 20]
内置的 map 函数比手动写 for 循环的性能更高。
SCOOP 模块提供了多个 map 函数可以将异步计算任务下发到多个计算节点:
futures.map(func, iterables, kargs)
futures.map_as_completed(func, iterables, kargs)
futures.scoop.futures.mapReduce(mapFunc, reductionOp, iterables, kargs)
在这个例子中,我们将对比 SCOOP 实现 MapReduce 和 Python 内置函数实现: :
""" Compare SCOOP MapReduce with a serial implementation """ import operator import time from scoop import futures def simulateWorkload(inputData): time.sleep(0.01) return sum(inputData) def CompareMapReduce(): mapScoopTime = time.time() res = futures.mapReduce( simulateWorkload, operator.add, list([a] * a for a in range(1000)), ) mapScoopTime = time.time() - mapScoopTime print("futures.map in SCOOP executed in {0:.3f}s with result:{1}".format( mapScoopTime, res)) mapPythonTime = time.time() res = sum(map(simulateWorkload, list([a] * a for a in range(1000)))) mapPythonTime = time.time() - mapPythonTime print("map Python executed in: {0:.3f}s with result: {1}".format( mapPythonTime, res)) if __name__ == '__main__': CompareMapReduce()
这段代码通过以下命令来执行: :
python -m scoop map_reduce.py > [2015-06-12 20:13:25,602] launcher INFO SCOOP 0.7.2 dev on win32 using Python 3.4.3 (v3.4.3:9b73f1c3e601, Feb 24 2015, 22:43:06) [MSC v.1600 32 bit (Intel)], API: 1013 [2015-06-12 20:13:25,602] launcher INFO Deploying 2 worker(s) over 1 host(s). [2015-06-12 20:13:25,602] launcher INFO Worker d--istribution: [2015-06-12 20:13:25,602] launcher INFO 127.0.0.1: 1 + origin Launching 2 worker(s) using an unknown shell. futures.map in SCOOP executed in 8.459s with result: 332833500 map Python executed in: 10.034s with result: 332833500 [2015-06-12 20:13:45,344] launcher (127.0.0.1:2559) INFO is done. [2015-06-12 20:13:45,368] launcher (127.0.0.1:2559) INFO cleaning spawned subprocesses.
在这个例子中,我们将 SCOOP 实现的 MapReduce 和 Python 内置函数的 MapReduce 来对比。主要的函数是 CompareMapReduce() ,里面有两种实现和对时间的统计。程序结构如下: :
CompareMapReduce()
mapScoopTime = tme.time() #Run SCOOP MapReduce mapScoopTime = time.time() – mapScoopTime mapPythonTime = time.time() #Run serial MapReduce mapPythonTime = time.time() - mapPythonTime
在输出中,我们打印了执行时间: :
futures.map in SCOOP executed in 8.459s with result: 332833500 map Python executed in: 10.034s with result: 332833500
为了得到比较明显的时间比较,我们在 simulatedWordload 函数中引入了 time.sleep 来延长计算时间。 :
simulatedWordload
time.sleep
def simulateWorkload(inputData, chose=None): time.sleep(0.01) return sum(inputData)
SCOOP 版本的 MapReduce 如下: :
res = futures.mapReduce( simulateWorkload, operator.add, list([a] * a for a in range(1000)), )
futures.mapReduce 函数需要以下参数:
futures.mapReduce
simulateWork
operator.add
list(...)
使用 Python 内置函数实现的 MapReduce 如下: :
res = sum(map(simulateWorkload, list([a] * a for a in range(1000))))
Python 内置的 map() 函数接受两个参数: simulateWorkload 函数和可迭代的 list() 对象。Reduce 操作我们只是简单地用 Python 内置的 sum() 函数。
map()
simulateWorkload
list()
sum()
Copyright© 2013-2020
All Rights Reserved 京ICP备2023019179号-8