开发者问题收集

如何使用 dask 使部分脚本异步?

2016-08-15
229

假设我得到了一组文档。我需要对它们进行标记,然后将它们转换为向量以供进一步工作。我发现 elasticsearch 的标记器比我自己的解决方案好得多,所以我正在改用它。但是,它的速度要慢得多。然后,预计最终结果将以流的形式输入到矢量化器中。

整个过程可以通过生成器的链式列表来完成

def fetch_documents(_cursor):
    with _cursor:
        # a lot of documents expected, may not fit in memory
        _cursor.execute('select ... from ...')

        for doc in _cursor:
            yield doc

def tokenize(documents):
    for doc in documents:
        yield elasticsearch_tokenize_me(doc)

def build_model(documents):
    some_model = SomeModel()

    for doc in documents:
        some_model.add_document(doc)

    return some_model

build_model(tokenize(fetch_documents))

所以这基本上可以正常工作,但并没有利用所有可用的处理能力。由于 dask 在其他相关项目中使用,我尝试适应并获得这一点(我正在使用 psycopg2 进行数据库访问)。

from dask import delayed
import psycopg2
import psycopg2.extras
from elasticsearch import Elasticsearch
from elasticsearch.client import IndicesClient

def loader():
    conn = psycopg2.connect()

    cur = conn.cursor(cursor_factory=psycopg2.extras.DictCursor)
    cur.execute('''
                SELECT document, ... FROM ...
                ''')
    return cur

@delayed
def tokenize(partition):
    result = []

    client = IndicesClient(Elasticsearch())

    for row in partition:
        _result = client.analyze(analyzer='standard', text=row['document'])
        result.append(dict(row,
                           tokens=tuple(item['token'] for item in _result['tokens'])))

    return result

@delayed
def build_model(sequence_of_data):
    some_model = SomeModel()

    for item in chain.from_iterable(sequence_of_data):
        some_model.add_document(item)

    return some_model

with loader() as cur:
    partitions = []

    for idx_start in range(0, cur.rowcount, 200):
        partitions.append(delayed(cur.fetchmany)(200))

    tokenized = []
    for partition in partitions:
        tokenized.append(tokenize(partition))

    result = do_something(tokenized)
    result.compute()

代码或多或少可以工作,除了最后所有文档在输入模型之前都被标记化。虽然这适用于较小的数据集合,但不适用于大量数据(由于巨大的内存消耗)。我是否应该仅使用普通的 concurrent.futures 来完成这项工作,还是我错误地使用了 dask?

2个回答

一个简单的解决方案是将数据本地加载到您的机器上(很难对单个 SQL 查询进行分区),然后将数据发送到 dask-cluster 进行昂贵的标记化步骤。 可能如下所示:

rows = cur.execute(''' SELECT document, ... FROM ... ''')

from toolz import partition_all, concat
partitions = partition_all(10000, rows)

from dask.distributed import Executor
e = Executor('scheduler-address:8786')

futures = []

for part in partitions:
    x = e.submit(tokenize, part)
    y = e.submit(process, x)
    futures.append(y)

results = e.gather(futures)
result = list(concat(results))

在此示例中,函数 tokenize 和 process 期望使用并返回元素列表。

MRocklin
2016-08-16

仅使用 concurrent.futures 进行工作

from concurrent.futures import ProcessPoolExecutor

def loader():
    conn = psycopg2.connect()

    cur = conn.cursor(cursor_factory=psycopg2.extras.DictCursor)
    cur.execute('''
                SELECT document, ... FROM ...
                ''')
    return cur

def tokenize(partition):
    result = []

    client = IndicesClient(Elasticsearch())

    for row in partition:
        _result = client.analyze(analyzer='standard', text=row['document'])
        result.append(dict(row,
                           tokens=tuple(item['token'] for item in _result['tokens'])))

    return result

def do_something(partitions, total):
    some_model = 0
    for partition in partitions:
        result = partition.result()

        for item in result:
            some_model.add_document(item)

    return some_model

with loader() as cur, \
    ProcessPoolExecutor(max_workers=8) as executor:
    print(cur.rowcount)
    partitions = []

    for idx_start in range(0, cur.rowcount, 200):
        partitions.append(executor.submit(tokenize,
                                          cur.fetchmany(200)))

    build_model(partitions)
Jeffrey04
2016-08-15