scala—如何将列表中的数据作为记录发送给kafka

vlju58qv  于 2021-06-05  发布在  Kafka
关注(0)|答案(1)|浏览(405)

我是新来scala的,我有一个这样的清单

newlist = List([Date,Open,High,Low,Close,Volume,Name], [2012-08-13T00:00:00.000Z,92.29,92.59,91.74,92.4,2075391.0,MMM], [2012-08-14T00:00:00.000Z,92.36,92.5,92.01,92.3,1843476.0,MMM], [2012-08-15T00:00:00.000Z,92.0,92.74,91.94,92.54,1983395.0,MMM], [2012-08-16T00:00:00.000Z,92.75,93.87,92.21,93.74,3395145.0,MMM], [2012-08-17T00:00:00.000Z,93.93,94.3,93.59,94.24,3069513.0,MMM], [2012-08-20T00:00:00.000Z,94.0,94.17,93.55,93.89,1640008.0,MMM])

我使用下面的代码将列表中的数据发送给kafka

val today = Calendar.getInstance.getTime
      val formatter = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
      val key = UUID.randomUUID().toString().split("-")(0)

      val value = formatter.format(today) + "," + newList

      val data = new ProducerRecord[String, String](topic, key, value)

      println(data.value())
      producer.send(data)

我的输出如下:

2020-05-12 14:56:41,List([Date,Open,High,Low,Close,Volume,Name], [2012-08-13T00:00:00.000Z,92.29,92.59,91.74,92.4,2075391.0,MMM], [2012-08-14T00:00:00.000Z,92.36,92.5,92.01,92.3,1843476.0,MMM], [2012-08-15T00:00:00.000Z,92.0,92.74,91.94,92.54,1983395.0,MMM], [2012-08-16T00:00:00.000Z,92.75,93.87,92.21,93.74,3395145.0,MMM], [2012-08-17T00:00:00.000Z,93.93,94.3,93.59,94.24,3069513.0,MMM], [2012-08-20T00:00:00.000Z,94.0,94.17,93.55,93.89,1640008.0,MMM])
2020-05-12 14:56:42,List([Date,Open,High,Low,Close,Volume,Name], [2012-08-13T00:00:00.000Z,92.29,92.59,91.74,92.4,2075391.0,MMM], [2012-08-14T00:00:00.000Z,92.36,92.5,92.01,92.3,1843476.0,MMM], [2012-08-15T00:00:00.000Z,92.0,92.74,91.94,92.54,1983395.0,MMM], [2012-08-16T00:00:00.000Z,92.75,93.87,92.21,93.74,3395145.0,MMM], [2012-08-17T00:00:00.000Z,93.93,94.3,93.59,94.24,3069513.0,MMM], [2012-08-20T00:00:00.000Z,94.0,94.17,93.55,93.89,1640008.0,MMM])

但我希望我的输出是这样的(列表中的每个值都是行):

2020-05-12 14:56:41,Date,Open,High,Low,Close,Volume,Name
2020-05-12 14:56:42,2012-08-13T00:00:00.000Z,92.29,92.59,91.74,92.4,2075391.0,MMM
2020-05-12 14:56:43,2012-08-14T00:00:00.000Z,92.36,92.5,92.01,92.3,1843476.0,MMM

我们怎么做?请忽略我的错误

ncecgwcz

ncecgwcz1#

producerrecord是单个数据的示例,如果您要发送数据列表,请多次调用“send”方法

newlist.map(row => 
    producer.send(new ProducerRecord[String, String](topic, key, row.mkString))

相关问题