无法使用spark批处理在kafka中推送消息

2admgd59  于 2021-06-04  发布在  Kafka
关注(0)|答案(0)|浏览(243)
sparkSession = SparkSession.builder().appName("TDAP DATA SHARING 
     API").config("spark.ui.port", "6787")
                .getOrCreate();
        Dataset<Row> parquetFileDF = sparkSession.read().parquet(
                "/test/cob_date=2019-04-18/");
        parquetFileDF.createOrReplaceTempView("ParquetTable");
        Dataset<Row> parkSQL = sparkSession.sql("select * from ParquetTable where m_date<>'NULL' limit 10");

        System.out.println("printing the scema of parkSQL");
        parkSQL.printSchema();
        System.out.println("*record count in dataframe*" + parkSQL.count());

        System.out.println("printing the scema of df");
        Dataset<Row> df = parkSQL.selectExpr("struct(*) AS value");
        Dataset<Row> df2 = df.selectExpr("CAST(value AS STRING)");
        df2.printSchema();
        df2.show();
        System.out.println("*record count in dataframe*" + df2.count());

        df2.write().format("kafka").option("kafka.bootstrap.servers", "10.91.134.19:9093")
                .option("kafka.key.deserializer", "org.apache.kafka.common.serialization.StringDeserialize")
                .option("kafka.value.deserializer", "org.apache.kafka.common.serialization.StringDeserialize")
                .option("kafka.ssl.keystore.location", "/hivestage/tmmart/tmmart/sparkJar/kafka.keystore.jks")
                .option("kafka.ssl.keystore.password", "x@123")
                .option("kafka.ssl.key.password", "x@123")
                .option("kafka.ssl.truststore.location", "/x/tmmart/tmmart/sparkJar/kafka.truststore.jks")
                .option("kafka.ssl.truststore.password", "x@UAT").option("kafka.ssl.keystore.type", "JKS")
                .option("kafka.ssl.protocol", "SSL").option("kafka.sasl.mechanism", "SASL")
                .option("kafka.ssl.endpoint.identification.algorithm", "")
                .option("kafka.request.timeout.ms", "300000").option("topic", "dbs_itt_tdap_tswb_load_perf").save();

我无法在Kafka的主题中传递任何信息。这是一个错误
org.apache.kafka.common.errors.timeoutexception:在60000毫秒后更新元数据失败。
谁能告诉我,我错在哪里?

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题