如何在flink流媒体作业中读写hbase

cwdobuhd  于 2021-06-02  发布在  Hadoop
关注(0)|答案(1)|浏览(379)

如果我们必须在流应用程序中读写hbase,我们怎么能做到呢。我们通过open方法打开一个连接进行写入,我们如何打开一个连接进行读取。

object test {

    if (args.length != 11) {
      //print args
      System.exit(1)
    }

    val Array() = args
    println("Parameters Passed " + ...);

    val env = StreamExecutionEnvironment.getExecutionEnvironment

    val properties = new Properties()
    properties.setProperty("bootstrap.servers", metadataBrokerList)
    properties.setProperty("zookeeper.connect", zkQuorum)
    properties.setProperty("group.id", group)

    val messageStream = env.addSource(new FlinkKafkaConsumer08[String](topics, new SimpleStringSchema(), properties))

    messageStream.map { x => getheader(x) }

    def getheader(a: String) {

        //Get header and parse and split the headers
                if (metadata not available hit HBASE) { //Device Level send(Just JSON)

            //How to read from HBASE here .

                      } 
                      //If the resultset is not available in Map fetch from phoenix
                      else {
                          //fetch from cache
                      }
     }

    }
   messageStream.writeUsingOutputFormat(new HBaseOutputFormat());
   env.execute()

}

现在在方法内部 getheader 如果我想从hbase里面读 if(metadata not available hit HBASE) 我怎么能这么做。我不想在这里打开连接,我的想法是为一个线程维护一个连接并重用它,就像flink使用hbase sink with open()方法或spark使用foreachpartition所做的那样。我尝试过这个,但是我不能将streamexecutionenvironment传递给方法。我怎么能做到这一点,有人能提供一个片段吗?

8hhllhi2

8hhllhi21#

您希望从流式用户函数读取/写入apachehbase。您链接的hbasereadexample做了一些不同的事情:它将hbase表读入数据集(flink的批处理抽象)。在用户函数中使用此代码意味着从flink程序中启动flink程序。
对于您的用例,您需要在用户函数中直接创建一个hbase客户机并与之交互。最好的方法是使用 RichFlatMapFunction 并在 open() 方法。
flink的下一个版本(1.2.0)将支持用户函数中的异步i/o操作,这将显著提高应用程序的吞吐量。

相关问题