我想把spark结构化流数据写入cassandra。我的spark版本是2.4.0。
我来自kafka的输入源带有json,因此在写入控制台时,这是可以的,但是当我在cqlsh cassandra中查询时,没有记录附加到表中。你能告诉我怎么了吗?
schema = StructType() \
.add("humidity", IntegerType(), True) \
.add("time", TimestampType(), True) \
.add("temperature", IntegerType(), True) \
.add("ph", IntegerType(), True) \
.add("sensor", StringType(), True) \
.add("id", StringType(), True)
def writeToCassandra(writeDF, epochId):
writeDF.write \
.format("org.apache.spark.sql.cassandra") \
.mode('append') \
.options("spark.cassandra.connection.host", "cassnode1, cassnode2") \
.options(table="sensor", keyspace="sensordb") \
.save()
# Load json format to dataframe
df = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "kafkanode") \
.option("subscribe", "iot-data-sensor") \
.load() \
.select([
get_json_object(col("value").cast("string"), "$.{}".format(c)).alias(c)
for c in ["humidity", "time", "temperature", "ph", "sensor", "id"]])
df.writeStream \
.foreachBatch(writeToCassandra) \
.outputMode("update") \
.start()
1条答案
按热度按时间oknwwptz1#
我在Pypark也有同样的问题。尝试以下步骤
首先,验证它是否连接到cassandra。您可以指向一个不可用的表,查看它是否因为“找不到表”而失败
尝试如下所示的writestream(在调用cassandra update之前包括触发器和输出模式)
df.writeStream \ .trigger(processingTime="10 seconds") \ .outputMode("update") \ .foreachBatch(writeToCassandra) \