将spark数据集转换为json并写入kafka producer

ubbxdtey  于 2021-06-06  发布在  Kafka
关注(0)|答案(1)|浏览(785)

我想从配置单元中读取一个表并写入kafka producer(批处理作业)。
目前,我正在阅读表格 Dataset<Row> 在我的java类中,并尝试转换为json,以便使用kafkaproducer编写json消息。

Dataset<Row> data = spark.sql("select * from tablename limit 5");
List<Row> rows = data.collectAsList();
for(Row row: rows) {
        List<String> stringList = new ArrayList<String>(Arrays.asList(row.schema().fieldNames())); 
        Seq<String> row_seq = JavaConverters.asScalaIteratorConverter(stringList.iterator()).asScala().toSeq();
        Map map = (Map) row.getValuesMap(row_seq);
        JSONObject json = new JSONObject();
        json.putAll( map);

        ProducerRecord<String, String> record = new ProducerRecord<String, String>(SPARK_CONF.get("topic.name"), json.toString());
        producer.send(record);

我有一个例外

fxnxkyjh

fxnxkyjh1#

你一写信 collectAsList(); ,您不再使用spark,只使用原始的kafka java api。
我的建议是使用spark结构化流Kafka集成,你可以这样做
下面是一个示例,您需要至少用两列构成一个Dataframe,因为kafka接受键和值。

// Write key-value data from a DataFrame to a specific Kafka topic specified in an option
data.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
  .write
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("topic", "topic_name")
  .save()

至于把数据转换成json, collectToList() 这是错误的。不要将数据拉入单个节点。
你可以用 data.map() 将数据集从一种格式转换为另一种格式。
例如,您可以将一行Map为json格式的字符串。

row -> "{\"f0\":" + row.get(0) + "}"

相关问题