实现由 GCS finalize 触发的发布到 pubsub 的云功能
我一直在尝试用 Python 编写和部署云函数。(由于文档混乱且更改速度相对较快,我放弃了 node.js)
它旨在将消息发布到 Pub/Sub 主题,当文件上传到 Google Cloud Bucket 完成(“完成”)时触发。
我用来部署该函数的代码是
gcloud functions deploy hello_gcs_generic --runtime python37 --trigger-resource bucketcfpubsub
我一直在尝试使用 Google 提供的此脚本
import time
from google.cloud import pubsub_v1
project_id = "bucketcfpubsub"
topic_name = "projects/bucketcfpubsub/topics/pubsub"
publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(project_id, topic_name)
def callback(message_future):
# When timeout is unspecified, the exception method waits indefinitely.
if message_future.exception(timeout=30):
print('Publishing message on {} threw an Exception {}.'.format(
topic_name, message_future.exception()))
else:
print(message_future.result())
for n in range(1, 10):
data = u'Message number {}'.format(n)
# Data must be a bytestring
data = data.encode('utf-8')
# When you publish a message, the client returns a Future.
message_future = publisher.publish(topic_path, data=data)
message_future.add_done_callback(callback)
print('Published message IDs:')
# We must keep the main thread from exiting to allow it to process
# messages in the background.
while True:
time.sleep(60)
我在 Google Cloud Console 中收到这些错误
ERROR: (gcloud.functions.deploy) OperationError: code=3, message=Function load error: Code in file main.py can't be loaded.
Detailed stack trace: Traceback (most recent call last):
File "/env/local/lib/python3.7/site-packages/google/cloud/functions_v1beta2/worker.py", line 256, in check_or_load_user_function
_function_handler.load_user_function()
File "/env/local/lib/python3.7/site-packages/google/cloud/functions_v1beta2/worker.py", line 166, in load_user_function
spec.loader.exec_module(main)
File "<frozen importlib._bootstrap_external>", line 728, in exec_module
File "<frozen importlib._bootstrap>", line 219, in _call_with_frames_removed
File "<frozen importlib._bootstrap_external>", line 728, in exec_module
File "<frozen importlib._bootstrap>", line 219, in _call_with_frames_removed
File "/user_code/main.py", line 3, in <module>
from google.cloud import pubsub_v1
ImportError: cannot import name 'pubsub_v1' from 'google.cloud' (unknown location)
按照 这 两篇 帖子,我从 helloworld 代码示例中复制了 requirements.txt,其中仅包含
google-cloud-error-reporting==0.30.0
并更新了其他云功能,如 bigquery、存储和日志记录。然后我收到这些错误:
ERROR: (gcloud.functions.deploy) OperationError: code=3, message=Function load error: Code in file main.py can't be loaded.
Detailed stack trace: Traceback (most recent call last):
File "/env/local/lib/python3.7/site-packages/google/cloud/functions_v1beta2/worker.py", line 256, in check_or_load_user_function
_function_handler.load_user_function()
File "/env/local/lib/python3.7/site-packages/google/cloud/functions_v1beta2/worker.py", line 166, in load_user_function
spec.loader.exec_module(main)
File "<frozen importlib._bootstrap_external>", line 728, in exec_module
File "<frozen importlib._bootstrap>", line 219, in _call_with_frames_removed
File "/user_code/main.py", line 3, in <module>
from google.cloud import pubsub_v1`
并且我还发现[此线程]( ImportError: 无法从“google.cloud”(未知位置)导入名称“pubsub_v1” 但我真的不明白解决方案是什么,我尝试用 google-cloud-pubsub==0.38.0 替换 pubsub_v1,但没有帮助。我收到此错误:
Deploying function (may take a while - up to 2 minutes)...failed.
ERROR: (gcloud.functions.deploy) OperationError: code=3, message=Function load error: Code in file main.py can't be loaded.
Detailed stack trace: Traceback (most recent call last):
File "/env/local/lib/python3.7/site-packages/google/cloud/functions_v1beta2/worker.py", line 256, in check_or_load_user_function
_function_handler.load_user_function()
File "/env/local/lib/python3.7/site-packages/google/cloud/functions_v1beta2/worker.py", line 166, in load_user_function
spec.loader.exec_module(main)
File "<frozen importlib._bootstrap_external>", line 724, in exec_module
File "<frozen importlib._bootstrap_external>", line 860, in get_code
File "<frozen importlib._bootstrap_external>", line 791, in source_to_code
File "<frozen importlib._bootstrap>", line 219, in _call_with_frames_removed
File "/user_code/main.py", line 3
此外,如果一旦 Google 将 pubsub 更新到新版本,代码就会中断,这似乎不是一个可持续的解决方案?
所以我是一个初学者,很迷茫,但我希望本文档可以帮助你们帮助我。
更新:
似乎 pubsub 和 pubsub_v1 都可以使用,但不确定它们有什么区别。
@dustin 我执行了 pip install -r requirements.txt,最终与您提供的内容匹配。 我还注意到将函数部署为“hello-gcs-generic”时出现错误,应将其更改为“callback”。
python 代码现在在本地运行良好,但使用上述代码(OP 中的第一个代码行)将其部署到云时始终返回此错误
ERROR: (gcloud.functions.deploy) OperationError: code=3, messa
ge=Function load error: Error: function load attempt timed out
.
您需要将
google-cloud-pubsub
添加到您的
requirements.txt
文件中,而不是添加到您的
main.py
文件中。它应如下所示:
google-cloud-error-reporting==0.30.0
google-cloud-pubsub==0.38.0
有一个 更简单的 Python 快速入门示例 可以满足您的需求。;-)
您引用的示例更高级。它展示了如何发布带有错误处理的消息。高级示例中的
while(True): sleep(60)
行用于保持主线程处于活动状态,除非发出
Ctrl+C
或其等效命令来停止程序运行。此
sleep
函数存在的原因是为了让我们可以等待发布期货的回调调用完成,而不是在发布调用后立即退出程序。同样,对于您尝试学习使用 Cloud Pub/Sub 和 Cloud Functions 进行的操作而言,这可能有点太复杂了。我建议避开高级示例并使用快速入门示例。
from google.cloud import pubsub_v1
# TODO project_id = "Your Google Cloud Project ID"
# TODO topic_name = "Your Pub/Sub topic name"
publisher = pubsub_v1.PublisherClient()
# The `topic_path` method creates a fully qualified identifier
# in the form `projects/{project_id}/topics/{topic_name}`
topic_path = publisher.topic_path(project_id, topic_name)
for n in range(1, 10):
data = u'Message number {}'.format(n)
# Data must be a bytestring
data = data.encode('utf-8')
# When you publish a message, the client returns a future.
future = publisher.publish(topic_path, data=data)
print('Published {} of message ID {}.'.format(data, future.result()))
print('Published messages.')