如何使用spark连接到ibmcos(云对象存储),如何解决“scheme:cos没有文件系统”

t5zmwmid  于 2021-05-27  发布在  Spark
关注(0)|答案(3)|浏览(628)

我正在尝试使用spark创建到ibmcos(云对象存储)的连接。spark版本=2.4.4,scala版本=2.11.12。
我用正确的凭证在本地运行它,但我发现以下错误-“没有scheme:cos的文件系统”
我正在与错误日志共享代码片段。有人能帮我解决这个问题吗。
提前谢谢!
代码段:

import com.ibm.ibmos2spark.CloudObjectStorage
import org.apache.spark.sql.SparkSession

object CosConnection extends App{
  var credentials = scala.collection.mutable.HashMap[String, String](
      "endPoint"->"ENDPOINT",
      "accessKey"->"ACCESSKEY",
      "secretKey"->"SECRETKEY"
  )
  var bucketName = "FOO"
  var objectname = "xyz.csv"

  var configurationName = "softlayer_cos" 

  val spark = SparkSession
    .builder()
    .appName("Connect IBM COS")
    .master("local")
    .getOrCreate()

  spark.sparkContext.hadoopConfiguration.set("fs.stocator.scheme.list", "cos")
  spark.sparkContext.hadoopConfiguration.set("fs.stocator.cos.impl", "com.ibm.stocator.fs.cos.COSAPIClient")
  spark.sparkContext.hadoopConfiguration.set("fs.stocator.cos.scheme", "cos")

  var cos = new CloudObjectStorage(spark.sparkContext, credentials, configurationName=configurationName)

  var dfData1 = spark.
    read.format("org.apache.spark.sql.execution.datasources.csv.CSVFileFormat").
    option("header", "true").
    option("inferSchema", "true").
    load(cos.url(bucketName, objectname))

  dfData1.printSchema()
  dfData1.show(5,0)
}

错误:

Exception in thread "main" java.io.IOException: No FileSystem for scheme: cos
    at org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:2586)
    at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2593)
    at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:91)
    at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2632)
    at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2614)
    at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:370)
    at org.apache.hadoop.fs.Path.getFileSystem(Path.java:296)
k3fezbri

k3fezbri1#

你必须把 .config("spark.hadoop.fs.stocator.scheme.list", "cos") 以及其他一些人 fs.cos... 配置。
下面是一个python端到端代码片段示例。转换成scala应该很简单:

from pyspark.sql import SparkSession

stocator_jar = '/path/to/stocator-1.1.2-SNAPSHOT-IBM-SDK.jar'
cos_instance_name = '<myCosIntanceName>'
bucket_name = '<bucketName>'
s3_region = '<region>'
cos_iam_api_key = '*******'
iam_servicce_id = 'crn:v1:bluemix:public:iam-identity::<****************>'

spark_builder = (
    SparkSession
        .builder
        .appName('test_app'))

spark_builder.config('spark.driver.extraClassPath', stocator_jar)
spark_builder.config('spark.executor.extraClassPath', stocator_jar)
spark_builder.config(f"fs.cos.{cos_instance_name}.iam.api.key", cos_iam_api_key)
spark_builder.config(f"fs.cos.{cos_instance_name}.endpoint", f"s3.{s3_region}.cloud-object-storage.appdomain.cloud")
spark_builder.config(f"fs.cos.{cos_instance_name}.iam.service.id", iam_servicce_id)
spark_builder.config("spark.hadoop.fs.stocator.scheme.list", "cos")
spark_builder.config("spark.hadoop.fs.cos.impl", "com.ibm.stocator.fs.ObjectStoreFileSystem")
spark_builder.config("fs.stocator.cos.impl", "com.ibm.stocator.fs.cos.COSAPIClient")
spark_builder.config("fs.stocator.cos.scheme", "cos")

spark_sess = spark_builder.getOrCreate()

dataset = spark_sess.range(1, 10)
dataset = dataset.withColumnRenamed('id', 'user_idx')

dataset.repartition(1).write.csv(
    f'cos://{bucket_name}.{cos_instance_name}/test.csv',
    mode='overwrite',
    header=True)

spark_sess.stop()
print('done!')
osh3o9ms

osh3o9ms2#

这个问题是通过用spark版本=2.4.4,scala版本=2.11.12Mapstocator依赖项来解决的

// https://mvnrepository.com/artifact/com.ibm.stocator/stocator
libraryDependencies += "com.ibm.stocator" % "stocator" % "1.0.24"

一定要有 stocator-1.0.24-jar-with-dependencies.jar 在生成包时在外部库中
还要确保将端点作为 s3.us.cloud-object-storage.appdomain.cloud 相反 https://s3.us.cloud-object-storage.appdomain.cloud 您可以手动构建stocator jar并包括 target/stocator-1.0.24-SNAPSHOT-IBM-SDK.jar jar到类路径(如果需要)-

git clone https://github.com/SparkTC/stocator
cd stocator
git fetch
git checkout -b 1.0.24-ibm-sdk origin/1.0.24-ibm-sdk
mvn clean install –DskipTests
yruzcnhs

yruzcnhs3#

我在Windows10上使用的是spark版本2.4.5和scala版本2.11.12。我已经在环境变量中为这两者添加了classpath。
启动spark shell的命令(打开命令提示符并粘贴到命令下方): spark-shell --packages com.ibm.stocator:stocator:1.0.36 如果你得到下面的细节,这意味着你已经成功地喷枪Spark壳。

您还可以在浏览器上检查它,如命令提示符中所示,例如--spark context webuihttp://localhost:4040(在您的情况下,端口可能会更改)。
在scala中设置配置信息(我的cos位置是us east):

sc.hadoopConfiguration.set("fs.stocator.scheme.list", "cos")
sc.hadoopConfiguration.set("fs.cos.impl", "com.ibm.stocator.fs.ObjectStoreFileSystem")
sc.hadoopConfiguration.set("fs.stocator.cos.impl", "com.ibm.stocator.fs.cos.COSAPIClient")
sc.hadoopConfiguration.set("fs.stocator.cos.scheme", "cos")
sc.hadoopConfiguration.set("fs.cos.mycos.access.key", "your access key")
sc.hadoopConfiguration.set("fs.cos.mycos.secret.key", "your secret key")
sc.hadoopConfiguration.set("fs.cos.mycos.endpoint", "https://s3.us-east.cloud-object-storage.appdomain.cloud")

从清单文件获取对象列表:

val sqlContext = new org.apache.spark.sql.SQLContext(sc)
val cosContent = sqlContext.read.text("cos://someBucketName.mycos/someFile.mf")
cosContent.show(false)


或者,您可以从Parquet文件中读取数据,如下所示:

val event1 = sqlContext.read.parquet("cos://someBucketName.mycos/parquetDirectoryName/")
 event1.printSchema()
 event1.count()
 event1.show(false)

相关问题