如何使用spark从azurite读取/写入数据?

1bqhqjot  于 2021-05-16  发布在  Spark
关注(0)|答案(0)|浏览(377)

我曾尝试使用spark从/向azurite读取/写入Parquet文件,如下所示:

import com.holdenkarau.spark.testing.DatasetSuiteBase
import org.apache.spark.SparkConf
import org.apache.spark.sql.SaveMode
import org.scalatest.WordSpec

class SimpleAzuriteSpec extends WordSpec with DatasetSuiteBase {
  val AzuriteHost = "localhost"
  val AzuritePort = 10000
  val AzuriteAccountName = "devstoreaccount1"
  val AzuriteAccountKey = "Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw=="
  val AzuriteContainer = "container1"
  val AzuriteDirectory = "dir1"
  val AzuritePath = s"wasb://$AzuriteContainer@$AzuriteAccountName.blob.core.windows.net/$AzuriteDirectory/"

  override final def conf: SparkConf = {
    val cfg = super.conf
    val settings =
      Map(
        s"spark.hadoop.fs.azure.storage.emulator.account.name" -> AzuriteAccountName,
        s"spark.hadoop.fs.azure.account.key.${AzuriteAccountName}.blob.core.windows.net" -> AzuriteAccountKey
      )
    settings.foreach { case (k, v) =>
      cfg.set(k, v)
    }
    cfg
  }

  "Spark" must {
    "write to/read from Azurite" in {
      import spark.implicits._
      val xs = List(Rec(1, "Alice"), Rec(2, "Bob"))
      val inputDs = spark.createDataset(xs)

      inputDs.write
        .format("parquet")
        .mode(SaveMode.Overwrite)
        .save(AzuritePath)

      val ds = spark.read
        .format("parquet")
        .load(AzuritePath)
        .as[Rec]

      ds.show(truncate = false)

      val actual = ds.collect().toList.sortBy(_.id)
      assert(actual == xs)
    }
  }
}

case class Rec(id: Int, name: String)

我试过azurite3.9.0和azurite2.7.0(都在docker中)。我可以使用 az (也已停靠)。
上面的测试在docker主机上运行。从docker主机可以访问蓝铜矿。
我正在使用spark 2.4.5、hadoop 2.10.0和以下依赖项:

libraryDependencies += "org.apache.hadoop" % "hadoop-azure" % "2.10.0"

使用时 az ,此连接字符串工作:

AZURE_STORAGE_CONNECTION_STRING="DefaultEndpointsProtocol=http;AccountName=devstoreaccount1;AccountKey=Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==;BlobEndpoint=http://azurite-3.9.0:10000/devstoreaccount1;QueueEndpoint=http://azurite-3.9.0:10001/devstoreaccount1;"

但是我不知道如何在spark中配置它。
我的问题:如何配置主机、端口、凭据等(在path或sparkconf中)?

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题