使用 dask 延迟函数返回列表
2017-12-11
2336
我正在尝试使用 dask.delayed 来构建任务图。这在大多数情况下都运行良好,但我经常遇到这样的情况,我有许多延迟对象,它们的方法返回一个对象列表,该列表的长度无法根据我目前掌握的信息轻松计算出来:
items = get_collection() # known length
def do_work(item):
# get_list_of_things returns list of "unknown" length
return map(lambda x: x.DoStuff(), item.get_list_of_things())
results = [delayed(do_work(x)) for x in items]
这给出了
TypeError: Delayed objects of unspecified length are not iterable
在 dask 中是否有任何方法可以解决这个问题,最好不必在中间结果上调用 .compute(),因为这会破坏拥有任务图的大部分优势?它基本上意味着图形无法完全解析,直到它的一些步骤运行完毕,但唯一可变的是并行部分的宽度,它不会改变图形的结构或深度。
1个回答
不幸的是,如果您想对列表中的每个元素调用单独的函数,那么这 是 图形结构的一部分,如果您想使用 dask.delayed,则必须在图形构建时知道。
一般来说,我看到两个选项:
-
不要为列表的每个元素创建单独的任务,而是为前 10%、后 10% 等创建任务。这是 dask.bag 中采用的相同方法,它还处理未知数量元素的并行性(这可能值得考虑。
-
切换到实时的 parallel.futures 接口,并等待列表的结果在提交更多工作之前
from dask.distributed import Client client = Client() list_future = client.submit(do_work, *args) len_future = client.submit(len, list_future) n = len_future.result() # 等待长度计算完成 futures = [client.submit(operator.getitem, list_future, i) for i in range(n)] ... 使用 Futures 做更多事情
MRocklin
2017-12-12