apache beam textio读取文件,行号为

ohfgkhjo  于 2021-07-12  发布在  Java
关注(0)|答案(0)|浏览(235)

是否可以从textio.read()获取行号?如果不是的话,您能指出一个从java中的textio继承的自定义i/o的例子吗
我的问题是,我需要读取一个大的csv文件(150gb+),并在30分钟内生成一个行号。
使用以下代码 FileIO 并且打开一个文件然后放序列号,这个数据流作业不能做伸缩,它只阻塞了一个worker,在数据流上花费的时间大约是5个小时 n1-standard-2 ```
@Slf4j
public class ApplyRowIndexFn extends DoFn<FileIO.ReadableFile, KV<Integer, String>> {

  1. protected final PCollectionView<TransformSideInput> transformFnInput;
  2. public ApplyRowIndexFn(PCollectionView<TransformSideInput> transformFnInput) {
  3. this.transformFnInput = transformFnInput;
  4. }
  5. @ProcessElement
  6. public void processElement(ProcessContext context) {
  7. TransformSideInput input = context.sideInput(transformFnInput);
  8. String header = input.getHeader();
  9. try (Reader reader = Channels.newReader(
  10. FileSystems.open(context.element().getMetadata().resourceId()), "UTF-8");
  11. BufferedReader buffer = new BufferedReader(reader)) {
  12. String row;
  13. int counter = 1;
  14. while ((row = buffer.readLine()) != null) {
  15. if (!row.equals(header)) {
  16. context.output(KV.of(counter, row));
  17. counter = counter + 1;
  18. }
  19. }
  20. } catch (IOException e) {
  21. e.printStackTrace();
  22. }
  23. }

}

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题