开发者问题收集

尝试理解作为 api 网关 websockets 连接逻辑一部分的 lambda 函数

2020-11-09
1186

TLDR:如何将 mqtt 请求中的短负载发送到 aws iot 到 aws lambda,该请求通过 apigateway 与在 linux 中本地运行的 electron 应用程序建立开放连接。

我有一个 esp8266,其代码如下 init.js 此代码成功将其消息发送到 aws iot,并设置规则以触发名为 sendmessage 的 lambda。现在,此 sendmessage lambda 通过 websockets 连接到我 linux 机器上的本地 Electon 应用程序。我能够通过 websockets 将消息从 Electron 应用程序发送到 api 网关 wss url。我按照 此处 的示例操作,使用 api 网关和 aws lambda(其中一个是 sendmessage lambda)设置所有 websocket。

load("api_config.js");
load("api_gpio.js");
load("api_mqtt.js");
load("api_sys.js");
load("api_timer.js");

let pin = 0;
GPIO.set_button_handler(
  pin,
  GPIO.PULL_UP,
  GPIO.INT_EDGE_NEG,
  50,
  function (x) {
    let res = MQTT.pub(
      "mOS/topic1",
      JSON.stringify({ action: "sendmessage", data: "pushed" }),
      1
    );

    print(res);
    print("Published:", res ? "yes" : "no");
    let connected = MQTT.isConnected();

    print(connected);
  },
  true
);
print("Flash button is configured on GPIO pin", pin);
print("Press the flash button now!");

我知道从 iot 到 sendmessage lambda 的消息需要是 websockets 消息,但它只有最小的对象 {"action":"sendmessage","data":"hello world"> 它缺少 websocket 所需的大量信息。但我不需要 aws iot 和 sendmessage lambda 之间的 websocket 连接,我需要它从 IOT 转到 sendmessage lambda,并且使用最小的有效负载 转到 sendmessage lambda。电子应用程序通过 websockets 从 IOT 接收有效载荷。

SENDMESSAGE LAMBDA

// Copyright 2018-2020Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: MIT-0

const AWS = require('aws-sdk');

const ddb = new AWS.DynamoDB.DocumentClient({ apiVersion: '2012-08-10', region: process.env.AWS_REGION });

const { TABLE_NAME } = process.env;

exports.handler = async event => {
  let connectionData;
  
  try {
    connectionData = await ddb.scan({ TableName: TABLE_NAME, ProjectionExpression: 'connectionId' }).promise();
  } catch (e) {
    return { statusCode: 500, body: e.stack };
  }
  
  const apigwManagementApi = new AWS.ApiGatewayManagementApi({
    apiVersion: '2018-11-29',
    endpoint: event.requestContext.domainName + '/' + event.requestContext.stage
  });
  
  const postData = JSON.parse(event.body).data;
  
  const postCalls = connectionData.Items.map(async ({ connectionId }) => {
    try {
      await apigwManagementApi.postToConnection({ ConnectionId: connectionId, Data: postData }).promise();
    } catch (e) {
      if (e.statusCode === 410) {
        console.log(`Found stale connection, deleting ${connectionId}`);
        await ddb.delete({ TableName: TABLE_NAME, Key: { connectionId } }).promise();
      } else {
        throw e;
      }
    }
  });
  
  try {
    await Promise.all(postCalls);
  } catch (e) {
    return { statusCode: 500, body: e.stack };
  }

  return { statusCode: 200, body: 'Data sent.' };
};

onconnect lambda

// SPDX-License-Identifier: MIT-0

const AWS = require('aws-sdk');

const ddb = new AWS.DynamoDB.DocumentClient({ apiVersion: '2012-08-10', region: process.env.AWS_REGION });

exports.handler = async event => {
  const putParams = {
    TableName: process.env.TABLE_NAME,
    Item: {
      connectionId: event.requestContext.connectionId
    }
  };

  try {
    await ddb.put(putParams).promise();
  } catch (err) {
    return { statusCode: 500, body: 'Failed to connect: ' + JSON.stringify(err) };
  }

  return { statusCode: 200, body: 'Connected.' };
};

ondisconnect lambda

// Copyright 2018-2020 Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: MIT-0

// https://docs.aws.amazon.com/apigateway/latest/developerguide/apigateway-websocket-api-route-keys-connect-disconnect.html
// The $disconnect route is executed after the connection is closed.
// The connection can be closed by the server or by the client. As the connection is already closed when it is executed, 
// $disconnect is a best-effort event. 
// API Gateway will try its best to deliver the $disconnect event to your integration, but it cannot guarantee delivery.

const AWS = require('aws-sdk');

const ddb = new AWS.DynamoDB.DocumentClient({ apiVersion: '2012-08-10', region: process.env.AWS_REGION });

exports.handler = async event => {
  const deleteParams = {
    TableName: process.env.TABLE_NAME,
    Key: {
      connectionId: event.requestContext.connectionId
    }
  };

  try {
    await ddb.delete(deleteParams).promise();
  } catch (err) {
    return { statusCode: 500, body: 'Failed to disconnect: ' + JSON.stringify(err) };
  }

  return { statusCode: 200, body: 'Disconnected.' };
};

在我的电子应用程序中,我有以下代码来测试 websocket,但我收到了禁止错误。但是使用 wscat 它可以工作...

"use strict";
const { app, BrowserWindow } = require("electron");
const { Notification } = require("electron");
const WebSocket = require("ws");

function createWindow() {
  const win = new BrowserWindow({
    width: 800,
    height: 600,
    webPreferences: {
      nodeIntegration: true,
    },
  });

  win.loadFile("index.html");
  win.webContents.openDevTools();
}

app.whenReady().then(createWindow);

app.on("window-all-closed", () => {
  if (process.platform !== "darwin") {
    app.quit();
  }
});

app.on("activate", () => {
  if (BrowserWindow.getAllWindows().length === 0) {
    createWindow();
  }
});

// Tell express to use the body-parser middleware and to not parse extended bodies

const url = "wss://random.execute-api.us-east-1.amazonaws.com/Prod";
const connection = new WebSocket(url);

connection.onopen = () => {
  connection.send("hello world");
};

connection.onmessage = (e) => {
  console.log(e.data);
};

connection.onerror = (error) => {
  console.log(`WebSocket error: ${error}`);
};

function showNotification() {
  const notification = {
    title: "Basic Notification",
    body: `notification`,
  };

  new Notification(notification).show();
}

app.whenReady().then(createWindow).then(showNotification);

我现在设置了我的 mqtt 事件以将相同的数据发送到 lambda,但我在 lambda 中收到以下错误

{
    "errorType": "TypeError",
    "errorMessage": "Cannot read property 'domainName' of undefined",
    "stack": [
        "TypeError: Cannot read property 'domainName' of undefined",
        "    at Runtime.exports.handler (/var/task/app.js:29:28)",
        "    at processTicksAndRejections (internal/process/task_queues.js:97:5)"
    ]
}

更新: 这是我的最后一个 lambda,我在从 IOT 收到事件后向 wss 地址发送了一条消息,但它不起作用,控制台记录了事件,但没有触发任何 ws.on 函数

// const axios = require('axios')
// const url = 'http://checkip.amazonaws.com/';
const WebSocket = require("ws");
let response;

/**
 *
 * Event doc: https://docs.aws.amazon.com/apigateway/latest/developerguide/set-up-lambda-proxy-integrations.html#api-gateway-simple-proxy-for-lambda-input-format
 * @param {Object} event - API Gateway Lambda Proxy Input Format
 *
 * Context doc: https://docs.aws.amazon.com/lambda/latest/dg/nodejs-prog-model-context.html
 * @param {Object} context
 *
 * Return doc: https://docs.aws.amazon.com/apigateway/latest/developerguide/set-up-lambda-proxy-integrations.html
 * @returns {Object} object - API Gateway Lambda Proxy Output Format
 *
 */
exports.lambdaHandler = async (event, context) => {
  try {
    // const ret = await axios(url);

    console.log(event);

    const url = "wss://obsf.execute-api.us-east-1.amazonaws.com/Prod";
    const ws = new WebSocket(url);

    var test = { action: "sendmessage", data: "hello world from button" };

    ws.on("open", function open() {
      ws.send(JSON.stringify(test));
    });

    ws.on("message", function incoming(data) {
      console.log(data);
    });

    response = {
      statusCode: 200,
      body: JSON.stringify({
        message: "hello world",
        // location: ret.data.trim()
      }),
    };
  } catch (err) {
    console.log(err);
    return err;
  }

  return response;
};

更新:最后我试过了,我甚至没有得到一个错误,我知道 ws 在那里,因为如果我控制台它,它会返回一个带有一堆函数的大对象

    console.log(ws); this returns a large object

    ws.on("error", console.error); this does nothing
2个回答

您似乎设置了 1 个 lambda 来处理 2 个触发源,一个是 IoT 服务,另一个是 API Gateway Websocket。由于您使用 1 个 lambda,因此您必须处理请求来自以下源的情况:

  1. 虽然从 API Gateway 触发请求时 event.requestContext 可用,但从 IoT 服务触发请求时不可用(在此处检查 IoT 事件对象 https://docs.aws.amazon.com/lambda/latest/dg/services-iotevents.html )。因此,您遇到的错误(即 无法读取未定义的属性“domainName” )与此有关。您应该关闭来自 IoT 服务的 lambda 触发器,或者在请求来自 IoT 服务时处理该请求。
  2. 我不确定禁止错误,但它更像是您向 API 网关 WS 发送了非结构化消息,它应该是 connection.send(JSON.stringify({ action: "sendmessage", data: "hello world" })); ,而不是 connection.send("hello world");

根据帖子更新编辑

I know ws is there because if I console it it returns a big object with a bunch of functions

Lambda 函数实际上不是服务器,它是一个实例 Node 环境(这就是它被称为 FUNCTION 的原因),Lambda 函数不像您认为的普通 Nodejs 应用程序那样工作,它的容器(节点环境)通常会停止(或冻结)每当其工作完成时,因此您无法像普通服务器一样保持其容器处于活动状态。 这就是虽然您可以控制台记录 Websocket 对象但无法使其保持活动状态的原因,每当您返回/响应时,NodeJS 容器已经停止。

由于您无法使用 Websocket 对象在 Lambda 中打开 WS 连接,因此 Amazon 提供了一种通过 API 网关执行此操作的方法。我们使用 API 网关 Websocket 的方式与普通服务器也不同,它将类似于:

  • 用户 -> 请求 API 网关连接到 websocket -> 调用 Lambda 1(onconnect 函数)
  • 用户 -> 请求 API 网关通过 Websocket 发送消息 -> 调用 Lambda 2(sendmessage 函数)
  • 用户 -> 请求 API 网关关闭连接 ->调用 Lambda 3 (ondisconnect 函数)

上述 3 个设置在 API Gateway 中配置 ( https://docs.aws.amazon.com/apigateway/latest/developerguide/apigateway-websocket-api-integrations.html ),3 个函数 onconnectsendmessageondisconnect 的逻辑可以根据我们的设计方式在 1 个 lambda 或 3 个 lambda 函数中处理,我检查了您的 3 个 lambda 函数,看起来没问题。

我知道您想使用 IoT,但我不确定为什么。您应该先测试您的 Websocket API,不要使用任何与 IoT 相关的东西。如果您能告诉您想要在这里实现什么就更好了,因为物联网更像是一个发布/订阅/消息传递渠道,我认为没有必要在这里使用它。

Hoang Dao
2020-11-16

我发布此信息只是为了展示我如何解决该问题。

我创建了一个节点 express ec2 服务器来处理来自 iot 设备的请求,然后将其传递给我的 wss 服务器,这是其中正常工作的主要部分

这就是我在这里所做的 Express node js webssockets 正在从 websocket 服务器接收消息但无法发送它们

这是其中正常工作的主要部分

app.post('/doorbell', (req, res) => {
  var hmm1 = { action: 'sendmessage', data: 'hello world from post request' };

  if (ws.readyState === WebSocket.OPEN) {
    ws.send(JSON.stringify(hmm1));
    res.send('heya hey');
  } else {
    res.send('The WebSocket connection is not open');
  }
});
Anders Kitson
2020-11-17