spring云流消息处理

gzjq41n4  于 2021-06-07  发布在  Kafka
关注(0)|答案(0)|浏览(212)

在使用spring cloud stream、kafka、debezium和postgresql时,我面临以下问题:突出显示:postgresql中有一个表person(id,name),debezium获取更改事件并抛出到kafka topic(person),验证的消息在topic中,消息的形式如下

{"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"name"}],"optional":true,"name":"dbserver1.datafeeds.person.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"name"}],"optional":true,"name":"dbserver1.datafeeds.person.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":true,"field":"version"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":true,"field":"ts_usec"},{"type":"int64","optional":true,"field":"txId"},{"type":"int64","optional":true,"field":"lsn"},{"type":"boolean","optional":true,"default":false,"field":"snapshot"},{"type":"boolean","optional":true,"field":"last_snapshot_record"}],"optional":false,"name":"io.debezium.connector.postgresql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"}],"optional":false,"name":"dbserver1.datafeeds.person.Envelope"},"payload":{"before":null,"after":{"id":123,"name":"Muhammad Sufyian"},"source":{"version":"0.9.0.Alpha1","name":"dbserver1","ts_usec":946684800000000000,"txId":350418,"lsn":9647073220,"snapshot":false,"last_snapshot_record":null},"op":"c","ts_ms":1534254709618}}

在消费者方面,我使用的是springcloudstream,消息类如下所示

public class Person {

    private Integer id;
    private String name;

    public Person() {
    }

    public Person(Integer id, String name) {
        this.id = id;
        this.name = name;
    }

    public Integer getId() {
        return id;
    }

    public void setId(Integer id) {
        this.id = id;
    }

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    @Override
    public String toString() {
        return "Person{" +
                "id=" + id +
                ", name='" + name + '\'' +
                '}';
    }
}

Channel class looks like this :

@enablebinding(processor.class)公共类通道{

@StreamListener(Processor.INPUT)
public void notify(@Payload Person person){
        System.out.println("Print");
        System.out.println("Id:"+person.getId());
        System.out.println("Name:"+person.getName());
}

}
主类是这样的

@SpringBootApplication
@EnableAutoConfiguration(exclude={DataSourceAutoConfiguration.class})
public class PartnerHotelChangedProcessorApplication {

    public static void main(String[] args) {
        SpringApplication.run(PartnerHotelChangedProcessorApplication.class, args);
    }

}

application.properties如下所示:

spring.cloud.stream.kafka.binder.brokers=<list-of-kafka-brokers>
spring.cloud.stream.bindings.input.destination=dbserver1.datafeeds.person
spring.cloud.stream.bindings.input.content-type=application/json

通过@streamlistner获取的负载为空,我怀疑这里的消息转换有问题。
任何方向都是值得赞赏的
堆栈跟踪:

2018-08-16 15:23:36.102  INFO 23525 --- [container-0-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-2, groupId=anonymous.44607cf2-3162-459a-8ca9-3b8495d0ab90] Discovered group coordinator kafka-02.hotel02.pro06.eu.idealo.com:9092 (id: 2147481645 rack: null)
2018-08-16 15:23:36.104  INFO 23525 --- [container-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-2, groupId=anonymous.44607cf2-3162-459a-8ca9-3b8495d0ab90] Revoking previously assigned partitions []
2018-08-16 15:23:36.105  INFO 23525 --- [container-0-C-1] o.s.c.s.b.k.KafkaMessageChannelBinder$1  : partitions revoked: []
2018-08-16 15:23:36.105  INFO 23525 --- [container-0-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-2, groupId=anonymous.44607cf2-3162-459a-8ca9-3b8495d0ab90] (Re-)joining group
2018-08-16 15:23:39.168  INFO 23525 --- [container-0-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-2, groupId=anonymous.44607cf2-3162-459a-8ca9-3b8495d0ab90] Successfully joined group with generation 1
2018-08-16 15:23:39.170  INFO 23525 --- [container-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-2, groupId=anonymous.44607cf2-3162-459a-8ca9-3b8495d0ab90] Setting newly assigned partitions [dbserver1.datafeeds.person-0]
2018-08-16 15:23:39.232  INFO 23525 --- [container-0-C-1] o.s.c.s.b.k.KafkaMessageChannelBinder$1  : partitions assigned: [dbserver1.datafeeds.person-0]
2018-08-16 15:23:59.003  INFO 23525 --- [a9-3b8495d0ab90] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-2, groupId=anonymous.44607cf2-3162-459a-8ca9-3b8495d0ab90] Group coordinator kafka-02.hotel02.pro06.eu.idealo.com:9092 (id: 2147481645 rack: null) is unavailable or invalid, will attempt rediscovery
[B@25c28bf5
Print
2018-08-16 15:24:00.512  INFO 23525 --- [container-0-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-2, groupId=anonymous.44607cf2-3162-459a-8ca9-3b8495d0ab90] Discovered group coordinator kafka-02.hotel02.pro06.eu.idealo.com:9092 (id: 2147481645 rack: null)
2018-08-16 15:24:00.538 ERROR 23525 --- [container-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-2, groupId=anonymous.44607cf2-3162-459a-8ca9-3b8495d0ab90] Offset commit failed on partition dbserver1.datafeeds.person-0 at offset 58: The coordinator is not aware of this member.
2018-08-16 15:24:00.555 ERROR 23525 --- [container-0-C-1] o.s.kafka.listener.LoggingErrorHandler   : Error while processing: null

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题