与Spring CloudKafka流结一对多关系

tf7tbtn2  于 2021-06-04  发布在  Kafka
关注(0)|答案(0)|浏览(281)

我正在尝试连接两个主题的数据,一个人和地址,其中一个人可以有多个地址。发布到主题中的数据如下所示:

//person with id as key
{"id": "123", "name": "Tom Tester"}

//addresses with id as key
{"id": "321", "person_id": "123", "address": "Somestreet 12, 4321 Somewhere"}
{"id": "432", "person_id": "123", "address": "Otherstreet 12, 5432 Nowhere"}

在连接之后,我希望有一个聚合输出(在elasticsearch中被索引),它应该如下所示:

{
  "id": "123",
  "name": "Tom Tester",
  "addresses": [
    {
      "id": "321",
      "address": "Somestreet 12, 4321 Somewhere"
    },
    {
      "id": "432",
      "address": "Otherstreet 12, 5432 Nowhere"
    }
  ]
}

每当person或address主题得到更新时,聚合的person也应该被更新。目前,我实现了只在地址发布时获取聚合人员的更新,而不在人员本身更改时获取更新。你知道这个代码有什么问题吗?

@SpringBootApplication
@EnableBinding(PersonAggregatorBinding.class)
public class KafkaStreamTestApplication {

    public static void main(String[] args) {
        SpringApplication.run(KafkaStreamTestApplication.class, args);
    }

    private static final Logger LOG = LoggerFactory.getLogger(KafkaStreamTestApplication.class);

    @StreamListener
    @SendTo("person-aggregation")
    public KStream<String, PersonAggregation> process(
            @Input("person-input") KTable<String, Person> personInput,
            @Input("address-input") KTable<String, Address> addressInput) {
        KTable<String, AddressAggregation> addressAggregate = addressInput.toStream()
                .peek((key, value) -> LOG.info("addr {}: {}", key, value))
                .groupBy((k, v) -> v.getPersonId(), Grouped.with(null, new AddressSerde()))
                .aggregate(
                        AddressAggregation::new,
                        (key, value, aggregation) -> {
                            aggregate(aggregation, value);
                            return aggregation;
                        }, Materialized.with(Serdes.String(), new AddressAggregationSerde()));

        addressAggregate.toStream()
                .peek((key, value) -> LOG.info("aggregated addr: {}", value));

        return personInput.toStream()
                .leftJoin(addressAggregate, this::join, Joined.with(Serdes.String(), new PersonSerde(), new AddressAggregationSerde()))
                .peek((key, value) -> LOG.info("aggregated person: {}", value));
    }

    private PersonAggregation join(Person person, AddressAggregation addrs) {
        return PersonAggregation.builder()
                .id(person.getId())
                .name(person.getName())
                .addresses(addrs)
                .build();
    }

    public void aggregate(AddressAggregation aggregation, Address address) {
        if(address != null){
            aggregation.removeIf(it -> Objects.equals(it.getId(), address.getId()));
            if(address.isValid()) {
                aggregation.add(address);
            }
        }
    }
}

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题