flink-kafka一旦引起kafkae异常,那么序列化程序就不是序列化程序的示例

h7wcgrx3  于 2021-06-04  发布在  Kafka
关注(0)|答案(1)|浏览(395)

所以,我正试图在我的flink kafka流媒体工作中启用一次语义,同时启用检查点。
但是,我无法让它工作,因此我尝试从github下载测试示例代码:https://github.com/apache/flink/blob/c025407e8a11dff344b587324ed73bdba2024dff/flink-end-to-end-tests/flink-streaming-kafka-test/src/main/java/org/apache/flink/streaming/kafka/test/kafkaexample.java
所以运行这个很好。但是,当启用检查点时,我会出错。或者,如果我将exactly\u once更改为least\u once语义并启用检查点,就可以了。但是当我把它改成一次的时候,我又犯了这个错误。
我得到的例外是:

  1. org.apache.kafka.common.KafkaException: org.apache.kafka.common.KafkaException: Failed to construct kafka producer
  2. at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
  3. at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
  4. at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
  5. at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
  6. at java.util.concurrent.ForkJoinTask.getThrowableException(ForkJoinTask.java:593)
  7. at java.util.concurrent.ForkJoinTask.reportException(ForkJoinTask.java:677)
  8. at java.util.concurrent.ForkJoinTask.invoke(ForkJoinTask.java:735)
  9. at java.util.stream.ForEachOps$ForEachOp.evaluateParallel(ForEachOps.java:159)
  10. at java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateParallel(ForEachOps.java:173)
  11. at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:233)
  12. at java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:485)
  13. at java.util.stream.ReferencePipeline$Head.forEach(ReferencePipeline.java:650)
  14. at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.abortTransactions(FlinkKafkaProducer.java:1099)
  15. at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.initializeState(FlinkKafkaProducer.java:1036)
  16. at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178)
  17. at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160)
  18. at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96)
  19. at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:284)
  20. at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeStateAndOpen(StreamTask.java:1006)
  21. at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:454)
  22. at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94)
  23. at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:449)
  24. at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:461)
  25. at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707)
  26. at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532)
  27. at java.lang.Thread.run(Thread.java:748)
  28. Caused by: org.apache.kafka.common.KafkaException: Failed to construct kafka producer
  29. at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:430)
  30. at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:298)
  31. at org.apache.flink.streaming.connectors.kafka.internal.FlinkKafkaInternalProducer.<init>(FlinkKafkaInternalProducer.java:76)
  32. at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.lambda$abortTransactions$2(FlinkKafkaProducer.java:1107)
  33. at java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:183)
  34. at java.util.HashMap$KeySpliterator.forEachRemaining(HashMap.java:1556)
  35. at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
  36. at java.util.stream.ForEachOps$ForEachTask.compute(ForEachOps.java:290)
  37. at java.util.concurrent.CountedCompleter.exec(CountedCompleter.java:731)
  38. at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
  39. at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
  40. at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
  41. at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157)
  42. Caused by: org.apache.kafka.common.KafkaException: org.apache.kafka.common.serialization.ByteArraySerializer is not an instance of org.apache.kafka.common.serialization.Serializer
  43. at org.apache.kafka.common.config.AbstractConfig.getConfiguredInstance(AbstractConfig.java:304)
  44. at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:360)
  45. ... 12 more

为了在我的环境中工作,我对代码做了一些细微的修改。我在码头里的Flink行动操场里运行它(这个https://ci.apache.org/projects/flink/flink-docs-stable/getting-started/docker-playgrounds/flink-operations-playground.html). 最新版本,1.10和Kafka内提供的是verison 2.2.1

  1. public static void main(String[] args) throws Exception {
  2. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  3. env.enableCheckpointing(1_000);
  4. String inputTopic = "my-input";
  5. String outputTopic = "my-output";
  6. String kafkaHost = "kafka:9092";
  7. Properties kafkaProps = new Properties();
  8. kafkaProps.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaHost);
  9. kafkaProps.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "test");
  10. DataStream<KafkaEvent> input = env
  11. .addSource(new FlinkKafkaConsumer<>(inputTopic, new KafkaEventSchema(), kafkaProps)
  12. .assignTimestampsAndWatermarks(new CustomWatermarkExtractor()))
  13. .keyBy("word")
  14. .map(new RollingAdditionMapper());
  15. input.addSink(
  16. new FlinkKafkaProducer<>(
  17. outputTopic,
  18. new KafkaEventSerializationSchema(outputTopic),
  19. kafkaProps,
  20. FlinkKafkaProducer.Semantic.EXACTLY_ONCE));
  21. env.execute("Modern Kafka Example");
  22. }

示例中的其他类可以找到:https://github.com/apache/flink/tree/c025407e8a11dff344b587324ed73bdba2024dff/flink-end-to-end-tests/flink-streaming-kafka-test-base/src/main/java/org/apache/flink/streaming/kafka/test/base
我尝试将序列化改为使用kafkaserializationschema,而不是使用serializationschema的示例代码。然而,下面的代码也没有帮助。同样的错误。

  1. import org.apache.flink.streaming.connectors.kafka.KafkaSerializationSchema;
  2. import org.apache.kafka.clients.producer.ProducerRecord;
  3. public class KafkaEventSerializationSchema implements KafkaSerializationSchema<KafkaEvent> {
  4. /**
  5. *
  6. */
  7. private static final long serialVersionUID = 1L;
  8. private String topic;
  9. public KafkaEventSerializationSchema(String topic) {
  10. this.topic = topic;
  11. }
  12. @Override
  13. public ProducerRecord<byte[], byte[]> serialize(KafkaEvent element, Long timestamp) {
  14. return new ProducerRecord<>(topic, element.toString().getBytes());
  15. }
  16. }

感谢大家的帮助。我还没能在网上找到Flink和Kafka之间的加兰图埃的任何工作代码。只载入有关它的文章,而不载入实际的实际工作代码。这就是我在这里想要达到的目标。

f0ofjuux

f0ofjuux1#

我遇到了同样的问题,并明确设置生产者超时帮助。 properties.setProperty("transaction.timeout.ms", "900000");

相关问题