我想从配置单元中读取一个表并写入kafka producer(批处理作业)。
目前,我正在阅读表格 Dataset<Row>
在我的java类中,并尝试转换为json,以便使用kafkaproducer编写json消息。
Dataset<Row> data = spark.sql("select * from tablename limit 5");
List<Row> rows = data.collectAsList();
for(Row row: rows) {
List<String> stringList = new ArrayList<String>(Arrays.asList(row.schema().fieldNames()));
Seq<String> row_seq = JavaConverters.asScalaIteratorConverter(stringList.iterator()).asScala().toSeq();
Map map = (Map) row.getValuesMap(row_seq);
JSONObject json = new JSONObject();
json.putAll( map);
ProducerRecord<String, String> record = new ProducerRecord<String, String>(SPARK_CONF.get("topic.name"), json.toString());
producer.send(record);
我有一个例外
1条答案
按热度按时间fxnxkyjh1#
你一写信
collectAsList();
,您不再使用spark,只使用原始的kafka java api。我的建议是使用spark结构化流Kafka集成,你可以这样做
下面是一个示例,您需要至少用两列构成一个Dataframe,因为kafka接受键和值。
至于把数据转换成json,
collectToList()
这是错误的。不要将数据拉入单个节点。你可以用
data.map()
将数据集从一种格式转换为另一种格式。例如,您可以将一行Map为json格式的字符串。