pyspark 如何将hdfs中的数据保存到amazon s3

lbsnaicq  于 2024-01-06  发布在  Spark
关注(0)|答案(1)|浏览(185)

我正在处理webarchives,并提取一些数据,最初我用来存储这些数据作为txt在我的hdfs,但由于它是巨大的大小,我将不得不存储在亚马逊s3桶输出,我如何才能实现这一点?我试图使用s3a连接器,但它抛出了一个错误,说凭据是错误的。txt文件的大小是TB,无论如何,我可以存储在hdfs中,因为我是做之前,并上传到s3,然后从hdfs删除,或任何其他有效的方法这样做?

  1. for bucket in buckets[4:5]:
  2. filenames = get_bucket_warcs(bucket)
  3. print("==================================================")
  4. print(f"bucket: {bucket}, filenames: {len(filenames)}")
  5. print("==================================================")
  6. jsonld_count = sc.accumulator(0)
  7. records_count = sc.accumulator(0)
  8. exceptions_count = sc.accumulator(0)
  9. rdd_filenames = sc.parallelize(filenames, len(filenames))
  10. rdd_jsonld = rdd_filenames.flatMap(lambda f: get_jsonld_records(bucket, f))
  11. rdd_jsonld.saveAsTextFile(f"{hdfs_path}/webarchive-jsonld-{bucket}")
  12. print(f"records processed: {records_count.value}", f"jsonld: {jsonld_count.value}", f"exceptions: {exceptions_count.value}")
  13. sc.stop()

字符串
这是我的代码,我想保存rdd_jsonld在亚马逊s3桶.

6jjcrrmo

6jjcrrmo1#

如果s3 a连接器报告凭证错误,则说明您没有设置凭证,或者您将客户端配置为与错误的公共/私有S3存储进行通信。
查找s3连接器(hadoop s3 a或EMR s3)的在线文档并阅读它,特别是关于身份验证和故障排除的部分。

相关问题