hadoop问题:sparkshell中用于从hbase检索数据的scala代码

vuktfyat  于 2021-05-29  发布在  Hadoop
关注(0)|答案(3)|浏览(329)

我们试图在sparkshell中执行一个简单的scala代码来从hbase检索数据。hadoop环境支持kerberos,我们已经确保执行kinit。
调用spark shell的步骤:

MASTER=yarn-client

DRIVER_CLASSPATH="/opt/cloudera/parcels/CDH/lib/hbase/lib/*"
DRIVER_LIBRARY_PATH="/opt/cloudera/parcels/CDH/lib/hadoop/lib/native"

spark-shell --driver-class-path "$DRIVER_CLASSPATH" --driver-library-path "$DRIVER_LIBRARY_PATH" --driver-memory 10G --executor-memory 15G --executor-cores 8 --num-executors 3 --master $MASTER

代码:

import org.apache.hadoop.fs._
import org.apache.hadoop.hbase._
import org.apache.hadoop.hbase.client._
import org.apache.hadoop.hbase.io._
import org.apache.hadoop.hbase.mapreduce._
import org.apache.hadoop.hbase.util._
import org.apache.spark._

val hc = HBaseConfiguration.create
hc.addResource(new Path("file:///opt/cloudera/parcels/CDH/lib/hbase/conf/hbase-site.xml"))

hc.addResource(new Path("file:///opt/cloudera/parcels/CDH/lib/hbase/conf/core-site.xml"))

hc.set(TableInputFormat.INPUT_TABLE, "poc-customers")
val rdd = sc.newAPIHadoopRDD(hc, classOf[TableInputFormat], classOf[ImmutableBytesWritable], classOf[Result])

rdd.count

下面是错误

org.apache.hadoop.hbase.client.RetriesExhaustedException: Can't get the location
        at org.apache.hadoop.hbase.client.RpcRetryingCallerWithReadReplicas.getRegionLocations(RpcRetryingCallerWithReadReplicas.java:308)
        at org.apache.hadoop.hbase.client.ScannerCallableWithReplicas.call(ScannerCallableWithReplicas.java:149)
        at org.apache.hadoop.hbase.client.ScannerCallableWithReplicas.call(ScannerCallableWithReplicas.java:57)
        at org.apache.hadoop.hbase.client.RpcRetryingCaller.callWithoutRetries(RpcRetryingCaller.java:200)
        at org.apache.hadoop.hbase.client.ClientScanner.call(ClientScanner.java:293)
        at org.apache.hadoop.hbase.client.ClientScanner.nextScanner(ClientScanner.java:268)
        at org.apache.hadoop.hbase.client.ClientScanner.initializeScannerInConstruction(ClientScanner.java:140)
        at org.apache.hadoop.hbase.client.ClientScanner.<init>(ClientScanner.java:135)
        at org.apache.hadoop.hbase.client.HTable.getScanner(HTable.java:888)
        at org.apache.hadoop.hbase.mapreduce.TableRecordReaderImpl.restart(TableRecordReaderImpl.java:90)
        at org.apache.hadoop.hbase.mapreduce.TableRecordReaderImpl.initialize(TableRecordReaderImpl.java:167)
        at org.apache.hadoop.hbase.mapreduce.TableRecordReader.initialize(TableRecordReader.java:134)
        at org.apache.hadoop.hbase.mapreduce.TableInputFormatBase$1.initialize(TableInputFormatBase.java:200)
        at org.apache.spark.rdd.NewHadoopRDD$$anon$1.<init>(NewHadoopRDD.scala:133)
        at org.apache.spark.rdd.NewHadoopRDD.compute(NewHadoopRDD.scala:104)
        at org.apache.spark.rdd.NewHadoopRDD.compute(NewHadoopRDD.scala:66)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
        at org.apache.spark.scheduler.Task.run(Task.scala:64)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.IOException: Could not set up IO Streams to <management-node-server-hostname>/10.118.114.40:60020
        at org.apache.hadoop.hbase.ipc.RpcClientImpl$Connection.setupIOstreams(RpcClientImpl.java:773)
        at org.apache.hadoop.hbase.ipc.RpcClientImpl$Connection.writeRequest(RpcClientImpl.java:881)
        at org.apache.hadoop.hbase.ipc.RpcClientImpl$Connection.tracedWriteRequest(RpcClientImpl.java:850)
        at org.apache.hadoop.hbase.ipc.RpcClientImpl.call(RpcClientImpl.java:1184)
        at org.apache.hadoop.hbase.ipc.AbstractRpcClient.callBlockingMethod(AbstractRpcClient.java:216)
        at org.apache.hadoop.hbase.ipc.AbstractRpcClient$BlockingRpcChannelImplementation.callBlockingMethod(AbstractRpcClient.java:300)
        at org.apache.hadoop.hbase.protobuf.generated.ClientProtos$ClientService$BlockingStub.get(ClientProtos.java:31865)
        at org.apache.hadoop.hbase.protobuf.ProtobufUtil.getRowOrBefore(ProtobufUtil.java:1580)
        at org.apache.hadoop.hbase.client.ConnectionManager$HConnectionImplementation.locateRegionInMeta(ConnectionManager.java:1294)
        at org.apache.hadoop.hbase.client.ConnectionManager$HConnectionImplementation.locateRegion(ConnectionManager.java:1126)
        at org.apache.hadoop.hbase.client.RpcRetryingCallerWithReadReplicas.getRegionLocations(RpcRetryingCallerWithReadReplicas.java:299)
        ... 23 more
Caused by: java.lang.RuntimeException: SASL authentication failed. The most likely cause is missing or invalid credentials. Consider 'kinit'.
        at org.apache.hadoop.hbase.ipc.RpcClientImpl$Connection$1.run(RpcClientImpl.java:673)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.Subject.doAs(Subject.java:415)
        at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1671)
        at org.apache.hadoop.hbase.ipc.RpcClientImpl$Connection.handleSaslConnectionFailure(RpcClientImpl.java:631)
        at org.apache.hadoop.hbase.ipc.RpcClientImpl$Connection.setupIOstreams(RpcClientImpl.java:739)
        ... 33 more
Caused by: javax.security.sasl.SaslException: GSS initiate failed [Caused by GSSException: No valid credentials provided (Mechanism level: Failed to find any Kerberos tgt)]
        at com.sun.security.sasl.gsskerb.GssKrb5Client.evaluateChallenge(GssKrb5Client.java:212)
        at org.apache.hadoop.hbase.security.HBaseSaslRpcClient.saslConnect(HBaseSaslRpcClient.java:179)
        at org.apache.hadoop.hbase.ipc.RpcClientImpl$Connection.setupSaslConnection(RpcClientImpl.java:605)
        at org.apache.hadoop.hbase.ipc.RpcClientImpl$Connection.access$600(RpcClientImpl.java:154)
        at org.apache.hadoop.hbase.ipc.RpcClientImpl$Connection$2.run(RpcClientImpl.java:731)
        at org.apache.hadoop.hbase.ipc.RpcClientImpl$Connection$2.run(RpcClientImpl.java:728)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.Subject.doAs(Subject.java:415)
        at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1671)
        at org.apache.hadoop.hbase.ipc.RpcClientImpl$Connection.setupIOstreams(RpcClientImpl.java:728)
        ... 33 more
Caused by: GSSException: No valid credentials provided (Mechanism level: Failed to find any Kerberos tgt)
        at sun.security.jgss.krb5.Krb5InitCredential.getInstance(Krb5InitCredential.java:147)
        at sun.security.jgss.krb5.Krb5MechFactory.getCredentialElement(Krb5MechFactory.java:121)
        at sun.security.jgss.krb5.Krb5MechFactory.getMechanismContext(Krb5MechFactory.java:187)
        at sun.security.jgss.GSSManagerImpl.getMechanismContext(GSSManagerImpl.java:223)
        at sun.security.jgss.GSSContextImpl.initSecContext(GSSContextImpl.java:212)
        at sun.security.jgss.GSSContextImpl.initSecContext(GSSContextImpl.java:179)
        at com.sun.security.sasl.gsskerb.GssKrb5Client.evaluateChallenge(GssKrb5Client.java:193)
        ... 42 more

请注意:
我们可以从同一个会话调用hbase shell并扫描同一个表中的记录
我们能够从同一sparkshell会话中对hdfs文件执行字数计数
我们能够在本地模式下执行上述代码
我们可以从相同的sparkshell会话执行其他操作,如-a。val admin=新的hbaseadmin(hc)b。打印(admin.istableavailable(“poc客户”))
正在寻求帮助以解决此问题。

r7knjye2

r7knjye21#

我和op的工作是同一个项目。我们没有直接使用萨姆森·沙尔夫里希特的答案,但它让人相信这种解决方案是可能的。以下是对我们有效的方法:
我们现在正在使用sparkonhbase的rdd(https://github.com/cloudera-labs/sparkonhbase),但我们已将https://github.com/cloudera-labs/sparkonhbase/pull/7. 由于此拉请求是打开的,因此其更改也可以通过子类化来实现:

import com.cloudera.spark.hbase.{HBaseContext, HBaseScanRDD}
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.hbase.client.Scan
import org.apache.hadoop.security.UserGroupInformation
import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod
import org.apache.spark.{SerializableWritable, SparkContext}
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.rdd.RDD

class MyHBaseScanRDD (sc: SparkContext,
    @transient tableName: String,
    @transient scan: Scan,
    configBroadcast: Broadcast[SerializableWritable[Configuration]]) extends HBaseScanRDD(sc, tableName, scan, configBroadcast) {
  val jobCredentialBroadcast = sc.broadcast(new SerializableWritable(jobTransient.getCredentials))

  override def addCreds {
    val creds = SparkHadoopUtil.get.getCurrentUserCredentials
    @transient val ugi = UserGroupInformation.getCurrentUser
    ugi.addCredentials(creds)
    ugi.setAuthenticationMethod(AuthenticationMethod.PROXY)
    ugi.addCredentials(jobCredentialBroadcast.value.value)
  }
}

class MyHBaseContext (sc: SparkContext,
    @transient config: Configuration,
    val tmpHdfsConfigFile: String = null) extends HBaseContext(sc, config, tmpHdfsConfigFile) {
  def myHBaseScanRDD(tableName: String, scan: Scan): RDD[(Array[Byte], java.util.List[(Array[Byte], Array[Byte], Array[Byte])])] = {
    new MyHBaseScanRDD(sc, tableName, scan, broadcastedConf)
  }
}

val hc = HBaseConfiguration.create
val scan = new Scan
val hbaseContext = new MyHBaseContext(sc, hc)
val rdd = hbaseContext.myHBaseScanRDD("tableName", scan)
rdd.count

看起来这些变化已经被合并到hbase的hbase spark模块中,它是sparkonhbase的继承者。版本控制问题使我们无法使用较新的hbase库,但我建议任何面临此问题的人都可以先尝试一下。

bzzcjhmw

bzzcjhmw2#

当spark“driver”请求yarn在集群中的某个地方生成它的“executors”时,它使用它的本地kerberos tgt——您用它创建的 kinit --进行身份验证。然后,yarn发出一个由所有执行者共享的全局委托令牌来访问hdfs和yarn。
唉,hbase不支持该委托令牌。每个执行器都必须使用本地tgt重新对zk进行身份验证,然后对实际的hbase区域服务器进行身份验证。
在一个完美的世界中,您只需要在“spark default.conf”中插入两个属性,即。 spark.yarn.principal 以及 spark.yarn.keytab (创建一个keytab来存储密码是使用“ktutil”实用程序所做的工作)
唉,这个特性是为需要更新hdfs委派令牌(通常每7天)的长时间运行的流式作业而构建的,而不是为hbase初始身份验证而构建的。现在,spark1.6的发行说明显示了许多与yarn和kerberos相关的bug修复,也许这个特性现在也可以在hbase中开箱即用。但我不敢打赌。
那么解决办法是什么呢?
在驱动程序运行的java代码中,声明keytab文件必须以 addFile() 在执行器运行的java代码中,显式地创建hadoop UserGroupInformation 在连接到hbase之前,从keytab显式获取自己的kerberos tgt
请注意,当以这种方式使用时,ugi将其tgt保持为私有的——它不会显示在缓存中,因此同一台计算机上的其他进程无法重用它(另一方面 kinit 从另一个进程将不会篡改它)。

lrpiutwd

lrpiutwd3#

你问题的根本原因是 GSSException: No valid credentials provided (Mechanism level: Failed to find any Kerberos tgt) 《cloudera故障排除指南》建议了此问题的解决方案
描述:用户必须拥有有效的kerberos票证才能与安全的hadoop集群交互。如果凭据缓存中没有有效的kerberos票证,则运行任何hadoop命令(如hadoop fs-ls)都将失败。如果您没有有效的票证,您将收到一个错误,例如:
11/01/04 12:08:12 warn ipc.client:连接到服务器时遇到异常:javax.security.sasl.saslexception:gss initiate失败[由gssexception引起:未提供有效凭据(机制级别:找不到任何kerberos tgt)]到fs的连接错误。命令已中止。异常:对nn host/10.0.0.2:8020的调用在本地异常上失败:java.io.ioexception:javax.security.sasl.saslexception:gss initiate failed[由gssexception引起:未提供有效凭据(机制级别:找不到任何kerberos tgt)]解决方案:可以通过运行klist命令。您可以通过运行kinit命令并指定包含凭据的keytab文件或输入主体的密码来获取票证。
你可以试试建议的解决办法。

相关问题