为什么spark流媒体不阅读Kafka主题?

ttvkxqim  于 2021-06-07  发布在  Kafka
关注(0)|答案(1)|浏览(481)

Spark流1.6.0
apacheKafka10.0.1
我用Spark流来阅读 sample 主题。代码运行时没有错误或异常,但我在控制台上没有通过 print() 方法。
我检查了主题中是否有消息:

  1. ./bin/kafka-console-consumer.sh \
  2. --zookeeper ip-172-xx-xx-xxx:2181 \
  3. --topic sample \
  4. --from-beginning

我得到的信息是:

  1. message no. 1
  2. message no. 2
  3. message no. 3
  4. message no. 4
  5. message no. 5

运行流作业的命令:

  1. ./bin/spark-submit \
  2. --conf "spark.executor.extraJavaOptions=-XX:+PrintGCDetails -XX:+PrintGCTimeStamps -XX:MaxDirectMemorySize=512m" \
  3. --jars /home/ubuntu/zifferlabs/target/ZifferLabs-1-jar-with-dependencies.jar \
  4. --class "com.zifferlabs.stream.SampleStream" \
  5. /home/ubuntu/zifferlabs/src/main/java/com/zifferlabs/stream/SampleStream.java

完整代码如下:

  1. import java.util.Arrays;
  2. import java.util.HashMap;
  3. import java.util.HashSet;
  4. import java.util.Map;
  5. import java.util.Set;
  6. import org.apache.spark.SparkConf;
  7. import org.apache.spark.api.java.function.Function;
  8. import org.apache.spark.streaming.Duration;
  9. import org.apache.spark.streaming.api.java.JavaDStream;
  10. import org.apache.spark.streaming.api.java.JavaPairInputDStream;
  11. import org.apache.spark.streaming.api.java.JavaStreamingContext;
  12. import org.apache.spark.streaming.kafka.KafkaUtils;
  13. import kafka.serializer.DefaultDecoder;
  14. import kafka.serializer.StringDecoder;
  15. import scala.Tuple2;
  16. public class SampleStream {
  17. private static void processStream() {
  18. SparkConf conf = new SparkConf().setAppName("sampleStream")
  19. .setMaster("local[3]")
  20. .set("spark.serializer", "org.apache.spark.serializer.JavaSerializer")
  21. .set("spark.driver.memory", "2g").set("spark.streaming.blockInterval", "1000")
  22. .set("spark.driver.allowMultipleContexts", "true")
  23. .set("spark.scheduler.mode", "FAIR");
  24. JavaStreamingContext jsc = new JavaStreamingContext(conf, new Duration(Long.parseLong("2000")));
  25. String[] topics = "sample".split(",");
  26. Set<String> topicSet = new HashSet<String>(Arrays.asList(topics));
  27. Map<String, String> props = new HashMap<String, String>();
  28. props.put("metadata.broker.list", "ip-172-xx-xx-xxx:9092");
  29. props.put("kafka.consumer.id", "sample_con");
  30. props.put("group.id", "sample_group");
  31. props.put("zookeeper.connect", "ip-172-xx-xx-xxx:2181");
  32. props.put("zookeeper.connection.timeout.ms", "16000");
  33. JavaPairInputDStream<String, byte[]> kafkaStream =
  34. KafkaUtils.createDirectStream(jsc, String.class, byte[].class, StringDecoder.class,
  35. DefaultDecoder.class, props, topicSet);
  36. JavaDStream<String> data = kafkaStream.map(new Function<Tuple2<String,byte[]>, String>() {
  37. public String call(Tuple2<String, byte[]> arg0) throws Exception {
  38. System.out.println("$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$ value is: " + arg0._2().toString());
  39. return arg0._2().toString();
  40. }
  41. });
  42. data.print();
  43. System.out.println("Spark Streaming started....");
  44. jsc.checkpoint("/home/spark/sparkChkPoint");
  45. jsc.start();
  46. jsc.awaitTermination();
  47. System.out.println("Stopped Spark Streaming");
  48. }
  49. public static void main(String[] args) {
  50. processStream();
  51. }
  52. }
guykilcj

guykilcj1#

我认为你的代码是对的,但是执行它的命令行是不正确的。
你呢 spark-submit 应用程序如下(格式化是我的+ spark.executor.extraJavaOptions 为简单起见,已删除):

  1. ./bin/spark-submit \
  2. --jars /home/ubuntu/zifferlabs/target/ZifferLabs-1-jar-with-dependencies.jar \
  3. --class "com.zifferlabs.stream.SampleStream" \
  4. /home/ubuntu/zifferlabs/src/main/java/com/zifferlabs/stream/SampleStream.java

我想从那以后就不行了 spark-submit 提交java源代码而不是可执行代码。
拜托 spark-submit 您的申请如下:

  1. ./bin/spark-submit \
  2. --class "com.zifferlabs.stream.SampleStream" \
  3. /home/ubuntu/zifferlabs/target/ZifferLabs-1-jar-with-dependencies.jar

哪个是 --class 定义spark应用程序的“入口点”和具有依赖关系的代码(作为 spark-submit ).
给它一个机会,并报告回来!

展开查看全部

相关问题