我一直在尝试让入站subscribablechannel和出站messagechannel在我的spring boot应用程序中工作。
我已经成功地设置了Kafka频道,并成功地进行了测试。
此外,我还创建了一个基本的spring引导应用程序,用于测试从通道添加和接收内容。
我遇到的问题是,当我将等效代码放在它所属的应用程序中时,消息似乎永远不会被发送或接收。通过调试很难确定发生了什么,但唯一不同的是频道名。在工作impl中,频道名称类似于非工作应用程序中的application.channellocalhost:8080/channel.
我想知道是否有一些spring引导配置阻止或改变了通道的创建到一个不同的通道源?
有人有类似的问题吗?
应用程序.yml
spring:
datasource:
url: jdbc:h2:mem:dpemail;DB_CLOSE_DELAY=-1;DB_CLOSE_ON_EXIT=FALSE
platform: h2
username: hello
password:
driverClassName: org.h2.Driver
jpa:
properties:
hibernate:
show_sql: true
use_sql_comments: true
format_sql: true
cloud:
stream:
kafka:
binder:
brokers: localhost:9092
bindings:
email-in:
destination: email
contentType: application/json
email-out:
destination: email
contentType: application/json
电子邮件
public class Email {
private long timestamp;
private String message;
public long getTimestamp() {
return timestamp;
}
public void setTimestamp(long timestamp) {
this.timestamp = timestamp;
}
public String getMessage() {
return message;
}
public void setMessage(String message) {
this.message = message;
}
}
绑定配置
@EnableBinding(EmailQueues.class)
public class EmailQueueConfiguration {
}
接口
public interface EmailQueues {
String INPUT = "email-in";
String OUTPUT = "email-out";
@Input(INPUT)
SubscribableChannel inboundEmails();
@Output(OUTPUT)
MessageChannel outboundEmails();
}
控制器
@RestController
@RequestMapping("/queue")
public class EmailQueueController {
private EmailQueues emailQueues;
@Autowired
public EmailQueueController(EmailQueues emailQueues) {
this.emailQueues = emailQueues;
}
@RequestMapping(value = "sendEmail", method = POST)
@ResponseStatus(ACCEPTED)
public void sendToQueue() {
MessageChannel messageChannel = emailQueues.outboundEmails();
Email email = new Email();
email.setMessage("hello world: " + System.currentTimeMillis());
email.setTimestamp(System.currentTimeMillis());
messageChannel.send(MessageBuilder.withPayload(email).setHeader(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.APPLICATION_JSON).build());
}
@StreamListener(EmailQueues.INPUT)
public void handleEmail(@Payload Email email) {
System.out.println("received: " + email.getMessage());
}
}
我不确定是否有一个继承的配置项目使用springcloud,springcloudsleuth可能会阻止它工作,但即使我删除了它,仍然没有。但与我的应用程序不同的是,我从未看到配置consumeconfig,例如:
o.a.k.clients.consumer.ConsumerConfig : ConsumerConfig values:
auto.commit.interval.ms = 100
auto.offset.reset = latest
bootstrap.servers = [localhost:9092]
check.crcs = true
client.id = consumer-2
connections.max.idle.ms = 540000
enable.auto.commit = false
exclude.internal.topics = true
(这个配置是我在运行上述代码时在我的基本spring引导应用程序中看到的,代码可以从kafka通道进行读写)。。。。
我假设我正在使用的一个库中有一些over-spring引导配置创建一个不同类型的通道,但我找不到该配置是什么。
2条答案
按热度按时间shstlldc1#
您发布的内容包含许多不相关的配置,因此很难确定是否有任何东西妨碍了您。另外,当你说“..似乎消息永远不会被发送或接收..”时,日志中是否有异常?另外,请说明您使用的Kafka版本以及SpringCloudStream。现在,我确实尝试根据您的代码复制它(在清理了一点只留下相关部分之后),并且能够成功地发送/接收。
我的Kafka版本是0.11和SpringCloudStream2.0.0。以下是相关代码:
ymdaylpp2#
好的,经过大量的调试。。。我发现有些东西正在创建一个测试支持绑定器(怎么还不知道),所以很明显,这是用来不影响将消息添加到真实通道的。
添加后
Kafka频道配置已经运行,消息正在添加。。很想知道到底是什么在设置这个测试支持绑定器。。我最终会找到那个笨蛋的。