我在Kafka河上工作。我面临以下问题:
关于我目前所做工作的详细信息:
我创建了以下主题、流和表:
./kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic bptcus
./kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic address-elasticsearch-sink
为上述创建的主题创建了表和流。
CREATE table CUSTOMER_SRC (customerId VARCHAR,name VARCHAR, age VARCHAR, address VARCHAR) WITH (KAFKA_TOPIC='bptcus', VALUE_FORMAT='JSON', KEY='customerId');
CREATE stream ADDRESS_SRC (addressId VARCHAR, city VARCHAR, state VARCHAR) WITH (KAFKA_TOPIC='address-elasticsearch-sink', VALUE_FORMAT='JSON');
我可以看到如下数据:
select * from customer_src;
1528743137610 | Parent-1528743137047 | Ron | 31 | [{"addressId":"1","city":"Fremont","state":"CA"},{"addressId":"2","city":"Dallas","state":"TX"}]
select * from address_src;
1528743413826 | Parent-1528743137047 | 1 | Detroit | MI
通过连接上面创建的表和流来创建另一个流。
CREATE stream CUST_ADDR_SRC as select c.name , c.age , c.address, a.rowkey, a.addressId , a.city , a.state from ADDRESS_SRC a left join CUSTOMER_SRC c on c.rowkey=a.rowkey;
我可以看到cust\u addr\u src流中的数据,如下所示:
select * from cust_addr_src;
1528743413826 | Parent-1528743137047 | Ron | 31 | [{"addressId":"1","city":"Fremont","state":"CA"},{"addressId":"2","city":"Dallas","state":"TX"}] | Parent-1528743137047 | 1 | Detroit | MI
我的问题:
现在我想用addressid1(底特律)替换addressid1(fremont)。我该怎么做?
我还尝试将流输入输出到控制台,如票据中所述
将Kafka流输入输出到控制台?
这是我的密码:
Properties config = new Properties();
config.put(StreamsConfig.APPLICATION_ID_CONFIG, "cusadd-application");
config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "10.1.61.125:9092");
config.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, "10.1.61.125:2181");
config.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
config.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
KStreamBuilder builder = new KStreamBuilder();
KStream<String, String> source = builder.stream("cust_addr_src");
source.foreach(new ForeachAction<String, String>() {
public void apply(String key, String value) {
System.out.println("Stream key values are: " + key + ": " + value);
}
});
我看不到输出。
只是,我可以看到以下输出:
12:04:42.145[streamthread-1]debug org.apache.kafka.clients.consumer.internals.fetcher-将分区cust\u addr\u src-0的偏移量重置为最新偏移量。12:04:42.145[streamthread-1]debug org.apache.kafka.clients.networkclient-开始连接到hsharma-mbp15处的节点0。local:9092. 12:04:42.145[streamthread-1]debug org.apache.kafka.common.metrics.metrics-已添加名为node-0.bytes的传感器-已发送12:04:42.145[streamthread-1]debug org.apache.kafka.common.metrics.metrics-已添加名为node-0.bytes的传感器12:04:42.145[streamthread-1]debug org.apache.kafka.common.metrics.metrics-已添加名为node-0.bytes的传感器node-0.latency 12:04:42.145[streamthread-1]debug org.apache.kafka.clients.networkclient-已完成到节点0的连接12:04:42.145[streamthread-1]debug org.apache.kafka.clients.consumer.internals.fetcher-已获取分区cust\u addr\u src-0的偏移量0 12:04:42.676[streamthread-1]debug org.apache.kafka.common.metrics.metrics-已添加名为topic.cust\u addr\u src.bytes-fetched的传感器12:04:42.680[streamthread-1]debug org.apache.kafka.common.metrics.metrics-已添加名为topic.cust\u addr\u src.records-fetched的传感器12:04:45.150[streamthread-1]debug org.apache.kafka.clients.consumer.internals.abstractcoordinator-已成功接收到组的心跳响应添加应用程序。
提前谢谢。
1条答案
按热度按时间u5i3ibmn1#
我看到两种方法:
字符串操作:address列当前是一个包含json对象的字符串。您可以只使用字符串操作函数来替换所需的位。虽然这看起来有点老套。
结构操作:切换CREATETABLE语句,使地址为
ARRAY<STRUCT<addressId STRING, city STRING, state>>
输入,而不是字符串。然后可以使用数组的元素和结构的字段来构建输出。上面将创建一个包含两个结构的数组,并设置新的city。
当然,这只有在数组中始终有两个元素时才有效。如果数量可变,则需要使用长窗口case语句根据数组的大小执行不同的操作。例如
等。