我正在使用spark-sql-2.4.1v在poc中进行流式处理。
我有一个场景如下:dataset staticdf=//来自hdfs/cassandra表dataset streamingdf=//来自kafka主题的数据流
Dataset<Row> joinDs = streamingDs.join(staticDs, streamingDs.col("companyId").equalTo(staticDs.col("company_id"), "inner"));
即使这是工作良好,我有一个加入的时间问题。目前我的流跳跳虎时间约为10秒。这个连接在哪里运行了将近1分钟。所以我没有在预期的时间内得到结果。
我怎样才能让我的加入每10秒触发一次?
谢谢您。
1条答案
按热度按时间oug3syen1#
在您的情况下,要执行join spark,需要从cassandra读取所有数据,这很慢。正如我前面提到的,如果您想在数据集/Dataframe上执行高效连接,或者使用
joinWithCassandra
/leftJoinWithCassandra
来自rdd api。2020年9月更新:spark cassandra connector 2.5.0中添加了对在Dataframe中与cassandra连接的支持