我尝试使用spark streaming/python从Kafka主题中读取流,我可以读取消息并将其转储到一个带有默认Kafka消息模式的青铜表中,但我无法将键和值从二进制转换为字符串,我尝试了以下方法,但都不起作用:
方法1:
raw_kafka_events = (spark.readStream
.format("kafka")
.option("subscribe", TOPIC)
.option("kafka.bootstrap.servers", KAFKA_BROKER)
.option("startingOffsets", "earliest")
.option("kafka.security.protocol", "SSL") \
.option("kafka.ssl.truststore.location", SSL_TRUST_STORE_FILE_LOCATION) \
.option("kafka.ssl.keystore.location", SSL_KEY_STORE_FILE_LOCATION) \
.option("kafka.ssl.keystore.password", SSL_KEY_STORE_PASSWORD) \
.option("kafka.ssl.truststore.password", SSL_TRUST_STORE_PASSWORD) \
.option("kafka.ssl.key.password", SSL_KEY_PASSWORD) \
.option("kafka.ssl.keystore.type", "JKS") \
.option("kafka.ssl.truststore.type", "JKS") \
.option("failOnDataLoss", "false") \
.load()).selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
@dlt.table(
comment="the raw message from kafa topic",
table_properties={"pipelines.reset.allowed":"false"}
)
def kafka_bronze():
return raw_kafka_events
错误:
Failed to merge fields 'key' and 'key'. Failed to merge incompatible data types BinaryType and StringType
方法2:
raw_kafka_events = (spark.readStream
.format("kafka")
.option("subscribe", TOPIC)
.option("kafka.bootstrap.servers", KAFKA_BROKER)
.option("startingOffsets", "earliest")
.option("kafka.security.protocol", "SSL") \
.option("kafka.ssl.truststore.location", SSL_TRUST_STORE_FILE_LOCATION) \
.option("kafka.ssl.keystore.location", SSL_KEY_STORE_FILE_LOCATION) \
.option("kafka.ssl.keystore.password", SSL_KEY_STORE_PASSWORD) \
.option("kafka.ssl.truststore.password", SSL_TRUST_STORE_PASSWORD) \
.option("kafka.ssl.key.password", SSL_KEY_PASSWORD) \
.option("kafka.ssl.keystore.type", "JKS") \
.option("kafka.ssl.truststore.type", "JKS") \
.option("failOnDataLoss", "false") \
.load())
raw_kafka_events.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
@dlt.table(
comment="the raw message from kafa topic",
table_properties={"pipelines.reset.allowed":"false"}
)
def kafka_bronze():
return raw_kafka_events
没有错误消息,但后来当我检查kafka_bronze表时,它显示列键和值仍然是二进制格式
方法3:添加了kafka_银表:
raw_kafka_events = (spark.readStream
.format("kafka")
.option("subscribe", TOPIC)
.option("kafka.bootstrap.servers", KAFKA_BROKER)
.option("startingOffsets", "earliest")
.option("kafka.security.protocol", "SSL") \
.option("kafka.ssl.truststore.location", SSL_TRUST_STORE_FILE_LOCATION) \
.option("kafka.ssl.keystore.location", SSL_KEY_STORE_FILE_LOCATION) \
.option("kafka.ssl.keystore.password", SSL_KEY_STORE_PASSWORD) \
.option("kafka.ssl.truststore.password", SSL_TRUST_STORE_PASSWORD) \
.option("kafka.ssl.key.password", SSL_KEY_PASSWORD) \
.option("kafka.ssl.keystore.type", "JKS") \
.option("kafka.ssl.truststore.type", "JKS") \
.option("failOnDataLoss", "false") \
.load())
@dlt.table(
comment="the raw message from kafa topic",
table_properties={"pipelines.reset.allowed":"false"}
)
def kafka_bronze():
return raw_kafka_events
@dlt.table(comment="real schema for kafka payload",
temporary=False)
def kafka_silver():
return (
# kafka streams are (timestamp,value)
# value contains the kafka payload
dlt.read_stream("kafka_bronze")
.select(col("key").cast("string"))
.select(col("value").cast("string"))
)
错误:
Column 'value' does not exist.
在阅读了Kafka主题之后,我如何将键/值转换为字符串呢?我更愿意将字符串值的键/值转储到青铜表,但这是不可能的,我也可以将它们转储到银表
1条答案
按热度按时间knsnq2tg1#
首先,建议在函数内部定义
raw_kafka_events
变量,这样它将是该函数的本地变量。在第二种方法中,你的问题是你只是执行
raw_kafka_events.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
,而没有将它赋给变量,如下所示:raw_kafka_events = raw_kafka_events.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.第二个问题是,当你使用
CAST(key AS STRING)
这样的表达式时,字段会得到一个新的名称,与这个表达式匹配。将它改为CAST(key AS STRING) as key
和CAST(value AS STRING) as value
-这应该可以解决第一个问题。在第二种方法中,您有一个链接的select语句:
但是在第一次选择之后,您将得到一个只有一列的 Dataframe -
key
。您需要将代码更改为: