我使用Spark 2.1和Cassandra(3.9)作为数据源。C* 有一个包含50列的大表,这对于我的用例来说不是一个好的数据模型。所以我为每个传感器创建了拆分表,沿着分区键和集群键列。
All sensor table
-----------------------------------------------------
| Device | Time | Sensor1 | Sensor2 | Sensor3 |
| dev1 | 1507436000 | 50.3 | 1 | 1 |
| dev2 | 1507436100 | 90.2 | 0 | 1 |
| dev1 | 1507436100 | 28.1 | 1 | 1 |
-----------------------------------------------------
Sensor1 table
-------------------------------
| Device | Time | value |
| dev1 | 1507436000 | 50.3 |
| dev2 | 1507436100 | 90.2 |
| dev1 | 1507436100 | 28.1 |
-------------------------------
现在我正在使用spark将数据从旧表复制到新表。
df = spark.read\
.format("org.apache.spark.sql.cassandra")\
.options(table="allsensortables", keyspace="dataks")\
.load().cache()
df.createOrReplaceTempView("data")
query = ('''select device,time,sensor1 as value from data ''' )
vgDF = spark.sql(query)
vgDF.write\
.format("org.apache.spark.sql.cassandra")\
.mode('append')\
.options(table="sensor1", keyspace="dataks")\
.save()
一个接一个地复制数据对于一个表来说要花很多时间(2.1)小时。有没有什么方法可以select *
并为每个传感器创建多个df,然后一次保存?(或者甚至顺序保存)。
1条答案
按热度按时间bnl4lu3b1#
代码中的一个问题是该高速缓存
在这里,除了保存之外,我看不到df是如何被多次使用的。因此,这里的cache是反生产的。你阅读数据,过滤它,并将它保存到一个单独的cassandra表中。现在,在 Dataframe 上发生的唯一操作是save,没有其他操作。
所以在这里缓存数据没有任何好处。删除该高速缓存会给予你一些速度提升。
要按顺序创建多个表,我建议使用partitionBy,首先将数据写入HDFS,作为传感器的分区数据,然后再将其写回cassandra。