如何在结构化流媒体中正确使用foreachbatch.batchdf.unpersist()((有错误)

g6ll5ycj  于 2021-05-27  发布在  Spark
关注(0)|答案(1)|浏览(509)

我使用的是spark 3.0的结构化流媒体。
我要做的是将数据写入多个接收器。我需要在kafka中编写一些Dataframe,以便在另一个进程中使用,还需要将同一个Dataframe存储在cassandra中,以便以后使用(一些 Jmeter 板等)。
对于目标过程,我编写了如下代码。我从这里提到了官方文件。

merged_stream.writeStream
      //.trigger(Trigger.ProcessingTime("3 seconds"))
      .foreachBatch((batchDF: DataFrame, batchId: Long) => {
        batchDF.persist()
        batchDF.write
          .format("kafka")
          .option("kafka.bootstrap.servers", brokers)
          .option("kafka.compression.type", sinkCompressionType)
          .option("topic", mergeTopic)
          .mode("append")
          .save()
        batchDF.write
          .format("org.apache.spark.sql.cassandra")
          .cassandraFormat(cassandraTable, cassandraKeyspace, cassandraCluster)
          .mode("append")
          .save()
        batchDF.unpersist() //**this is the problem!!**//
      })
      .option("checkpointLocation", checkpointDir)
      .start()
      .awaitTermination()

然而,每当我写 batchDF.unpersist() 在foreachbatch的最后一部分,出现编译错误:

[error]   (function: org.apache.spark.api.java.function.VoidFunction2[org.apache.spark.sql.Dataset[org.apache.spark.sql.Row],java.lang.Long])org.apache.spark.sql.streaming.DataStreamWriter[org.apache.spark.sql.Row] <and>
[error]   (function: (org.apache.spark.sql.Dataset[org.apache.spark.sql.Row], scala.Long) => Unit)org.apache.spark.sql.streaming.DataStreamWriter[org.apache.spark.sql.Row]
[error]  cannot be applied to ((org.apache.spark.sql.DataFrame, scala.Long) => org.apache.spark.sql.DataFrame)
[error]       .foreachBatch({(batchDF: DataFrame, batchId: Long) => {
[error]        ^
[error] one error found
[error] (Compile / compileIncremental) Compilation failed

当我移除 batchDF.unpersist() ,它工作正常,我检查了Kafka和Cassandra的数据。然而,很明显,它很快就出现了内存不足错误,因为缓存的数据仍保留在内存中。
我也试着用 sparkSession.catalog.clearCache() ,但它似乎没有按我的意图工作。
既然我的代码与文档完全相同,为什么会发生此错误?还有,我该怎么修?
提前谢谢。

7fhtutme

7fhtutme1#

spark为scala和java提供了两种不同的方法,因为scala在Scala2.12之前不生成JavaLambda。

/**
   * Applies a function `f` to all rows.
   *
   * @group action
   * @since 1.6.0
   */
  def foreach(f: T => Unit): Unit = withNewRDDExecutionId {
    rdd.foreach(f)
  }

  /**
   * (Java-specific)
   * Runs `func` on each element of this Dataset.
   *
   * @group action
   * @since 1.6.0
   */
  def foreach(func: ForeachFunction[T]): Unit = foreach(func.call(_))

这是为了方便java用户,但是一旦spark开始支持Scala2.12,这些方法就会互相冲突。
spark社区进行了相关的讨论,但该决定似乎是为了保持api的兼容性。也就是说,不幸的是,您需要在两个方法之间“严格”匹配其中一个签名,例如add Unit 在lambda的末尾。

相关问题