从客户端通过websocket订阅kafka主题

luaexgnf  于 2021-06-07  发布在  Kafka
关注(0)|答案(0)|浏览(370)

当服务器端的生产者将消息推送到特定的Kafka主题时,我们尝试使用浏览器中的js代码来收听Kafka主题。
在服务器端,kafka服务器和zookeeper分别在9092和2181端口上运行。

String topicName = "test"; 
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");     
props.put("acks", "all");
props.put("retries", 0);
props.put("batch.size", 16384);
props.put("linger.ms", 1);
props.put("buffer.memory", 33554432);
props.put("key.serializer", 
     "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", 
             "org.apache.kafka.common.serialization.StringSerializer");
props.put("partitioner.class",
        "org.apache.kafka.clients.producer.internals.DefaultPartitioner");
Thread.currentThread().setContextClassLoader(null);
Producer<String, String> producer = new KafkaProducer <String, String>(props);
for(int i = 0; i < 10; i++){
    producer.send(new ProducerRecord<String, String>(topicName, 
       Integer.toString(i), Integer.toString(i) + i));
    }
producer.close();
}catch(Exception e){
    e.printStackTrace();
}

客户端代码段:

<!DOCTYPE html>
<html>
<head>My Page</head>
<script src="stomp.js"></script>
<script src="http://cdn.sockjs.org/sockjs-0.3.min.js"></script>
<script type="text/javascript">
console.log('Starting: ');
var socket = new SockJS('ws://localhost:9092');
client = Stomp.over(socket);
client.connect( "", "",
        function() {
            console.log('Connected: ');
            client.subscribe("/topic/test",
                function( message ) {
                    alert( message );
                }  
            );
        }
);
</script>
</html>

在客户端,当我们尝试通过ws连接时,只有start:console被打印,connected:not被打印,因为到kafka服务器的websocket连接没有成功。
由于Kafka不直接支持stomp,我们试图起诉sockjs。
有谁能帮我们实现这个功能吗。

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题