我们有一个应用程序从一个spring引导服务器发送消息,比如说服务器端口号是8080,我们需要使用kafka将此消息发送到另一个服务器,kafka的端口号是8090。如何在8090端口服务器上使用这些服务。你能告诉我我做错了什么吗
生产商在8080端口上运行
package com.xerox.pps.Kafka.services;
import java.io.IOException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;
@Service
@EnableBinding(Source.class)
public class KafkaProducer {
private Source mySource;
private static final Logger logger = LoggerFactory.getLogger(KafkaProducer.class);
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
String kafkaTopic = "java_in_use_topic";
public void send(String message) {
logger.info(String.format("#### -> Producing message -> %s", message));
kafkaTemplate.send(kafkaTopic, message);
}
@KafkaListener(topics = "hello_world_topic1", groupId = "java_in_use_topic-0")
public void consume(String message) throws IOException {
logger.info(String.format("#### ->Recived message -> %s", message));
}
}
8090上运行的耗电元件
package com.xerox.pps.Kafka.services;
import java.io.IOException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;
@Service
@EnableBinding(Sink.class)
public class KafkaConsumer {
private final Logger logger = LoggerFactory.getLogger(KafkaConsumer.class);
@Autowired private KafkaTemplate<String, String> kafkaTemplate;
@KafkaListener(topics = "java_in_use_topic", groupId = "java_in_use_topic-0")
@StreamListener(target = Sink.INPUT)
public void consume(String message) throws IOException {
logger.info(String.format("#### -> Consumed message -> %s", message));
kafkaTemplate.send("hello_world_topic1", message+" Response");
}
}
- application.properties文件*
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
# spring.kafka.producer.bootstrap-servers=localhost:9092
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.cloud.stream.default-binder=kafka
spring.cloud.stream.kafka.binder.brokers=localhost:9092
spring.cloud.stream.bindings.input.group=java_in_use_topic-0
spring.cloud.stream.bindings.input.binder=kafka
spring.cloud.stream.bindings.input.destination=test
spring.cloud.stream.bindings.input.content-type=text/plain
暂无答案!
目前还没有任何答案,快来回答吧!