我正在尝试将我的应用程序升级到spark 3.0.1。对于表的创建,我使用cassandra驱动程序python cassandra连接器拖放并创建一个表。然后我使用spark cassandra连接器将Dataframe写入表中。仅仅使用spark-cassandra连接器来创建和删除表并不是一个好的选择。
对于spark 2.4,drop create write流没有问题。但是在Spark3.0中,应用程序似乎没有按照特定的顺序来做这些事情,通常是在删除和创建之前先写。我不知道如何确保先删除和创建表。我知道即使应用程序在写的时候出错,删除和创建也会发生,因为当我通过cqlsh查询cassandra时,我可以看到表被删除和重新创建。对spark 3.0中的这种行为有什么想法吗?
注意:由于模式发生了变化,需要删除并重新创建这个表,而不是直接覆盖。
请求的代码段:
session = self._get_python_cassandra_session(self.env_conf, self.database)
# build drop table query
drop_table_query = 'DROP TABLE IF EXISTS {}.{}'.format(self.database, tablename)
session.execute(drop_table_query)
df, table_columns, table_keys = self._create_table_metadata(df, keys=keys)
# build create query
create_table_query = 'CREATE TABLE IF NOT EXISTS {}.{} ({} PRIMARY KEY({}), );'.format(self.database, tablename, table_columns, table_keys)
# execute table creation
session.execute(create_table_query)
session.shutdown()
# spark-cassandra connection options
copts = _cassandra_cluster_spark_options(self.env_conf)
# set write mode
copts['confirm.truncate'] = overwrite
mode = 'overwrite' if overwrite else 'append'
# write dataframe to cassandra
get_dataframe_writer(df, 'cassandra', keyspace=self.database,
table=tablename, mode=mode, copts=copts).save()
2条答案
按热度按时间fzsnzjdm1#
我最终建立了一个time.sleep(5)延迟和100秒超时,周期性地ping cassandra查找表,然后写入是否找到了表。
qgzx9mmu2#
在spark cassandra connector 3.0+中,您可以使用新功能—通过catalogs api操纵键空间和表。您可以使用sparksql创建/更改/删除键空间和表。例如,可以使用以下命令在cassandra中创建表:
正如您在这里看到的,您可以指定非常复杂的主键,还可以指定表选项。这个
casscatalog
piece是一个前缀,链接到特定的cassandra集群(您可以同时使用多个)-它是在启动spark作业时指定的,例如:更多的例子可以在文档中找到: