我正在尝试将来自amazon kinesis数据流的流数据加载到amazon es中,如教程中所示:https://docs.aws.amazon.com/elasticsearch-service/latest/developerguide/es-aws-integrations.html#es-aws整合运动
如本教程所示,我的lambda函数是:
import base64
import boto3
import json
import requests
from requests_aws4auth import AWS4Auth
region = 'us-east-1'
service = 'es'
credentials = boto3.Session().get_credentials()
awsauth = AWS4Auth(credentials.access_key, credentials.secret_key, region, service, session_token=credentials.token)
host = '' # the ES domain has been specified here
index = 'lambda-kine-index'
type = 'lambda-kine-type'
url = host + '/' + index + '/' + type + '/'
headers = { "Content-Type": "application/json" }
def handler(event, context):
count = 0
for record in event['Records']:
id = record['eventID']
timestamp = record['kinesis']['approximateArrivalTimestamp']
# Kinesis data is base64-encoded, so decode here
message = base64.b64decode(record['kinesis']['data'])
# Create the JSON document
document = { "id": id, "timestamp": timestamp, "message": message }
# Index the document
r = requests.put(url + id, auth=awsauth, json=document, headers=headers)
count += 1
return 'Processed ' + str(count) + ' items.'
此外,如本教程所述,iam的角色是:
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"es:ESHttpPost",
"es:ESHttpPut",
"logs:CreateLogGroup",
"logs:CreateLogStream",
"logs:PutLogEvents",
"kinesis:GetShardIterator",
"kinesis:GetRecords",
"kinesis:DescribeStream",
"kinesis:ListStreams"
],
"Resource": "*"
}
]
}
信任关系是:
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Principal": {
"Service": "lambda.amazonaws.com"
},
"Action": "sts:AssumeRole"
}
]
}
执行此操作后,运行lambda时得到的响应是: <Response [403]>
任何帮助解决这一点是感激的。
1条答案
按热度按时间a0zr77ik1#
确保您的凭据正常工作。你可以使用
aws-cli
. 请参阅此处的文档。