pyspark和cassandra

x759pob2  于 2021-05-27  发布在  Spark
关注(0)|答案(1)|浏览(494)

我很困惑如何使用pyspark和cassandra连接器。一些帖子说,这只是一个使用pyspark的sparkcontext和sparksession的问题,其他帖子说这些都不起作用,我必须使用pyspark cassandra。有人能告诉我什么是正确的方式连接datastax远程Cassandra数据库与Pypark?
这就是我想做的:
使用secure bundle.zip(astra datastax)将pyspark连接到远程datastax数据库
从数据库中检索我的信息,将其用于机器学习包,将数据检索到pythonDataframe或其他东西中。
这就是我想做的,但是我看到了这么多的帖子,没有一篇是完全有效的,我不想直接使用pyspark shell,如果可能的话,我想在一些代码编辑器中用python代码完成所有的工作,我的意思是,不在spark终端中。
谢谢

o2g1uqev

o2g1uqev1#

当人们提到 pyspark-cassandra -他们提到它主要是因为它公开了spark cassandra connector(scc)的rdd部分,而scc本身没有公开这个部分(对于python,它只公开了dataframeapi)。
如何将scc与astra结合使用在scc2.5.0发布公告博客和文档中有很好的描述。使用以下命令启动pyspark(可以指定用户名、密码和其他参数,除了 --packages 在代码中,不需要在命令行中):

pyspark --packages com.datastax.spark:spark-cassandra-connector_2.11:2.5.1\ 
  --files path_to/secure-connect-test.zip \
  --conf spark.cassandra.connection.config.cloud.path=secure-connect-test.zip \
  --conf spark.cassandra.auth.username=UserName \
  --conf spark.cassandra.auth.password=Password \
  --conf spark.dse.continuousPagingEnabled=false

请注意禁用连续分页的标志-这是必需的,正如本文所描述的。
进程启动后,只需执行读取、转换和写入数据的spark命令:

>>> from pyspark.sql.functions import col

# read data

>>> data = park.read.format("org.apache.spark.sql.cassandra")\
   .options(table="t2", keyspace="test").load()
>>> data.count()
5
>>> data.show(5, truncate = False)
+---+-----------------------+
|id |tm                     |
+---+-----------------------+
|4  |2020-06-23 10:37:25.825|
|3  |2020-06-23 10:37:25.754|
|5  |2020-06-23 10:37:25.852|
|1  |2020-06-23 10:37:25.701|
|2  |2020-06-23 10:37:25.726|
+---+-----------------------+

# generate new data frame

>>> data2 = data.select((col("id") + 10).alias("id"), col("tm"))
>>> data2.show()
+---+--------------------+
| id|                  tm|
+---+--------------------+
| 13|2020-06-23 10:37:...|
| 14|2020-06-23 10:37:...|
| 15|2020-06-23 10:37:...|
| 11|2020-06-23 10:37:...|
| 12|2020-06-23 10:37:...|
+---+--------------------+

# write the data

>>> data2.write.format("org.apache.spark.sql.cassandra")\
  .options(table="t2", keyspace="test").mode("append").save()

# check that data is written

>>> spark.read.format("org.apache.spark.sql.cassandra")\
  .options(table="t2", keyspace="test").load().count()
10

相关问题