pyspark 将表单识别器数据加载到 Dataframe 中

a0x5cqrl  于 2022-11-21  发布在  Spark
关注(0)|答案(1)|浏览(193)

我正在使用表单识别器阅读PDF文件。将其存储在“result”变量/对象中。根据表单识别器文档中为Azure Databricks/pyspark提供的语法,我的输出如下所示。相反,我需要将输出放入 Dataframe 中。每个表都放入单独的 Dataframe 中。请对语法提出建议。提前感谢。

with open(formUrl, "rb") as f:
  poller = document_analysis_client.begin_analyze_document("prebuilt-layout", document =f)
  result = poller.result()

     for table_idx, table in enumerate(result.tables):
         print(
             "Table # {} has {} rows and {} columns".format(
             table_idx, table.row_count, table.column_count
             )
         )
                
         for cell in table.cells:
             print(
                 "...Cell[{}][{}] has content '{}'".format(
                 cell.row_index,
                 cell.column_index,
                 cell.content.encode("utf-8"),
                 )
             )

输出量

Table # 0 has 3 rows and 7 columns
     ...Cell[0][0] has content 'b'BIOMARKER''
     ...Cell[0][1] has content 'b'METHOD|''
     ...Cell[0][2] has content 'b'ANALYTE''
     ...Cell[0][3] has content 'b'RESULT''
     ...Cell[0][4] has content 'b'THERAPY ASSOCIATION''
     ...Cell[0][6] has content 'b'BIOMARKER LEVELE''
     ...Cell[1][0] has content 'b'''
     ...Cell[1][1] has content 'b'IHC''
     ...Cell[1][2] has content 'b'Protein''
     ...Cell[1][3] has content 'b'Negative | 0''
     ...Cell[1][4] has content 'b'LACK OF BENEFIT''
     ...Cell[1][5] has content 'b'alectinib, brigatinib''
     ...Cell[1][6] has content 'b'Level 1''
     ...Cell[2][0] has content 'b'ALK''
     ...Cell[2][1] has content 'b'Seq''
     ...Cell[2][2] has content 'b'RNA-Tumor''
     ...Cell[2][3] has content 'b'Fusion Not Detected''
     ...Cell[2][5] has content 'b'ceritinib''
     ...Cell[2][6] has content 'b'Level 1''
     ...Cell[3][1] has content 'b'''
     ...Cell[3][2] has content 'b'''
     ...Cell[3][3] has content 'b'''
     ...Cell[3][5] has content 'b'crizotinib''
     ...Cell[3][6] has content 'b'Level 1''
 Table # 1 has 3 rows and 4 columns
 ...Cell[0][0] has content 'b'''
 ...Cell[0][1] has content 'b'''
 ...Cell[0][2] has content 'b'''
 ...Cell[0][3] has content 'b'''
 ...Cell[1][0] has content 'b'NTRK1/2/3''
 ...Cell[1][1] has content 'b'Seq''
 ...Cell[1][2] has content 'b'RNA-Tumor''
 ...Cell[1][3] has content 'b'Fusion Not Detected''
 ...Cell[2][0] has content 'b'Tumor Mutational Burden''
 ...Cell[2][1] has content 'b'Seq''
 ...Cell[2][2] has content 'b'DNA-Tumor''
 ...Cell[2][3] has content 'b'High | 19 Mutations/ Mb''
axr492tv

axr492tv1#

我尝试使用azure表单识别器读取PDF文档,并使用azure数据库将其转换为 Dataframe ,以下是详细步骤

  • 〉浏览所需的发票pdf文件,并在分析识别器后单击分析

  • 〉我创建了一个存储帐户和两个容器输入(存储输入发票),freg(存储输出csv)

  • 〉创建Azure数据库笔记本

1.软件包安装

%pip install azure.storage.blob
%pip install azure.ai.formrecognizer

2.连接到Azure存储容器

from azure.storage.blob import ContainerClient

container_url = "https://formrecognizerdemo070621.blob.core.windows.net/pdf-raw"
container = ContainerClient.from_container_url(container_url)

3.启用认知服务

在代码中,我们提供所创建的表单识别器的键值和端点,而不是cognitiveServicesEndpoint、cognitiveServicesKey
x1c4d 1x指令集

import requests
from azure.ai.formrecognizer import FormRecognizerClient
from azure.core.credentials import AzureKeyCredential

endpoint = dbutils.secrets.get(scope="formrec",key="cognitiveServicesEndpoint")
key = dbutils.secrets.get(scope="formrec",key="cognitiveServicesKey")

form_recognizer_client = FormRecognizerClient(endpoint=endpoint, credential=AzureKeyCredential(key))

4.将文件发送到认知服务并转换为Dataframe

import pandas as pd

field_list = ["InvoiceId", "VendorName", "VendorAddress", "CustomerName", "CustomerAddress", "CustomerAddressRecipient", "InvoiceDate", "InvoiceTotal", "DueDate"]
df = pd.DataFrame(columns=field_list)

for blob in container.list_blobs():
  blob_url = container_url + "/" + blob.name
  poller = form_recognizer_client.begin_recognize_invoices_from_url(invoice_url=blob_url)
  invoices = poller.result()
  print("Scanning " + blob.name + "...")
  
  for idx, invoice in enumerate(invoices):
      single_df = pd.DataFrame(columns=field_list)

      for field in field_list:
        entry = invoice.fields.get(field)
        
        if entry:
          single_df[field] = [entry.value]
          
      single_df['FileName'] = blob.name
      df = df.append(single_df)

df = df.reset_index(drop=True)
df

5.将结果的 Dataframe 上载到Azure

由于安全问题,我使用了相关的替换名称来代替实际名称StorageAccountName =最初创建的存储帐户名称,OutputContainer =为存储输出文件创建的约束器,formreckey =表单识别器的密钥

account_name = "StorageAccountName"
account_key = "fs.azure.account.key." + account_name + ".blob.core.windows.net"

 dbutils.fs.mount(
    source = "wasbs://OutputContainer@StorageAccount.blob.core.windows.net",
    mount_point = "/mnt/OutputContainer",
    extra_configs = {account_key: dbutils.secrets.get(scope = "formrec", key = "formreckey")} )

df.to_csv(r"/dbfs/mnt/OutputContainer/output.csv", index=False)


指令集

相关问题