flink statefun到flink表api的连接

t2a7ltrp  于 2021-06-24  发布在  Flink
关注(0)|答案(1)|浏览(424)

我们感兴趣的是从新的有状态函数?连接到常规的flink流应用程序,理想情况下使用表api。我们的想法是从statefun查询在flink注册的表,这可能吗?正确的方法是什么?
到目前为止,我的想法是在一些主函数中初始化我的表流,并注册一个有状态函数提供程序来连接到表:

@AutoService(StatefulFunctionModule.class)
public class Module implements StatefulFunctionModule {

  @Override
  public void configure(Map<String, String> globalConfiguration, Binder binder) {
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

    // ingest a DataStream from an external source
    DataStream<Tuple3<Long, String, Integer>> ds = env.addSource(...);

    // SQL query with an inlined (unregistered) table
    Table myTable = tableEnv.fromDataStream(ds, "user, product, amount");
    tableEnv.createTemporaryView("my_table", myTable);

    TableFunctionProvider tableProvider = new TableFunctionProvider();
    binder.bindFunctionProvider(FnEnrichmentCallback.TYPE, tableProvider);

    //continue registering my other messages
    //...
  }
}

有状态函数提供程序将返回 FnTableQuery 它只需在收到消息时查询表:

public class TableFunctionProvider implements StatefulFunctionProvider {

  @Override
  public StatefulFunction functionOfType(FunctionType type) {
    return new FnTableQuery();
  }
}

然后,query function对象将作为每个已建立进程的参与者进行操作,并在调用时简单地查询表:

public class FnTableQuery extends StatefulMatchFunction {

  static final FunctionType TYPE = new FunctionType(Identifiers.NAMESPACE, "my-table");

  private Table myTable;

  @Override
  public void configure(MatchBinder binder) {
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

    myTable = tableEnv.from("my_table");

    binder
        .otherwise(this::catchAll);
  }

  private void catchAll(Context context, Object message) {
    context.send(FnEnrichmentCallback.TYPE, myTable.select("max(amount)").toString(), message);
  }
}

如果这种方法不合理,我提前道歉,因为我不知道是否:
flink和statefun应用程序可以在源/汇领域之外协同工作,特别是因为这个特定函数是无状态的,并且表是有状态的
我们可以像这样查询flink表,我只将它们作为中间对象来查询,以发送到接收器或数据流
在module.configure中初始化一些东西是有意义的,如果有状态函数提供程序和它的匹配函数对每个并行工作进程都调用一次

ddarikpa

ddarikpa1#

apache flink社区确实考虑过在将来支持flink数据流作为statefun入口/出口。
这意味着您可以获取使用flink table api/flink cep/datastream api等的结果流,并使用流中的事件调用函数。

相关问题