使用aws lambda读取kafka(msk)事件源

fumotvh3  于 2021-06-04  发布在  Kafka
关注(0)|答案(1)|浏览(430)

我正在尝试使用aws lambda读取kafka主题(aws msk)中的值。
从lambda打印的事件记录如下所示:
{'eventsource':'aws:kafka','eventsourcearn':'arn:aws:kafka:ap-northeast-1:987654321:cluster/mskcluster/79y80c66-813a-4f-af0e-4ea47ba107e6','records':{'transactions-0':[{'topic':'transactions','partition':0,'offset':4798,'timestamp':1603565835915,'timestamptype':'create\u time','value':'eyjfdmvudfrpbwuioiiaimjaymc0xmc0yncaxodo1nzoxns45mtuzmjqilcaisvaiaiaiaimtgwlji0ms4xntkumje4iiwgikfjy291bnrodw1izxiioiiwimtq2oda4odyilcaivxnlck5hbwuioi67iqw1izxigum9tyxjviiwgikftb3vudci6ici1ntyiwgilyyyyw5zwn0aw9usuqioiaitzi4qlg3tlbjbwzmxexwcidb3vuthj5ijogik9tyw4ifq='}]}
如何提取“topic”和“value”字段?值1是base64编码的。我得到以下错误:
名称错误:未定义名称“record”
我正在尝试以下代码:

import json
import base64

def lambda_handler(event, context):
    print(event)
    message = event['records']
    payload=base64.b64decode(record["message"]["value"])
    print("Decoded payload: " + str(payload))

msk事件结构示例

6tr1vspr

6tr1vspr1#

在代码片段中 record 试图传递给解码函数的变量不存在。迭代记录的示例如下:

records = event['records']['Transactions-0']
for record in records:
    payload=base64.b64decode(record["message"]["value"])
    print("Decoded payload: " + str(payload))

每个函数调用都包含每个主题的多个记录。如果你有多个类似的 Transactions-1 ,...

相关问题