如何使用scala解决spark中socket流的错误microbatchexecution?

5lhxktic  于 2021-05-27  发布在  Spark
关注(0)|答案(0)|浏览(324)

我已经成功使用Spark流如下。

  1. import org.apache.spark.streaming._
  2. import org.apache.spark._
  3. import org.apache.spark.streaming.StreamingContext._
  4. val conf = new SparkConf().setMaster(“local[2]”).setAppName(“NetworkWordCount”)
  5. val ssc = new StreamingContext(sc, Seconds(30))
  6. val lines = ssc.socketTextStream("localhost", 8888)
  7. val words = lines.flatMap(_.split(" "))
  8. val pairs = words.map(word => 1))
  9. val wordCounts = pairs.reduceByKey(_ + _)
  10. wordCounts.print()
  11. scc.start()
  12. nc -lvp 8888

我想试着用socket接收的数据进行字数统计,如下所示。

  1. import org.apache.spark.sql.functions._
  2. import org.apache.spark.sql.SparkSession
  3. import spark.implicits._
  4. val spark = SparkSession.builder.appName("StructuredNetworkWordCount").getOrCreate()
  5. spark.conf.set("spark.sql.shuffle.partitions", 5)
  6. val lines = spark.readStream.format("socket").option("host", "localhost").option("port", 9999).load()
  7. val words = lines.as[String].flatMap(_.split(" "))
  8. val wordCounts = words.groupBy("value").count()
  9. val query = wordCounts.writeStream.outputMode("complete").format("console").start()

但我犯了这样的错误。

  1. ERROR MicroBatchExecution: Query [id = 36c54725-eeb5-415e-829a-e73c079d47d4, runId = 048f9d74-6456-4b01-b058-a4bd8e105b91] terminated with error
  2. java.net.ConnectException: Connection refused (Connection refused)
  3. at java.net.PlainSocketImpl.socketConnect(Native Method)
  4. at
  5. java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350)
  6. at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206)
  7. at
  8. java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188)
  9. at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
  10. at java.net.Socket.connect(Socket.java:607)
  11. at java.net.Socket.connect(Socket.java:556)
  12. at java.net.Socket.<init>(Socket.java:452)
  13. at java.net.Socket.<init>(Socket.java:229)

流媒体下的两种方式有什么问题和区别?谢谢您!

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题