cassandra Spark从一个 Dataframe 创建多个 Dataframe

jum4pzuy  于 2022-11-05  发布在  Cassandra
关注(0)|答案(1)|浏览(192)

我使用Spark 2.1和Cassandra(3.9)作为数据源。C* 有一个包含50列的大表,这对于我的用例来说不是一个好的数据模型。所以我为每个传感器创建了拆分表,沿着分区键和集群键列。

All sensor table
-----------------------------------------------------
| Device |   Time     | Sensor1 | Sensor2 | Sensor3 |
|  dev1  | 1507436000 |  50.3   |    1    |    1    |
|  dev2  | 1507436100 |  90.2   |    0    |    1    |
|  dev1  | 1507436100 |  28.1   |    1    |    1    |
-----------------------------------------------------
Sensor1 table
-------------------------------
| Device |   Time     | value |
|  dev1  | 1507436000 | 50.3  |
|  dev2  | 1507436100 | 90.2  |
|  dev1  | 1507436100 | 28.1  |
-------------------------------

现在我正在使用spark将数据从旧表复制到新表。

df = spark.read\
    .format("org.apache.spark.sql.cassandra")\
    .options(table="allsensortables", keyspace="dataks")\
    .load().cache()
df.createOrReplaceTempView("data")
query = ('''select device,time,sensor1 as value from data  ''' )
vgDF = spark.sql(query)
vgDF.write\
    .format("org.apache.spark.sql.cassandra")\
    .mode('append')\
    .options(table="sensor1", keyspace="dataks")\
    .save()

一个接一个地复制数据对于一个表来说要花很多时间(2.1)小时。有没有什么方法可以select *并为每个传感器创建多个df,然后一次保存?(或者甚至顺序保存)。

bnl4lu3b

bnl4lu3b1#

代码中的一个问题是该高速缓存

df = spark.read\
.format("org.apache.spark.sql.cassandra")\
.options(table="allsensortables", keyspace="dataks")\
.load().cache()

在这里,除了保存之外,我看不到df是如何被多次使用的。因此,这里的cache是反生产的。你阅读数据,过滤它,并将它保存到一个单独的cassandra表中。现在,在 Dataframe 上发生的唯一操作是save,没有其他操作。
所以在这里缓存数据没有任何好处。删除该高速缓存会给予你一些速度提升。
要按顺序创建多个表,我建议使用partitionBy,首先将数据写入HDFS,作为传感器的分区数据,然后再将其写回cassandra。

相关问题