Spark与Cassandra python设置

oo7oh9g9  于 2022-11-05  发布在  Cassandra
关注(0)|答案(3)|浏览(107)

我正在尝试使用spark对 cassandra 表做一些简单的计算,但我相当迷失。
我努力跟着:https://github.com/datastax/spark-cassandra-connector/blob/master/doc/15_python.md
所以我运行PySpark shell:与

./bin/pyspark \
  --packages com.datastax.spark:spark-cassandra-connector_2.11:2.0.0-M3

但是我不知道如何设置。我如何让Spark知道我的Cassandra集群在哪里?我已经看到CassandraSQLContext可以用于此,但我也读到它被弃用了。
我读过这篇文章:How to connect spark with cassandra using spark-cassandra-connector?
但如果我用

import com.datastax.spark.connector._

Python说它找不到这个模块,有人能给我指出正确的方向,告诉我如何正确设置吗?

3zwtqj6y

3zwtqj6y1#

1.复制pyspark-cassandra连接器Spark文件夹/jar。
1.下面的代码将连接到cassandra。

from pyspark import SparkContext, SparkConf
from pyspark.sql import SQLContext, SparkSession

spark = SparkSession.builder \
  .appName('SparkCassandraApp') \
  .config('spark.cassandra.connection.host', 'localhost') \
  .config('spark.cassandra.connection.port', '9042') \
  .config('spark.cassandra.output.consistency.level','ONE') \
  .master('local[2]') \
  .getOrCreate()

sqlContext = SQLContext(spark)
ds = sqlContext \
  .read \
  .format('org.apache.spark.sql.cassandra') \
  .options(table='emp', keyspace='demo') \
  .load()

ds.show(10)
ovfsdjhp

ovfsdjhp2#

Cassandra连接器不提供任何Python模块。所有功能都是由Data Source API提供的,只要有所需的jar,所有功能都可以开箱即用。
我怎么才能让星火知道我的 cassandra 星团在哪里?
使用spark.cassandra.connection.host属性。例如,您可以将其作为spark-submit/pyspark的参数传递:

pyspark ... --conf spark.cassandra.connection.host=x.y.z.v

或在配置中设置:

(SparkSession.builder
    .config("cassandra.connection.host", "x.y.z.v"))

可以直接在读取器上设置表名或密钥空间等配置:

(spark.read
    .format("org.apache.spark.sql.cassandra")
    .options(table="kv", keyspace="test", cluster="cluster")
    .load())

因此,您可以遵循Dataframes文档。
作为旁注

import com.datastax.spark.connector._

是一种Scala语法,在Python中只是偶然被接受。

egdjgwm8

egdjgwm83#

用户名和密码:

spark = SparkSession.builder \
  .appName('SparkCassandraApp') \
  .config('spark.cassandra.connection.host', 'localhost') \
  .config('spark.cassandra.connection.port', '9042') \
  .config("spark.cassandra.auth.username","cassandrauser")\
  .config("spark.cassandra.auth.password","cassandrapwd")\
  .master('local[2]') \
  .getOrCreate()

df = spark.read.format("org.apache.spark.sql.cassandra")\
   .options(table="tablename", keyspace="keyspacename").load()

df.show()

相关问题