kerberized hadoop环境中的spark和高可用性:sparksql只能在写任务之后读取数据

of1yzvn4  于 2021-05-31  发布在  Hadoop
关注(0)|答案(1)|浏览(434)

我们一直在使用kerberized的hadoop环境(hdp3.1.4,spark2.3.2和ambari2.7.4),到目前为止一切都很顺利。
现在我们启用了namenode高可用性,并且有以下问题:当我们想使用sparksql读取数据时,我们首先必须编写一些(其他)数据。如果我们在读操作之前不写东西,它就会失败。
下面是我们的场景:

  1. $ kinit -kt /etc/security/keytabs/user.keytab user
  2. $ spark-shell

运行读取请求->每个会话的第一个读取请求失败!

  1. scala> spark.sql("SELECT * FROM pm.simulation_uci_hydraulic_sensor").show
  2. Hive Session ID = cbb6b6e2-a048-41e0-8e77-c2b2a7f52dbe
  3. [Stage 0:> (0 + 1) / 1]20/04/22 15:04:53 WARN TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0, had-data6.my-company.de, executor 2): java.io.IOException: DestHost:destPort had-job.my-company.de:8020 , LocalHost:localPort had-data6.my-company.de/192.168.178.123:0. Failed on local exception: java.io.IOException: org.apache.hadoop.security.AccessControlException: Client cannot authenticate via:[TOKEN, KERBEROS]
  4. at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
  5. at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
  6. at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
  7. at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
  8. at org.apache.hadoop.net.NetUtils.wrapWithMessage(NetUtils.java:831)
  9. at org.apache.hadoop.net.NetUtils.wrapException(NetUtils.java:806)
  10. at org.apache.hadoop.ipc.Client.getRpcResponse(Client.java:1502)
  11. at org.apache.hadoop.ipc.Client.call(Client.java:1444)
  12. at org.apache.hadoop.ipc.Client.call(Client.java:1354)
  13. at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:228)
  14. at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:116)
  15. at com.sun.proxy.$Proxy13.getBlockLocations(Unknown Source)
  16. at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.getBlockLocations(ClientNamenodeProtocolTranslatorPB.java:317)
  17. at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
  18. at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
  19. at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
  20. at java.lang.reflect.Method.invoke(Method.java:498)
  21. at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:422)
  22. at org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invokeMethod(RetryInvocationHandler.java:165)
  23. at org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invoke(RetryInvocationHandler.java:157)
  24. at org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invokeOnce(RetryInvocationHandler.java:95)
  25. at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:359)
  26. at com.sun.proxy.$Proxy14.getBlockLocations(Unknown Source)
  27. at org.apache.hadoop.hdfs.DFSClient.callGetBlockLocations(DFSClient.java:862)
  28. at org.apache.hadoop.hdfs.DFSClient.getLocatedBlocks(DFSClient.java:851)
  29. at org.apache.hadoop.hdfs.DFSClient.getLocatedBlocks(DFSClient.java:840)
  30. at org.apache.hadoop.hdfs.DFSClient.open(DFSClient.java:1004)
  31. at org.apache.hadoop.hdfs.DistributedFileSystem$4.doCall(DistributedFileSystem.java:320)
  32. at org.apache.hadoop.hdfs.DistributedFileSystem$4.doCall(DistributedFileSystem.java:316)
  33. at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
  34. at org.apache.hadoop.hdfs.DistributedFileSystem.open(DistributedFileSystem.java:328)
  35. at org.apache.hadoop.fs.FileSystem.open(FileSystem.java:899)
  36. at org.apache.orc.impl.ReaderImpl.extractFileTail(ReaderImpl.java:522)
  37. at org.apache.orc.impl.ReaderImpl.<init>(ReaderImpl.java:364)
  38. at org.apache.orc.OrcFile.createReader(OrcFile.java:251)
  39. [...]

运行一个写作业->这个工作!

  1. scala> val primitiveDS = Seq(1, 2, 3).toDS()
  2. primitiveDS: org.apache.spark.sql.Dataset[Int] = [value: int]
  3. scala> primitiveDS.write.saveAsTable("pm.todelete3")
  4. 20/04/22 15:05:07 WARN SessionState: METASTORE_FILTER_HOOK will be ignored, since hive.security.authorization.manager is set to instance of HiveAuthorizerFactory.

现在,做同样的阅读一遍->它的作品(为同一个会话)!?

  1. scala> spark.sql("SELECT * FROM pm.simulation_uci_hydraulic_sensor").show
  2. +--------+--------+--------------------+------+
  3. |instance|sensorId| ts| value|
  4. +--------+--------+--------------------+------+
  5. | 21| PS6|2020-04-18 17:19:...| 8.799|
  6. | 21| EPS1|2020-04-18 17:19:...|2515.6|
  7. | 21| PS3|2020-04-18 17:19:...| 2.187|
  8. +--------+--------+--------------------+------+

当运行新的 spark-shell 会话,同样的行为!
有人能帮忙解决这个问题吗?谢谢您!

vyswwuz2

vyswwuz21#

我们找到了问题的答案:表属性指向在hadoop集群中激活高可用性之前创建的表中的“旧”namenode位置。
可以通过运行以下命令查找表信息:

  1. $ spark-shell
  2. scala> spark.sql("DESCRIBE EXTENDED db.table").show(false)

这将显示表信息,如在我的示例中:

  1. +----------------------------+---------------------------------------------------------------------------------------------+-------+
  2. |col_name |data_type |comment|
  3. +----------------------------+---------------------------------------------------------------------------------------------+-------+
  4. |instance |int |null |
  5. |sensorId |string |null |
  6. |ts |timestamp |null |
  7. |value |double |null |
  8. | | | |
  9. |# Detailed Table Information| | |
  10. |Database |simulation | |
  11. |Table |uci_hydraulic_sensor_1 | | | |
  12. |Created By |Spark 2.3.2.3.1.4.0-315 | |
  13. |Type |EXTERNAL | |
  14. |Provider |parquet | |
  15. |Statistics |244762020 bytes | |
  16. |Location |hdfs://had-job.mycompany.de:8020/projects/pm/simulation/uci_hydraulic_sensor_1 <== This is important!
  17. |Serde Library |org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe | |
  18. |InputFormat |org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat | |
  19. |OutputFormat |org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat | |
  20. +----------------------------+---------------------------------------------------------------------------------------------+-------+

要使用ha群集服务名称设置新表位置,请运行以下sql:

  1. $ spark-shell
  2. scala> spark.sql("ALTER TABLE simulation.uci_hydraulic_sensor_1 SET LOCATION 'hdfs://my-ha-name/projects/pm/simulation/uci_hydraulic_sensor_1'")

在以后的spark会话中,table read可以正常工作!

展开查看全部

相关问题