java—在kafka流媒体中用另一个对象替换json对象

q3qa4bjr  于 2021-06-08  发布在  Kafka
关注(0)|答案(1)|浏览(361)

我在Kafka河上工作。我面临以下问题:
关于我目前所做工作的详细信息:
我创建了以下主题、流和表:

./kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic bptcus

./kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic address-elasticsearch-sink

为上述创建的主题创建了表和流。

CREATE table CUSTOMER_SRC  (customerId VARCHAR,name VARCHAR, age VARCHAR, address VARCHAR) WITH (KAFKA_TOPIC='bptcus', VALUE_FORMAT='JSON', KEY='customerId');

CREATE stream ADDRESS_SRC (addressId VARCHAR, city VARCHAR, state VARCHAR) WITH (KAFKA_TOPIC='address-elasticsearch-sink', VALUE_FORMAT='JSON');

我可以看到如下数据:

select * from customer_src;  
1528743137610 | Parent-1528743137047 | Ron | 31 | [{"addressId":"1","city":"Fremont","state":"CA"},{"addressId":"2","city":"Dallas","state":"TX"}] 

select * from address_src;  
1528743413826 | Parent-1528743137047 | 1 | Detroit | MI

通过连接上面创建的表和流来创建另一个流。

CREATE stream CUST_ADDR_SRC as select c.name , c.age , c.address, a.rowkey, a.addressId , a.city , a.state  from  ADDRESS_SRC a left join CUSTOMER_SRC c  on c.rowkey=a.rowkey;

我可以看到cust\u addr\u src流中的数据,如下所示:

select * from cust_addr_src;  

1528743413826 | Parent-1528743137047 | Ron | 31 | [{"addressId":"1","city":"Fremont","state":"CA"},{"addressId":"2","city":"Dallas","state":"TX"}] | Parent-1528743137047 | 1 | Detroit | MI

我的问题:
现在我想用addressid1(底特律)替换addressid1(fremont)。我该怎么做?
我还尝试将流输入输出到控制台,如票据中所述
将Kafka流输入输出到控制台?
这是我的密码:

Properties config = new Properties();
    config.put(StreamsConfig.APPLICATION_ID_CONFIG, "cusadd-application");
    config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "10.1.61.125:9092");
    config.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, "10.1.61.125:2181");
    config.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
    config.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());

    KStreamBuilder builder = new KStreamBuilder();
    KStream<String, String> source = builder.stream("cust_addr_src");
    source.foreach(new ForeachAction<String, String>() {
        public void apply(String key, String value) {
            System.out.println("Stream key values are: " + key + ": " + value);
        }
     });

我看不到输出。
只是,我可以看到以下输出:
12:04:42.145[streamthread-1]debug org.apache.kafka.clients.consumer.internals.fetcher-将分区cust\u addr\u src-0的偏移量重置为最新偏移量。12:04:42.145[streamthread-1]debug org.apache.kafka.clients.networkclient-开始连接到hsharma-mbp15处的节点0。local:9092. 12:04:42.145[streamthread-1]debug org.apache.kafka.common.metrics.metrics-已添加名为node-0.bytes的传感器-已发送12:04:42.145[streamthread-1]debug org.apache.kafka.common.metrics.metrics-已添加名为node-0.bytes的传感器12:04:42.145[streamthread-1]debug org.apache.kafka.common.metrics.metrics-已添加名为node-0.bytes的传感器node-0.latency 12:04:42.145[streamthread-1]debug org.apache.kafka.clients.networkclient-已完成到节点0的连接12:04:42.145[streamthread-1]debug org.apache.kafka.clients.consumer.internals.fetcher-已获取分区cust\u addr\u src-0的偏移量0 12:04:42.676[streamthread-1]debug org.apache.kafka.common.metrics.metrics-已添加名为topic.cust\u addr\u src.bytes-fetched的传感器12:04:42.680[streamthread-1]debug org.apache.kafka.common.metrics.metrics-已添加名为topic.cust\u addr\u src.records-fetched的传感器12:04:45.150[streamthread-1]debug org.apache.kafka.clients.consumer.internals.abstractcoordinator-已成功接收到组的心跳响应添加应用程序。
提前谢谢。

u5i3ibmn

u5i3ibmn1#

我看到两种方法:
字符串操作:address列当前是一个包含json对象的字符串。您可以只使用字符串操作函数来替换所需的位。虽然这看起来有点老套。
结构操作:切换CREATETABLE语句,使地址为 ARRAY<STRUCT<addressId STRING, city STRING, state>> 输入,而不是字符串。然后可以使用数组的元素和结构的字段来构建输出。

ARRAY[
  STRUCT(
    addressId := address[0]->addressId,
    city := address_src->city,
    state := address[0]->state
  ),
  ... same for second element
]

上面将创建一个包含两个结构的数组,并设置新的city。
当然,这只有在数组中始终有两个元素时才有效。如果数量可变,则需要使用长窗口case语句根据数组的大小执行不同的操作。例如

CASE 
   WHEN ARRAY_LENGTH(address) = 1 
    THEN ARRAY[STRUCT(addressId := address[0]->addressId, city := address_src->city, state := address[0]->state)]
   WHEN ARRAY_LENGTH(address) = 2
     THEN ARRAY(... with two elements...)
   WHEN ARRAY_LENGTH(address) = 3
     THEN ARRAY(... with three elements...)
END

等。

相关问题