apache-kafka 使用增量表从Kafka启动读取流

uqdfh47h  于 2022-11-01  发布在  Apache
关注(0)|答案(1)|浏览(160)

我尝试使用spark streaming/python从Kafka主题中读取流,我可以读取消息并将其转储到一个带有默认Kafka消息模式的青铜表中,但我无法将键和值从二进制转换为字符串,我尝试了以下方法,但都不起作用:
方法1:

  1. raw_kafka_events = (spark.readStream
  2. .format("kafka")
  3. .option("subscribe", TOPIC)
  4. .option("kafka.bootstrap.servers", KAFKA_BROKER)
  5. .option("startingOffsets", "earliest")
  6. .option("kafka.security.protocol", "SSL") \
  7. .option("kafka.ssl.truststore.location", SSL_TRUST_STORE_FILE_LOCATION) \
  8. .option("kafka.ssl.keystore.location", SSL_KEY_STORE_FILE_LOCATION) \
  9. .option("kafka.ssl.keystore.password", SSL_KEY_STORE_PASSWORD) \
  10. .option("kafka.ssl.truststore.password", SSL_TRUST_STORE_PASSWORD) \
  11. .option("kafka.ssl.key.password", SSL_KEY_PASSWORD) \
  12. .option("kafka.ssl.keystore.type", "JKS") \
  13. .option("kafka.ssl.truststore.type", "JKS") \
  14. .option("failOnDataLoss", "false") \
  15. .load()).selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
  16. @dlt.table(
  17. comment="the raw message from kafa topic",
  18. table_properties={"pipelines.reset.allowed":"false"}
  19. )
  20. def kafka_bronze():
  21. return raw_kafka_events

错误:

  1. Failed to merge fields 'key' and 'key'. Failed to merge incompatible data types BinaryType and StringType

方法2:

  1. raw_kafka_events = (spark.readStream
  2. .format("kafka")
  3. .option("subscribe", TOPIC)
  4. .option("kafka.bootstrap.servers", KAFKA_BROKER)
  5. .option("startingOffsets", "earliest")
  6. .option("kafka.security.protocol", "SSL") \
  7. .option("kafka.ssl.truststore.location", SSL_TRUST_STORE_FILE_LOCATION) \
  8. .option("kafka.ssl.keystore.location", SSL_KEY_STORE_FILE_LOCATION) \
  9. .option("kafka.ssl.keystore.password", SSL_KEY_STORE_PASSWORD) \
  10. .option("kafka.ssl.truststore.password", SSL_TRUST_STORE_PASSWORD) \
  11. .option("kafka.ssl.key.password", SSL_KEY_PASSWORD) \
  12. .option("kafka.ssl.keystore.type", "JKS") \
  13. .option("kafka.ssl.truststore.type", "JKS") \
  14. .option("failOnDataLoss", "false") \
  15. .load())
  16. raw_kafka_events.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
  17. @dlt.table(
  18. comment="the raw message from kafa topic",
  19. table_properties={"pipelines.reset.allowed":"false"}
  20. )
  21. def kafka_bronze():
  22. return raw_kafka_events

没有错误消息,但后来当我检查kafka_bronze表时,它显示列键和值仍然是二进制格式
方法3:添加了kafka_银表:

  1. raw_kafka_events = (spark.readStream
  2. .format("kafka")
  3. .option("subscribe", TOPIC)
  4. .option("kafka.bootstrap.servers", KAFKA_BROKER)
  5. .option("startingOffsets", "earliest")
  6. .option("kafka.security.protocol", "SSL") \
  7. .option("kafka.ssl.truststore.location", SSL_TRUST_STORE_FILE_LOCATION) \
  8. .option("kafka.ssl.keystore.location", SSL_KEY_STORE_FILE_LOCATION) \
  9. .option("kafka.ssl.keystore.password", SSL_KEY_STORE_PASSWORD) \
  10. .option("kafka.ssl.truststore.password", SSL_TRUST_STORE_PASSWORD) \
  11. .option("kafka.ssl.key.password", SSL_KEY_PASSWORD) \
  12. .option("kafka.ssl.keystore.type", "JKS") \
  13. .option("kafka.ssl.truststore.type", "JKS") \
  14. .option("failOnDataLoss", "false") \
  15. .load())
  16. @dlt.table(
  17. comment="the raw message from kafa topic",
  18. table_properties={"pipelines.reset.allowed":"false"}
  19. )
  20. def kafka_bronze():
  21. return raw_kafka_events
  22. @dlt.table(comment="real schema for kafka payload",
  23. temporary=False)
  24. def kafka_silver():
  25. return (
  26. # kafka streams are (timestamp,value)
  27. # value contains the kafka payload
  28. dlt.read_stream("kafka_bronze")
  29. .select(col("key").cast("string"))
  30. .select(col("value").cast("string"))
  31. )

错误:

  1. Column 'value' does not exist.

在阅读了Kafka主题之后,我如何将键/值转换为字符串呢?我更愿意将字符串值的键/值转储到青铜表,但这是不可能的,我也可以将它们转储到银表

knsnq2tg

knsnq2tg1#

首先,建议在函数内部定义raw_kafka_events变量,这样它将是该函数的本地变量。
在第二种方法中,你的问题是你只是执行raw_kafka_events.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)"),而没有将它赋给变量,如下所示:raw_kafka_events = raw_kafka_events.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") .
第二个问题是,当你使用CAST(key AS STRING)这样的表达式时,字段会得到一个新的名称,与这个表达式匹配。将它改为CAST(key AS STRING) as keyCAST(value AS STRING) as value-这应该可以解决第一个问题。
在第二种方法中,您有一个链接的select语句:

  1. def kafka_silver():
  2. return (
  3. # kafka streams are (timestamp,value)
  4. # value contains the kafka payload
  5. dlt.read_stream("kafka_bronze")
  6. .select(col("key").cast("string"))
  7. .select(col("value").cast("string"))
  8. )

但是在第一次选择之后,您将得到一个只有一列的 Dataframe -key。您需要将代码更改为:

  1. dlt.read_stream("kafka_bronze") \
  2. .select(col("key").cast("string").alias("key"),
  3. col("value").cast("string").alias("value"))
展开查看全部

相关问题