尝试理解作为 api 网关 websockets 连接逻辑一部分的 lambda 函数
TLDR:如何将 mqtt 请求中的短负载发送到 aws iot 到 aws lambda,该请求通过 apigateway 与在 linux 中本地运行的 electron 应用程序建立开放连接。
我有一个 esp8266,其代码如下
init.js
此代码成功将其消息发送到 aws iot,并设置规则以触发名为 sendmessage 的 lambda。现在,此 sendmessage lambda 通过 websockets 连接到我 linux 机器上的本地 Electon 应用程序。我能够通过 websockets 将消息从 Electron 应用程序发送到 api 网关 wss url。我按照
此处
的示例操作,使用 api 网关和 aws lambda(其中一个是 sendmessage lambda)设置所有 websocket。
load("api_config.js");
load("api_gpio.js");
load("api_mqtt.js");
load("api_sys.js");
load("api_timer.js");
let pin = 0;
GPIO.set_button_handler(
pin,
GPIO.PULL_UP,
GPIO.INT_EDGE_NEG,
50,
function (x) {
let res = MQTT.pub(
"mOS/topic1",
JSON.stringify({ action: "sendmessage", data: "pushed" }),
1
);
print(res);
print("Published:", res ? "yes" : "no");
let connected = MQTT.isConnected();
print(connected);
},
true
);
print("Flash button is configured on GPIO pin", pin);
print("Press the flash button now!");
我知道从 iot 到 sendmessage lambda 的消息需要是 websockets 消息,但它只有最小的对象
{"action":"sendmessage","data":"hello world">
它缺少 websocket 所需的大量信息。但我不需要 aws iot 和 sendmessage lambda 之间的 websocket 连接,我需要它从 IOT 转到 sendmessage lambda,并且使用最小的有效负载 转到 sendmessage lambda。电子应用程序通过 websockets 从 IOT 接收有效载荷。
SENDMESSAGE LAMBDA
// Copyright 2018-2020Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: MIT-0
const AWS = require('aws-sdk');
const ddb = new AWS.DynamoDB.DocumentClient({ apiVersion: '2012-08-10', region: process.env.AWS_REGION });
const { TABLE_NAME } = process.env;
exports.handler = async event => {
let connectionData;
try {
connectionData = await ddb.scan({ TableName: TABLE_NAME, ProjectionExpression: 'connectionId' }).promise();
} catch (e) {
return { statusCode: 500, body: e.stack };
}
const apigwManagementApi = new AWS.ApiGatewayManagementApi({
apiVersion: '2018-11-29',
endpoint: event.requestContext.domainName + '/' + event.requestContext.stage
});
const postData = JSON.parse(event.body).data;
const postCalls = connectionData.Items.map(async ({ connectionId }) => {
try {
await apigwManagementApi.postToConnection({ ConnectionId: connectionId, Data: postData }).promise();
} catch (e) {
if (e.statusCode === 410) {
console.log(`Found stale connection, deleting ${connectionId}`);
await ddb.delete({ TableName: TABLE_NAME, Key: { connectionId } }).promise();
} else {
throw e;
}
}
});
try {
await Promise.all(postCalls);
} catch (e) {
return { statusCode: 500, body: e.stack };
}
return { statusCode: 200, body: 'Data sent.' };
};
onconnect lambda
// SPDX-License-Identifier: MIT-0
const AWS = require('aws-sdk');
const ddb = new AWS.DynamoDB.DocumentClient({ apiVersion: '2012-08-10', region: process.env.AWS_REGION });
exports.handler = async event => {
const putParams = {
TableName: process.env.TABLE_NAME,
Item: {
connectionId: event.requestContext.connectionId
}
};
try {
await ddb.put(putParams).promise();
} catch (err) {
return { statusCode: 500, body: 'Failed to connect: ' + JSON.stringify(err) };
}
return { statusCode: 200, body: 'Connected.' };
};
ondisconnect lambda
// Copyright 2018-2020 Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: MIT-0
// https://docs.aws.amazon.com/apigateway/latest/developerguide/apigateway-websocket-api-route-keys-connect-disconnect.html
// The $disconnect route is executed after the connection is closed.
// The connection can be closed by the server or by the client. As the connection is already closed when it is executed,
// $disconnect is a best-effort event.
// API Gateway will try its best to deliver the $disconnect event to your integration, but it cannot guarantee delivery.
const AWS = require('aws-sdk');
const ddb = new AWS.DynamoDB.DocumentClient({ apiVersion: '2012-08-10', region: process.env.AWS_REGION });
exports.handler = async event => {
const deleteParams = {
TableName: process.env.TABLE_NAME,
Key: {
connectionId: event.requestContext.connectionId
}
};
try {
await ddb.delete(deleteParams).promise();
} catch (err) {
return { statusCode: 500, body: 'Failed to disconnect: ' + JSON.stringify(err) };
}
return { statusCode: 200, body: 'Disconnected.' };
};
在我的电子应用程序中,我有以下代码来测试 websocket,但我收到了禁止错误。但是使用 wscat 它可以工作...
"use strict";
const { app, BrowserWindow } = require("electron");
const { Notification } = require("electron");
const WebSocket = require("ws");
function createWindow() {
const win = new BrowserWindow({
width: 800,
height: 600,
webPreferences: {
nodeIntegration: true,
},
});
win.loadFile("index.html");
win.webContents.openDevTools();
}
app.whenReady().then(createWindow);
app.on("window-all-closed", () => {
if (process.platform !== "darwin") {
app.quit();
}
});
app.on("activate", () => {
if (BrowserWindow.getAllWindows().length === 0) {
createWindow();
}
});
// Tell express to use the body-parser middleware and to not parse extended bodies
const url = "wss://random.execute-api.us-east-1.amazonaws.com/Prod";
const connection = new WebSocket(url);
connection.onopen = () => {
connection.send("hello world");
};
connection.onmessage = (e) => {
console.log(e.data);
};
connection.onerror = (error) => {
console.log(`WebSocket error: ${error}`);
};
function showNotification() {
const notification = {
title: "Basic Notification",
body: `notification`,
};
new Notification(notification).show();
}
app.whenReady().then(createWindow).then(showNotification);
我现在设置了我的 mqtt 事件以将相同的数据发送到 lambda,但我在 lambda 中收到以下错误
{
"errorType": "TypeError",
"errorMessage": "Cannot read property 'domainName' of undefined",
"stack": [
"TypeError: Cannot read property 'domainName' of undefined",
" at Runtime.exports.handler (/var/task/app.js:29:28)",
" at processTicksAndRejections (internal/process/task_queues.js:97:5)"
]
}
更新: 这是我的最后一个 lambda,我在从 IOT 收到事件后向 wss 地址发送了一条消息,但它不起作用,控制台记录了事件,但没有触发任何 ws.on 函数
// const axios = require('axios')
// const url = 'http://checkip.amazonaws.com/';
const WebSocket = require("ws");
let response;
/**
*
* Event doc: https://docs.aws.amazon.com/apigateway/latest/developerguide/set-up-lambda-proxy-integrations.html#api-gateway-simple-proxy-for-lambda-input-format
* @param {Object} event - API Gateway Lambda Proxy Input Format
*
* Context doc: https://docs.aws.amazon.com/lambda/latest/dg/nodejs-prog-model-context.html
* @param {Object} context
*
* Return doc: https://docs.aws.amazon.com/apigateway/latest/developerguide/set-up-lambda-proxy-integrations.html
* @returns {Object} object - API Gateway Lambda Proxy Output Format
*
*/
exports.lambdaHandler = async (event, context) => {
try {
// const ret = await axios(url);
console.log(event);
const url = "wss://obsf.execute-api.us-east-1.amazonaws.com/Prod";
const ws = new WebSocket(url);
var test = { action: "sendmessage", data: "hello world from button" };
ws.on("open", function open() {
ws.send(JSON.stringify(test));
});
ws.on("message", function incoming(data) {
console.log(data);
});
response = {
statusCode: 200,
body: JSON.stringify({
message: "hello world",
// location: ret.data.trim()
}),
};
} catch (err) {
console.log(err);
return err;
}
return response;
};
更新:最后我试过了,我甚至没有得到一个错误,我知道 ws 在那里,因为如果我控制台它,它会返回一个带有一堆函数的大对象
console.log(ws); this returns a large object
ws.on("error", console.error); this does nothing
您似乎设置了 1 个 lambda 来处理 2 个触发源,一个是 IoT 服务,另一个是 API Gateway Websocket。由于您使用 1 个 lambda,因此您必须处理请求来自以下源的情况:
-
虽然从 API Gateway 触发请求时
event.requestContext
可用,但从 IoT 服务触发请求时不可用(在此处检查 IoT 事件对象 https://docs.aws.amazon.com/lambda/latest/dg/services-iotevents.html )。因此,您遇到的错误(即无法读取未定义的属性“domainName”
)与此有关。您应该关闭来自 IoT 服务的 lambda 触发器,或者在请求来自 IoT 服务时处理该请求。 -
我不确定禁止错误,但它更像是您向 API 网关 WS 发送了非结构化消息,它应该是
connection.send(JSON.stringify({ action: "sendmessage", data: "hello world" }));
,而不是connection.send("hello world");
根据帖子更新编辑 :
I know ws is there because if I console it it returns a big object with a bunch of functions
Lambda 函数实际上不是服务器,它是一个实例 Node 环境(这就是它被称为 FUNCTION 的原因),Lambda 函数不像您认为的普通 Nodejs 应用程序那样工作,它的容器(节点环境)通常会停止(或冻结)每当其工作完成时,因此您无法像普通服务器一样保持其容器处于活动状态。 这就是虽然您可以控制台记录 Websocket 对象但无法使其保持活动状态的原因,每当您返回/响应时,NodeJS 容器已经停止。
由于您无法使用 Websocket 对象在 Lambda 中打开 WS 连接,因此 Amazon 提供了一种通过 API 网关执行此操作的方法。我们使用 API 网关 Websocket 的方式与普通服务器也不同,它将类似于:
- 用户 -> 请求 API 网关连接到 websocket -> 调用 Lambda 1(onconnect 函数)
- 用户 -> 请求 API 网关通过 Websocket 发送消息 -> 调用 Lambda 2(sendmessage 函数)
- 用户 -> 请求 API 网关关闭连接 ->调用 Lambda 3 (ondisconnect 函数)
上述 3 个设置在 API Gateway 中配置 (
https://docs.aws.amazon.com/apigateway/latest/developerguide/apigateway-websocket-api-integrations.html
),3 个函数
onconnect
、
sendmessage
、
ondisconnect
的逻辑可以根据我们的设计方式在 1 个 lambda 或 3 个 lambda 函数中处理,我检查了您的 3 个 lambda 函数,看起来没问题。
我知道您想使用 IoT,但我不确定为什么。您应该先测试您的 Websocket API,不要使用任何与 IoT 相关的东西。如果您能告诉您想要在这里实现什么就更好了,因为物联网更像是一个发布/订阅/消息传递渠道,我认为没有必要在这里使用它。
我发布此信息只是为了展示我如何解决该问题。
我创建了一个节点 express ec2 服务器来处理来自 iot 设备的请求,然后将其传递给我的 wss 服务器,这是其中正常工作的主要部分
这就是我在这里所做的 Express node js webssockets 正在从 websocket 服务器接收消息但无法发送它们
这是其中正常工作的主要部分
app.post('/doorbell', (req, res) => {
var hmm1 = { action: 'sendmessage', data: 'hello world from post request' };
if (ws.readyState === WebSocket.OPEN) {
ws.send(JSON.stringify(hmm1));
res.send('heya hey');
} else {
res.send('The WebSocket connection is not open');
}
});