I want to convert a workbook object (<openpyxl.workbook.workbook.Workbook object 0x00000150265A6B80>) into an Output stream using python.
this is my python code.
def main(inputblob: func.InputStream, outputblob: func.Out[BytesIO]):
logging.info(f"Python blob trigger function processed blob \n"
f"Name: {inputblob.name}\n"
f"file: {inputblob}\n"
f"Blob Size: {inputblob.length} bytes")
...some code goes here...
today = date.today()
date_today = today.strftime("%m-%d-%y")
outputfile = "{}_COMPLETE_{}".format(date_today,input_file_name)
wb.save(outputfile)
output = BytesIO(wb)
outputblob.set(output)
This gives an error.
Executed 'Functions.BlobTrigger' (Failed, Id=291563d2-407c-4540-9471-9f49cbc74857, Duration=77564ms) [2023-03-30T06:42:20.778Z] System.Private.CoreLib: Exception while executing function: Functions.BlobTrigger. System.Private.CoreLib: Result: Failure Exception: TypeError: a bytes-like object is required, not 'Workbook' Stack: File "C:\Program Files\Microsoft\Azure Functions Core Tools\workers\python\3.9/WINDOWS/X64\azure_functions_worker\dispatcher.py", line 452, in _handle__invocation_request call_result = await self._loop.run_in_executor( File "C:\Users\Asuni\AppData\Local\Programs\Python\Python39\lib\concurrent\futures\thread.py", line 58, in run result = self.fn(*self.args, *self.kwargs) File "C:\Program Files\Microsoft\Azure Functions Core Tools\workers\python\3.9/WINDOWS/X64\azure_functions_worker\dispatcher.py", line 718, in run_sync_func return ExtensionManager.get_sync_invocation_wrapper(context, File "C:\Program Files\Microsoft\Azure Functions Core Tools\workers\python\3.9/WINDOWS/X64\azure_functions_worker\extension.py", line 215, in *raw_invocation_wrapper result = function(**args) File "D:\Automization of Invoice Compare\Service Projects-1\Customers\InvoiceCompare\BlobTriggerinit.py", line 368, in main output = BytesIO(wb)
I tried various things like converting my workbook into BytesIO, StringIO objects.But any of those did not work.
Can someone please help me?
PS: My full code
import logging
import azure.functions as func
import pandas as pd
import os,io
from azure.data.tables import TableServiceClient
#from azure.storage.blob import BlobServiceClient, BlobClient,
ContainerClient
from openpyxl import Workbook
from openpyxl.styles import PatternFill, Border, Side, Alignment,
Protection, Font
from datetime import date
from io import BytesIO,StringIO
def main(inputblob: func.InputStream, outputblob:
func.Out[StringIO]):
logging.info(f"Python blob trigger function processed blob \n"
f"Name: {inputblob.name}\n"
f"file: {inputblob}\n"
f"Blob Size: {inputblob.length} bytes")
# Set up the connection to the Azure Table storage account
connection_string =
'DefaultEndpointsProtocol=http;BlobEndpoint=http://127.0.0.1:10000/devstoreaccount1;QueueEndpoint=http://127.0.0.1:10001/devstoreaccount1;TableEndpoint=http://127.0.0.1:10002/devstoreaccount1;AccountName=devstoreaccount1;AccountKey=Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==;'
table_service_client =
TableServiceClient.from_connection_string(connection_string)
# Create the BlobServiceClient object
#blob_service_client =
BlobServiceClient.from_connection_string(connection_string)
# Get a reference to the table
table_name = 'runtimedocument'
table_client_document =
table_service_client.get_table_client(table_name)
# Get a reference to the blob container
#container_name = 'cloudops-resources/inputs'
# Get a reference to the blob
#blob_client_instance =
blob_service_client.get_blob_client(container_name, inputblob,
snapshot=None)
# Instantiate a ContainerClient
#container_client =
blob_service_client.get_container_client(container_name)
# Query the entities in runtimedocument table
INOs_from_runtimedocument =
list(table_client_document.list_entities(select=["InvoiceNo"]))
Status_from_runtimedocument =
list(table_client_document.list_entities(select=["Status"]))
INOs_from_runtimedocument_list = []
for x in range(len(INOs_from_runtimedocument)):
INOs_from_runtimedocument_list.append(INOs_from_runtimedocument[x]
["InvoiceNo"])
Status_from_runtimedocument_list = []
for y in range(len(Status_from_runtimedocument)):
Status_from_runtimedocument_list.append(Status_from_runtimedocument[y]["Status"])
# Query the entities in runtimeedi table
runtimeedi = ['100302308549','100302308550','100302308551','100302308552','100302308553','100302308554','100302308555','100302308556',
'100302308557','100302308558','100302308559','100302308560']
# Query the entities in runtimeemail table
runtimeemail = ['100302308552','100302308553','100302308554','100302308555','100302308556','100302308557','100302308558','100302308559',
'100302308560','100302308561','100302308562','100302308563']
# Query the entities in runtimefile table
runtimefile = ['100302308557','100302308558','100302308559','100302308560','100302308561','100302308562','100302308563','100302308564',
'100302308565','100302308566','100302308567','100302308568']
# Query the entities in runtimeprint table
runtimelocalprint = ['100302308567','100302308568','100302308569','100302308570','100302308571','100302308572','100302308573','100302308574',
'100302308575','100302308576','100302308577','100302308578']
# List the blobs in the container
'''blob_list = container_client.list_blobs(prefix="inputs/")
for blob in blob_list:
input_file_name = (blob.name[7:])'''
#blobstring = blob_service_client.get_blob_to_text("cloudops-
resources/inputs",inputblob).content
#Reading the input xlsx file from customer
#df0 = pd.read_excel(StringIO(blobstring))
#book = xlrd.open_workbook(file_contents=inputblob.read())
#df0 = openpyxl.load_workbook(inputblob)
df0 = pd.read_excel(inputblob.read())
logging.info(f"{df0}")
#Opening a new workbook and a worksheet
wb = Workbook()
sheet = wb.active
#Defining column width
sheet.column_dimensions['A'].width = 20
sheet.column_dimensions['B'].width = 15
sheet.column_dimensions['C'].width = 10
sheet.column_dimensions['D'].width = 10
sheet.column_dimensions['E'].width = 10
sheet.column_dimensions['F'].width = 10
sheet.column_dimensions['G'].width = 10
sheet.column_dimensions['H'].width = 22
sheet.column_dimensions['I'].width = 19
sheet.column_dimensions['J'].width = 19
sheet.column_dimensions['K'].width = 19
sheet.column_dimensions['L'].width = 19
sheet.column_dimensions['M'].width = 19
# Add a header format.
font1 = Font(name='Calibri',size=11,bold=True)
font2 = Font(name='Calibri',size=11,bold=False)
fill = PatternFill(start_color='ECC4A0',
end_color='ECC4A0',
fill_type='solid')
border = Border(
left=Side(border_style='thin', color='00000000'),
right=Side(border_style='thin', color='00000000'),
top=Side(border_style='thin', color='00000000'),
bottom=Side(border_style='thin', color='00000000'))
alignment=Alignment(horizontal='center',
vertical='center',
wrap_text=True,
shrink_to_fit=True)
protection = Protection(locked=True,
hidden=False)
#Adding column headers
hA = sheet['A1']
hB = sheet['B1']
hC = sheet['C1']
hD = sheet['D1']
hE = sheet['E1']
hF = sheet['F1']
hG = sheet['G1']
hH = sheet['H1']
hI = sheet['I1']
hJ = sheet['J1']
hK = sheet['K1']
hL = sheet['L1']
hM = sheet['M1']
hA.value = "InvNo"
hA.font = font1
hA.border = border
hA.alignment = alignment
hA.protection = protection
hB.value = "Payer"
hB.font = font1
hB.border = border
hB.alignment = alignment
hB.protection = protection
hC.value = "Document"
hC.font = font1
hC.border = border
hC.alignment = alignment
hC.protection = protection
hD.value = "EDI"
hD.font = font1
hD.border = border
hD.alignment = alignment
hD.protection = protection
hE.value = "E-mail"
hE.font = font1
hE.border = border
hE.alignment = alignment
hE.protection = protection
hF.value = "File"
hF.font = font1
hF.border = border
hF.alignment = alignment
hF.protection = protection
hG.value = "Print"
hG.font = font1
hG.border = border
hG.alignment = alignment
hG.protection = protection
hH.value = "Check Invoice"
hH.font = font1
hH.border = border
hH.alignment = alignment
hH.protection = protection
hI.value = "runtimedocument status"
hI.font = font2
hI.fill = fill
hI.border = border
hI.alignment = alignment
hI.protection = protection
hJ.value = "runtimeedi status"
hJ.font = font2
hJ.fill = fill
hJ.border = border
hJ.alignment = alignment
hJ.protection = protection
hK.value = "runtimeemail status"
hK.font = font2
hK.fill = fill
hK.border = border
hK.alignment = alignment
hK.protection = protection
hL.value = "runtimefile status"
hL.font = font2
hL.fill = fill
hL.border = border
hL.alignment = alignment
hL.protection = protection
hM.value = "runtimelocalprint status"
hM.font = font2
hM.fill = fill
hM.border = border
hM.alignment = alignment
hM.protection = protection
greenfill = PatternFill(start_color='C0FED5' , end_color='C0FED5',
fill_type='solid')
yellowfill = PatternFill(start_color='FDF4BB' , end_color='FDF4BB',
fill_type='solid')
redfill = PatternFill(start_color='CD362F' , end_color='CD362F',
fill_type='solid')
pinkfill = PatternFill(start_color='FEB9CF' , end_color='FEB9CF',
fill_type='solid')
#Iterating through every row
invoice_no_list = []
payer_list = []
length = (len(df0))
for i in range(len(df0)):
div = df0.at[i, 'Div']
inv = df0.at[i, 'Inv no']
div_str = str(div)
inv_str = str(inv)
#concatanating division and invoice no from the input file
Invoice_No = div_str + inv_str
invoice_no_list.append(Invoice_No)
cellNo_InvoiceNo = "A"+str(i+2)
A = sheet[cellNo_InvoiceNo]
A.value = Invoice_No
#Slicing the " ' " in the payer column
payer = df0.at[i,'Payer']
payer_modified = payer[1:]
payer_list.append(payer_modified)
cellNo_payer = "B"+str(i+2)
B = sheet[cellNo_payer]
B.value = payer_modified
#Checking whether the Invoice nos are in runtimedocument
cellNo_document_existence = "C"+str(i+2)
C = sheet[cellNo_document_existence]
C.border = border
if Invoice_No in INOs_from_runtimedocument_list:
C.value = "Exists"
C.fill = greenfill
else:
C.value = "N/A"
C.fill = yellowfill
#Finding Index Positions of duplicate invoice numbers in
runtimedocument
def find_indices(INOs_from_runtimedocument_list, Invoice_No):
indices = []
for idx, value in
enumerate(INOs_from_runtimedocument_list):
if value == Invoice_No:
indices.append(idx)
return indices
duplicates = find_indices(INOs_from_runtimedocument_list,
Invoice_No)
cellNo_status_document = "I"+str(i+2)
I = sheet[cellNo_status_document]
I.border = border
I.alignment = alignment
#printing error status of relevant invoice numbers in
runtimedocument
for d in duplicates:
if(Status_from_runtimedocument_list[d]=="error"):
I.value = "error"
I.fill = pinkfill
else:
pass
#printing ok status in the remaining cells in runtimedocument
if (i <= length):
if (I.value == None):
I.value = "ok"
I.fill = greenfill
else:
pass
#Checking whether the Invoice nos are in runtimeedi
cellNo_edi_existence = "D"+str(i+2)
D = sheet[cellNo_edi_existence]
D.border = border
if Invoice_No in runtimeedi:
D.value = "Exists"
D.fill = greenfill
else:
D.value = "N/A"
D.fill = yellowfill
#Checking whether the Invoice nos are in runtimeemail
cellNo_email_existence = "E"+str(i+2)
E = sheet[cellNo_email_existence]
E.border = border
if Invoice_No in runtimeemail:
E.value = "Exists"
E.fill = greenfill
else:
E.value = "N/A"
E.fill = yellowfill
#Checking whether the Invoice nos are in runtimefile
cellNo_file_existence = "F"+str(i+2)
F = sheet[cellNo_file_existence]
F.border = border
if Invoice_No in runtimefile:
F.value = "Exists"
F.fill = greenfill
else:
F.value = "N/A"
F.fill = yellowfill
#Checking whether the Invoice nos are in runtimelocalprint
cellNo_localprint_existence = "G"+str(i+2)
G = sheet[cellNo_localprint_existence]
G.border = border
if Invoice_No in runtimelocalprint:
G.value = "Exists"
G.fill = greenfill
else:
G.value = "N/A"
G.fill = yellowfill
if (i<=length):
cellNo_Check_Invoice = "H"+str(i+2)
H = sheet[cellNo_Check_Invoice]
H.border = border
if(D.value == "N/A" and E.value == "N/A" and G.value ==
"N/A"):
H.value = "Investigation needed"
H.fill = redfill
elif(D.value == "Exists" and E.value == "N/A" and G.value
== "N/A" and I.value == "error"):
H.value = "Investigation needed"
H.fill = redfill
elif(D.value == "Exists" and E.value == "N/A" and G.value
== "N/A" and I.value == "ok"):
H.value = "Investigation not needed"
H.fill = greenfill
else:
pass
#Saving the file
#input = os.path.basename(inputblob.name)
today = date.today()
date_today = today.strftime("%m-%d-%y")
outputfile = "{}_COMPLETE_{}".format(date_today,inputblob.name)
wb.save(outputfile)
#Converting output file into a StringIO object
output = StringIO.__str__(wb)
out = StringIO(output)
outputblob.set(out)
'''logging.info(f"Name: {outputblob.name}\n"
f"file: {outputblob}\n"
f"Blob Size: {outputblob.length} bytes")'''
Here, I have converted the workbook object into StringIO.I want to place this output file in a blob container in azure storage explorer. I want the file as an Output stream.
My current code for saving the file in the blob container.
#Saving the file
input = os.path.basename(inputblob.name)
today = date.today()
date_today = today.strftime("%m-%d-%y")
outputfile = "{}_COMPLETE_{}".format(date_today,input)
cc = "http://127.0.0.1:10000/devstoreaccount1/cloudops-
resources/outputs/"+ outputfile
wk = openpyxl.load_workbook(cc)
o = io.BytesIO()
wk.save(o)
data= o.getvalue()
print(data)
but it gives this error.
Exception: OSError: [Errno 22] Invalid argument: 'http://127.0.0.1:10000/devstoreaccount1/cloudops-resources/outputs/03-30-23_COMPLETE_Input_file456.xlsx'
I tried this as well.
#Saving the file
input = os.path.basename(inputblob.name)
today = date.today()
date_today = today.strftime("%m-%d-%y")
outputfile = "{}_COMPLETE_{}".format(date_today,input)
wb.save("template.xlsx")
with NamedTemporaryFile() as outputfile:
wb.save(outputfile.name)
outputfile.seek(0)
stream = outputfile.read()
outputblob.set(data)
But, it gives this error.
Exception: PermissionError: [Errno 13] Permission denied: 'C:\Users\Asu\AppData\Local\Temp\tmp06vytpds'
1条答案
按热度按时间w6lpcovy1#
如何在python中将工作簿对象转换为输出流
我已经在我的环境中复制并获得了如下预期结果,我遵循了Document:
你可以保留outputfile作为你的输出文件。
输出:
这里你需要修改的代码是:
输出:
在我的例子中,我从location硬编码了输出文件xlsx。
在您的代码中,您直接给出输出文件,首先您需要使用
openpyxl.load_workbook(outputfile)
加载到工作簿中