对象存储与函数计算集成后,对象存储事件能触发相关函数执行,实现对对象存储中的数据的自定义处理。
背景信息
对象存储和函数计算通过对象存储触发器实现无缝集成,可以编写函数对对象存储事件进行自定义处理,当对象存储捕获到指定类型的事件后,对象存储事件触发相应的函数执行。例如,可以设置函数来处理PutObject事件,当上传图片到对象存储后,相关联的函数会自动被触发来处理该图片。
对象存储和函数计算集成后,可以自由地调用各种函数处理图像或音频数据,再把结果写回到多种存储服务中。整个架构中,只需要专注于函数逻辑的编写,系统将以实时的、可靠的、大规模并行的方式处理海量的数据。
对象存储事件定义
当对象存储系统捕获到相关事件后,会将事件信息编码为JSON字符串,传递给事件处理函数。
已支持的对象存储事件定义如下表所示。
事件类型 | 事件名称 | 说明 |
---|---|---|
ObjectCreated | s3:ObjectCreated:Put | 原子上传对象到Bucket。 |
s3:ObjectCreated:Post | Post上传对象到Bucket。 | - |
s3:ObjectCreated:CompleteMultipartUpload | 分段上传对象到Bucket。 | - |
s3:ObjectCreated:Copy | 拷贝上传。 | - |
ObjectRemoved | s3:ObjectRemoved:Delete | 删除Bucket中对象。 |
s3:ObjectRemoved:MultiDelete | 批量删除Bucket中对象。 | - |
s3:ObjectRemoved:DeleteMarkerCreated | 不指定版本号删除多版本Bucket中对象。 | - |
ObjectRestore | s3:ObjectRestore:Post | 对象解冻。 |
触发器触发规则
避免循环触发
使用对象存储触发器时,请注意避免循环触发。例如,一个典型的循环触发场景是对象存储的某个bucket上传文件事件触发函数执行,此函数执行完成后又生成了一个或多个文件再写回到对象存储的bucket里,这个写入动作又触发了函数执行,形成了链状循环。
为了避免循环触发函数产生不必要的费用,建议配置文件前缀或文件后缀,例如将触发函数的文件的文件前缀设置为 src
,函数执行完成后生成文件的文件前缀设置为 dst
,生成的文件将不会再次触发函数。如果不设置文件前缀和文件后缀,表示匹配任意文件前缀和文件后缀。
配置对象存储触发器
创建触发器
- 登录函数计算控制台,在左侧导航栏,单击函数。
- 在顶部菜单栏,选择地域,然后在函数页面,单击目标函数。
- 在函数配置页面,选择配置页签,在左侧导航栏,单击触发器,然后单击创建触发器。
- 在创建触发器面板,填写相关信息,然后单击确定。
配置项说明如下。
配置项 | 操作 | 示例 |
---|---|---|
触发器类型 | 选择对象存储触发器。 | 对象存储触发器 |
名称 | 填写自定义的触发器名称。 | zos-trigger |
版本或别名 | 默认值为LATEST。 | LATEST |
Bucket 名称 | 选择已创建的对象存储Bucket。 | testbucket |
文件前缀 | 输入要匹配的文件名称的前缀。建议配置文件前缀和后缀,避免触发事件嵌套循环触发引起额外费用。 | source |
文件后缀 | 输入要匹配的文件名称的后缀。强烈建议配置前缀和后缀,避免触发事件嵌套循环触发引起额外费用。 | png |
事件类型 | 选择一个或多个触发事件。关于对象存储ZOS的事件类型,请参见对象存储事件定义。 | s3:ObjectCreated:Put ,s3:ObjectCreated:Post ,s3:ObjectCreated:CompleteMultipartUpload |
调用方式 | 选择函数调用方式,默认为同步调用。 同步调用:事件触发函数执行,等待函数调用完成后,函数计算返回执行结果。 异步调用:事件触发函数执行后,函数计算立即返回响应结果并且确保函数至少被成功执行一次,但不会返回具体执行结果。适用于调度延时较长的函数。 |
同步调用 |
开通事件上报
并非所有Bucket都会主动上报事件,需要通过对象存储产品的Openapi打开指定Bucket的事件上报,方法步骤如下:
-
Python调用openapi示例
-
结合上述示例,执行如下代码,打开指定存储桶的事件上报开关
def main(): params = { "regionID": "bb9fdb42056f11eda1610242ac110002", # 华东1地域ID "bucket": "您的桶名称", "bucketEventBridgeEnabled": True } result=post("https://zos-global.ctapi.ctyun.cn/v4/oss/put-bucket-event-bridge","", params) print(result) if __name__ == "__main__": main()
-
接口返回以下内容即可
{"message": "SUCCESS", "description": "成功", "statusCode": 800}
配置函数入口参数
对象存储触发器事件源会以CloudEvents模板作为输入参数传递给函数,可以手动将Event传给函数模拟触发事件。
Event是函数计算的入口参数,当指定的对象存储Bucket发生指定事件时,会将事件数据以JSON格式发送给绑定的函数。具体格式如下所示。
{
"id": "1723530386.250878.3f7f5c***",
"source": "ctyun.zos",
"specversion": "1.0",
"type": "s3:ObjectCreated:Post",
"subject": "ctyun.zos:b342b77ef26b11e***:5d4ce56a08db4a***:bucket-***:abc.txt",
"time": "2024-03-05T13:52:18.374Z",
"data": {
"userIdentity": {
"principalId": "testuser_sub1"
},
"responseElements": {
"x-amz-request-id": "ec2906f5-e72t-494f-9aa1-f621108***",
"x-amz-id-2": "1286b9-zone2-zon***"
},
"s3": {
"bucket": {
"name": "bucket-***",
"ownerIdentity": {
"principalId": "testuser"
},
"arn": "arn:aws:s3:::bucket-***",
"id": "ec2906f5-e72f-494f-9aa1-f6211***"
},
"object": {
"key": "abc.txt",
"size": 417791,
"etag": "3f7f 5c925b10c789e3e1389***",
"versionId": "",
"sequencer": "92FCBA66FA6***",
"metadata": []
}
}
},
"datacontenttype": "application/json;charset=utf-8",
"ctyunaccountid": "5d4ce56a08db4ac19***",
"ctyunuserid": "usertest-userid",
"ctyunresourceid": "",
"ctyuneventbusname": "default",
"ctyunregion": "b342b77ef26b11ecb0***"
}
Event参数中不同属性字段的解释如下表所示。
参数 | 类型 | 示例值 | 描述 |
---|---|---|---|
id | String | b5771f76-6cdf-48ed-b1ba-d15418c***** | 事件ID。标识事件的唯一值。 |
source | String | ctyun.zos | 事件源。对象存储触发器固定为ctyun.zos。 |
specversion | String | 1.0 | CloudEvents协议版本。 |
type | String | s3:ObjectCreated:Post | 事件类型。 |
subject | String | ctyun.zos:b342b77ef26***:5d4ce56a08d***:bucketname***:objectname*** | 事件主体。格式为ctyun.zos::::,其中是资源池ID,是天翼云账号ID,是对象存储产生事件的bucket名称,是对象存储产生事件的文件名称。 |
time | Timestamp | 2024-03-05T13:52:18.374Z | 事件产生的时间。 |
datacontenttype | String | application/json;charset=utf-8 | 参数data的内容形式。 |
data | Struct | {"abc":"1111", "def":"xxxx"} | 事件内容。JSON对象,内容由发起事件的服务决定。CloudEvents可能包含事件发生时由事件生产者给定的上下文,data中封装了这些信息。 |
ctyunaccountid | String | 123456789**** | 天翼云账号ID。 |
ctyunuserid | String | 123456789**** | 天翼云用户ID。 |
ctyunresourceid | String | 27aadda4-db94-11ee-a6fc-e8b47009**** | 天翼云资源ID |
ctyuneventbusname | String | default | 接收事件的事件总线名称,天翼云产品事件为default。 |
ctyunregion | String | bb9fdb42056fl1eda161**** | 接收事件的地域。 |
编写函数代码并测试
对象存储触发器创建完成后,可以开始编写函数代码并测试,以验证代码的正确性。在实际操作过程中发生对象存储事件时,会自动触发函数执行。
代码中一定要避免循环触发,否则会产生不必要的费用。一个典型的循环触发场景是对象存储的某个bucket上传文件事件触发函数执行,此函数执行完成后又生成了一个或多个文件再写回到对象存储的bucket里,这个写入动作又触发了函数执行,形成了链状循环。
Python代码示例:
# -*- coding: utf-8 -*-
import json
import logging
def handler(event, context):
logger = logging.getLogger()
logger.info("print event payload:")
if event: # 检查是否为空
try:
json_data = json.loads(event)
logger.info(json.dumps(json_data, indent=4))
except json.JSONDecodeError as e:
logger.info(
"Event is not of JSON type, print raw event data: %s", event)
else:
logger.info("event is empty")
# 测试 CloudEvents 支持(CloudEvents 相关参数编码在 headers 中)
logger.info("print headers:")
headers_map = context.headersMap # 遵循PEP 8命名规范
logger.info(json.dumps(headers_map, indent=4))
return 'hello world\n'