我正在尝试将数据从一种格式转换为另一种格式(从一种模式转换为另一种模式)。
例子:
payload = {
'a' : 'a1',
'b' : 'b1'
}
我想把这个有效载荷转换成另一种形式
payload_transform = {
'a':{
'b' : 'b1'
}
'c' : 'a1'
}
考虑一下这些数据( payload
)是从Kafka来的,我想看看 payload_transform
在消费者中,随着转变
使用ksql有可能吗?
更新时间:
我们可以做一个层次:
payload = {
'a' : 'a1',
'b' : 'b1'
}
到
payload = {
'confluent' : 'a1',
'b' : 'b1'
}
我们可以添加条件吗?
例如:如果有效负载中存在“b”键,则生成
payload = {
'confluent' : 'a1',
'b' : 'b1'
}
否则:
payload = {
'kafka' : 'a1',
'b' : 'b1'
}
1条答案
按热度按时间rpppsulh1#
而ksql确实支持反嵌套json(使用
EXTRACTJSONFIELD
),目前(2018年3月/0.5版)不支持构建嵌套结构。它目前也不支持嵌套的avro。对更新问题的更新回复:
您可以重命名字段,只需使用sql
AS
条款:SELECT A AS NEW_COL, B FROM INPUT_STREAM
你能详细描述一下你想在这里做什么吗?在您给出的示例中,有条件地重命名字段没有意义。或许还可以尝试一下ksql,看看什么对您有效。