spark 2.3流连接丢失左表键

h4cxqtbf  于 2021-07-12  发布在  Spark
关注(0)|答案(1)|浏览(345)

我正在尝试用spark 2.3.0实现一个流连接玩具
当条件匹配时,流连接可以正常工作,但当条件不匹配时,即使使用leftouterjoin,也会丢失left stream值。
提前谢谢
这是我的源代码和数据,基本上,我正在创建两个套接字,一个是9999作为右流源,另一个是9998作为左流源。

val spark = SparkSession
      .builder
      .appName("StreamStream")
      .master("local")
      .getOrCreate()

    import spark.implicits._

    spark.sparkContext.setLogLevel("ERROR")

    val s9999: DataFrame = spark
      .readStream
      .format("socket")
      .option("host", "localhost")
      .option("port", 9999)
      .load()

    val s9999Dataset: Dataset[S9999] = s9999
      .map(line => {
        val strings = line.get(0).toString.split(",")
        val id = strings(0).toInt
        val time = Timestamp.valueOf(strings(1))
        S9999(id, time)
      })
      .withWatermark("timestamp99", "30 seconds")

    val s9998Dataset: Dataset[S9998] = spark
      .readStream
      .format("socket")
      .option("host", "localhost")
      .option("port", 9998)
      .load()
      .map(line => {
        val strings = line.get(0).toString.split(",")
        val id = strings(0).toInt
        val time = Timestamp.valueOf(strings(1))
        S9998(id, time)
      })

    val resultDataset = s9998Dataset
      .join(s9999Dataset,
        joinExprs = expr(
          """
                id99 = id98 AND
                timestamp98 >= timestamp99 AND
                timestamp98 <= timestamp99 + interval 6 seconds
        """),
        joinType = "leftOuter")

    val streamingQuery = resultDataset
      .writeStream
      .outputMode("append")
      .format("console")
      .start()

    streamingQuery.awaitTermination()
  }

  case class S9999(id99: Int, timestamp99: Timestamp)

  case class S9998(id98: Int, timestamp98: Timestamp)

样本数据:
左插座:

1,2011-10-02 18:50:20.123
2,2011-10-02 18:50:25.123
3,2011-10-02 18:50:30.123
4,2011-10-02 18:50:35.123
5,2011-10-02 18:50:40.123
6,2011-10-02 18:50:45.123
7,2011-10-02 18:50:50.123
8,2011-10-02 18:50:55.123
9,2011-10-02 18:51:00.123
10,2011-10-02 18:51:05.123
11,2011-10-02 18:51:10.123
12,2011-10-02 18:51:15.123
13,2011-10-02 18:51:20.123
14,2011-10-02 18:51:25.123
15,2011-10-02 18:51:30.123

右流数据:

1,2011-10-02 18:50:20.123
3,2011-10-02 18:50:30.123
7,2011-10-02 18:50:50.123
8,2011-10-02 18:50:55.123
9,2011-10-02 18:51:00.123
13,2011-10-02 18:51:20.123
14,2011-10-02 18:51:25.123
15,2011-10-02 18:51:30.123
fv2wmkja

fv2wmkja1#

在这个问题上花了6个小时之后,我发现左边的可选水印实际上是强制性的

相关问题