Spark流套接字流示例不工作

oyjwcjzk  于 2021-07-13  发布在  Spark
关注(0)|答案(1)|浏览(429)

我正在尝试使用spark流媒体,但我被困在第一个示例:

  1. import java.util.Arrays;
  2. import org.apache.spark.*;
  3. import org.apache.spark.api.java.function.*;
  4. import org.apache.spark.streaming.*;
  5. import org.apache.spark.streaming.api.java.*;
  6. import scala.Tuple2;
  7. public class NetworkWordCount {
  8. public static void main(String[] args) {
  9. // Create a local StreamingContext with two working thread and batch interval of 1 second
  10. SparkConf conf = new SparkConf().setMaster("local[4]").setAppName("NetworkWordCount");
  11. JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(1));
  12. JavaReceiverInputDStream<String> lines = jssc.socketTextStream("localhost", 9999);
  13. JavaDStream<String> words = lines.flatMap(
  14. new FlatMapFunction<String, String>() {
  15. @Override public Iterable<String> call(String x) {
  16. return Arrays.asList(x.split(" "));
  17. }
  18. });
  19. // Count each word in each batch
  20. JavaPairDStream<String, Integer> pairs = words.mapToPair(
  21. new PairFunction<String, String, Integer>() {
  22. @Override public Tuple2<String, Integer> call(String s) {
  23. return new Tuple2<String, Integer>(s, 1);
  24. }
  25. });
  26. JavaPairDStream<String, Integer> wordCounts = pairs.reduceByKey(
  27. new Function2<Integer, Integer, Integer>() {
  28. @Override public Integer call(Integer i1, Integer i2) {
  29. return i1 + i2;
  30. }
  31. });
  32. // Print the first ten elements of each RDD generated in this DStream to the console
  33. wordCounts.print();
  34. jssc.start(); // Start the computation
  35. jssc.awaitTermination();
  36. }
  37. }

这段代码实际上是文档的副本。
https://spark.apache.org/docs/1.6.3/streaming-programming-guide.html
我设置了一个netcat服务器,如:
北卡罗来纳州-lk 9999
以及obne nc客户端,如:
nc本地主机9999
我在其中键入如下句子:
你好,世界!世界万岁\n
在netcat服务器上正确显示。
但它不起作用。每一批我都有一张空印。

  1. 21/02/16 00:36:41 INFO SocketInputDStream: Removing blocks of RDD BlockRDD[137] at socketTextStream at NetworkWordCount.java:17 of time 1613432201000 ms
  2. 21/02/16 00:36:41 INFO ReceivedBlockTracker: Deleting batches ArrayBuffer(1613432199000 ms)
  3. 21/02/16 00:36:41 INFO InputInfoTracker: remove old batch metadata: 1613432199000 ms
  4. -------------------------------------------
  5. Time: 1613432201000 ms
  6. -------------------------------------------

我尝试了不同的价值观 setMaster 例如 local[4] , local[2] 以及 local[*] 但结果是一样的。
另外,如果我在netcat客户端之前运行spark流代码,我甚至看不到nc服务器上的字符串。

44u64gxh

44u64gxh1#

我找到了解决我问题的办法。
简而言之,您需要将消息直接写入运行tcp服务器的终端,而不需要另一个netcat客户端。
没有错误或缺少配置这只是对netcat工作方式的误解。
我从电话里听懂了 man nc-k 选项允许netcat管理多个连接,但部分是错误的。

  1. -k When a connection is completed, listen for another one. Requires -l. When used together with the -u option, the server socket is not connected and it can receive UDP datagrams from multiple
  2. hosts.

但这并不是我想的那样。如果使用 -k 选项,则它将接受多个连接,但仍将一次处理一个连接。
这意味着,如果您有两个nc客户端,并且如果您在两个客户端中都键入了一些文本,那么在您关闭第一个连接之前,服务器将只接收其中一个客户端的文本。

相关问题