我使用的是spark 3.0的结构化流媒体。
我要做的是将数据写入多个接收器。我需要在kafka中编写一些Dataframe,以便在另一个进程中使用,还需要将同一个Dataframe存储在cassandra中,以便以后使用(一些 Jmeter 板等)。
对于目标过程,我编写了如下代码。我从这里提到了官方文件。
merged_stream.writeStream
//.trigger(Trigger.ProcessingTime("3 seconds"))
.foreachBatch((batchDF: DataFrame, batchId: Long) => {
batchDF.persist()
batchDF.write
.format("kafka")
.option("kafka.bootstrap.servers", brokers)
.option("kafka.compression.type", sinkCompressionType)
.option("topic", mergeTopic)
.mode("append")
.save()
batchDF.write
.format("org.apache.spark.sql.cassandra")
.cassandraFormat(cassandraTable, cassandraKeyspace, cassandraCluster)
.mode("append")
.save()
batchDF.unpersist() //**this is the problem!!**//
})
.option("checkpointLocation", checkpointDir)
.start()
.awaitTermination()
然而,每当我写 batchDF.unpersist()
在foreachbatch的最后一部分,出现编译错误:
[error] (function: org.apache.spark.api.java.function.VoidFunction2[org.apache.spark.sql.Dataset[org.apache.spark.sql.Row],java.lang.Long])org.apache.spark.sql.streaming.DataStreamWriter[org.apache.spark.sql.Row] <and>
[error] (function: (org.apache.spark.sql.Dataset[org.apache.spark.sql.Row], scala.Long) => Unit)org.apache.spark.sql.streaming.DataStreamWriter[org.apache.spark.sql.Row]
[error] cannot be applied to ((org.apache.spark.sql.DataFrame, scala.Long) => org.apache.spark.sql.DataFrame)
[error] .foreachBatch({(batchDF: DataFrame, batchId: Long) => {
[error] ^
[error] one error found
[error] (Compile / compileIncremental) Compilation failed
当我移除 batchDF.unpersist()
,它工作正常,我检查了Kafka和Cassandra的数据。然而,很明显,它很快就出现了内存不足错误,因为缓存的数据仍保留在内存中。
我也试着用 sparkSession.catalog.clearCache()
,但它似乎没有按我的意图工作。
既然我的代码与文档完全相同,为什么会发生此错误?还有,我该怎么修?
提前谢谢。
1条答案
按热度按时间7fhtutme1#
spark为scala和java提供了两种不同的方法,因为scala在Scala2.12之前不生成JavaLambda。
这是为了方便java用户,但是一旦spark开始支持Scala2.12,这些方法就会互相冲突。
spark社区进行了相关的讨论,但该决定似乎是为了保持api的兼容性。也就是说,不幸的是,您需要在两个方法之间“严格”匹配其中一个签名,例如add
Unit
在lambda的末尾。