在检查了所有源代码后发现,cassandra连接器支持在cassandra中用scala和java中的rdd自动创建表。对于pyspark,可以使用另一个包来执行此任务--https://github.com/anguenot/pyspark-cassandra. 但即使使用这个包也无法自动创建表。对于dataframe,我没有找到任何选择。我是pyspark和cassandra的新手,非常感谢您的帮助。也尝试只使用anguenot包作为依赖项。spark版本:2.4.7Cassandra:最新docker图片
Pyspark shell >> pyspark --packages anguenot/pyspark-cassandra:2.4.0,com.datastax.spark:spark-cassandra-connector_2.11:2.5.1
>>> spark = SparkSession.builder.master('local[*]').appName('cassandra').config("spark.cassandra.connection.host", "ip").config("spark.cassandra.connection.port", "port").config("spark.cassandra.auth.username", "username").config("spark.cassandra.auth.password", "password").getOrCreate()
>>> from datetime import datetime
>>> rdd = sc.parallelize([{
... "key": k,
... "stamp": datetime.now(),
... "tags": ["a", "b", "c"],
... "options": {
... "foo": "bar",
... "baz": "qux",
... }
... } for k in ["x", "y", "z"]])
>>> rdd.saveToCassandra("test", "testTable")
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
AttributeError: 'RDD' object has no attribute 'saveToCassandra'
2条答案
按热度按时间mutmk8jj1#
通常,可以从spark cassandra connector为RDD(saveAsPassandraTable或saveAsPassandraTableEx)或dataframes(createcassandratable和createcassandratableex)创建表,但此功能仅在scala api中可用。
由于版本3.0,spark cassandra connector支持catalogs api(spark 3+),因此您可以使用spark sql使用keyspace&tables(创建/更改/删除),如下所示:
vfwfrxfs2#
你应该导入
pyspark_cassandra
创建rdd之前:看到了吗https://github.com/anguenot/pyspark-cassandra#examples.