从map函数内部调用的函数返回spark rdd时出错

c86crjj0  于 2021-06-09  发布在  Hbase
关注(0)|答案(1)|浏览(405)

我有一个hbase表中的行键集合(如下所示),我想创建一个fetchdata函数,从集合中返回行键的rdd数据。目标是从fetchdata方法中获得植物集合中每个元素的RDD的并集。我在下面给出了代码的相关部分。我的问题是,代码给出了fetchdata返回类型的编译错误:
println(“partb:”+hbaserdd.getnumpartitions)
错误:值getnumpartitions不是选项[org.apache.spark.rdd.rdd[it.nerdammer.spark.test.sys.record]的成员
我使用的是Scala2.11.8Spark2.2.0和maven编译

import it.nerdammer.spark.hbase._
import org.apache.spark.sql._
import org.apache.spark.sql.types.{StructType, StructField, StringType, IntegerType};
import org.apache.log4j.Level
import org.apache.log4j.Logger
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
object sys {
  case class systems( rowkey: String, iacp: Option[String], temp: Option[String])

  val spark = SparkSession.builder().appName("myApp").config("spark.executor.cores",4).getOrCreate()
  import spark.implicits._

  type Record = (String, Option[String], Option[String])

  def fetchData(plant: String): RDD[Record] = {
    val start_index = plant
    val end_index = plant + "z"
    //The below command works fine if I run it in main function, but to get multiple rows from hbase, I am using it in a separate function
    spark.sparkContext.hbaseTable[Record]("test_table").select("iacp","temp").inColumnFamily("pp").withStartRow(start_index).withStopRow(end_index)

  }

  def main(args: Array[String]) {
    //the below elements in the collection are prefix of relevant rowkeys in hbase table ("test_table") 
    val plants = Vector("a8","cu","aw","fx")
    val hBaseRDD = plants.map( pp => fetchData(pp))
    println("Part: "+ hBaseRDD.getNumPartitions)
    /*
      rest of the code
    */
  }

}

下面是代码的工作版本。这里的问题是我使用for循环,我必须从每个循环中的hbase请求rowkey(plants)vector对应的数据,而不是先获取所有数据,然后执行其余代码

import it.nerdammer.spark.hbase._
    import org.apache.spark.sql._
    import org.apache.spark.sql.types.{StructType, StructField, StringType, IntegerType};
    import org.apache.log4j.Level
    import org.apache.log4j.Logger
    import org.apache.spark.sql.SparkSession
    import org.apache.spark.sql.functions._
    object sys {
      case class systems( rowkey: String, iacp: Option[String], temp: Option[String])
      def main(args: Array[String]) {

        val spark = SparkSession.builder().appName("myApp").config("spark.executor.cores",4).getOrCreate()
        import spark.implicits._

        type Record = (String, Option[String], Option[String])
        val plants = Vector("a8","cu","aw","fx")

        for (plant <- plants){
          val start_index = plant
          val end_index = plant + "z"
          val hBaseRDD = spark.sparkContext.hbaseTable[Record]("test_table").select("iacp","temp").inColumnFamily("pp").withStartRow(start_index).withStopRow(end_index)
          println("Part: "+ hBaseRDD.getNumPartitions)
          /*
            rest of the code
          */
        }
      }
    }

经过努力,这就是我现在的困境。那么我如何将类型转换为required。

scala>   def fetchData(plant: String) = {
     |     val start_index = plant
     |     val end_index = plant + "~"
     |     val x1 = spark.sparkContext.hbaseTable[Record]("test_table").select("iacp","temp").inColumnFamily("pp").withStartRow(start_index).withStopRow(end_index)
     |     x1
     |   }

在repl中定义函数并运行它

scala> val hBaseRDD = plants.map( pp => fetchData(pp)).reduceOption(_ union _)
<console>:39: error: type mismatch;
 found   : org.apache.spark.rdd.RDD[(String, Option[String], Option[String])]
 required: it.nerdammer.spark.hbase.HBaseReaderBuilder[(String, Option[String], Option[String])]
       val hBaseRDD = plants.map( pp => fetchData(pp)).reduceOption(_ union _)

提前谢谢!

v1l68za4

v1l68za41#

类型 hBaseRDDVector[_] 而不是 RDD[_] ,因此无法执行方法 getNumPartitions 在上面。如果我理解正确的话,您希望联合获取的RDD。你可以通过 plants.map( pp => fetchData(pp)).reduceOption(_ union _) (我建议使用 reduceOption 因为它不会在空列表上失败,但是您可以使用 reduce 如果您确信列表不是空的)
还返回了 fetchDataRDD[U] ,但我没有找到 U . 也许这就是编译器推断的原因 Vector[Nothing] 而不是 Vector[RDD[Record]] . 为了避免后续的错误,您还应该更改 RDD[U]RDD[Record] .

相关问题