spring云流kafka:org.springframework.integration.messagedispatchingexception:dispatcher没有订阅服务器

wqnecbli  于 2021-06-07  发布在  Kafka
关注(0)|答案(2)|浏览(388)

我正在尝试使用springcloudstreamkafka绑定来发送和接收本地zookeeper和kafka服务器的消息。但是,在启动spring mvc server时,我看到以下异常:

Caused by: org.springframework.integration.MessageDispatchingException: Dispatcher has no subscribers
at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:154) ~[spring-integration-core-4.3.5.RELEASE.jar:4.3.5.RELEASE]
at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:121) ~[spring-integration-core-4.3.5.RELEASE.jar:4.3.5.RELEASE]
at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:77) ~[spring-integration-core-4.3.5.RELEASE.jar:4.3.5.RELEASE]
... 28 common frames omitted

服务器非常简单,一个类和一个spring应用程序属性文件:

package com.example;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RestController;

import javax.annotation.PostConstruct;
import java.util.Map;

@RestController
@EnableBinding(ProducerChannels.class)
@SpringBootApplication
public class ProducerApplication {

  private final MessageChannel consumer;

  public ProducerApplication(ProducerChannels channels){
      this.consumer = channels.consumer();
  }

  @PostMapping("/greet/{name}")
  public void publish(@RequestBody Map<String, String> name){
      String greeting = "Hello, " + name + "!";

      Message<String> msg = MessageBuilder.withPayload(greeting).build();

      consumer.send(msg);
  }

  public static void main(String[] args) {
      SpringApplication.run(ProducerApplication.class, args);
  }
}

interface ProducerChannels {

  @Output
  MessageChannel consumer();
}

blockquote spring.cloud.stream.kafka.bindings.consumer.destination=consumer server.port=8080
我还需要做什么来配置SpringCloudStream?

ffx8fchx

ffx8fchx1#

您使用的是什么版本?我刚刚将您的代码粘贴到boot1.4.2应用程序(streamstarter的1.1.0.release)中,对我来说效果很好。

9jyewag0

9jyewag02#

消费者是一个特殊的关键词。你不应该用它。我想如果你按下面的方法修好它就会发生。

public interface ProducerChannels {

    String OUTPUT = "example-topic";

    @Output(OUTPUT)
    MessageChannel exampleTopic();
}

application.yml文件

spring:
  cloud:
    stream:
      kafka:
        binder:
          brokers: localhost:9092
      bindings:
        example-topic:
          destination: example-topic
          contentType: application/json

spring.cloud.stream.bindings.exampleTopic.destination= example-topic

相关问题