spark bigtable-hbase客户端没有在pyspark中关闭?

ev7lccsx  于 2021-07-13  发布在  Hbase
关注(0)|答案(1)|浏览(581)

我正在尝试执行一个pyspark语句,该语句在python for循环中写入bigtable,这将导致以下错误(使用dataproc提交的作业)。任何客户没有正确关闭(如这里所建议的),如果是,有什么办法在pyspark中这样做?
请注意,每次使用新的dataproc作业手动重新执行脚本都可以正常工作,因此作业本身是正确的。
谢谢你的支持!
Pypark脚本

  1. from pyspark import SparkContext
  2. from pyspark.sql import SQLContext
  3. import json
  4. sc = SparkContext()
  5. sqlc = SQLContext(sc)
  6. def create_df(n_start,n_stop):
  7. # Data
  8. row_1 = ['a']+['{}'.format(i) for i in range(n_start,n_stop)]
  9. row_2 = ['b']+['{}'.format(i) for i in range(n_start,n_stop)]
  10. # Spark schema
  11. ls = [row_1,row_2]
  12. schema = ['col0'] + ['col{}'.format(i) for i in range(n_start,n_stop)]
  13. # Catalog
  14. first_col = {"col0":{"cf":"rowkey", "col":"key", "type":"string"}}
  15. other_cols = {"col{}".format(i):{"cf":"cf", "col":"col{}".format(i), "type":"string"} for i in range(n_start,n_stop)}
  16. first_col.update(other_cols)
  17. columns = first_col
  18. d_catalogue = {}
  19. d_catalogue["table"] = {"namespace":"default", "name":"testtable"}
  20. d_catalogue["rowkey"] = "key"
  21. d_catalogue["columns"] = columns
  22. catalog = json.dumps(d_catalogue)
  23. # Dataframe
  24. df = sc.parallelize(ls, numSlices=1000).toDF(schema=schema)
  25. return df,catalog
  26. for i in range(0,2):
  27. N_step = 100
  28. N_start = 1
  29. N_stop = N_start+N_step
  30. data_source_format = "org.apache.spark.sql.execution.datasources.hbase"
  31. df,catalog = create_df(N_start,N_stop)
  32. df.write\
  33. .options(catalog=catalog,newTable= "5")\
  34. .format(data_source_format)\
  35. .save()
  36. N_start += N_step
  37. N_stop += N_step

dataproc作业

  1. gcloud dataproc jobs submit pyspark <my_script>.py \
  2. --cluster $SPARK_CLUSTER \
  3. --jars <path_to_jar>/bigtable-dataproc-spark-shc-assembly-0.1.jar \
  4. --region=us-east1

错误

  1. ...
  2. ERROR com.google.bigtable.repackaged.io.grpc.internal.ManagedChannelOrphanWrapper: *~*~*~ Channel ManagedChannelImpl{logId=41, target=bigtable.googleapis.com:443} was not shutdown properly!!! ~*~*~*
  3. Make sure to call shutdown()/shutdownNow() and wait until awaitTermination() returns true.
  4. ...
kiz8lqtg

kiz8lqtg1#

如果您没有使用最新版本,请尝试更新它。它看起来类似于最近修复的这个问题。我可以想象错误信息仍然会出现,但是现在完成的工作意味着支持团队仍在处理它,希望他们能在下一个版本中修复它。

相关问题