为什么我要创建我自己的richsink函数而不是仅仅打开和关闭我的postgresql连接?

mrphzbgm  于 2021-06-21  发布在  Flink
关注(0)|答案(1)|浏览(366)

我想知道为什么我真的需要创建我自己的richsinkfunction或者使用jdbcoutputformat来连接数据库,而不是仅仅创建我的连接,使用我的sinkfunction中的传统postgresql驱动程序执行查询并关闭连接?
我发现许多文章告诉你,但没有解释为什么?有什么区别?
使用jdbcoutputformat的代码示例,

JDBCOutputFormat jdbcOutput = JDBCOutputFormat.buildJDBCOutputFormat()
     .setDrivername("org.postgresql.Driver")
     .setDBUrl("jdbc:postgresql://localhost:1234/test?user=xxx&password=xxx")
     .setQuery(query)
     .setSqlTypes(new int[] { Types.VARCHAR, Types.VARCHAR, Types.VARCHAR }) //set the types
     .finish();

实现自己的richsink函数的代码示例,

public class RichCaseSink extends RichSinkFunction<Case> {

  private static final String UPSERT_CASE = "INSERT INTO public.cases (caseid, tracehash) "
      + "VALUES (?, ?) "
      + "ON CONFLICT (caseid) DO UPDATE SET "
      + "  tracehash=?";

  private PreparedStatement statement;

  @Override
  public void invoke(Case aCase) throws Exception {

    statement.setString(1, aCase.getId());
    statement.setString(2, aCase.getTraceHash());
    statement.setString(3, aCase.getTraceHash());
    statement.addBatch();
    statement.executeBatch();
  }

  @Override
  public void open(Configuration parameters) throws Exception {
    Class.forName("org.postgresql.Driver");
    Connection connection =
        DriverManager.getConnection("jdbc:postgresql://localhost:5432/casedb?user=signavio&password=signavio");

    statement = connection.prepareStatement(UPSERT_CASE);
  }

}

为什么我不能只使用postgresql驱动程序?

public class Storable implements SinkFunction<Activity>{

    @Override
    public void invoke(Activity activity) throws Exception {
        Class.forName("org.postgresql.Driver");
        try(Connection connection =
            DriverManager.getConnection("jdbc:postgresql://localhost:5432/casedb?user=signavio&password=signavio")){

        statement = connection.prepareStatement(UPSERT_CASE);

        //Perform the query

        //close connection...
        }
    }

}

有人知道flink最佳实践的技术答案吗?richsinkfunction的实现或jdbcoutputformat的使用有什么特别之处吗?
先谢谢你。

mwngjboj

mwngjboj1#

你可以用你自己的 SinkFunction 只需使用 invoke() 方法来打开连接并写入数据,它应该可以正常工作。但在大多数情况下,它的性能会非常非常差。
第一个例子和第二个例子之间的实际区别是 RichSinkFunction 您正在使用 open() 方法打开连接并准备语句。这个 open() 方法仅在函数初始化时调用一次。在第二个示例中,您将打开到数据库的连接,并在 invoke() 方法,为输入的每个元素调用 DataStream 。实际上,您将为流中的每个元素打开一个新连接。
创建一个数据库连接是一件昂贵的事情,而且它肯定会有可怕的性能缺陷。

相关问题