如何在spark流媒体应用中使用滞后/超前功能?

u2nhd7ah  于 2021-06-10  发布在  Cassandra
关注(0)|答案(1)|浏览(401)

我使用的是spark sql 2.4.x版本,datastax spark cassandra connector for cassandra-3.x版本。还有Kafka。
我有一个来自Kafka主题的财务数据的场景。如公司ID、年度、季度、销售额、上一个销售数据。

val kafkaDf = sc.parallelize(Seq((15,2016, 4, 100.5,"")).toDF("companyId", "year","quarter", "sales","prev_sales")

我需要用Cassandra表格上的上一年同一季度的数据来预测销售额,如下所示

val cassandraTabledf = sc.parallelize(Seq(
  (15,2016, 3, 120.6, 320.6),
  (15,2016, 2, 450.2,650.2),
  (15,2016, 1, 200.7,700.7),
  (15,2015, 4, 221.4,400),
  (15,2015, 3, 320.6,300),
  (15,2015, 2, 650.2,200),
  (15,2015, 1, 700.7,100))).toDF("companyId", "year","quarter", "sales","prev_sales")

i、 e.对于seq((152016,4100.5,“”)数据,应为2015年第四季度数据,即221.4
所以新的数据
(15,2016, 4, 100.5,221.4)
如何做到这一点?我们可以显式地进行查询,但是有没有办法在cassandra表上使用join来使用“lag”函数呢?

3okqufwl

3okqufwl1#

我不认为这需要什么 leg 以及 lead 功能。你可以通过 join 我也是。检查以下代码以供参考:
注意:我在中添加了更多数据 kafkaDF 更多了解。

scala> kafkaDf.show(false)
+---------+----+-------+-----+----------+
|companyId|year|quarter|sales|prev_sales|
+---------+----+-------+-----+----------+
|15       |2016|4      |100.5|          |
|15       |2016|1      |115.8|          |
|15       |2016|3      |150.1|          |
+---------+----+-------+-----+----------+

scala> cassandraTabledf.show
+---------+----+-------+-----+----------+
|companyId|year|quarter|sales|prev_sales|
+---------+----+-------+-----+----------+
|       15|2016|      3|120.6|     320.6|
|       15|2016|      2|450.2|     650.2|
|       15|2016|      1|200.7|     700.7|
|       15|2015|      4|221.4|       400|
|       15|2015|      3|320.6|       300|
|       15|2015|      2|650.2|       200|
|       15|2015|      1|700.7|       100|
+---------+----+-------+-----+----------+

scala>kafkaDf.alias("k").join(
                              cassandraTabledf.alias("c"), 
                              col("k.companyId") === col("c.companyId") && 
                              col("k.quarter") === col("c.quarter") && 
                              (col("k.year") - 1) === col("c.year"),
                              "left"
                             )
                       .drop("prev_sales")
                       .select(col("k.*"), col("c.sales").alias("prev_sales"))
                       .withColumn("prev_sales", when(col("prev_sales").isNull, col("sales")).otherwise(col("prev_sales")))
                       .show()
+---------+----+-------+-----+----------+
|companyId|year|quarter|sales|prev_sales|
+---------+----+-------+-----+----------+
|       15|2016|      1|115.8|     700.7|
|       15|2016|      3|150.1|     320.6|
|       15|2016|      4|100.5|     221.4|
+---------+----+-------+-----+----------+

相关问题