我正在使用flinkdatastreamapi,其中有可用的机架&我想通过机架id计算温度组的“平均值”。我的窗口持续时间是40秒&我的窗口每10秒滑动一次…下面是我的代码,我每10秒计算一个rackid的温度总和,但现在我要计算平均温度:
static Properties properties=new Properties();
public static Properties getProperties()
{
properties.setProperty("bootstrap.servers", "54.164.200.104:9092");
properties.setProperty("zookeeper.connect", "54.164.200.104:2181");
//properties.setProperty("deserializer.class", "kafka.serializer.StringEncoder");
//properties.setProperty("group.id", "akshay");
properties.setProperty("auto.offset.reset", "earliest");
return properties;
}
@SuppressWarnings("rawtypes")
public static void main(String[] args) throws Exception
{
StreamExecutionEnvironment env=StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
Properties props=Program.getProperties();
DataStream<TemperatureEvent> dstream=env.addSource(new FlinkKafkaConsumer09<TemperatureEvent>("TemperatureEvent", new TemperatureEventSchema(), props)).assignTimestampsAndWatermarks(new IngestionTimeExtractor<>());
DataStream<TemperatureEvent> ds1=dstream.keyBy("rackId").timeWindow(Time.seconds(40), Time.seconds(10)).sum("temperature");
env.execute("Temperature Consumer");
}
如何计算上述示例的平均温度??
1条答案
按热度按时间ac1kyiln1#
据我所知,你需要自己写平均函数。你可以在这里找到一个例子:
https://github.com/apache/flink/blob/master/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/groupedprocessingtimewindowexample.java
在你的情况下,你可能会
.sum("temperature");
比如说.apply(new Avg());
并实现avg类:注意:如果有可能在空窗口中调用函数(例如,通过使用自定义触发器),则需要在访问elements.head之前进行检查