在最新的Kafka版本0.10.1.1中,将使用什么来代替Kafka

uqdfh47h  于 2021-06-08  发布在  Kafka
关注(0)|答案(2)|浏览(389)

我已经升级了我的Kafka和KafkaSpark流,但我面临着一些挑战,而改变他们的一些方法。像kafkautils一样,迭代器也在抛出错误。我的Kafka版本是0.10.1.1。所以,如果有人知道如何修复这些变化,那就太好了。谢谢

fykwrbwg

fykwrbwg1#

以前的kafkautils包是“org.apache.spark.streaming.kafka”。最新的包是“org.apache.spark.streaming.kafka010”。
要设置kafkaparams和主题详细信息,请检查以下代码段,

  1. import java.util.*;
  2. import org.apache.spark.SparkConf;
  3. import org.apache.spark.TaskContext;
  4. import org.apache.spark.api.java.*;
  5. import org.apache.spark.api.java.function.*;
  6. import org.apache.spark.streaming.api.java.*;
  7. import org.apache.spark.streaming.kafka010.*;
  8. import org.apache.kafka.clients.consumer.ConsumerRecord;
  9. import org.apache.kafka.common.TopicPartition;
  10. import org.apache.kafka.common.serialization.StringDeserializer;
  11. import scala.Tuple2;
  12. Map<String, Object> kafkaParams = new HashMap<>();
  13. kafkaParams.put("bootstrap.servers", "localhost:9092,anotherhost:9092");
  14. kafkaParams.put("key.deserializer", StringDeserializer.class);
  15. kafkaParams.put("value.deserializer", StringDeserializer.class);
  16. kafkaParams.put("group.id", "use_a_separate_group_id_for_each_stream");
  17. kafkaParams.put("auto.offset.reset", "latest");
  18. kafkaParams.put("enable.auto.commit", false);
  19. Collection<String> topics = Arrays.asList("topicA", "topicB");
  20. final JavaInputDStream<ConsumerRecord<String, String>> stream =
  21. KafkaUtils.createDirectStream(
  22. streamingContext,
  23. LocationStrategies.PreferConsistent(),
  24. ConsumerStrategies.<String, String>Subscribe(topics, kafkaParams)
  25. );
  26. stream.mapToPair(
  27. new PairFunction<ConsumerRecord<String, String>, String, String>() {
  28. @Override
  29. public Tuple2<String, String> call(ConsumerRecord<String, String> record) {
  30. return new Tuple2<>(record.key(), record.value());
  31. }
  32. })

如需进一步参考,请访问以下链接https://spark.apache.org/docs/latest/streaming-kafka-0-10-integration.html

展开查看全部
pes8fvy9

pes8fvy92#

kafkautils是apachespark流媒体的一部分,而不是apachekafka的一部分
org.apache.spark.streaming.kafka.kafkautils网站

相关问题