我收到Kafka的数据格式,其中null是键。
null,val1,val2,val3,val4,val5,val6,val7,...val23
null,val1,val2,val3,val4,val5,val6,val7,...val23
null,val1,val2,val3,val4,val5,val6,val7,...val23
现在,我已经Map了这些值,以删除空键,并使用以下代码形成新的键和值对。
val topics = Array("kafka-topic")
val stream = KafkaUtils.createDirectStream[String, String](
streamingContext,
PreferConsistent,
Subscribe[String, String](topics, kafkaParams)
)
streamingContext.checkpoint("hdfs:///hdfs/location")
val record= stream.map(record=>record.value().toString)
val rdds=record.transform
{
pps=>pps.flatMap(_.split(","))
}
val ppds= rdds.transform
` `{
pair=>pair.map(vals=>
(vals(2).toString(),Set(vals(1).toLong,vals(2),vals(3),vals(4),val(5),val(6),val(7)....val(23)
}
其中vals(2)一个字符串将是键,其余22个值将是值。
我现在正试图在20秒的时间窗口内获得每个键的所有值的平均值,并不断地将计算出的每个键的平均值推送到数据存储(hbase)中。在批处理模式中,我知道有aggregatebykey()方法允许您这样做。
在流媒体模式下,如何实现这一点?
还有一种可能是某些值是字符串。如何跳过字符串值,只计算数值类型的平均值,同时不断地将更新推送到hbase?
2条答案
按热度按时间q43xntqr1#
使用reducebykeyandwindow,
上面的示例将用于计算窗口期间的字数,而不是像上面那样使用简单的加法函数,您可以编写更复杂的聚合函数,并将其与reducebykeyandwindow一起使用
了解更多信息
https://docs.cloud.databricks.com/docs/latest/databricks_guide/07%20spark%20streaming/10%20window%20aggregations.html
jbose2ul2#
你可以这样使用: