msk的流媒体功能和性能

yc0p9oo0  于 2021-06-06  发布在  Kafka
关注(0)|答案(1)|浏览(479)

我正在评估aws托管服务kafka(msk),我知道目前它处于预览模式,因此可能没有所有功能或适当的文档。我试着建立msk集群,并验证msk是否能够满足我们公司的所有用例/需求,但目前,它缺乏文档和示例。
https://docs.aws.amazon.com/msk/latest/developerguide/what-is-msk.html
我有以下疑问:
i) 我怎样才能访问aws msk与Kafka客户端运行在我的内部系统?
ii)msk是否支持模式演化和精确一次语义?
iii)msk会提供一些方法来更新一些集群或调优配置吗?像aws一样,glue在其托管环境中为spark executr和驱动程序内存提供参数更改。
iv)是否可以将msk与其他aws服务(如红移、emr等)集成?
v) 我可以通过ksql与msk一起使用流式sql吗?如何使用msk设置ksql?
vi)如何对流经msk的数据进行实时预测分析?
vii)msk与来自azure/confluent的其他基于云的kafka集群相比,可靠性如何?与vanilla kafka相比,任何性能基准测试的可靠性如何?集群中可以启动的代理的最大数量是多少?

np8igboo

np8igboo1#

msk基本上是由aws定制和管理的香草apachekafka集群(具有基于集群示例类型、代理数量等的预定义配置设置),针对云环境进行了调优。
理想情况下,它应该能够执行开源kafka支持的所有/大多数事情。另外,如果您有未记录的特定用例或需求,我建议您联系aws支持人员,以进一步了解kafka集群的托管部分(允许的代理的最大数量、可靠性和成本)。
我将根据我的个人经验回答你的问题:
i) 我怎样才能访问aws msk与Kafka客户端运行在我的内部系统?
您不能使用kafka客户端或kafka流直接从内部或本地计算机访问msk。因为broker url、zookeeper连接字符串是msk集群vpc/子网的私有ip。要通过kafka客户端访问,需要在msk的同一vpc中启动ec2示例,执行kafka客户端(生产者/消费者)访问msk集群。
要从本地计算机或本地系统访问msk集群,可以通过confluent设置kafka rest代理框架open-sourced,通过restapi从外部世界访问msk集群。该框架不是成熟的kafka客户端,不允许kafka客户端的所有操作,但您可以从获取集群元数据、主题信息、生成和消费消息等方面对集群进行大部分操作。
首先设置confluent repo和ec2示例安全组(请参阅-第1节:预安装或设置-其他kafka组件),然后安装/设置kafka rest代理。

sudo yum install confluent-kafka-rest

创建文件名kafka-rest.properties并添加以下内容-

bootstrap.servers=PLAINTEXT://10.0.10.106:9092,PLAINTEXT://10.0.20.27:9092,PLAINTEXT://10.0.0.119:9092
zookeeper.connect=10.0.10.83:2181,10.0.20.22:2181,10.0.0.218:2181
schema.registry.url=http://localhost:8081

**修改bootstrapserver和zookeeper url/ips。

启动rest服务器

kafka-rest-start kafka-rest.properties &

使用curl或rest客户端/浏览器通过restapi访问msk。
获取主题列表

curl "http://localhost:8082/topics"

curl "http://<ec2 instance public ip>:8082/topics"

为了从本地或本地机器进行访问,请确保运行rest服务器的ec2示例连接了公共ip或弹性ip。
更多rest api操作https://github.com/confluentinc/kafka-rest
ii)msk是否支持模式演化和精确一次语义?
您可以使用avro消息和“schema registry”来实现模式演化和模式维护。
安装和设置模式注册表类似于confluent kafka rest代理。

sudo yum install confluent-schema-registry

创建文件名schema-registry.property并添加以下内容-

listeners=http://0.0.0.0:8081
kafkastore.connection.url=10.0.10.83:2181,10.0.20.22:2181,10.0.0.218:2181
kafkastore.bootstrap.servers=PLAINTEXT://10.0.10.106:9092,PLAINTEXT://10.0.20.27:9092,PLAINTEXT://10.0.0.119:9092
kafkastore.topic=_schemas
debug=false

**修改bootstrapserver和zookeeper(连接)url/ips。

启动架构注册表服务

schema-registry-start schema-registry.properties &

有关更多信息,请参阅:https://github.com/confluentinc/schema-registry
https://docs.confluent.io/current/schema-registry/docs/schema_registry_tutorial.html
语义曾经是apachekafka的特性,尽管我还没有在msk上测试过它,但我相信它应该支持这个特性,因为它只是开放源码apachekafka的一部分。
iii)msk会提供一些方法来更新一些集群或调优配置吗?像aws一样,glue在其托管环境中为spark执行器和驱动程序内存提供参数更改。
是的,可以在运行时更改配置参数。我已经通过使用kafka配置工具更改retention.ms参数进行了测试,该更改立即应用于主题。因此,我认为您也可以更新其他参数,但msk可能不允许所有配置更改,就像aws glue只允许少量spark配置参数更改一样,因为允许用户更改所有参数可能容易受到托管环境的影响。
通过Kafka配置工具更改

kafka-configs.sh --zookeeper 10.0.10.83:2181,10.0.20.22:2181,10.0.0.218:2181  --entity-type topics --entity-name jsontest --alter --add-config retention.ms=128000

使用rest验证更改

curl "http://localhost:8082/topics/jsontest"

现在amazonmsk允许您创建一个自定义的msk配置。
有关可更新的配置/参数,请参阅以下文档:
https://docs.aws.amazon.com/msk/latest/developerguide/msk-configuration-properties.html
还有msk kafka的默认配置:
https://docs.aws.amazon.com/msk/latest/developerguide/msk-default-configuration.html
iv)是否可以将msk与其他aws服务(如红移、emr等)集成?
是的,您可以使用msk连接/集成到其他aws服务。例如,您可以运行kafka客户机(consumer)从kafka读取数据并写入redshift、rds、s3或dynamodb。确保kafka客户机在ec2示例(msk vpc内部)上运行,该示例具有访问这些服务的适当iam角色,并且ec2示例位于公用子网或专用子网(对于s3具有nat或vpc端点)。
您还可以在msk集群vpc/子网内启动emr,然后通过emr(spark)连接到其他服务。
使用aws托管服务kafka的spark结构流
在msk cluster的vpc中启动emr cluster允许端口9092的msk clusters安全组入站规则中的emr master和slave安全组
启动Spark壳

spark-shell --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.0

从spark结构流连接到msk群集

val kafka = spark.readStream.format("kafka").option("kafka.bootstrap.servers", "10.0.10.106:9092,10.0.20.27:9092,10.0.0.119:9092").option("subscribe", "jsontest") .load()

开始在控制台上阅读/打印消息

val df=kafka.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)").writeStream.format("console").start()

或者

val df=kafka.selectExpr("CAST(value AS STRING)").writeStream.format("console").start()



v) 我可以通过ksql与msk一起使用流式sql吗?如何使用msk设置ksql?
是的,您可以用msk cluster设置ksql。基本上,您需要在msk集群的同一vpc/子网中启动一个ec2示例。然后在ec2示例中安装ksqlserver+客户端并使用它。
首先设置confluent repo和ec2示例安全组(请参阅-第1节:预安装或设置-其他kafka组件),然后安装/设置ksql服务器/客户端。
然后安装ksql服务器

sudo yum install confluent-ksql

创建文件名ksql-server.properties并添加以下内容-

bootstrap.servers=10.0.10.106:9092,10.0.20.27:9092,10.0.0.119:9092
listeners=http://localhost:8088

**修改引导服务器IP/url。

启动ksql服务器

ksql-server-start ksql-server.properties &

然后启动ksqlcli

ksql http://localhost:8088

最后运行命令获取主题列表

ksql> SHOW TOPICS;

 Kafka Topic | Registered | Partitions | Partition Replicas | Consumers | ConsumerGroups 
-----------------------------------------------------------------------------------------
 _schemas    | false      | 1          | 3                  | 0         | 0              
 jsontest    | false      | 1          | 3                  | 1         | 1              
----------------------------- --------------------------------------------------

有关更多信息,请参阅-https://github.com/confluentinc/ksql
vi)如何对流经msk的数据进行实时预测分析?
做预测性分析或实时机器学习的东西并不是msk特有的。使用kafka集群(或任何流媒体管道)的方法也适用于msk。根据您的具体要求,有多种方法可供选择,但我将介绍行业内最常见或最广泛使用的方法:
使用spark和msk(kafka)并通过structure streaming和mlib(有你的预测模型)进行分析。
您可以在h20.ai框架中训练您的预测模型,然后将模型导出为javapojo。然后将javapojo模型与kafka消费代码集成,处理msk(kafka)主题的消息并进行实时分析。
您可以在sagemaker中训练模型并进行部署,然后根据kafka数据/消息调用sagemaker模型推断端点,从kafka客户机消费代码中调用,以获得实时预测。
vii)msk与来自azure/confluent的其他基于云的kafka集群相比,可靠性如何?与vanilla kafka相比,任何性能基准测试的可靠性如何?集群中可以启动的代理的最大数量是多少?
正如你已经知道的,msk正在预览中,所以现在说它的可靠性还为时过早。但总的来说,和所有其他aws服务一样,随着时间的推移,它应该会变得更加可靠,并有新的特性和更好的文档。
我不认为aws或任何云供应商azure、googlecloud提供了他们服务的性能基准,所以你必须从你的Angular 来尝试性能测试。kafka客户机/工具(kafka-producer-perf-test.sh、kafka-consumer-perf-test.sh)提供了一个性能基准脚本,可以执行该脚本来了解集群的性能。同样,在实际生产场景中对服务的性能测试将根据各种因素(消息大小、到达kafka的数据量、sync或async producer、有多少消费者等)而变化很大,性能将取决于特定的用例,而不是通用的基准测试。
关于集群中支持的代理的最大数量,最好通过他们的支持系统询问aws人员。
第1节:预安装或设置-附加kafka组件:
在msk集群的vpc/子网中启动ec2示例。
登录到ec2示例
设置yum-repo以通过yum下载合流kafka组件包

sudo yum install curl which
  sudo rpm --import https://packages.confluent.io/rpm/5.1/archive.key

导航到/etc/yum.repos.d/,创建一个名为confluent.repo的文件并添加以下内容

[Confluent.dist]
name=Confluent repository (dist)
baseurl=https://packages.confluent.io/rpm/5.1/7
gpgcheck=1
gpgkey=https://packages.confluent.io/rpm/5.1/archive.key
enabled=1

[Confluent]
name=Confluent repository
baseurl=https://packages.confluent.io/rpm/5.1
gpgcheck=1
gpgkey=https://packages.confluent.io/rpm/5.1/archive.key
enabled=1

下一个干净的百胜回购

sudo yum clean all

允许msk群集安全组的端口9092(连接代理)和2081(连接管理员)的入站规则中的ec2示例的安全组。
第2节:获取msk集群代理和zookeeper url/ip信息的命令
zookeeper连接u

相关问题