目前我在同一个spring引导应用程序中配置producer和consumer,但是触发消息的spring云流没有通过kafka(我正在用kafka控制台consumer监视消息),但是consumer仍然收到消息(使用与producer相同的线程)。
如果我删除了应用程序中的consumerhandler(@streamlistener),生产者就会成功地将消息发送给kafka。
这个有什么配置吗?默认情况下,我需要spring云流向kafka发送消息。
生产者和消费者配置:
@Component
public interface NotificationProcessor {
String EMAIL_NOTIFICATION = "email-notification";
@Input(EMAIL_NOTIFICATION)
SubscribableChannel receiveEmail();
@Output(EMAIL_NOTIFICATION)
MessageChannel sendEmail();
}
以下是我的一些配置:
spring:
cloud:
stream:
kafka:
binder:
autoAddPartitions: true
brokers: ${KAFKA_BROKERS:localhost:9092}
auto-create-topics: true
configuration:
auto.offset.reset: latest
bindings:
email-notification:
group: ${EMAIL_GROUP:email-group-notification}
destination: ${EMAIL_TOPIC:email-notification}
contentType: application/json
producer:
partitionCount: 9
consumer:
partitioned: true
concurrency: 3
instance-count: 1
instance-index: 0
触发发送消息的api:
@RestController
@RequestMapping("/api")
public class TestResource {
private final Logger log = LoggerFactory.getLogger(TestResource.class);
private final NotificationProcessor notificationProcessor;
public TestResource(NotificationProcessor notificationProcessor) {
this.notificationProcessor = notificationProcessor;
}
@ApiOperation(value = "Test api")
@GetMapping(value = "/send-email2", produces = APPLICATION_JSON_VALUE)
public ResponseEntity<Boolean> test2() {
EmailMessage test = EmailMessage.builder()
.to(Arrays.asList(Receiver.builder().email("test@nomail.com").build())
).type(EContentType.JSON)
.build();
log.info("send email message to kafka");
notificationProcessor.sendEmail().send(MessageBuilder.withPayload(test).build());
return ResponseEntity.ok(Boolean.TRUE);
}
}
和使用者处理程序:
@EnableBinding(NotificationProcessor.class)
public class NotificationProducer {
private final Logger log = LoggerFactory.getLogger(NotificationProducer.class);
public NotificationProducer(){}
@StreamListener(NotificationProcessor.EMAIL_NOTIFICATION)
public void receiveEmail(@Payload Message<EmailMessage> message) {
log.info("Receive email message from kafka");
EmailMessage emailMessage = message.getPayload();
}
}
1条答案
按热度按时间bnlyeluc1#
从提供的信息中不清楚您将信息发送到哪里。什么频道?默认情况下,通道是内部的和直接的,因此如果您发送到订阅的同一个通道,那么您将完全绕过messagebroker(即kafka)。这可以解释这两种症状(没有代理和相同的线程)。
也就是说,基于注解的配置模型已被弃用。在过去的几年中,我们已经完全迁移到函数式编程模型,这个模型要简单得多,它的设计也帮助您不要考虑内部实现,比如通道,因为它们实际上是供内部使用的(代码和代理适配器之间的桥梁)。
还有一个新组件允许您将消息发送到专门为您拥有的场景设计的代理streambridge。
不管怎样,看看它,考虑重构你的应用程序。至少要确保发送到绑定到代理目的地的通道,并订阅绑定到同一目的地的另一个通道,从而确保发生到代理的往返。
最后但并非最不重要的一点,我仍然很困惑,为什么你需要发送给经纪人,然后在同一个应用程序订阅它?为什么要增加网络开销?