JavaSparkKafka流中的对象不可序列化(org.apache.kafka.clients.consumer.consumerrecord)

ijnw1ujt  于 2021-06-07  发布在  Kafka
关注(0)|答案(3)|浏览(449)

我很肯定我只推数据字符串,反序列化也作为字符串。我推的记录也显示错误。
但为什么突然出现这样的错误,我有什么遗漏吗?
下面是代码,

  1. import java.util.HashMap;
  2. import java.util.HashSet;
  3. import java.util.Arrays;
  4. import java.util.Collection;
  5. import java.util.Iterator;
  6. import java.util.Map;
  7. import java.util.Set;
  8. import java.util.concurrent.atomic.AtomicReference;
  9. import java.util.regex.Pattern;
  10. import scala.Tuple2;
  11. import kafka.serializer.StringDecoder;
  12. import org.apache.spark.SparkConf;
  13. import org.apache.spark.api.java.JavaPairRDD;
  14. import org.apache.spark.api.java.JavaRDD;
  15. import org.apache.spark.api.java.function.*;
  16. import org.apache.spark.streaming.api.java.*;
  17. import org.apache.spark.streaming.kafka.HasOffsetRanges;
  18. import org.apache.spark.streaming.kafka10.*;
  19. import org.apache.spark.streaming.kafka.OffsetRange;
  20. import org.apache.spark.streaming.Duration;
  21. import org.apache.spark.streaming.Durations;
  22. public final class KafkaConsumerDirectStream {
  23. public static void main(String[] args) throws Exception {
  24. try {
  25. SparkConf sparkConf = new SparkConf().setAppName("JavaKafkaWordCount11").setMaster("local[*]");
  26. sparkConf.set("spark.streaming.concurrentJobs", "3");
  27. // Create the context with 2 seconds batch size
  28. JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new Duration(2000));
  29. Map<String, Object> kafkaParams = new HashMap<>();
  30. // kafkaParams.put("metadata.broker.list", "x.xx.xxx.xxx:9091,
  31. // x.xx.xxx.xxx:9092, x.xx.xxx.xxx:9093");
  32. kafkaParams.put("bootstrap.servers", "x.xx.xxx.xxx:9091");
  33. kafkaParams.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
  34. kafkaParams.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
  35. kafkaParams.put("group.id", "11_ubiq_12dj");
  36. kafkaParams.put("enable.auto.commit", "true");
  37. kafkaParams.put("auto.commit.interval.ms", "1000");
  38. kafkaParams.put("session.timeout.ms", "30000");
  39. kafkaParams.put("auto.offset.reset", "earliest");
  40. kafkaParams.put("enable.auto.commit", true);
  41. Collection<String> topics = Arrays.asList("TopicQueue");
  42. JavaInputDStream<ConsumerRecord<String, String>> stream = KafkaUtils.createDirectStream(jssc,
  43. LocationStrategies.PreferBrokers(),
  44. ConsumerStrategies.<String, String>Subscribe(topics, kafkaParams));
  45. //stream.print();
  46. stream.foreachRDD(new VoidFunction<JavaRDD<ConsumerRecord<String, String>>>() {
  47. @Override
  48. public void call(JavaRDD<ConsumerRecord<String, String>> rdd) {
  49. final OffsetRange[] offsetRanges = ((HasOffsetRanges) rdd.rdd()).offsetRanges();
  50. rdd.foreachPartition(new VoidFunction<Iterator<ConsumerRecord<String, String>>>() {
  51. @Override
  52. public void call(Iterator<ConsumerRecord<String, String>> consumerRecords) {
  53. OffsetRange o = offsetRanges[TaskContext.get().partitionId()];
  54. // stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges);
  55. System.out.println(
  56. o.topic() + " " + o.partition() + " " + o.fromOffset() + " " + o.untilOffset());
  57. }
  58. });
  59. }
  60. });
  61. jssc.start();
  62. jssc.awaitTermination();
  63. } catch (Exception e) {
  64. e.printStackTrace();
  65. }
  66. }
  67. }

下面是错误提示,

  1. 16/11/24 00:19:14 ERROR JobScheduler: Error running job streaming job 1479964754000 ms.0
  2. org.apache.spark.SparkException: Job aborted due to stage failure: Task 0.0 in stage 30.0 (TID 1500) had a not serializable result: org.apache.kafka.clients.consumer.ConsumerRecord
  3. Serialization stack:
  4. - object not serializable (class: org.apache.kafka.clients.consumer.ConsumerRecord, value: ConsumerRecord(topic = PartWithTopic02Queue, partition = 36, offset = 555, CreateTime = 1479964753779, checksum = 2582644462, serialized key size = -1, serialized value size = 6, key = null, value = Hello0))
  5. - element of array (index: 0)
  6. - array (class [Lorg.apache.kafka.clients.consumer.ConsumerRecord;, size 1)
  7. at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1450)
  8. at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1438)
  9. at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1437)
  10. at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
  11. at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
  12. at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1437)
  13. at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:811)
  14. at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:811)
  15. at scala.Option.foreach(Option.scala:236)
  16. at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:811)
  17. at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1659)
  18. at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1618)
  19. at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1607)
  20. at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
  21. at java.lang.Thread.getStackTrace(Thread.java:1117)
  22. at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:632)
  23. at org.apache.spark.SparkContext.runJob(SparkContext.scala:1871)
  24. at org.apache.spark.SparkContext.runJob(SparkContext.scala:1884)
  25. at org.apache.spark.streaming.kafka010.KafkaRDD.take(KafkaRDD.scala:122)
  26. at org.apache.spark.streaming.kafka010.KafkaRDD.take(KafkaRDD.scala:50)
  27. at org.apache.spark.streaming.dstream.DStream$$anonfun$print$2$$anonfun$foreachFunc$3$1.apply(DStream.scala:734)
  28. at org.apache.spark.streaming.dstream.DStream$$anonfun$print$2$$anonfun$foreachFunc$3$1.apply(DStream.scala:733)
  29. at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:51)
  30. at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51)
  31. at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51)
  32. at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:415)
  33. at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:50)
  34. at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50)
  35. at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50)
  36. at scala.util.Try$.apply(Try.scala:161)
  37. at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39)
  38. at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:245)
  39. at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:245)
  40. at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:245)
  41. at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
  42. at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:244)
  43. at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1153)
  44. at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
  45. at java.lang.Thread.run(Thread.java:785)
ulydmbyx

ulydmbyx1#

apache.kafka.clients.consumer.consumerrecord类不可序列化,不能用于rmi等。

fdbelqdn

fdbelqdn2#

看来 org.apache.spark.streaming.kafka10.*; 效果不好。我只用了 org.apache.spark.streaming.kafka 它对我很有效。

h79rfbju

h79rfbju3#

你只需要加上 public final class KafkaConsumerDirectStream implements java.io.Serializable 这对我来说是工作,尽管用 org.apache.spark.streaming.kafka10.* 希望您能帮忙,谢谢:-)

相关问题