Kafka 通过Lambda函数重新启动MSK代理节点

jmp7cifd  于 2024-01-06  发布在  Apache
关注(0)|答案(1)|浏览(111)

我尝试使用部署在lambda函数上的boto3 API重新启动MSK代理节点。但是,调用一直超时,没有可能有用的错误消息。此外,在意识到我尝试与之交谈的MSK API服务位于另一个VPC中,而不是调用客户端的VPC中时,(lambda函数),我们尝试了另一种方法:使用API网关,将VPC端点连接到该网关,并将lambda触发器添加到API网关上的端点,这将再次调用与我们之前在普通lambda函数中使用的相同的boto3 API代码。然而,呼叫超时。
我们已经验证:

  • 执行重新启动代理操作所需的权限。它存在于附加到调用API调用的lambda函数的IAM策略中。
  • 如果我们能够通过VPCe调用网关的API端点:是的,我们能够,因为如果我们注解API调用以重新启动MSK API服务上的代理端点,它将返回200响应。
    总的来说,我正在寻找一个使用lambda函数调用boto3 API来重新启动MSK代理节点的示例。截至目前,我在AWS的文档中没有看到这个示例场景。
jv4diomz

jv4diomz1#

这是我的代码:

import boto3

def lambda_handler(event, context):
    cluster_arn = event.get('arn')
    broker_id = event.get('brokerId')

    # Create an MSK client
    msk_client = boto3.client('kafka')

    # Reboot the broker
    response = msk_client.reboot_broker(
        ClusterArn=cluster_arn,
        BrokerIds=[broker_id]
    )

    # Check the response for confirmation
    if response['ResponseMetadata']['HTTPStatusCode'] == 200:
        return "Broker reboot initiated successfully"
    else:
        return "Broker reboot failed"

字符串
这段代码工作正常,我可以观察到我的broker此时正在重新启动。API调用成功代码200表明服务成功接受了API。重新启动本身是重启动的,需要花费任何时间。
至于您遇到的调用超时问题,我想提一下:

  1. Boto3 reboot API是MSK级别的API,而不是Kafka级别。
    1.作为#1的结果,您不需要在集群运行的VPC中调用它。它不使用broker,也不使用bootstrap,也不使用Zookeeper来重新启动broker。它是一个服务端点。
    1.不确定它可能与IAM有关,但以防万一,请检查您是否具有执行MSK API所需的所有权限
    重新引导代理的IAM权限:
{
            "Sid": "AllowMskApis",
            "Effect": "Allow",
            "Action": [
                "kafka:ListClusters",
                "kafka:ListClustersV2",
                "kafka:ListNodes",
                "kafka:DescribeCluster",
                "kafka:GetBootstrapBrokers",
                "kafka:RebootBroker"
            ],
            "Resource": [
                "*"
            ]
        }

相关问题