java—在kafka流媒体中用另一个对象替换json对象

q3qa4bjr  于 2021-06-08  发布在  Kafka
关注(0)|答案(1)|浏览(413)

我在Kafka河上工作。我面临以下问题:
关于我目前所做工作的详细信息:
我创建了以下主题、流和表:

  1. ./kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic bptcus
  2. ./kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic address-elasticsearch-sink

为上述创建的主题创建了表和流。

  1. CREATE table CUSTOMER_SRC (customerId VARCHAR,name VARCHAR, age VARCHAR, address VARCHAR) WITH (KAFKA_TOPIC='bptcus', VALUE_FORMAT='JSON', KEY='customerId');
  2. CREATE stream ADDRESS_SRC (addressId VARCHAR, city VARCHAR, state VARCHAR) WITH (KAFKA_TOPIC='address-elasticsearch-sink', VALUE_FORMAT='JSON');

我可以看到如下数据:

  1. select * from customer_src;
  2. 1528743137610 | Parent-1528743137047 | Ron | 31 | [{"addressId":"1","city":"Fremont","state":"CA"},{"addressId":"2","city":"Dallas","state":"TX"}]
  3. select * from address_src;
  4. 1528743413826 | Parent-1528743137047 | 1 | Detroit | MI

通过连接上面创建的表和流来创建另一个流。

  1. CREATE stream CUST_ADDR_SRC as select c.name , c.age , c.address, a.rowkey, a.addressId , a.city , a.state from ADDRESS_SRC a left join CUSTOMER_SRC c on c.rowkey=a.rowkey;

我可以看到cust\u addr\u src流中的数据,如下所示:

  1. select * from cust_addr_src;
  2. 1528743413826 | Parent-1528743137047 | Ron | 31 | [{"addressId":"1","city":"Fremont","state":"CA"},{"addressId":"2","city":"Dallas","state":"TX"}] | Parent-1528743137047 | 1 | Detroit | MI

我的问题:
现在我想用addressid1(底特律)替换addressid1(fremont)。我该怎么做?
我还尝试将流输入输出到控制台,如票据中所述
将Kafka流输入输出到控制台?
这是我的密码:

  1. Properties config = new Properties();
  2. config.put(StreamsConfig.APPLICATION_ID_CONFIG, "cusadd-application");
  3. config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "10.1.61.125:9092");
  4. config.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, "10.1.61.125:2181");
  5. config.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
  6. config.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
  7. KStreamBuilder builder = new KStreamBuilder();
  8. KStream<String, String> source = builder.stream("cust_addr_src");
  9. source.foreach(new ForeachAction<String, String>() {
  10. public void apply(String key, String value) {
  11. System.out.println("Stream key values are: " + key + ": " + value);
  12. }
  13. });

我看不到输出。
只是,我可以看到以下输出:
12:04:42.145[streamthread-1]debug org.apache.kafka.clients.consumer.internals.fetcher-将分区cust\u addr\u src-0的偏移量重置为最新偏移量。12:04:42.145[streamthread-1]debug org.apache.kafka.clients.networkclient-开始连接到hsharma-mbp15处的节点0。local:9092. 12:04:42.145[streamthread-1]debug org.apache.kafka.common.metrics.metrics-已添加名为node-0.bytes的传感器-已发送12:04:42.145[streamthread-1]debug org.apache.kafka.common.metrics.metrics-已添加名为node-0.bytes的传感器12:04:42.145[streamthread-1]debug org.apache.kafka.common.metrics.metrics-已添加名为node-0.bytes的传感器node-0.latency 12:04:42.145[streamthread-1]debug org.apache.kafka.clients.networkclient-已完成到节点0的连接12:04:42.145[streamthread-1]debug org.apache.kafka.clients.consumer.internals.fetcher-已获取分区cust\u addr\u src-0的偏移量0 12:04:42.676[streamthread-1]debug org.apache.kafka.common.metrics.metrics-已添加名为topic.cust\u addr\u src.bytes-fetched的传感器12:04:42.680[streamthread-1]debug org.apache.kafka.common.metrics.metrics-已添加名为topic.cust\u addr\u src.records-fetched的传感器12:04:45.150[streamthread-1]debug org.apache.kafka.clients.consumer.internals.abstractcoordinator-已成功接收到组的心跳响应添加应用程序。
提前谢谢。

u5i3ibmn

u5i3ibmn1#

我看到两种方法:
字符串操作:address列当前是一个包含json对象的字符串。您可以只使用字符串操作函数来替换所需的位。虽然这看起来有点老套。
结构操作:切换CREATETABLE语句,使地址为 ARRAY<STRUCT<addressId STRING, city STRING, state>> 输入,而不是字符串。然后可以使用数组的元素和结构的字段来构建输出。

  1. ARRAY[
  2. STRUCT(
  3. addressId := address[0]->addressId,
  4. city := address_src->city,
  5. state := address[0]->state
  6. ),
  7. ... same for second element
  8. ]

上面将创建一个包含两个结构的数组,并设置新的city。
当然,这只有在数组中始终有两个元素时才有效。如果数量可变,则需要使用长窗口case语句根据数组的大小执行不同的操作。例如

  1. CASE
  2. WHEN ARRAY_LENGTH(address) = 1
  3. THEN ARRAY[STRUCT(addressId := address[0]->addressId, city := address_src->city, state := address[0]->state)]
  4. WHEN ARRAY_LENGTH(address) = 2
  5. THEN ARRAY(... with two elements...)
  6. WHEN ARRAY_LENGTH(address) = 3
  7. THEN ARRAY(... with three elements...)
  8. END

等。

展开查看全部

相关问题