一个流中几个不同字段的平均值

dwthyt8l  于 2021-06-21  发布在  Flink
关注(0)|答案(1)|浏览(328)

我还没有选择一个流媒体框架,但是我现在正在和flink玩。但是,我愿意使用束流、Spark流,无论我发现什么都符合我的用例。您将如何等效于以下sql:

SELECT a,b,c, avg(d), avg(e), ..., avg(z)
FROM whatever
GROUP BY a,b,c,d,e, ..., z

对Flink来说,平均值的计算似乎是通过一个聚合函数来完成的https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/common/functions/aggregatefunction.java#l61
但我不明白你是怎么做这个“秤”的。对于单个字段的平均值来说,这似乎是很多样板。如果我有几个不同的流和不同的领域,我需要平均?
flink、beam、结构化流媒体等是否能让这变得更容易?
顺便说一句,有没有一个简单的方法来模仿postgres的这个漂亮的count过滤器语法,

SELECT
  COUNT(*) AS unfiltered,
  COUNT(*) FILTER (WHERE some_condition) AS filtered
FROM whatever
uqxowvwt

uqxowvwt1#

一般来说,在flink jobs中,我会将已定义的用户函数创建为单独的类,然后我可以将这些类应用于我喜欢的任何字段。flink还有一个SQLAPI,我不太熟悉,但这里有一个基于我在这里找到的代码的示例(https://gist.github.com/mustafaakin/457859b8bf703c64029071c1139b593d):

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

StreamTableEnvironment table = TableEnvironment.getTableEnvironment(env);
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
DataStream<String> text = env.socketTextStream("localhost", 9999, "\n");
DataStream<Tuple3<String, Double, Time>> dataset = text.map(...);

table.registerDataStream("dataset", dataset, "p1, p2, p3");
String query = "SELECT p1, AVG(p2) AS avgp2 FROM dataset GROUP p1";
Table tableResult = table.sql(query);

// print to System.out
table.toAppendStream(tableResult, Row.class).print();

env.execute();

我还将研究ApacheIgnite,用于使用sql查询进行数据流传输。我自己从来没用过,但我听过很多好话。

相关问题