Dask延迟函数调用,不传递参数
2020-04-02
566
我希望更好地理解使用
dask.delayed
调用依赖于参数的函数时出现的以下行为。当在 configparser 读取的参数文件中指定参数时,似乎会出现问题。这是一个完整的示例:
参数文件:
#zpar.ini: parameter file for configparser
[my pars]
my_zpar = 2.
解析器:
#zippy_parser
import configparser
def read(_rundir):
global rundir
rundir = _rundir
cp = configparser.ConfigParser()
cp.read(rundir + '/zpar.ini')
#[my pars]
global my_zpar
my_zpar = cp['my pars'].getfloat('my_zpar')
和主 python 文件:
# dask test with configparser
import dask
from dask.distributed import Client
import zippy_parser as zpar
def my_func(x, y):
# print stuff
print("parameter from main is: {}".format(main_par))
print("parameter from configparser is: {}".format(zpar.my_zpar))
# do stuff
return x + y
if __name__ == '__main__':
client = Client(n_workers = 4)
#read parameters from input file
rundir = '/path/to/parameter/file'
zpar.read(rundir)
#test zpar
print("zpar is {}".format(zpar.my_zpar))
#define parameter and call my_func
main_par = 5.
z = dask.delayed(my_func)(1., 2.)
z.compute()
client.close()
my_func() 中的第一个打印语句执行正常,但第二个打印语句引发异常。输出为:
zpar is 2.0 parameter from main is: 5.0 distributed.worker - WARNING - Compute Failed Function: my_func args: (1.0, 2.0) kwargs: {} Exception: AttributeError("module 'zippy_parser' has no attribute 'my_zpar'",)
我是 dask 新手。我想这与序列化有关,我不明白。有人可以启发我和/或指出相关文档吗?谢谢!
2个回答
我会尽量简短。
当一个函数被序列化以便发送给 worker 时,python 还会发送该函数所需的局部变量和函数(它的“闭包”)。但是,它会按名称存储它引用的模块,而不会尝试序列化整个运行时。
这意味着
zippy_parser
被
导入
到 worker 中,而不是反序列化。由于函数
read
从未在 worker 中被调用,因此
global
变量从未被初始化。
因此,您可以在 worker 中将
read
作为函数的一部分调用,或者以其他方式调用,但使用函数设置模块全局变量的模式可能不太好。 Dask 的延迟机制更倾向于功能纯度,即您获得的结果不应依赖于运行时的当前状态。
(请注意,如果您在主脚本中调用
read
后创建了客户端,则工作者
可能
已经获得了内存版本,具体取决于如何在您的系统上配置子进程的创建方式)
mdurant
2020-04-03
我鼓励您明确地将所有参数传递给您的 dask 延迟函数,而不是依赖于全局命名空间。
MRocklin
2020-04-04