从列表行键创建sparkDataframe

jhiyze9q  于 2021-06-09  发布在  Hbase
关注(0)|答案(1)|浏览(307)

我有一个表格或表格中的hbase行键列表 Array[Row] 想制造Spark DataFrame 在使用这些行键从hbase获取的行中。
我在想这样的事情:

def getDataFrameFromList(spark: SparkSession, rList : Array[Row]): DataFrame = {

  val conf = HBaseConfiguration.create()
  val mlRows : List[RDD[String]] = new ArrayList[RDD[String]]

  conf.set("hbase.zookeeper.quorum", "dev.server")
  conf.set("hbase.zookeeper.property.clientPort", "2181")
  conf.set("zookeeper.znode.parent","/hbase-unsecure")
  conf.set(TableInputFormat.INPUT_TABLE, "hbase_tbl1")

  rList.foreach( r => {
    var rStr = r.toString()
    conf.set(TableInputFormat.SCAN_ROW_START, rStr)
    conf.set(TableInputFormat.SCAN_ROW_STOP, rStr + "_")
    // read one row
    val recsRdd = readHBaseRdd(spark, conf)
    mlRows.append(recsRdd)
  })

  // This works, but it is only one row
  //val resourcesDf = spark.read.json(recsRdd) 

  var resourcesDf = <Code here to convert List[RDD[String]] to DataFrame>
  //resourcesDf
  spark.emptyDataFrame
}

我能做到 recsRdd.collect() 在for循环中,将其转换为字符串并将该json附加到 ArrayList[String 但我不确定是否有效,打电话 collect() 在这样的for循环中。 readHBaseRdd 正在使用 newAPIHadoopRDD 从hbase获取数据

def readHBaseRdd(spark: SparkSession, conf: Configuration) = {
    val hBaseRDD = spark.sparkContext.newAPIHadoopRDD(conf, classOf[TableInputFormat],
      classOf[ImmutableBytesWritable],
      classOf[Result])

    hBaseRDD.map {
      case (_: ImmutableBytesWritable, value: Result) =>
          Bytes.toString(value.getValue(Bytes.toBytes("cf"),
                                            Bytes.toBytes("jsonCol")))
        }
    }
  }
tpxzln5u

tpxzln5u1#

使用 spark.union([mainRdd, recsRdd]) 而不是列表或RDD(mlrows)
为什么只从hbase读取一行?尽量保持最大的间隔。
总是避免打电话 collect() ,仅对调试/测试执行此操作。

相关问题