ksql中的转换数据

ruarlubt  于 2021-06-07  发布在  Kafka
关注(0)|答案(1)|浏览(307)

我正在尝试将数据从一种格式转换为另一种格式(从一种模式转换为另一种模式)。
例子:

payload = {
    'a' : 'a1',
    'b' : 'b1'
}

我想把这个有效载荷转换成另一种形式

payload_transform = {
   'a':{
      'b' : 'b1'
    }
    'c' : 'a1'
}

考虑一下这些数据( payload )是从Kafka来的,我想看看 payload_transform 在消费者中,随着转变
使用ksql有可能吗?
更新时间:
我们可以做一个层次:

payload = {
    'a' : 'a1',
    'b' : 'b1'
}

payload = {
    'confluent' : 'a1',
    'b' : 'b1'
}

我们可以添加条件吗?
例如:如果有效负载中存在“b”键,则生成

payload = {
        'confluent' : 'a1',
        'b' : 'b1'
    }

否则:

payload = {
        'kafka' : 'a1',
        'b' : 'b1'
    }
rpppsulh

rpppsulh1#

而ksql确实支持反嵌套json(使用 EXTRACTJSONFIELD ),目前(2018年3月/0.5版)不支持构建嵌套结构。它目前也不支持嵌套的avro。
对更新问题的更新回复:
您可以重命名字段,只需使用sql AS 条款: SELECT A AS NEW_COL, B FROM INPUT_STREAM 你能详细描述一下你想在这里做什么吗?在您给出的示例中,有条件地重命名字段没有意义。或许还可以尝试一下ksql,看看什么对您有效。

相关问题