在使用spring cloud stream、kafka、debezium和postgresql时,我面临以下问题:突出显示:postgresql中有一个表person(id,name),debezium获取更改事件并抛出到kafka topic(person),验证的消息在topic中,消息的形式如下
{"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"name"}],"optional":true,"name":"dbserver1.datafeeds.person.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"name"}],"optional":true,"name":"dbserver1.datafeeds.person.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":true,"field":"version"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":true,"field":"ts_usec"},{"type":"int64","optional":true,"field":"txId"},{"type":"int64","optional":true,"field":"lsn"},{"type":"boolean","optional":true,"default":false,"field":"snapshot"},{"type":"boolean","optional":true,"field":"last_snapshot_record"}],"optional":false,"name":"io.debezium.connector.postgresql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"}],"optional":false,"name":"dbserver1.datafeeds.person.Envelope"},"payload":{"before":null,"after":{"id":123,"name":"Muhammad Sufyian"},"source":{"version":"0.9.0.Alpha1","name":"dbserver1","ts_usec":946684800000000000,"txId":350418,"lsn":9647073220,"snapshot":false,"last_snapshot_record":null},"op":"c","ts_ms":1534254709618}}
在消费者方面,我使用的是springcloudstream,消息类如下所示
public class Person {
private Integer id;
private String name;
public Person() {
}
public Person(Integer id, String name) {
this.id = id;
this.name = name;
}
public Integer getId() {
return id;
}
public void setId(Integer id) {
this.id = id;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
@Override
public String toString() {
return "Person{" +
"id=" + id +
", name='" + name + '\'' +
'}';
}
}
Channel class looks like this :
@enablebinding(processor.class)公共类通道{
@StreamListener(Processor.INPUT)
public void notify(@Payload Person person){
System.out.println("Print");
System.out.println("Id:"+person.getId());
System.out.println("Name:"+person.getName());
}
}
主类是这样的
@SpringBootApplication
@EnableAutoConfiguration(exclude={DataSourceAutoConfiguration.class})
public class PartnerHotelChangedProcessorApplication {
public static void main(String[] args) {
SpringApplication.run(PartnerHotelChangedProcessorApplication.class, args);
}
}
application.properties如下所示:
spring.cloud.stream.kafka.binder.brokers=<list-of-kafka-brokers>
spring.cloud.stream.bindings.input.destination=dbserver1.datafeeds.person
spring.cloud.stream.bindings.input.content-type=application/json
通过@streamlistner获取的负载为空,我怀疑这里的消息转换有问题。
任何方向都是值得赞赏的
堆栈跟踪:
2018-08-16 15:23:36.102 INFO 23525 --- [container-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-2, groupId=anonymous.44607cf2-3162-459a-8ca9-3b8495d0ab90] Discovered group coordinator kafka-02.hotel02.pro06.eu.idealo.com:9092 (id: 2147481645 rack: null)
2018-08-16 15:23:36.104 INFO 23525 --- [container-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-2, groupId=anonymous.44607cf2-3162-459a-8ca9-3b8495d0ab90] Revoking previously assigned partitions []
2018-08-16 15:23:36.105 INFO 23525 --- [container-0-C-1] o.s.c.s.b.k.KafkaMessageChannelBinder$1 : partitions revoked: []
2018-08-16 15:23:36.105 INFO 23525 --- [container-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-2, groupId=anonymous.44607cf2-3162-459a-8ca9-3b8495d0ab90] (Re-)joining group
2018-08-16 15:23:39.168 INFO 23525 --- [container-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-2, groupId=anonymous.44607cf2-3162-459a-8ca9-3b8495d0ab90] Successfully joined group with generation 1
2018-08-16 15:23:39.170 INFO 23525 --- [container-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-2, groupId=anonymous.44607cf2-3162-459a-8ca9-3b8495d0ab90] Setting newly assigned partitions [dbserver1.datafeeds.person-0]
2018-08-16 15:23:39.232 INFO 23525 --- [container-0-C-1] o.s.c.s.b.k.KafkaMessageChannelBinder$1 : partitions assigned: [dbserver1.datafeeds.person-0]
2018-08-16 15:23:59.003 INFO 23525 --- [a9-3b8495d0ab90] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-2, groupId=anonymous.44607cf2-3162-459a-8ca9-3b8495d0ab90] Group coordinator kafka-02.hotel02.pro06.eu.idealo.com:9092 (id: 2147481645 rack: null) is unavailable or invalid, will attempt rediscovery
[B@25c28bf5
Print
2018-08-16 15:24:00.512 INFO 23525 --- [container-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-2, groupId=anonymous.44607cf2-3162-459a-8ca9-3b8495d0ab90] Discovered group coordinator kafka-02.hotel02.pro06.eu.idealo.com:9092 (id: 2147481645 rack: null)
2018-08-16 15:24:00.538 ERROR 23525 --- [container-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-2, groupId=anonymous.44607cf2-3162-459a-8ca9-3b8495d0ab90] Offset commit failed on partition dbserver1.datafeeds.person-0 at offset 58: The coordinator is not aware of this member.
2018-08-16 15:24:00.555 ERROR 23525 --- [container-0-C-1] o.s.kafka.listener.LoggingErrorHandler : Error while processing: null
暂无答案!
目前还没有任何答案,快来回答吧!