如何配置spark streaming scala应用程序从hadoop+上的hbase读取

4dc9hkyq  于 2021-05-29  发布在  Hadoop
关注(0)|答案(1)|浏览(421)

spark,hadoop+yarn上的hbase,我想从用sbt构建的scala应用程序读写hbase。
我无法创建hbase
scala应用程序:

/usr/local/sparkapps/HBaseWordCount/src/main/scala/com/mydomain/spark/hbasewordcount/HbaseWordCount.scala

package com.mydomain.spark.hbasewordcount

import org.apache.spark._
import org.apache.spark.streaming._

import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.hbase.client.Put
import org.apache.hadoop.hbase.client.Result
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.mapred.TableOutputFormat
import org.apache.hadoop.hbase.mapreduce.TableInputFormat
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.mapred.JobConf

object HBaseScalaWordCount {
    def main(args: Array[String]) {

        val name = "Example of read from HBase table"

        lazy val sparkConf = new SparkConf().setAppName(name)
        lazy val ssc = new StreamingContext(sparkConf, Seconds(1))
        implicit val config = HBaseConfig() // Assumes hbase-site.xml is on classpath

        val columns = Map(
            "cf1" -> Set("col1", "col2"),
            "cf2" -> Set("col3")
        )

        ssc.hbase[String]("testtable", columns)
        .map({ case (k, v) =>
          val cf1 = v("cf1")
          val col1 = cf1("col1")
          val col2 = cf1("col2")
          val col3 = v("cf2")("col3")

          List(k, col1, col2, col3) mkString "\t"
        })
        .saveAsTextFile("file:/home/hduser/hbasetest-output")
    }
}

sbt文件:

/usr/local/sparkapps/HBaseWordCount/HBaseWordCount.sbt

name := "HBaseScalaWordCount"

version := "1.0"

scalaVersion := "2.10.6"

libraryDependencies ++= Seq(
  "org.apache.spark" %% "spark-core" % "1.6.1" % "provided",
  "org.apache.spark" %% "spark-streaming" % "1.6.1" % "provided",
  "org.apache.hbase" % "hbase-common" % "1.2.1" % "provided",
  "org.apache.hbase" % "hbase-client" % "1.2.1" % "provided",
  "org.apache.hbase" % "hbase-server" % "1.2.1" % "provided",
  "eu.unicredit" %% "hbase-rdd" % "0.7.1"
)

sbt包

/usr/local/sparkapps/HBaseWordCount$ sbt package

[info] Set current project to HBaseScalaWordCount (in build         file:/usr/local/sparkapps/HBaseWordCount/)
[info] Compiling 1 Scala source to /usr/local/sparkapps/HBaseWordCount/target/scala-2.10/classes...
[error] /usr/local/sparkapps/HBaseWordCount/src/main/scala/com/mydomain/spark/hbasewordcount/HbaseWordCount.scala:29: not found: value HBaseConfig
[error]         implicit val config = HBaseConfig() // Assumes hbase-site.xml is on classpath
[error]                               ^
[error] /usr/local/sparkapps/HBaseWordCount/src/main/scala/com/mydomain/spark/hbasewordcount/HbaseWordCount.scala:36: value hbase is not a member of org.apache.spark.streaming.StreamingContext
[error]         ssc.hbase[String]("testtable", columns)
[error]             ^
[error] two errors found
[error] (compile:compileIncremental) Compilation failed
[error] Total time: 9 s, completed Apr 14, 2016 4:11:40 PM

hbase在hadoop上工作正常,但我不知道如何配置spark的类路径,例如在/usr/local/spark/conf/spark-deafults.conf中,实际上并不存在,我只有spark-deafults.conf.template
spark-env.sh公司:

/usr/local/spark/conf/spark-env.sh

export SPARK_MASTER_IP=localhost
export SPARK_WORKER_CORES=1
export SPARK_WORKER_MEMORY=800m
export SPARK_WORKER_INSTANCES=1

spark-defaults.conf:默认值:

doesn't exist

hbase路径:

/usr/local/hbase/hbase-1.1.3/lib/

hbase\u site.xml:

/usr/local/hbase/hbase-1.1.3/conf/hbase-site.xml 

<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>

<configuration>

  <property>
    <name>hbase.rootdir</name>
    <value>hdfs://localhost:9000/hbase</value>
  </property>

  <property>
    <name>hbase.cluster.distributed</name>
    <value>true</value>
  </property>

  <property>
    <name>hbase.zookeeper.quorum</name>
    <value>localhost</value>
  </property>

  <property>
    <name>dfs.replication</name>
    <value>1</value>
  </property>

  <property>
    <name>hbase.zookeeper.property.clientPort</name>
    <value>2181</value>
  </property>

  <property>
    <name>hbase.zookeeper.property.dataDir</name>
    <value>/home/hduser/hbase/zookeeper</value>
  </property>

</configuration>
esyap4oy

esyap4oy1#

首先,sbt找不到班级 HBaseConf . 这是因为你已经导入了 org.apache.hadoop.hbase.HBaseConfiguration ,但你需要的是 unicredit.spark.hbase.HBaseConf .
你的第二个问题是

value hbase is not a member of org.apache.spark.streaming.StreamingContextvalue hbase is not a member of org.apache.spark.streaming.StreamingContext

也就是说sbt找不到 hbase 上的方法 StreamingContext . 我看到您正在使用hbase rdd向spark添加hbase支持。如果检查该项目的自述文件,则必须为其隐式添加导入行,因此将此项添加到类的顶部:

import unicredit.spark.hbase._

隐式是对scala的一个很好的补充,它可以扩展其他包的类的功能。有了导入的隐式表达式 hbase 方法应在您的计算机上可用 SparkContext 示例。
请注意,您没有 SparkContext 只是一个例子 StreamingContext ,所以首先创建一个。也没有必要让他们 lazy .

相关问题