pyspark rdd/dataframe没有在cassandra中自动创建表

pbossiut  于 2021-07-12  发布在  Spark
关注(0)|答案(2)|浏览(398)

在检查了所有源代码后发现,cassandra连接器支持在cassandra中用scala和java中的rdd自动创建表。对于pyspark,可以使用另一个包来执行此任务--https://github.com/anguenot/pyspark-cassandra. 但即使使用这个包也无法自动创建表。对于dataframe,我没有找到任何选择。我是pyspark和cassandra的新手,非常感谢您的帮助。也尝试只使用anguenot包作为依赖项。spark版本:2.4.7Cassandra:最新docker图片

Pyspark shell >> pyspark --packages anguenot/pyspark-cassandra:2.4.0,com.datastax.spark:spark-cassandra-connector_2.11:2.5.1
>>> spark = SparkSession.builder.master('local[*]').appName('cassandra').config("spark.cassandra.connection.host", "ip").config("spark.cassandra.connection.port", "port").config("spark.cassandra.auth.username", "username").config("spark.cassandra.auth.password", "password").getOrCreate()
>>> from datetime import datetime
>>> rdd = sc.parallelize([{
...     "key": k,
...     "stamp": datetime.now(),
...     "tags": ["a", "b", "c"],
...     "options": {
...             "foo": "bar",
...             "baz": "qux",
...     }
... } for k in ["x", "y", "z"]])

>>> rdd.saveToCassandra("test", "testTable")
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
AttributeError: 'RDD' object has no attribute 'saveToCassandra'
mutmk8jj

mutmk8jj1#

通常,可以从spark cassandra connector为RDD(saveAsPassandraTable或saveAsPassandraTableEx)或dataframes(createcassandratable和createcassandratableex)创建表,但此功能仅在scala api中可用。
由于版本3.0,spark cassandra connector支持catalogs api(spark 3+),因此您可以使用spark sql使用keyspace&tables(创建/更改/删除),如下所示:

spark.sql("""
CREATE TABLE casscatalog.ksname.testTable (
     key_1 Int, key_2 Int, key_3 Int, 
     cc1 STRING, cc2 String, cc3 String, value String) 
  USING cassandra
  PARTITIONED BY (key_1, key_2, key_3)
  TBLPROPERTIES (
    clustering_key='cc1.asc, cc2.desc, cc3.asc'
  )
""")
vfwfrxfs

vfwfrxfs2#

你应该导入 pyspark_cassandra 创建rdd之前:

>>> import pyspark_cassandra
>>> rdd = sc.parallelize(...)
>>> rdd.saveToCassandra("test", "testTable")

看到了吗https://github.com/anguenot/pyspark-cassandra#examples.

相关问题