使用databricks从存储帐户读取文件

b5buobof  于 2021-07-13  发布在  Spark
关注(0)|答案(0)|浏览(297)

我正在研究一个用例,在这个用例中,我阅读了一个带有databricks的eventhub事件。特定的用例是,每次将文件添加到存储帐户目录时,都会触发一个事件(使用eventgrid)并通过eventhub使用databricks处理它。
每次插入文件时,到达eventhub的json都具有以下结构:

{
   "topic":"/subscriptions/b87ed442-9d87-4e71-8784-f72e6da5b77e/resourceGroups/rsg/providers/Microsoft.Storage/storageAccounts/storage_account_name",
   "subject":"/blobServices/default/containers/container/blobs/path/to/file.xml",
   "eventType": "Microsoft.Storage.BlobCreated",
   "id":"02e27363-a01e-0000-7218-fb8871065026",
   "data":{
      "api": "PutBlob",
      "requestId":"02e27363-a01e-0000-7218-fb8871000000",
      "eTag":"0x8D8C92FA9FDAB6D",
      "contentType": "application/octet-stream",
      "contentLength":103809024,
      "blobType": "BlockBlob",
      "blobUrl":"https://storage_account_name.blob.core.windows.net/container/path/to/file.xml",
      "url":"https://storage_account_name.blob.core.windows.net/container/path/to/file.xml",
      "sequencer":"000000000000000000000000000027c30000000000005204",
      "storageDiagnostics":{
         "batchId":"00ea9f48-5f8c-42ee-8323-da91a026e701"
      }
   },
   }, "dataVersion":"",
   "metadataVersion": "1",
   "eventTime":"2021-02-04T17:09:42.5557255Z",
   "EventProcessedUtcTime":"2021-02-04T17:20:27.8325561Z",
   "PartitionId":0,
   "EventEnqueuedUtcTime":"2021-02-04T17:09:42.9170000Z"
}

在databricks中,我读到了结构化流式处理这个事件并提取 url json的字段。
我们的想法是,对于每个批,使用以下方法创建一个pysparkDataframe:

path = "<<URL value of the JSON>>"
spark.read.format("binaryFile").load(path)

有没有什么方法可以直接加载 https:// 或者我应该在读取文件之前实现一些逻辑来挂载目录?
我想知道有没有更快的办法。
非常感谢!
pd:下面是我试图实现所有解决方案的代码:

stream_data = spark \
    .readStream \
    .format('eventhubs') \
    .options(**event_hub_conf) \
    .option('multiLine', True) \
    .option('mode', 'PERMISSIVE') \
    .load() 

df = stream_data.withColumn("body", stream_data_df["body"].cast("string"))

def parse_json(array_str):
    json_obj = json.loads(array_str)
    return json_obj[0]['data']['url']

extract_url = udf(parse_json)
url_df= df.withColumn("url",extract_url(df.body))

def getData(url):
  binary = spark.read.format("binaryFile").load(url)
  binary.show()

def loadData(batchDf, batchId):

    url_select_df = batchDf.select("url")
    url_collect = url_select_df.collect()

    [getData(item) for item in url_collect]

url_df.writeStream.foreachBatch(loadData).outputMode("append").start().awaitTermination()

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题