我对scala和databricks流媒体都是新手。我正在将流式事件读入一个Dataframe,我想使用if else语句根据Dataframe是否为空来触发另一个笔记本。下面的简单代码(及其变体)
if(finalDF.isEmpty){
print("0")
}
else{
print("1")
}
持续导致以下错误
AnalysisException: Queries with streaming sources must be executed with writeStream.start();;
eventhubs
如何将writestream.start()合并到上述代码中?或者,如果Dataframe是由流式事件填充的,我如何评估Dataframe内容并在此基础上执行一个或另一个操作?
2条答案
按热度按时间4smxwvx51#
流df不能是空的或者不是设计的-流是无限的,如果你现在没有数据,那么你可以在下一秒得到一些新的东西。所以你的代码不起作用。
您可以使用foreachbatch来处理数据的“当前”快照,这样您就可以像处理“正常”的非流式Dataframe一样处理这些快照,但是您可能无法从笔记本内部触发笔记本,因此这两种情况的代码应该在同一个函数中,而不是在不同的笔记本中。
1sbrub3j2#
我测试了这段代码,它是一种引入if-else并根据事件内容决定操作的方法。