我正在尝试连接两个主题的数据,一个人和地址,其中一个人可以有多个地址。发布到主题中的数据如下所示:
//person with id as key
{"id": "123", "name": "Tom Tester"}
//addresses with id as key
{"id": "321", "person_id": "123", "address": "Somestreet 12, 4321 Somewhere"}
{"id": "432", "person_id": "123", "address": "Otherstreet 12, 5432 Nowhere"}
在连接之后,我希望有一个聚合输出(在elasticsearch中被索引),它应该如下所示:
{
"id": "123",
"name": "Tom Tester",
"addresses": [
{
"id": "321",
"address": "Somestreet 12, 4321 Somewhere"
},
{
"id": "432",
"address": "Otherstreet 12, 5432 Nowhere"
}
]
}
每当person或address主题得到更新时,聚合的person也应该被更新。目前,我实现了只在地址发布时获取聚合人员的更新,而不在人员本身更改时获取更新。你知道这个代码有什么问题吗?
@SpringBootApplication
@EnableBinding(PersonAggregatorBinding.class)
public class KafkaStreamTestApplication {
public static void main(String[] args) {
SpringApplication.run(KafkaStreamTestApplication.class, args);
}
private static final Logger LOG = LoggerFactory.getLogger(KafkaStreamTestApplication.class);
@StreamListener
@SendTo("person-aggregation")
public KStream<String, PersonAggregation> process(
@Input("person-input") KTable<String, Person> personInput,
@Input("address-input") KTable<String, Address> addressInput) {
KTable<String, AddressAggregation> addressAggregate = addressInput.toStream()
.peek((key, value) -> LOG.info("addr {}: {}", key, value))
.groupBy((k, v) -> v.getPersonId(), Grouped.with(null, new AddressSerde()))
.aggregate(
AddressAggregation::new,
(key, value, aggregation) -> {
aggregate(aggregation, value);
return aggregation;
}, Materialized.with(Serdes.String(), new AddressAggregationSerde()));
addressAggregate.toStream()
.peek((key, value) -> LOG.info("aggregated addr: {}", value));
return personInput.toStream()
.leftJoin(addressAggregate, this::join, Joined.with(Serdes.String(), new PersonSerde(), new AddressAggregationSerde()))
.peek((key, value) -> LOG.info("aggregated person: {}", value));
}
private PersonAggregation join(Person person, AddressAggregation addrs) {
return PersonAggregation.builder()
.id(person.getId())
.name(person.getName())
.addresses(addrs)
.build();
}
public void aggregate(AddressAggregation aggregation, Address address) {
if(address != null){
aggregation.removeIf(it -> Objects.equals(it.getId(), address.getId()));
if(address.isValid()) {
aggregation.add(address);
}
}
}
}
暂无答案!
目前还没有任何答案,快来回答吧!