Azure Data Facory pipeline将Batch Service作为链接服务运行自定义Activity,其中包含Python代码。该转换是为本地运行而构建的,我想让它在Azure上运行,将blob(csv文件)保存到Azure存储(Az Data Lake)中。
转换在块上运行,并在for循环中执行。
for i, in_chunk in enumerate(pd.read_csv(source, chunksize=6000, sep=";")
# transformation happens and spits out out_chunk parameter
# holding the 6000 rows out of the entire file
kwargs = {'mode':'a', 'header': False} if i>0 else {}
out_chunk.to_csv(cdl_file, sep="|", index=False, **kwargs)
字符串
在此之后,我尝试了不同的方法,因为它是写在这个问题和答案例如:我原来的问题,另一个问题
在上面的问题和答案中编写的解决方案没有抛出错误,但它没有存储整个文件,只存储指定为块的6000行。
我做错了什么?我不明白这应该如何处理。
编辑:根据JayashankarGS的要求,我添加了我尝试的代码和发生的事情的截图。
def transform_data(v, outputType, eventType, fileName, config, source, conn_string, dl, encoding, in_dtypes = None, chunkSize=10000, in_sep = ";"):
folderName = 'temp'
containerName = 'input'
outputBlobName = folderName + "/" + fileName
inputBlobPath = containerName + "/" + outputBlobName
blob = BlobClient.from_connection_string(conn_str=conn_string, container_name=containerName, blob_name=outputBlobName)
credential = {'connection_string': conn_string}
accountName = conn_string.split("AccountName=")[1].split(";")[0]
adls_path = 'abfs://' + containerName + '@' + accountName + '.dfs.core.windows.net/' + outputBlobName
template = pd.DataFrame(columns = v.headers[outputType])
transformationSchema = config[outputType + "_out_transformations"]
logging.info('Reading data chunk...')
for i, in_chunk in enumerate(pd.read_csv(source,
chunksize = chunkSize,
sep=in_sep,
encoding = 'unicode_escape',
dtype=in_dtypes)):
logging.info('Handle duplicates and missing fields...')
in_chunk.fillna('', inplace=True)
out_chunk = template.copy()
out_chunk.fillna('', inplace=True)
out_chunk.drop_duplicates(subset=[config["composite_key"]["key_partA"], config["composite_key"]["key_partB"], config["composite_key"]["key_partC"]],inplace=True)
logging.info('Start data transformation for schema: ' + outputType)
for name, spec in transformationSchema.items():
out_chunk[name] = transform_column(in_chunk, spec)
kwargs = {'mode': 'a', 'header': False}
logging.info('Transformation was successful for ' + outputType)
dateTime = time.strftime("%Y%m%d%H%M%S")
if not os.path.exists(containerName + "/" + folderName + "/"):
os.mkdir(containerName)
os.mkdir(containerName + "/" + folderName + "/")
print(f"Uploading chunk: {len(out_chunk)}")
logging.info('Trying to store transformed file in Azure Storage...')
out_chunk.to_csv(adls_path, storage_options=credential, sep=dl, index=False, **kwargs)
型
这样做的结果是生成并存储在Azure存储中的两个文件。正如您在Azure Data Factory上使用Batch Service运行此操作的结果中所看到的,它将按照给定的批处理大小处理10000行,然后尝试对第二个文件执行相同的操作。“未找到文件”错误来自转换后的一个验证程序步骤(忽略该警告!)。
x1c 0d1x的数据
1条答案
按热度按时间shyt4zoc1#
原因是有不同的blob类型。
在您的代码中:
字符串
对于第一次迭代,您没有传递
mode
;默认情况下,这会使文件类型为Block blob。x1c 0d1x的数据
写入存储时,BlobType应该相同。
因此,只需从一开始就给予模式:
kwargs = {'mode': 'a', 'header': False}
下面是我的数据统计:
的
现在上传一个大小为500的块:
型
输出:
和
的
再次,获取文件:
备注:第一次运行输出文件时,请确保存储帐户中不存在输出文件,如果存在,则应为Append blob类型。