我使用Apache Flink进行批处理模式的文件处理。最初,我将CSV文件读入自定义对象DataSet readCsvData。随后,我对数据执行了各种验证,并更新了readCsvData中的isValid标志。例如,将isValid设置为true表示有效记录,将false表示无效记录。
现在,我的目标是使用一个条件语句来确定是否所有的CSV文件记录都是有效的。
例如:
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
String fileName = myFile.csv
DataSet<MyObject> readCsvData = readAndProcessCsvData(env, fileName);
DataSet<MyObject> validateFile = validateFile(readCsvData);
if(toCheckIsValidFlag){
//********** process further ***********
generateAckFile();
}
else
{
generateNckFile();
}
env.execute();
字符串
而MyObject是
public class MyObject{
private FileDetails fileDetails;
private String fileName;
public boolean isValid ;
public String invalidReason;
}
型
我在执行环境中操作(批处理模式),限制使用.collect()或.count()等急切函数。直接设置该标志是不可行的,因为自定义代码在作业运行后执行。如果不使用.collect()或collector()等运算符,则无法从DataSet检索MyObject。因此,我正在寻找一个方法来检查readCsvData中的所有记录是否都有效(readCsvData.isValid == true)。根据这个条件,我打算继续进行进一步的转换并接收数据,或者将所有数据定向到文件接收器。
使用老化器功能时,显示错误。
例外情况:
Caused by: org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: Job was submitted in detached mode. Results of job execution, such as accumulators, runtime, etc. are not available. Please make sure your program doesn't call an eager execution function [collect, print, printToErr, count].
型
1条答案
按热度按时间xxe27gdn1#
因为你需要处理所有的记录来决定你的下游操作将做什么,你有两个选择。
1.将其作为两个作业运行,其中第一个作业有一个无效记录数的计数器,并将所有结果写入临时存储器(理想情况下采用比CSV更有效的格式,例如Parquet)。然后第二个作业可以检查计数器并决定如何处理数据。
1.缓冲所有数据,这(使用数据集API)意味着默认情况下使用内存。然后,您可以根据所有记录的有效标志推迟决定要做什么。但这显然会对任何一个文件的大小产生内存限制。
请注意,如果您只需要分离数据,则可以使用DataStream API和bucket将有效记录和无效记录写入不同的位置。