gralog3.1,来自kafka1.1.1(aws)的纯文本输入

mo49yndu  于 2021-06-04  发布在  Kafka
关注(0)|答案(0)|浏览(336)

我们有aws msk kafka cluster和1.1.1,旨在接收部署在kubernetes上的graylog 3.1上的日志消息。我一直在看书。由于graylog使用kafka客户机与kafka通信,因此存在一些版本不兼容。马上。如果zookeeper主机和端口设置为纯文本kafka输入。引发异常。
首先有一些信息是kafka异常,然后是error 2020-07-15 17:04:33283 error[graylog eventbus]—订户方法inputstatechanged引发的异常(org.graylog2.plugin.events.inputs.ioStateChangeEvent)。
任何指标,参考是高度赞赏请。

kafka.common.KafkaException: Failed to parse the broker info from zookeeper: {"listener_security_protocol_map":{"CLIENT":"PLAINTEXT","CLIENT_SECURE":"SSL","REPLICATION":"PLAINTEXT","REPLICATION_SECURE":"SSL"},"endpoints":["CLIENT://b-1.tooling-stack-kafka.pg6kf7.c3.kafka.eu-central-1.amazonaws.com:9092","CLIENT_SECURE://b-1.tooling-stack-kafka.pg6kf7.c3.kafka.eu-central-1.amazonaws.com:9094","REPLICATION://b-1-internal.tooling-stack-kafka.pg6kf7.c3.kafka.eu-central-1.amazonaws.com:9093","REPLICATION_SECURE://b-1-internal.tooling-stack-kafka.pg6kf7.c3.kafka.eu-central-1.amazonaws.com:9095"],"rack":"euc1-az3","jmx_port":9099,"host":"b-1.tooling-stack-kafka.pg6kf7.c3.kafka.eu-central-1.amazonaws.com","timestamp":"1593632284417","port":9092,"version":4}
 at kafka.cluster.Broker$.createBroker(Broker.scala:85) ~[graylog.jar:?]
 at kafka.utils.ZkUtils$$anonfun$getCluster$1.apply(ZkUtils.scala:563) ~[graylog.jar:?]
 at kafka.utils.ZkUtils$$anonfun$getCluster$1.apply(ZkUtils.scala:561) ~[graylog.jar:?]
 at scala.collection.Iterator$class.foreach(Iterator.scala:893) ~[graylog.jar:?]
 at scala.collection.AbstractIterator.foreach(Iterator.scala:1336) ~[graylog.jar:?]
 at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) ~[graylog.jar:?]
 at scala.collection.AbstractIterable.foreach(Iterable.scala:54) ~[graylog.jar:?]
 at kafka.utils.ZkUtils.getCluster(ZkUtils.scala:561) ~[graylog.jar:?]
 at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$syncedRebalance$1$$anonfun$apply$mcV$sp$1.apply$mcVI$sp(ZookeeperConsumerConnector.scala:635) [graylog.jar:?]
 at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:160) [graylog.jar:?]
 at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$syncedRebalance$1.apply$mcV$sp(ZookeeperConsumerConnector.scala:627) [graylog.jar:?]
 at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$syncedRebalance$1.apply(ZookeeperConsumerConnector.scala:627) [graylog.jar:?]
 at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$syncedRebalance$1.apply(ZookeeperConsumerConnector.scala:627) [graylog.jar:?]
 at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) [graylog.jar:?]
 at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.syncedRebalance(ZookeeperConsumerConnector.scala:626) [graylog.jar:?]
 at kafka.consumer.ZookeeperConsumerConnector.kafka$consumer$ZookeeperConsumerConnector$$reinitializeConsumer(ZookeeperConsumerConnector.scala:967) [graylog.jar:?]
 at kafka.consumer.ZookeeperConsumerConnector$WildcardStreamsHandler.<init>(ZookeeperConsumerConnector.scala:1001) [graylog.jar:?]
 at kafka.consumer.ZookeeperConsumerConnector.createMessageStreamsByFilter(ZookeeperConsumerConnector.scala:163) [graylog.jar:?]
 at kafka.javaapi.consumer.ZookeeperConsumerConnector.createMessageStreamsByFilter(ZookeeperConsumerConnector.scala:101) [graylog.jar:?]
 at kafka.javaapi.consumer.ZookeeperConsumerConnector.createMessageStreamsByFilter(ZookeeperConsumerConnector.scala:105) [graylog.jar:?]
 at org.graylog2.inputs.transports.KafkaTransport.doLaunch(KafkaTransport.java:208) [graylog.jar:?]
 at org.graylog2.plugin.inputs.transports.ThrottleableTransport.launch(ThrottleableTransport.java:76) [graylog.jar:?]
 at org.graylog2.plugin.inputs.MessageInput.launch(MessageInput.java:155) [graylog.jar:?]
 at org.graylog2.shared.inputs.InputLauncher$1.run(InputLauncher.java:84) [graylog.jar:?]
 at com.codahale.metrics.InstrumentedExecutorService$InstrumentedRunnable.run(InstrumentedExecutorService.java:181) [graylog.jar:?]
 at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [?:1.8.0_232]
 at java.util.concurrent.FutureTask.run(FutureTask.java:266) [?:1.8.0_232]
 at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_232]
 at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_232]
 at java.lang.Thread.run(Thread.java:748) [?:1.8.0_232]
Caused by: java.lang.IllegalArgumentException: No enum constant org.apache.kafka.common.protocol.SecurityProtocol.CLIENT
 at java.lang.Enum.valueOf(Enum.java:238) ~[?:1.8.0_232]
 at org.apache.kafka.common.protocol.SecurityProtocol.valueOf(SecurityProtocol.java:27) ~[graylog.jar:?]
 at kafka.cluster.EndPoint$.createEndPoint(EndPoint.scala:48) ~[graylog.jar:?]
 at kafka.cluster.Broker$$anonfun$1.apply(Broker.scala:73) ~[graylog.jar:?]
 at kafka.cluster.Broker$$anonfun$1.apply(Broker.scala:72) ~[graylog.jar:?]
 at scala.collection.immutable.List.map(List.scala:273) ~[graylog.jar:?]
 at kafka.cluster.Broker$.createBroker(Broker.scala:72) ~[graylog.jar:?]
 ... 29 more
2020-07-15 17:04:31,274 INFO    [ZookeeperConsumerConnector] - [graylog2_graylogingress-0.graylogingress.graylog-ingress.svc.cluster.local-1594832664499-d9267393], end rebalancing consumer graylog2_graylogingress-0.graylogingress.graylog-ingress.svc.cluster.local-1594832664499-d9267393 try #3 - {}
2020-07-15 17:04:31,274 INFO    [ZookeeperConsumerConnector] - [graylog2_graylogingress-0.graylogingress.graylog-ingress.svc.cluster.local-1594832664499-d9267393], Rebalancing attempt failed. Clearing the cache before the next rebalancing operation is triggered - {}
2020-07-15 17:04:31,274 INFO    [ConsumerFetcherManager] - [ConsumerFetcherManager-1594832664731] Stopping leader finder thread - {}
2020-07-15 17:04:31,275 INFO    [ConsumerFetcherManager] - [ConsumerFetcherManager-1594832664731] Stopping all fetchers - {}
2020-07-15 17:04:31,275 INFO    [ConsumerFetcherManager] - [ConsumerFetcherManager-1594832664731] All connections stopped - {}
2020-07-15 17:04:31,275 INFO    [ZookeeperConsumerConnector] - [graylog2_graylogingress-0.graylogingress.graylog-ingress.svc.cluster.local-1594832664499-d9267393], Cleared all relevant queues for this fetcher - {}
2020-07-15 17:04:31,276 INFO    [ZookeeperConsumerConnector] - [graylog2_graylogingress-0.graylogingress.graylog-ingress.svc.cluster.local-1594832664499-d9267393], Cleared the data chunks in all the consumer message iterators - {}
2020-07-15 17:04:31,276 INFO    [ZookeeperConsumerConnector] - [graylog2_graylogingress-0.graylogingress.graylog-ingress.svc.cluster.local-1594832664499-d9267393], Committing all offsets after clearing the fetcher queues - {}
2020-07-15 17:04:33,278 ERROR   [InputLauncher] - The [org.graylog2.inputs.raw.kafka.RawKafkaInput] input with ID <5f0f3717843514002049ba65> misfired. Reason: graylog2_graylogingress-0.graylogingress.graylog-ingress.svc.cluster.local-1594832664499-d9267393 can't rebalance after 4 retries. - {}
org.graylog2.plugin.inputs.MisfireException: kafka.common.ConsumerRebalanceFailedException: graylog2_graylogingress-0.graylogingress.graylog-ingress.svc.cluster.local-1594832664499-d9267393 can't rebalance after 4 retries
 at org.graylog2.plugin.inputs.MessageInput.launch(MessageInput.java:158) ~[graylog.jar:?]
 at org.graylog2.shared.inputs.InputLauncher$1.run(InputLauncher.java:84) [graylog.jar:?]
 at com.codahale.metrics.InstrumentedExecutorService$InstrumentedRunnable.run(InstrumentedExecutorService.java:181) [graylog.jar:?]
 at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [?:1.8.0_232]
 at java.util.concurrent.FutureTask.run(FutureTask.java:266) [?:1.8.0_232]
 at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_232]
 at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_232]
 at java.lang.Thread.run(Thread.java:748) [?:1.8.0_232]
Caused by: kafka.common.ConsumerRebalanceFailedException: graylog2_graylogingress-0.graylogingress.graylog-ingress.svc.cluster.local-1594832664499-d9267393 can't rebalance after 4 retries
 at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.syncedRebalance(ZookeeperConsumerConnector.scala:660) ~[graylog.jar:?]
 at kafka.consumer.ZookeeperConsumerConnector.kafka$consumer$ZookeeperConsumerConnector$$reinitializeConsumer(ZookeeperConsumerConnector.scala:967) ~[graylog.jar:?]
 at kafka.consumer.ZookeeperConsumerConnector$WildcardStreamsHandler.<init>(ZookeeperConsumerConnector.scala:1001) ~[graylog.jar:?]
 at kafka.consumer.ZookeeperConsumerConnector.createMessageStreamsByFilter(ZookeeperConsumerConnector.scala:163) ~[graylog.jar:?]
 at kafka.javaapi.consumer.ZookeeperConsumerConnector.createMessageStreamsByFilter(ZookeeperConsumerConnector.scala:101) ~[graylog.jar:?]
 at kafka.javaapi.consumer.ZookeeperConsumerConnector.createMessageStreamsByFilter(ZookeeperConsumerConnector.scala:105) ~[graylog.jar:?]
 at org.graylog2.inputs.transports.KafkaTransport.doLaunch(KafkaTransport.java:208) ~[graylog.jar:?]
 at org.graylog2.plugin.inputs.transports.ThrottleableTransport.launch(ThrottleableTransport.java:76) ~[graylog.jar:?]
 at org.graylog2.plugin.inputs.MessageInput.launch(MessageInput.java:155) ~[graylog.jar:?]
 ... 7 more
2020-07-15 17:04:33,283 ERROR   [graylog-eventbus] - Exception thrown by subscriber method inputStateChanged(org.graylog2.plugin.events.inputs.IOStateChangedEvent) on subscriber org.graylog2.inputs.InputStateListener@702b656a when dispatching event: IOStateChangedEvent{oldState=STARTING, newState=FAILED, changedState=InputState{stoppable=RawKafkaInput{title=kk-logs, type=org.graylog2.inputs.raw.kafka.RawKafkaInput, nodeId=null}, state=FAILED, startedAt=2020-07-15T17:04:24.343Z, detailedMessage='graylog2_graylogingress-0.graylogingress.graylog-ingress.svc.cluster.local-1594832664499-d9267393 can't rebalance after 4 retries.'}} - {}
java.lang.NullPointerException: null
 at java.util.Objects.requireNonNull(Objects.java:203) ~[?:1.8.0_232]
 at org.graylog2.shared.system.activities.Activity.<init>(Activity.java:34) ~[graylog.jar:?]
 at org.graylog2.inputs.InputStateListener.inputStateChanged(InputStateListener.java:57) ~[graylog.jar:?]
 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:1.8.0_232]
 at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[?:1.8.0_232]
 at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:1.8.0_232]
 at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_232]
 at com.google.common.eventbus.Subscriber.invokeSubscriberMethod(Subscriber.java:87) ~[graylog.jar:?]
 at com.google.common.eventbus.Subscriber$SynchronizedSubscriber.invokeSubscriberMethod(Subscriber.java:144) ~[graylog.jar:?]
 at com.google.common.eventbus.Subscriber$1.run(Subscriber.java:72) [graylog.jar:?]
 at com.codahale.metrics.InstrumentedExecutorService$InstrumentedRunnable.run(InstrumentedExecutorService.java:181) [graylog.jar:?]
 at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_232]
 at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_232]
 at java.lang.Thread.run(Thread.java:748) [?:1.8.0_232] ```

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题