我执行一个脚本,创建10个Kafka流应用程序instances:.
mvn exec:java -Dexec.mainClass="SiteApplication" -Dexec.args="Vectors1 VectorSink1 VectorSink2 VectorSourceNodeName1 VectorProcessorName1 StateStoreName1 sk1-client sk1-group sk1-appid" &
mvn exec:java -Dexec.mainClass="SiteApplication" -Dexec.args="Vectors2 VectorSink3 VectorSink4 VectorSourceNodeName2 VectorProcessorName2 StateStoreName2 sk2-client sk2-group sk2-appid" &
mvn exec:java -Dexec.mainClass="SiteApplication" -Dexec.args="Vectors3 VectorSink5 VectorSink6 VectorSourceNodeName3 VectorProcessorName3 StateStoreName3 sk3-client sk3-group sk3-appid" &
mvn exec:java -Dexec.mainClass="SiteApplication" -Dexec.args="Vectors4 VectorSink7 VectorSink8 VectorSourceNodeName4 VectorProcessorName4 StateStoreName4 sk4-client sk4-group sk4-appid" &
mvn exec:java -Dexec.mainClass="SiteApplication" -Dexec.args="Vectors5 VectorSink9 VectorSink10 VectorSourceNodeName5 VectorProcessorName5 StateStoreName5 sk5-client sk5-group sk5-appid" &
mvn exec:java -Dexec.mainClass="SiteApplication" -Dexec.args="Vectors6 VectorSink11 VectorSink12 VectorSourceNodeName6 VectorProcessorName6 StateStoreName6 sk6-client sk6-group sk6-appid" &
mvn exec:java -Dexec.mainClass="SiteApplication" -Dexec.args="Vectors7 VectorSink13 VectorSink14 VectorSourceNodeName7 VectorProcessorName7 StateStoreName7 sk7-client sk7-group sk7-appid" &
mvn exec:java -Dexec.mainClass="SiteApplication" -Dexec.args="Vectors8 VectorSink15 VectorSink16 VectorSourceNodeName8 VectorProcessorName8 StateStoreName8 sk8-client sk8-group sk8-appid" &
mvn exec:java -Dexec.mainClass="SiteApplication" -Dexec.args="Vectors9 VectorSink17 VectorSink18 VectorSourceNodeName9 VectorProcessorName9 StateStoreName9 sk9-client sk9-group sk9-appid" &
mvn exec:java -Dexec.mainClass="SiteApplication" -Dexec.args="Vectors10 VectorSink19 VectorSink20 VectorSourceNodeName10 VectorProcessorName10 StateStoreName10 sk10-client sk10-group sk10-appid" &
我在记录员的帮助下发现 init
处理器的方法不会被调用到某些应用程序示例,它们也不会被初始化。
我的streams应用程序的拓扑结构是:
toplogy.addSource(vectorSourceNodeName, stringDeserializer, integerDeserializer, args[0])
.addProcessor(vectorProcessorName,
()->new SiteProcessor(vectorSink1, vectorSink2),
vectorSourceNodeName)
.addSink(vectorSink1,"IncreaseOfC", stringSerializer, byteArraySerializer, vectorProcessorName)
.addSink(vectorSink2, "Messages", stringSerializer, byteArraySerializer1, vectorProcessorName);
我不知道为什么它没有初始化以及如何调试。
我不使用国有商店。
(问题是在我的代码中出现了一个bug,这个bug会停留在while循环中,我执行了这个命令 pkill -u user
为了停止执行,但我修复了错误。Kafka的安装有问题吗?我该怎么知道?)
编辑:我通过logger发现,对于那些处理器已初始化的应用程序,会记录:
INFO AbstractCoordinator:677 - [Consumer clientId=sk5-client-StreamThread-1-consumer, groupId=sk5-appid] Discovered group coordinator myserver:6667 (id: 2147482646 rack: null)
INFO ConsumerCoordinator:472 - [Consumer clientId=sk5-client-StreamThread-1-consumer, groupId=sk5-appid] Revoking previously assigned partitions []
INFO StreamThread:209 - stream-thread [sk5-client-StreamThread-1] State transition from RUNNING to PARTITIONS_REVOKED
INFO KafkaStreams:261 - stream-client [sk5-client] State transition from RUNNING to REBALANCING
INFO StreamThread:320 - stream-thread [sk5-client-StreamThread-1] partition revocation took 1 ms.
suspended active tasks: []
suspended standby tasks: []
StreamsPartitionAssignor:579 - stream-thread [sk5-client-StreamThread-1-consumer] Assigned tasks to clients as {=[activeTasks: ([0_0]) standbyTasks: ([]) assignedTasks: ([0_0]) prevActiveTasks: ([]) prevStandbyTasks: ([]) prevAssignedTasks: ([]) capacity: 1]}.
INFO AbstractCoordinator:473 - [Consumer clientId=sk5-client-StreamThread-1-consumer, groupId=sk5-appid] Successfully joined group with generation 1
INFO ConsumerCoordinator:280 - [Consumer clientId=sk5-client-StreamThread-1-consumer, groupId=sk5-appid] Setting newly assigned partitions [Vectors5-0]
INFO StreamThread:209 - stream-thread [sk5-client-StreamThread-1] State transition from PARTITIONS_REVOKED to PARTITIONS_ASSIGNED
INFO StreamThread:280 - stream-thread [sk5-client-StreamThread-1] partition assignment took 23 ms.
current active tasks: [0_0]
current standby tasks: []
previous active tasks: []
INFO StreamThread:209 - stream-thread [sk5-client-StreamThread-1] State transition from PARTITIONS_ASSIGNED to RUNNING
INFO KafkaStreams:261 - stream-client [sk5-client] State transition from REBALANCING to RUNNING
对于那些处理器未初始化的应用程序,会记录 State transition from CREATED to RUNNING
只会发生,不会发生再平衡。
正如记录器建议的那样,那些处理器未初始化的流应用程序无法发现组协调器。这是我必须向运行apachekafka的集群管理员报告的问题还是我必须搜索的代码中的bug?提前谢谢。
暂无答案!
目前还没有任何答案,快来回答吧!