boot微服务从不同的服务器使用kafka消息

nhaq1z21  于 2021-06-05  发布在  Kafka
关注(0)|答案(0)|浏览(273)

我们有一个应用程序从一个spring引导服务器发送消息,比如说服务器端口号是8080,我们需要使用kafka将此消息发送到另一个服务器,kafka的端口号是8090。如何在8090端口服务器上使用这些服务。你能告诉我我做错了什么吗
生产商在8080端口上运行

package com.xerox.pps.Kafka.services;

import java.io.IOException;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;

@Service
@EnableBinding(Source.class)
public class KafkaProducer {

private Source mySource;

    private static final Logger logger = LoggerFactory.getLogger(KafkaProducer.class);

    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    String kafkaTopic = "java_in_use_topic";

    public void send(String message) {
        logger.info(String.format("#### -> Producing message -> %s", message));
        kafkaTemplate.send(kafkaTopic, message);
    }
    @KafkaListener(topics = "hello_world_topic1", groupId = "java_in_use_topic-0")
    public void consume(String message) throws IOException {
        logger.info(String.format("#### ->Recived message -> %s", message));
    }

}

8090上运行的耗电元件

package com.xerox.pps.Kafka.services;

import java.io.IOException;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;

@Service
@EnableBinding(Sink.class)
public class KafkaConsumer {

    private final Logger logger = LoggerFactory.getLogger(KafkaConsumer.class);

    @Autowired private KafkaTemplate<String, String> kafkaTemplate;

    @KafkaListener(topics = "java_in_use_topic", groupId = "java_in_use_topic-0")
    @StreamListener(target = Sink.INPUT)

    public void consume(String message) throws IOException {
          logger.info(String.format("#### -> Consumed message -> %s", message));
          kafkaTemplate.send("hello_world_topic1", message+" Response"); 
    }

}
  • application.properties文件*
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer

# spring.kafka.producer.bootstrap-servers=localhost:9092

spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer

spring.cloud.stream.default-binder=kafka
spring.cloud.stream.kafka.binder.brokers=localhost:9092
spring.cloud.stream.bindings.input.group=java_in_use_topic-0
spring.cloud.stream.bindings.input.binder=kafka
spring.cloud.stream.bindings.input.destination=test
spring.cloud.stream.bindings.input.content-type=text/plain

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题