neo4j flink接收器中的会话管理

nwlqm0z1  于 2021-06-25  发布在  Flink
关注(0)|答案(0)|浏览(443)

我正在用apache flink和neo4j(社区版)开发一个数据分析应用程序。
在这个应用程序中,flink sink必须在neo4j中保存/更新关系。
neo4j会话管理的最佳方式是什么?为什么?
首次实施:

public class MySink extends RichSinkFunction<Link> {

  private DbConfiguration dbconfig;

  private Driver driver;

  @Override
  public void open(Configuration parameters) throws Exception {
    this.driver = Neo4JManager.open(this.dbconfig);
  }

  @Override
  public void close() throws Exception {
    this.driver.close();
  }

  @Override
  public void invoke(Link link) throws Exception {
    Session session = this.driver.session();

    Neo4JManager.saveLink(session, link);

    session.close();
  }
}

第二次实施:

public class MySink extends RichSinkFunction<Link> {

  private DbConfiguration dbconfig;

  private Driver driver;

  private Session session;

  @Override
  public void open(Configuration parameters) throws Exception {
    this.driver = Neo4JManager.open(this.dbconfig);
    this.session = driver.session();
  }

  @Override
  public void close() throws Exception {
    this.session.close();
    this.driver.close();
  }

  @Override
  public void invoke(Link link) throws Exception {
    Neo4JManager.saveLink(this.session, link);
  }
}

在这两种实现中,都使用了以下功能:

public class Neo4JManager {
  public static Driver open(DbConfiguration dbconf) {
    AuthToken auth = AuthTokens.basic(dbconf.getUsername(), dbconf.getPassword());
    Config config = Config.build().withEncryptionLevel(Config.EncryptionLevel.NONE ).toConfig();
    return GraphDatabase.driver(dbconf.getHostname(), auth, config);
  }

  public static void saveLink(Session session, Link link) {
    Value params = parameters("x", link.x, "y", link.y);
    session.run('CREATE (Person {id:{x}}-[FOLLOWS]->(Person {id:{y}}))'
  }
}

谢谢您。

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题