kafka流处理器未初始化

cuxqih21  于 2021-06-05  发布在  Kafka
关注(0)|答案(0)|浏览(253)

我执行一个脚本,创建10个Kafka流应用程序instances:.

  1. mvn exec:java -Dexec.mainClass="SiteApplication" -Dexec.args="Vectors1 VectorSink1 VectorSink2 VectorSourceNodeName1 VectorProcessorName1 StateStoreName1 sk1-client sk1-group sk1-appid" &
  2. mvn exec:java -Dexec.mainClass="SiteApplication" -Dexec.args="Vectors2 VectorSink3 VectorSink4 VectorSourceNodeName2 VectorProcessorName2 StateStoreName2 sk2-client sk2-group sk2-appid" &
  3. mvn exec:java -Dexec.mainClass="SiteApplication" -Dexec.args="Vectors3 VectorSink5 VectorSink6 VectorSourceNodeName3 VectorProcessorName3 StateStoreName3 sk3-client sk3-group sk3-appid" &
  4. mvn exec:java -Dexec.mainClass="SiteApplication" -Dexec.args="Vectors4 VectorSink7 VectorSink8 VectorSourceNodeName4 VectorProcessorName4 StateStoreName4 sk4-client sk4-group sk4-appid" &
  5. mvn exec:java -Dexec.mainClass="SiteApplication" -Dexec.args="Vectors5 VectorSink9 VectorSink10 VectorSourceNodeName5 VectorProcessorName5 StateStoreName5 sk5-client sk5-group sk5-appid" &
  6. mvn exec:java -Dexec.mainClass="SiteApplication" -Dexec.args="Vectors6 VectorSink11 VectorSink12 VectorSourceNodeName6 VectorProcessorName6 StateStoreName6 sk6-client sk6-group sk6-appid" &
  7. mvn exec:java -Dexec.mainClass="SiteApplication" -Dexec.args="Vectors7 VectorSink13 VectorSink14 VectorSourceNodeName7 VectorProcessorName7 StateStoreName7 sk7-client sk7-group sk7-appid" &
  8. mvn exec:java -Dexec.mainClass="SiteApplication" -Dexec.args="Vectors8 VectorSink15 VectorSink16 VectorSourceNodeName8 VectorProcessorName8 StateStoreName8 sk8-client sk8-group sk8-appid" &
  9. mvn exec:java -Dexec.mainClass="SiteApplication" -Dexec.args="Vectors9 VectorSink17 VectorSink18 VectorSourceNodeName9 VectorProcessorName9 StateStoreName9 sk9-client sk9-group sk9-appid" &
  10. mvn exec:java -Dexec.mainClass="SiteApplication" -Dexec.args="Vectors10 VectorSink19 VectorSink20 VectorSourceNodeName10 VectorProcessorName10 StateStoreName10 sk10-client sk10-group sk10-appid" &

我在记录员的帮助下发现 init 处理器的方法不会被调用到某些应用程序示例,它们也不会被初始化。
我的streams应用程序的拓扑结构是:

  1. toplogy.addSource(vectorSourceNodeName, stringDeserializer, integerDeserializer, args[0])
  2. .addProcessor(vectorProcessorName,
  3. ()->new SiteProcessor(vectorSink1, vectorSink2),
  4. vectorSourceNodeName)
  5. .addSink(vectorSink1,"IncreaseOfC", stringSerializer, byteArraySerializer, vectorProcessorName)
  6. .addSink(vectorSink2, "Messages", stringSerializer, byteArraySerializer1, vectorProcessorName);

我不知道为什么它没有初始化以及如何调试。
我不使用国有商店。
(问题是在我的代码中出现了一个bug,这个bug会停留在while循环中,我执行了这个命令 pkill -u user 为了停止执行,但我修复了错误。Kafka的安装有问题吗?我该怎么知道?)
编辑:我通过logger发现,对于那些处理器已初始化的应用程序,会记录:

  1. INFO AbstractCoordinator:677 - [Consumer clientId=sk5-client-StreamThread-1-consumer, groupId=sk5-appid] Discovered group coordinator myserver:6667 (id: 2147482646 rack: null)
  2. INFO ConsumerCoordinator:472 - [Consumer clientId=sk5-client-StreamThread-1-consumer, groupId=sk5-appid] Revoking previously assigned partitions []
  3. INFO StreamThread:209 - stream-thread [sk5-client-StreamThread-1] State transition from RUNNING to PARTITIONS_REVOKED
  4. INFO KafkaStreams:261 - stream-client [sk5-client] State transition from RUNNING to REBALANCING
  5. INFO StreamThread:320 - stream-thread [sk5-client-StreamThread-1] partition revocation took 1 ms.
  6. suspended active tasks: []
  7. suspended standby tasks: []
  8. StreamsPartitionAssignor:579 - stream-thread [sk5-client-StreamThread-1-consumer] Assigned tasks to clients as {=[activeTasks: ([0_0]) standbyTasks: ([]) assignedTasks: ([0_0]) prevActiveTasks: ([]) prevStandbyTasks: ([]) prevAssignedTasks: ([]) capacity: 1]}.
  9. INFO AbstractCoordinator:473 - [Consumer clientId=sk5-client-StreamThread-1-consumer, groupId=sk5-appid] Successfully joined group with generation 1
  10. INFO ConsumerCoordinator:280 - [Consumer clientId=sk5-client-StreamThread-1-consumer, groupId=sk5-appid] Setting newly assigned partitions [Vectors5-0]
  11. INFO StreamThread:209 - stream-thread [sk5-client-StreamThread-1] State transition from PARTITIONS_REVOKED to PARTITIONS_ASSIGNED
  12. INFO StreamThread:280 - stream-thread [sk5-client-StreamThread-1] partition assignment took 23 ms.
  13. current active tasks: [0_0]
  14. current standby tasks: []
  15. previous active tasks: []
  16. INFO StreamThread:209 - stream-thread [sk5-client-StreamThread-1] State transition from PARTITIONS_ASSIGNED to RUNNING
  17. INFO KafkaStreams:261 - stream-client [sk5-client] State transition from REBALANCING to RUNNING

对于那些处理器未初始化的应用程序,会记录 State transition from CREATED to RUNNING 只会发生,不会发生再平衡。
正如记录器建议的那样,那些处理器未初始化的流应用程序无法发现组协调器。这是我必须向运行apachekafka的集群管理员报告的问题还是我必须搜索的代码中的bug?提前谢谢。

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题