如何使用pyspark连接spark和hive?

xuo3flqw  于 2021-06-27  发布在  Hive
关注(0)|答案(1)|浏览(687)

我正在尝试使用 pyspark ,远程。它表示无法连接到配置单元元存储客户端的错误。
我已经阅读了许多关于so和其他来源的答案,它们大多是配置,但没有一个可以解决为什么我无法远程连接。我阅读了文档并注意到,在不更改任何配置文件的情况下,我们可以将spark与 hive . 注意:我已经转发了一台机器 hive 正在运行并将其提供给 localhost:10000 . 我甚至用同样的方法 presto 并且能够在上运行查询 hive .
代码是:

  1. from pyspark import SparkContext, SparkConf
  2. from pyspark.sql import SparkSession, HiveContext
  3. SparkContext.setSystemProperty("hive.metastore.uris", "thrift://localhost:9083")
  4. sparkSession = (SparkSession
  5. .builder
  6. .appName('example-pyspark-read-and-write-from-hive')
  7. .enableHiveSupport()
  8. .getOrCreate())
  9. data = [('First', 1), ('Second', 2), ('Third', 3), ('Fourth', 4), ('Fifth', 5)]
  10. df = sparkSession.createDataFrame(data)
  11. df.write.saveAsTable('example')

我希望输出是一个表被保存的确认,但是相反,我面临这个错误。
抽象错误为:

  1. During handling of the above exception, another exception occurred:
  2. Traceback (most recent call last):
  3. File "<stdin>", line 2, in <module>
  4. File "/usr/local/spark/python/pyspark/sql/readwriter.py", line 775, in saveAsTable
  5. self._jwrite.saveAsTable(name)
  6. File "/usr/local/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1257, in __call__
  7. File "/usr/local/spark/python/pyspark/sql/utils.py", line 69, in deco
  8. raise AnalysisException(s.split(': ', 1)[1], stackTrace)
  9. pyspark.sql.utils.AnalysisException: 'java.lang.RuntimeException: java.lang.RuntimeException: Unable to instantiate org.apache.hadoop.hive.ql.metadata.SessionHiveMetaStoreClient;'

我已经发出命令:

  1. ssh -i ~/.ssh/id_rsa_sc -L 9000:A.B.C.D:8080 -L 9083:E.F.G.H:9083 -L 10000:E.F.G.H:10000 ubuntu@I.J.K.l

当我通过命令检查端口10000和9083时:

  1. aviral@versinator:~/testing-spark-hive$ nc -zv localhost 10000
  2. Connection to localhost 10000 port [tcp/webmin] succeeded!
  3. aviral@versinator:~/testing-spark-hive$ nc -zv localhost 9083
  4. Connection to localhost 9083 port [tcp/*] succeeded!

运行脚本时,出现以下错误:

  1. Caused by: java.net.UnknownHostException: ip-172-16-1-101.ap-south-1.compute.internal
  2. ... 45 more
wswtfjt7

wswtfjt71#

关键在于让配置单元在创建spark会话本身的同时进行配置。

  1. sparkSession = (SparkSession
  2. .builder
  3. .appName('example-pyspark-read-and-write-from-hive')
  4. .config("hive.metastore.uris", "thrift://localhost:9083", conf=SparkConf())
  5. .enableHiveSupport()
  6. .getOrCreate()
  7. )

需要注意的是,spark conf不需要更改,即使像aws glue这样的无服务器服务也可以有这样的连接。
完整代码:

  1. from pyspark import SparkContext, SparkConf
  2. from pyspark.conf import SparkConf
  3. from pyspark.sql import SparkSession, HiveContext
  4. """
  5. SparkSession ss = SparkSession
  6. .builder()
  7. .appName(" Hive example")
  8. .config("hive.metastore.uris", "thrift://localhost:9083")
  9. .enableHiveSupport()
  10. .getOrCreate();
  11. """
  12. sparkSession = (SparkSession
  13. .builder
  14. .appName('example-pyspark-read-and-write-from-hive')
  15. .config("hive.metastore.uris", "thrift://localhost:9083", conf=SparkConf())
  16. .enableHiveSupport()
  17. .getOrCreate()
  18. )
  19. data = [('First', 1), ('Second', 2), ('Third', 3), ('Fourth', 4), ('Fifth', 5)]
  20. df = sparkSession.createDataFrame(data)
  21. # Write into Hive
  22. # df.write.saveAsTable('example')
  23. df_load = sparkSession.sql('SELECT * FROM example')
  24. df_load.show()
  25. print(df_load.show())
展开查看全部

相关问题