spark结构化流媒体无法接收kafka消息

ehxuflar  于 2021-06-07  发布在  Kafka
关注(0)|答案(1)|浏览(533)

我正在测试使用kafka的spark结构化流媒体。我有一个kafka代理(0.10.1) host28 ,默认分区数: num.partitions=1 我的制作人:

bin/kafka-console-producer.sh --broker-list host28:6667 --topic test

当我使用

bin/kafka-console-consumer.sh --zookeeper host26:2181,host27:2181,host28:2181 --topic test --from-beginning

bin/kafka-console-consumer.sh --bootstrap-server host8:6667 --topic test --from-beginning --partition 0

能收到Kafka的信息。
但使用时

bin/kafka-console-consumer.sh --bootstrap-server host28:6667 --topic test --from-beginning

或者spark structured streaming无法接收消息

public class Main {
    private static String APP_NAE = "test_streaming_from_kafka";
    private static String KAFKA_HOST = "host28:6667";
    private static String KAFKA_SUBSCRIBE = "test";
    public static void main(String[] args) throws Exception {

        SparkSession spark = SparkSession
                .builder()
                .appName(APP_NAE)
                .getOrCreate();

        DataStreamReader reader = spark
                .readStream()
                .format("kafka")
                .option("kafka.bootstrap.servers", KAFKA_HOST)
                .option("subscribe", KAFKA_SUBSCRIBE);

        StreamingQuery query = reader.load()
                .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)", "topic", "CAST(partition AS STRING)", "CAST(offset AS STRING)")
                .writeStream()
                .format("console")
                .start();

        query.awaitTermination();
    }
}
mftmpeh8

mftmpeh81#

解决了的!
我把Spark记录从 INFODEBUG ,然后我发现:
18/08/17 21:12:07调试abstractcoordinator:收到组协调器响应clientresponse(receivedTimes=1534511527794,disconnected=false,request=clientrequest(expectresponse=true,callback=org.apache.kafka.clients.consumer.internals)。consumernetworkclient$requestfuturecompletionhandler@3d2afb1b,request=requestsend(头={api\u key=10,api_version=0,correlation_id=117,client_id=consumer-1},body={group_id=spark-kafka-source-f7b2afd9-e1c6-4d16-b299-6d629599cdc8-42875004-driver-0}),createdtimems=1534511527794,sendtimems=1534511527794),responsebody={error_code=15,coordinator={node_id=-1,host=,端口=-1}})18/08/17 21:12:07调试抽象协调器:组协调器查找组spark-kafka-source-f7b2afd9-e1c6-4d16-b299-6d629599cdc8-42875004-driver-0失败:组协调器不可用。
谷歌 The group coordinator is not available 找到了这个

相关问题