长流程中的websocket异步反馈
我试图在网页中实现反馈,让用户从 Excel 表(视线,是的……)开始一个漫长的过程。对于每行数据,处理时间约为 1 秒,常见数据长度在 40 到 100 项之间,因此总体处理时间可能超过一分钟。
我正在页面中显示数据的预览,通过 websocket 启动该过程,并希望显示来自同一 websocket 的进展。
处理本身由外部包完成,页面复杂性最小,因此我将其包装在
Lite
单个文件中。
我的问题是,在 websocket 路由中启动的漫长处理会阻止反馈,直到它完成并且所有进展事件在最后同时发送。据我了解,它与 Mojolicious 的事件循环有关,我应该单独启动处理以避免冻结 websocket 的处理。
请注意,我已尝试使用
EventSource
的单独反馈渠道在处理过程中将一些进度推送到客户端,但它在最后立即显示相同的完成情况。
这是我的简化代码,我使用
sleep()
来模拟长过程。我从
perl mojo_notify_ws.pl daemon
开始
您能建议如何修改 websocket 路由以允许实时反馈吗?
use Mojolicious::Lite;
use Mojo::JSON qw(encode_json decode_json j);
use Data::Dumper;
$|++;
any '/' => sub {
my $c = shift;
$c->render('index');
};
my $peer;
websocket '/go' => sub {
use Carp::Always;
my $ws = shift;
$peer = $ws->tx;
app->log->debug(sprintf 'Client connected: %s', Dumper $peer->remote_address);
# do not subscribe to 'text' else 'json' won't work
#$ws->on(text => sub {
# my ($ws, $msg) = @_;
# app->log->debug("Received text from websocket: `$msg`");
# });
# $peer->send('{"type": "test"}');
# say 'default inactivity timeout='. (p $ws->inactivity_timeout());
$ws->inactivity_timeout(120);
$ws->on(json => sub {
my ($ws, $msg) = @_;
app->log->debug('Received from websocket:', Dumper(\$msg));
unless($msg){
app->log->debug('Received empty message? WTF?!');
return;
}
my $prompt = $msg->{cmd};
return unless $prompt;
app->log->debug(sprintf 'Received: `%s`', $prompt // 'empty??');
# simulate
my $loop = Mojo::IOLoop->singleton;
# $loop->subprocess( sub {
# my $sp = shift;
for my $cell (1..3) {
# $loop->delay( sub {
app->log->debug("sending cell $cell");
my $payload = {
type => 'ticket',
cell => $cell,
result => $cell % 2 ? 'OK' : 'NOK'
};
$ws->send( { json => $payload } );
sleep(2);
# $loop->timer(2, sub {say 'we have waited 2 secs!';})->wait;
# });
};
# }, sub {} );#subprocess
app->log->debug('sending end of process ->websocket');
$ws->send({json => { type => 'end' } });
});
$ws->on(finish => sub {
my ($ws, $code, $reason) = @_;
$reason = '' unless defined $reason;
app->log->debug("Client disconnected: $code ($reason)");
});
app->log->debug('Reached end of ws route definition');
};
app->start;
__DATA__
@@ index.html.ep
<html>
<head>
<script type="text/javascript" src="https://cdnjs.cloudflare.com/ajax/libs/jquery/3.3.1/jquery.js"></script>
<script>
var timerID = 0;
function keepAlive(ws) {
var timeout = 20000;
if (ws.readyState == ws.OPEN) {
ws.send('ping');
}
timerId = setTimeout(function(){keepAlive(ws);}, timeout);
}
function cancelKeepAlive() {
if (timerId) {
clearTimeout(timerId);
}
}
function flagCell(cell, result){
var id='#CELL_' + cell;
var cell = $(id);
if(cell) {
if (result=='OK') {
cell.css('color', 'green');
cell.text('⯲');
} else {
cell.css('color','red');
cell.text('✘');
}
}
}
function process(){
//debugger;
console.log('Opening WebSocket');
var ws = new WebSocket('<%= url_for('go')->to_abs %>');
ws.onopen = function (){
console.log('Websocket Open');
//keepAlive(ws);
ws.send(JSON.stringify({cmd: "let's go Perl"}));
};
//incoming
ws.onmessage = function(evt){
var data = JSON.parse(evt.data);
console.log('WS received '+JSON.stringify(data));
if (data.type == 'ticket') {
console.log('Server has send a status');
console.log('Cell:'+data.cell + ' res:' + data.result);
flagCell(data.cell, data.result);
} else if (data.type == 'end') {
console.log('Server has finished.');
//cancelKeepAlive();
ws.close();
} else {
console.log('Unknown message:' + evt.data);
}
};
ws.onerror = function (evt) {
console.log('ws error:', evt.data);
}
ws.onclose = function (evt) {
if(evt.wasClean) {
console.log('Connection closed cleanly');
} else {
console.log('Connection reseted');
}
console.log('Code:'+ evt.code + ' Reason:' + evt.reason);
}
}
</script>
</head>
<body>
<button type=button id='upload' onclick="process();">Go</button><br>
<div style='font-family:sans;'>
<table border="1px">
<tr><td id="CELL_1"> </td><td>Foo</td></tr>
<tr><td id="CELL_2"> </td><td>Bar</td></tr>
<tr><td id="CELL_3"> </td><td>Baz</td></tr>
</table>
</div>
</body>
</html>
编辑:
Grinnz 提供了一个合适的解决方案,但根据记录,这是我使用
Mojo::IOLoop::Subprocess
回调的尝试,但随后我根本没有反馈。我在 Linux 上运行,
Subprocess
似乎分叉,
并且父进程似乎立即终止了 websocket
编辑:
否:我最终发现
$ws->send()
放在错误的位置,因为它应该放在在父端运行的第二个
sub{} 中,而不是在子进程中运行的第一个中。应重构此代码,以便每个循环迭代都有一个
subprocess
,并且最后一步用于通知结束。
这是修改后的
on(json)
$ws->on(json => sub {
my ($ws, $msg) = @_;
app->log->debug('Received from websocket:', Dumper(\$msg));
unless($msg){
app->log->debug('Received empty message? WTF?!');
return;
}
my $prompt = $msg->{cmd};
return unless $prompt;
app->log->debug(sprintf 'Received: `%s`', $prompt // '<empty??>');
# my $loop = Mojo::IOLoop->singleton;
my $subprocess = Mojo::IOLoop::Subprocess->new;
app->log->debug("we are pid $$");
$subprocess->run(
sub {
my $sp = shift;
for my $cell (1..3) {
app->log->debug("starting process for cell $cell in pid $$");
sleep(2);
app->log->debug("sending cell $cell to ws");
my $payload = {
type => 'ticket',
cell => $cell,
result => $cell % 2 ? 'OK' : 'NOK'
};
$ws->send( { json => $payload } ); # FIXME: actually this line is in the wrong place
# and should be in the second sub{}
};
},
sub {
my ($sp, $err, @results) = @_;
$ws->reply->exception($err) and return if $err;
app->log->debug('sending end of process ->websocket');
$ws->send({json => { type => 'end' } });
});
# Start event loop if necessary
$subprocess->ioloop->start unless $subprocess->ioloop->is_running;
});
以及相应的日志:
[Wed Oct 3 19:51:58 2018] [debug] Received: `let's go Perl`
[Wed Oct 3 19:51:58 2018] [debug] we are pid 8898
[Wed Oct 3 19:51:58 2018] [debug] Client disconnected: 1006 ()
[Wed Oct 3 19:51:58 2018] [debug] starting process for cell 1 in pid 8915
[Wed Oct 3 19:52:00 2018] [debug] sending cell 1 to ws
[Wed Oct 3 19:52:00 2018] [debug] starting process for cell 2 in pid 8915
[Wed Oct 3 19:52:02 2018] [debug] sending cell 2 to ws
[Wed Oct 3 19:52:02 2018] [debug] starting process for cell 3 in pid 8915
[Wed Oct 3 19:52:04 2018] [debug] sending cell 3 to ws
[Wed Oct 3 19:52:04 2018] [debug] sending end of process ->websocket
[Wed Oct 3 19:52:04 2018] [debug] Client disconnected: 1005 ()
我还尝试使用
Mojo::IOLoop->delay
以类似于
Promise
解决方案的方式生成复杂的步骤序列,但是这个也在最后一次发送所有通知:
$ws->on(json => sub {
my ($ws, $msg) = @_;
app->log->debug('Received from websocket:', Dumper(\$msg));
unless($msg){
app->log->debug('Received empty message? WTF?!');
return;
}
my $prompt = $msg->{cmd};
return unless $prompt;
app->log->debug(sprintf 'Received: `%s`', $prompt // '<empty??>');
app->log->debug("we are pid $$");
my @steps;
for my $cell (1..3) {
push @steps,
sub {
app->log->debug("subprocess for cell pid $cell");
# my $sp = shift;
my $delay = shift;
sleep(2);
app->log->debug("end of sleep for cell $cell");
$delay->pass($cell % 2 ? 'OK' : 'NOK');
},
sub {
my $delay = shift;
my $result = shift;
app->log->debug("sending cell $cell from pid $$ - result was $result");
my $payload = {
type => 'ticket',
cell => $cell,
result => $result
};
$ws->send( { json => $payload } );
$delay->pass;
};
}
# add final step to notify end of processing
push @steps, sub {
my $delay = shift;
app->log->debug('sending end of process ->websocket');
$ws->send({json => { type => 'end' } });
$delay->pass;
};
my $delay = Mojo::IOLoop::Delay->new;
app->log->debug("Starting delay...");
$delay->steps( @steps );
app->log->debug("After the delay");
});
不可能神奇地让 Perl 代码非阻塞。 这就是您的阻塞操作阻碍 websocket 响应和事件循环的原因。
单个子进程不适用于这种情况,因为只有处理请求的原始工作进程才能响应 websocket,并且子进程只能返回一次。但是,您可以使用子进程来准备要发送的每个响应。但是,您对子进程的使用并不完全正确。
传递给子进程的第一个子例程在 fork 中执行,因此不会阻塞主进程。子进程完成后,第二个子例程在父进程中执行,并接收第一个子例程的返回值。这是您需要发送回复的地方。
除此之外的任何代码都将在子进程启动之前执行,因为这是异步代码,所以您需要通过回调对逻辑进行排序。您可以使用 承诺 来简化复杂的排序。
use Mojo::Promise;
$ws->on(json => sub {
my ($ws, $msg) = @_;
app->log->debug('Received from websocket:', Dumper(\$msg));
unless($msg){
app->log->debug('Received empty message? WTF?!');
return;
}
my $prompt = $msg->{cmd};
return unless $prompt;
app->log->debug(sprintf 'Received: `%s`', $prompt // 'empty??');
my $promise = Mojo::Promise->new->resolve; # starting point
# attach follow-up code for each cell, returning a new promise representing the whole chain so far
for my $cell (1..3) {
$promise = $promise->then(sub {
my $promise = Mojo::Promise->new;
Mojo::IOLoop->subprocess(sub {
app->log->debug("sending cell $cell");
sleep(2);
my $payload = {
type => 'ticket',
cell => $cell,
result => $cell % 2 ? 'OK' : 'NOK'
};
return $payload;
}, sub {
my ($sp, $err, $payload) = @_;
return $promise->reject($err) if $err; # indicates subprocess died
$ws->send( { json => $payload }, sub { $promise->resolve } );
});
# here, the subprocess has not been started yet
# it will be started when this handler returns to the event loop
# then the second callback will run once the subprocess exits
return $promise;
};
}
# chain from last promise
$promise->then(sub {
app->log->debug('sending end of process ->websocket');
$ws->send({json => { type => 'end' } });
})->catch(sub {
my $err = shift;
# you can send or log something here to indicate an error occurred in one of the subprocesses
});
});
如果合适的话,我可以详细介绍其他一些选项: Mojo::IOLoop::ReadWriteFork 它可以让您只启动一个子进程并不断从中接收 STDOUT(您需要自己序列化有效负载以将其发送到 STDOUT,就像使用 Mojo::JSON 一样);或者一个常规子进程,该子进程通过两个进程都可以连接的外部发布/订阅代理将状态信息发送回父进程,例如 Postgres 、 Redis 或 Mercury (也需要序列化)。
您可以使用线程而不是子进程来完成工作。创建线程后,您需要一个通过 websocket 更新进度的循环。
如果您处理的关键工作负载确实必须在任何情况下完成(websocket 消失、网络中断等),则应将其委托给另一个守护进程,该守护进程会持续存在并通过文件或套接字传达其状态。
如果它是非关键工作负载,并且您可以轻松重新启动它,那么这可能是适合您的模板。
# Insert this at module header
# use threads;
# use Thread::Queue;
my $queue = Thread::Queue->new();
my $worker = threads->create(sub {
# dummy workload. do your work here
my $count = 60;
for (1..$count) {
sleep 1;
$queue->enqueue($_/$count);
}
# undef to signal end of work
$queue->enqueue(undef);
return;
});
# blocking dequeuing ends when retrieving an undef'd value
while(defined(my $item = $queue->dequeue)) {
# update progress via websocket
printf("%f %\n", $item);
}
# join thread
$worker->join;
我对您更新的示例做了一些小改动,使其按预期工作。您可以使用
Subprocess
模块的
progress
功能来确保从长子进程异步通过 websocket 发送正确的数据。
代码现在按预期工作,每次子进程进行迭代时,客户端都会更新表状态。
源代码的相关部分如下所示:
$ws->on(
json => sub {
my ( $ws, $msg ) = @_;
app->log->debug( 'Received from websocket:', Dumper( \$msg ) );
unless ($msg) {
app->log->debug('Received empty message? WTF?!');
return;
}
my $prompt = $msg->{cmd};
return unless $prompt;
app->log->debug( sprintf 'Received: `%s`', $prompt // '<empty??>' );
# my $loop = Mojo::IOLoop->singleton;
my $subprocess = Mojo::IOLoop::Subprocess->new;
app->log->debug("we are pid $$");
$subprocess->run(
sub {
my $sp = shift;
for my $cell ( 1 .. 3 ) {
app->log->debug(
"starting process for cell $cell in pid $$");
sleep(2);
app->log->debug("sending cell $cell to ws");
my $payload = {
type => 'ticket',
cell => $cell,
result => $cell % 2 ? 'OK' : 'NOK'
};
$sp->progress($payload);
}
},
sub {
my ( $sp, $err, @results ) = @_;
#$ws->send( { json => $payload } );
$ws->reply->exception($err) and return if $err;
app->log->debug('sending end of process ->websocket');
$ws->send( { json => { type => 'end' } } );
}
);
# Start event loop if necessary
$subprocess->on(
progress => sub {
my ( $subprocess, $payload ) = @_;
$ws->send( { json => $payload } );
}
);
$subprocess->ioloop->start unless $subprocess->ioloop->is_running;
}
);