PostgreSQL 表的分布式处理
我有一个包含数百万行的 PostgreSQL 表,需要使用相同的算法进行处理。 我使用 Python 和 SQLAlchemy.Core 执行此任务。
此算法接受一行或多行作为输入,并返回相同数量的行和一些更新的值。
id1, id2, NULL, NULL, NULL -> id1, id2, value1, value2, value3
id1, id3, NULL, NULL, NULL -> id1, id3, value4, value5, value6
id2, id3, NULL, NULL, NULL -> id2, id3, value7, value8, value9
...
id_n, id_m, NULL, NULL, NULL -> id_n, id_m, value_xxx, value_yyy, value_zzz
我使用 PC 集群执行此任务。该集群运行
dask.distributed
调度程序和工作程序。
我认为,可以使用
map
函数有效地实现此任务。我的想法是每个工作人员查询数据库,选择处理一些具有 NULL 值的行,然后用结果更新它们。
我的问题是:如何编写 SQL 查询,以便将表的各个部分分配给工作人员?
我试图在 SQL 查询中使用
offset
和
limit
为每个工作人员定义行的子集,每个工作人员都会发出这些子集:
SQL:
select * from table where value1 is NULL offset N limit 100;
...
update table where id1 = ... and id2 = ...
set value1 = value...;
Python:
from sqlalchemy import create_engine, bindparam, select, func
from distributed import Executor, progress
def process(offset, limit):
engine = create_engine(...)
# get next piece of work
query = select(...).where(...).limit(limit).offset(offset)
rows = engine.execute([select]).fetchall()
# process rows
# submit values to table
update_stmt = table.update().where(...).where(...).values(...)
up_values = ...
engine.execute(update_stmt, up_values)
if __name__ == '__main__':
e = Executor('{address}:{port}'.format(address=config('SERVER_ADDR'),
port=config('SERVER_PORT')))
n_rows = count_rows_to_process()
chunk_size = 100
progress(e.map(process, range(0, n_rows, chunk_size)))
但是,这不起作用。
在计算开始之前,
range
函数已经返回了偏移量列表,并且
map
函数在启动
process
函数之前已经将它们分配给工作人员。
然后一些工作人员成功完成了他们的工作块处理,将他们的结果提交到表中,并更新了值。
然后开始新的迭代,新的
SELECT ...WHERE value1 is NULL LIMIT 100 OFFSET ...
查询被发送到数据库,但偏移量现在无效,因为它是在先前的工作人员更新表之前计算的。 NULL 值的数量现在减少了,并且工作人员可以从数据库接收空集。
在开始计算之前,我无法使用一个
SELECT
查询,因为它将返回无法放入 RAM 的巨大表。
SQLAlchemy 手册还说,对于分布式处理,应该为每个 python 进程在本地创建引擎实例。 因此,我无法查询数据库一次并将返回的光标发送到
process
函数。
因此,解决方案是正确构建 SQL 查询。
要考虑的一个选项是随机化:
SELECT *
FROM table
WHERE value1 IS NULL
ORDER BY random()
LIMIT 100;
在最坏的情况下,您将有多个工作程序并行计算同一件事。如果您不介意,这是最简单的方法之一。
另一个选项是将各个行专用于特定工作程序:
UPDATE table
SET value1 = -9999
WHERE id IN (
SELECT id
FROM table
WHERE value1 IS NULL
ORDER BY random()
LIMIT 100
) RETURNING * ;
这样,您用 -9999“标记”特定工作程序已“获取”的行。所有其他工作程序将跳过这些行,因为 value1 不再为 NULL。这里的风险是,如果工作程序失败,您将没有简单的方法返回这些行 - 您必须手动将它们更新回 NULL。