org.apache.beam.sdk.io.FileSystems.open()方法的使用及代码示例

x33g5p2x  于2022-01-19 转载在 其他  
字(11.1k)|赞(0)|评价(0)|浏览(172)

本文整理了Java中org.apache.beam.sdk.io.FileSystems.open()方法的一些代码示例,展示了FileSystems.open()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。FileSystems.open()方法的具体详情如下:
包路径:org.apache.beam.sdk.io.FileSystems
类名称:FileSystems
方法名:open

FileSystems.open介绍

[英]Returns a read channel for the given ResourceId.

The resource is not expanded; it is used verbatim.

If seeking is supported, then this returns a java.nio.channels.SeekableByteChannel.
[中]返回给定ResourceId的读取通道。
资源没有扩大;它是逐字使用的。
如果支持查找,则返回一个java。尼奥。频道。Seekablebyte频道。

代码示例

代码示例来源:origin: GoogleCloudPlatform/java-docs-samples

public static String getSchema(String schemaPath) throws IOException {
 ReadableByteChannel chan = FileSystems.open(FileSystems.matchNewResource(
   schemaPath, false));
 try (InputStream stream = Channels.newInputStream(chan)) {
  BufferedReader streamReader = new BufferedReader(new InputStreamReader(stream, "UTF-8"));
  StringBuilder dataBuilder = new StringBuilder();
  String line;
  while ((line = streamReader.readLine()) != null) {
   dataBuilder.append(line);
  }
  return dataBuilder.toString();
 }
}

代码示例来源:origin: GoogleCloudPlatform/java-docs-samples

private static String getSchema(String schemaPath) throws IOException {
 ReadableByteChannel channel = FileSystems.open(FileSystems.matchNewResource(
   schemaPath, false));
 try (InputStream stream = Channels.newInputStream(channel)) {
  BufferedReader streamReader = new BufferedReader(new InputStreamReader(stream, "UTF-8"));
  StringBuilder dataBuilder = new StringBuilder();
  String line;
  while ((line = streamReader.readLine()) != null) {
   dataBuilder.append(line);
  }
  return dataBuilder.toString();
 }
}

代码示例来源:origin: GoogleCloudPlatform/DataflowTemplates

/** utility method to copy binary (jar file) data from source to dest. */
 private static void copy(ResourceId source, ResourceId dest) throws IOException {
  try (ReadableByteChannel rbc = FileSystems.open(source)) {
   try (WritableByteChannel wbc = FileSystems.create(dest, MimeTypes.BINARY)) {
    ByteStreams.copy(rbc, wbc);
   }
  }
 }
}

代码示例来源:origin: GoogleCloudPlatform/DataflowTemplates

@Override
 public String apply(String input) {
  ResourceId sourceResourceId = FileSystems.matchNewResource(input, false);
  String schema;
  try (ReadableByteChannel rbc = FileSystems.open(sourceResourceId)) {
   try (ByteArrayOutputStream baos = new ByteArrayOutputStream()) {
    try (WritableByteChannel wbc = Channels.newChannel(baos)) {
     ByteStreams.copy(rbc, wbc);
     schema = baos.toString(Charsets.UTF_8.name());
     LOG.info("Extracted schema: " + schema);
    }
   }
  } catch (IOException e) {
   LOG.error("Error extracting schema: " + e.getMessage());
   throw new RuntimeException(e);
  }
  return schema;
 }
});

代码示例来源:origin: com.spotify/scio-core

private static void copyToLocal(Metadata src, Path dst) throws IOException {
 FileChannel dstCh = FileChannel.open(
   dst, StandardOpenOption.WRITE, StandardOpenOption.CREATE_NEW);
 ReadableByteChannel srcCh = FileSystems.open(src.resourceId());
 long srcSize = src.sizeBytes();
 long copied = 0;
 do {
  copied += dstCh.transferFrom(srcCh, copied, srcSize - copied);
 } while (copied < srcSize);
 dstCh.close();
 srcCh.close();
 Preconditions.checkState(copied == srcSize);
}

代码示例来源:origin: org.apache.beam/beam-examples-java

public static String copyFile(ResourceId sourceFile, ResourceId destinationFile)
  throws IOException {
 try (WritableByteChannel writeChannel = FileSystems.create(destinationFile, "text/plain")) {
  try (ReadableByteChannel readChannel = FileSystems.open(sourceFile)) {
   final ByteBuffer buffer = ByteBuffer.allocateDirect(16 * 1024);
   while (readChannel.read(buffer) != -1) {
    buffer.flip();
    writeChannel.write(buffer);
    buffer.compact();
   }
   buffer.flip();
   while (buffer.hasRemaining()) {
    writeChannel.write(buffer);
   }
  }
 }
 return destinationFile.toString();
}

代码示例来源:origin: org.apache.beam/beam-sdks-java-core

/**
 * Returns a {@link ReadableByteChannel} reading the data from this file, potentially
 * decompressing it using {@link #getCompression}.
 */
public ReadableByteChannel open() throws IOException {
 return compression.readDecompressed(FileSystems.open(metadata.resourceId()));
}

代码示例来源:origin: GoogleCloudPlatform/DataflowTemplates

/**
  * Parses a JSON file and Returns a JSONObject containing the necessary source, sink, and schema
  * information.
  *
  * @param pathToJSON the JSON file location so we can download and parse it
  * @return the parsed JSONObject
  */
 public JSONObject parseSchema(String pathToJSON) throws Exception {

  try {
   ReadableByteChannel readableByteChannel =
     FileSystems.open(FileSystems.matchNewResource(pathToJSON, false));

   String json = new String(
     StreamUtils.getBytesWithoutClosing(Channels.newInputStream(readableByteChannel)));

   return new JSONObject(json);
  } catch (Exception e) {
   throw new RuntimeException(e);
  }
 }
}

代码示例来源:origin: org.apache.beam/beam-sdks-java-core

/**
 * Reads all the lines of all the files.
 *
 * <p>Not suitable for use except in testing of small data, since the data size may be far more
 * than can be reasonably processed serially, in-memory, by a single thread.
 */
@VisibleForTesting
List<String> readLines(Collection<Metadata> files) throws IOException {
 List<String> allLines = Lists.newArrayList();
 int i = 1;
 for (Metadata file : files) {
  try (Reader reader =
    Channels.newReader(FileSystems.open(file.resourceId()), StandardCharsets.UTF_8.name())) {
   List<String> lines = CharStreams.readLines(reader);
   allLines.addAll(lines);
   LOG.debug("[{} of {}] Read {} lines from file: {}", i, files.size(), lines.size(), file);
  }
  i++;
 }
 return allLines;
}

代码示例来源:origin: org.apache.beam/beam-sdks-java-core

/**
  * Reads all the lines of all the files.
  *
  * <p>Not suitable for use except in testing of small data, since the data size may be far more
  * than can be reasonably processed serially, in-memory, by a single thread.
  */
 @VisibleForTesting
 List<String> readLines(Collection<Metadata> files) throws IOException {
  List<String> allLines = Lists.newArrayList();
  int i = 1;
  for (Metadata file : files) {
   try (Reader reader =
     Channels.newReader(FileSystems.open(file.resourceId()), StandardCharsets.UTF_8.name())) {
    List<String> lines = CharStreams.readLines(reader);
    allLines.addAll(lines);
    LOG.debug("[{} of {}] Read {} lines from file: {}", i, files.size(), lines.size(), file);
   }
   i++;
  }
  return allLines;
 }
}

代码示例来源:origin: org.apache.beam/beam-sdks-java-core

/**
  * Reads all the lines of all the files.
  *
  * <p>Not suitable for use except in testing of small data, since the data size may be far more
  * than can be reasonably processed serially, in-memory, by a single thread.
  */
 @VisibleForTesting
 List<String> readLines(Collection<Metadata> files) throws IOException {
  List<String> allLines = Lists.newArrayList();
  int i = 1;
  for (Metadata file : files) {
   try (Reader reader =
     Channels.newReader(FileSystems.open(file.resourceId()), StandardCharsets.UTF_8.name())) {
    List<String> lines = CharStreams.readLines(reader);
    allLines.addAll(lines);
    LOG.debug("[{} of {}] Read {} lines from file: {}", i, files.size(), lines.size(), file);
   }
   i++;
  }
  return allLines;
 }
}

代码示例来源:origin: spotify/dbeam

String readFromFile(String passwordFile) throws IOException {
 MatchResult.Metadata m = FileSystems.matchSingleFileSpec(passwordFile);
 LOGGER.info("Reading password from file: {}", m.resourceId().toString());
 InputStream inputStream = Channels.newInputStream(FileSystems.open(m.resourceId()));
 return CharStreams.toString(new InputStreamReader(inputStream, Charsets.UTF_8));
}

代码示例来源:origin: GoogleCloudPlatform/DataflowTemplates

private static Export readManifest(ResourceId fileResource) {
  Export.Builder result = Export.newBuilder();
  try (InputStream stream = Channels.newInputStream(FileSystems.open(fileResource))) {
   Reader reader = new InputStreamReader(stream);
   JsonFormat.parser().merge(reader, result);
  } catch (IOException e) {
   throw new RuntimeException(e);
  }
  return result.build();
 }
}

代码示例来源:origin: GoogleCloudPlatform/DataflowTemplates

try (Reader reader =
  Channels.newReader(
    FileSystems.open(resourceId), StandardCharsets.UTF_8.name())) {
 return CharStreams.toString(reader);
} catch (IOException e) {

代码示例来源:origin: GoogleCloudPlatform/DataflowTemplates

try (InputStream stream = Channels.newInputStream(FileSystems.open(resourceId))) {
 BinaryDecoder decoder = DecoderFactory.get().binaryDecoder(stream, null);
 byte[] magic = new byte[DataFileConstants.MAGIC.length];

代码示例来源:origin: org.apache.beam/beam-sdks-java-io-tika

@Setup
public void setup() throws Exception {
 if (spec.getTikaConfigPath() != null) {
  ResourceId configResource =
    FileSystems.matchSingleFileSpec(spec.getTikaConfigPath().get()).resourceId();
  tikaConfig = new TikaConfig(Channels.newInputStream(FileSystems.open(configResource)));
 }
}

代码示例来源:origin: GoogleCloudPlatform/DataflowTemplates

compression.readDecompressed(FileSystems.open(inputFile))) {
try (WritableByteChannel writerChannel = FileSystems.create(tempFile, MimeTypes.TEXT)) {
 ByteStreams.copy(readerChannel, writerChannel);

代码示例来源:origin: GoogleCloudPlatform/DataflowTemplates

@ProcessElement
 public void processElement(ProcessContext c) {
  try {
   KV<String, String> kv = c.element();
   String filePath = GcsUtil.joinPath(importDirectory.get(), kv.getValue());
   MatchResult match = FileSystems.match(filePath, EmptyMatchTreatment.DISALLOW);
   ResourceId resourceId = match.metadata().get(0).resourceId();
   TableManifest.Builder builder = TableManifest.newBuilder();
   try (InputStream stream =
     Channels.newInputStream(FileSystems.open(resourceId))) {
    Reader reader = new InputStreamReader(stream);
    JsonFormat.parser().merge(reader, builder);
   }
   c.output(KV.of(kv.getKey(), builder.build()));
  } catch (IOException e) {
   throw new RuntimeException(e);
  }
 }
}));

代码示例来源:origin: GoogleCloudPlatform/DataflowTemplates

@ProcessElement
 public void processElement(ProcessContext context) {
  ResourceId inputFile = context.element().resourceId();
  Compression compression = compressionValue.get();
  // Add the compression extension to the output filename. Example: demo.txt -> demo.txt.gz
  String outputFilename = inputFile.getFilename() + compression.getSuggestedSuffix();
  // Resolve the necessary resources to perform the transfer
  ResourceId outputDir = FileSystems.matchNewResource(destinationLocation.get(), true);
  ResourceId outputFile =
    outputDir.resolve(outputFilename, StandardResolveOptions.RESOLVE_FILE);
  ResourceId tempFile =
    outputDir.resolve("temp-" + outputFilename, StandardResolveOptions.RESOLVE_FILE);
  // Perform the copy of the compressed channel to the destination.
  try (ReadableByteChannel readerChannel = FileSystems.open(inputFile)) {
   try (WritableByteChannel writerChannel =
     compression.writeCompressed(FileSystems.create(tempFile, MimeTypes.BINARY))) {
    // Execute the copy to the temporary file
    ByteStreams.copy(readerChannel, writerChannel);
   }
   // Rename the temporary file to the output file
   FileSystems.rename(ImmutableList.of(tempFile), ImmutableList.of(outputFile));
   // Output the path to the uncompressed file
   context.output(outputFile.toString());
  } catch (IOException e) {
   LOG.error("Error occurred during compression of {}", inputFile.toString(), e);
   context.output(DEADLETTER_TAG, KV.of(inputFile.toString(), e.getMessage()));
  }
 }
}

代码示例来源:origin: org.apache.beam/beam-sdks-java-core

@Override
protected final boolean startImpl() throws IOException {
 FileBasedSource<T> source = getCurrentSource();
 this.channel = FileSystems.open(source.getSingleFileMetadata().resourceId());
 if (channel instanceof SeekableByteChannel) {
  SeekableByteChannel seekChannel = (SeekableByteChannel) channel;
  seekChannel.position(source.getStartOffset());
 } else {
  // Channel is not seekable. Must not be a subrange.
  checkArgument(
    source.mode != Mode.SINGLE_FILE_OR_SUBRANGE,
    "Subrange-based sources must only be defined for file types that support seekable "
      + " read channels");
  checkArgument(
    source.getStartOffset() == 0,
    "Start offset %s is not zero but channel for reading the file is not seekable.",
    source.getStartOffset());
 }
 startReading(channel);
 // Advance once to load the first record.
 return advanceImpl();
}

相关文章