我使用的是spark sql 2.4.x版本,datastax spark cassandra connector for cassandra-3.x版本。还有Kafka。
我有一个来自Kafka主题的财务数据的场景。如公司ID、年度、季度、销售额、上一个销售数据。
val kafkaDf = sc.parallelize(Seq((15,2016, 4, 100.5,"")).toDF("companyId", "year","quarter", "sales","prev_sales")
我需要用Cassandra表格上的上一年同一季度的数据来预测销售额,如下所示
val cassandraTabledf = sc.parallelize(Seq(
(15,2016, 3, 120.6, 320.6),
(15,2016, 2, 450.2,650.2),
(15,2016, 1, 200.7,700.7),
(15,2015, 4, 221.4,400),
(15,2015, 3, 320.6,300),
(15,2015, 2, 650.2,200),
(15,2015, 1, 700.7,100))).toDF("companyId", "year","quarter", "sales","prev_sales")
i、 e.对于seq((152016,4100.5,“”)数据,应为2015年第四季度数据,即221.4
所以新的数据
(15,2016, 4, 100.5,221.4)
如何做到这一点?我们可以显式地进行查询,但是有没有办法在cassandra表上使用join来使用“lag”函数呢?
1条答案
按热度按时间3okqufwl1#
我不认为这需要什么
leg
以及lead
功能。你可以通过join
我也是。检查以下代码以供参考:注意:我在中添加了更多数据
kafkaDF
更多了解。