pyspark结构化流数据写入cassandra而不是填充数据

suzh9iv8  于 2021-06-14  发布在  Cassandra
关注(0)|答案(1)|浏览(362)

我想把spark结构化流数据写入cassandra。我的spark版本是2.4.0。
我来自kafka的输入源带有json,因此在写入控制台时,这是可以的,但是当我在cqlsh cassandra中查询时,没有记录附加到表中。你能告诉我怎么了吗?

  1. schema = StructType() \
  2. .add("humidity", IntegerType(), True) \
  3. .add("time", TimestampType(), True) \
  4. .add("temperature", IntegerType(), True) \
  5. .add("ph", IntegerType(), True) \
  6. .add("sensor", StringType(), True) \
  7. .add("id", StringType(), True)
  8. def writeToCassandra(writeDF, epochId):
  9. writeDF.write \
  10. .format("org.apache.spark.sql.cassandra") \
  11. .mode('append') \
  12. .options("spark.cassandra.connection.host", "cassnode1, cassnode2") \
  13. .options(table="sensor", keyspace="sensordb") \
  14. .save()
  15. # Load json format to dataframe
  16. df = spark \
  17. .readStream \
  18. .format("kafka") \
  19. .option("kafka.bootstrap.servers", "kafkanode") \
  20. .option("subscribe", "iot-data-sensor") \
  21. .load() \
  22. .select([
  23. get_json_object(col("value").cast("string"), "$.{}".format(c)).alias(c)
  24. for c in ["humidity", "time", "temperature", "ph", "sensor", "id"]])
  25. df.writeStream \
  26. .foreachBatch(writeToCassandra) \
  27. .outputMode("update") \
  28. .start()
oknwwptz

oknwwptz1#

我在Pypark也有同样的问题。尝试以下步骤
首先,验证它是否连接到cassandra。您可以指向一个不可用的表,查看它是否因为“找不到表”而失败
尝试如下所示的writestream(在调用cassandra update之前包括触发器和输出模式) df.writeStream \ .trigger(processingTime="10 seconds") \ .outputMode("update") \ .foreachBatch(writeToCassandra) \

相关问题