开发者问题收集

实现由 GCS finalize 触发的发布到 pubsub 的云功能

2018-12-06
5528

我一直在尝试用 Python 编写和部署云函数。(由于文档混乱且更改速度相对较快,我放弃了 node.js)

它旨在将消息发布到 Pub/Sub 主题,当文件上传到 Google Cloud Bucket 完成(“完成”)时触发。

我用来部署该函数的代码是

gcloud functions deploy hello_gcs_generic --runtime python37 --trigger-resource bucketcfpubsub

我一直在尝试使用 Google 提供的此脚本

import time

from google.cloud import pubsub_v1

project_id = "bucketcfpubsub"
topic_name = "projects/bucketcfpubsub/topics/pubsub"

publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(project_id, topic_name)

def callback(message_future):
# When timeout is unspecified, the exception method waits indefinitely.
if message_future.exception(timeout=30):
    print('Publishing message on {} threw an Exception {}.'.format(
        topic_name, message_future.exception()))
else:
    print(message_future.result())

for n in range(1, 10):
    data = u'Message number {}'.format(n)
# Data must be a bytestring
    data = data.encode('utf-8')
# When you publish a message, the client returns a Future.
    message_future = publisher.publish(topic_path, data=data)
    message_future.add_done_callback(callback)

print('Published message IDs:')

# We must keep the main thread from exiting to allow it to process
# messages in the background.
while True:
    time.sleep(60)

我在 Google Cloud Console 中收到这些错误

ERROR: (gcloud.functions.deploy) OperationError: code=3, message=Function load error: Code in file main.py can't be loaded.
Detailed stack trace: Traceback (most recent call last):
  File "/env/local/lib/python3.7/site-packages/google/cloud/functions_v1beta2/worker.py", line 256, in check_or_load_user_function
    _function_handler.load_user_function()
  File "/env/local/lib/python3.7/site-packages/google/cloud/functions_v1beta2/worker.py", line 166, in load_user_function
    spec.loader.exec_module(main)
  File "<frozen importlib._bootstrap_external>", line 728, in exec_module
  File "<frozen importlib._bootstrap>", line 219, in _call_with_frames_removed
  File "<frozen importlib._bootstrap_external>", line 728, in exec_module
  File "<frozen importlib._bootstrap>", line 219, in _call_with_frames_removed
  File "/user_code/main.py", line 3, in <module>
    from google.cloud import pubsub_v1
ImportError: cannot import name 'pubsub_v1' from 'google.cloud' (unknown location)

按照 两篇 帖子,我从 helloworld 代码示例中复制了 requirements.txt,其中仅包含

google-cloud-error-reporting==0.30.0

并更新了其他云功能,如 bigquery、存储和日志记录。然后我收到这些错误:

ERROR: (gcloud.functions.deploy) OperationError: code=3, message=Function load error: Code in file main.py can't be loaded.
Detailed stack trace: Traceback (most recent call last):
  File "/env/local/lib/python3.7/site-packages/google/cloud/functions_v1beta2/worker.py", line 256, in check_or_load_user_function
    _function_handler.load_user_function()
  File "/env/local/lib/python3.7/site-packages/google/cloud/functions_v1beta2/worker.py", line 166, in load_user_function
    spec.loader.exec_module(main)
  File "<frozen importlib._bootstrap_external>", line 728, in exec_module
  File "<frozen importlib._bootstrap>", line 219, in _call_with_frames_removed
  File "/user_code/main.py", line 3, in <module>
from google.cloud import pubsub_v1`

并且我还发现[此线程]( ImportError: 无法从“google.cloud”(未知位置)导入名称“pubsub_v1” 但我真的不明白解决方案是什么,我尝试用 google-cloud-pubsub==0.38.0 替换 pubsub_v1,但没有帮助。我收到此错误:

Deploying function (may take a while - up to 2 minutes)...failed.
ERROR: (gcloud.functions.deploy) OperationError: code=3, message=Function load error: Code in file main.py can't be loaded.
Detailed stack trace: Traceback (most recent call last):
  File "/env/local/lib/python3.7/site-packages/google/cloud/functions_v1beta2/worker.py", line 256, in check_or_load_user_function
    _function_handler.load_user_function()
  File "/env/local/lib/python3.7/site-packages/google/cloud/functions_v1beta2/worker.py", line 166, in load_user_function
    spec.loader.exec_module(main)
  File "<frozen importlib._bootstrap_external>", line 724, in exec_module
  File "<frozen importlib._bootstrap_external>", line 860, in get_code
  File "<frozen importlib._bootstrap_external>", line 791, in source_to_code
  File "<frozen importlib._bootstrap>", line 219, in _call_with_frames_removed
  File "/user_code/main.py", line 3

此外,如果一旦 Google 将 pubsub 更新到新版本,代码就会中断,这似乎不是一个可持续的解决方案?

所以我是一个初学者,很迷茫,但我希望本文档可以帮助你们帮助我。

更新:

似乎 pubsub 和 pubsub_v1 都可以使用,但不确定它们有什么区别。

@dustin 我执行了 pip install -r requirements.txt,最终与您提供的内容匹配。 我还注意到将函数部署为“hello-gcs-generic”时出现错误,应将其更改为“callback”。

python 代码现在在本地运行良好,但使用上述代码(OP 中的第一个代码行)将其部署到云时始终返回此错误

ERROR: (gcloud.functions.deploy) OperationError: code=3, messa
ge=Function load error: Error: function load attempt timed out
.
2个回答

您需要将 google-cloud-pubsub 添加到您的 requirements.txt 文件中,而不是添加到您的 main.py 文件中。它应如下所示:

google-cloud-error-reporting==0.30.0
google-cloud-pubsub==0.38.0
Dustin Ingram
2018-12-06

有一个 更简单的 Python 快速入门示例 可以满足您的需求。;-)

您引用的示例更高级。它展示了如何发布带有错误处理的消息。高级示例中的 while(True): sleep(60) 行用于保持主线程处于活动状态,除非发出 Ctrl+C 或其等效命令来停止程序运行。此 sleep 函数存在的原因是为了让我们可以等待发布期货的回调调用完成,而不是在发布调用后立即退出程序。同样,对于您尝试学习使用 Cloud Pub/Sub 和 Cloud Functions 进行的操作而言,这可能有点太复杂了。我建议避开高级示例并使用快速入门示例。

from google.cloud import pubsub_v1

# TODO project_id = "Your Google Cloud Project ID"
# TODO topic_name = "Your Pub/Sub topic name"

publisher = pubsub_v1.PublisherClient()
# The `topic_path` method creates a fully qualified identifier
# in the form `projects/{project_id}/topics/{topic_name}`
topic_path = publisher.topic_path(project_id, topic_name)

for n in range(1, 10):
    data = u'Message number {}'.format(n)
    # Data must be a bytestring
    data = data.encode('utf-8')
    # When you publish a message, the client returns a future.
    future = publisher.publish(topic_path, data=data)
    print('Published {} of message ID {}.'.format(data, future.result()))

print('Published messages.')

tianzi
2019-06-14