本文整理了Java中org.apache.beam.sdk.io.FileSystems.open()
方法的一些代码示例,展示了FileSystems.open()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。FileSystems.open()
方法的具体详情如下:
包路径:org.apache.beam.sdk.io.FileSystems
类名称:FileSystems
方法名:open
[英]Returns a read channel for the given ResourceId.
The resource is not expanded; it is used verbatim.
If seeking is supported, then this returns a java.nio.channels.SeekableByteChannel.
[中]返回给定ResourceId的读取通道。
资源没有扩大;它是逐字使用的。
如果支持查找,则返回一个java。尼奥。频道。Seekablebyte频道。
代码示例来源:origin: GoogleCloudPlatform/java-docs-samples
public static String getSchema(String schemaPath) throws IOException {
ReadableByteChannel chan = FileSystems.open(FileSystems.matchNewResource(
schemaPath, false));
try (InputStream stream = Channels.newInputStream(chan)) {
BufferedReader streamReader = new BufferedReader(new InputStreamReader(stream, "UTF-8"));
StringBuilder dataBuilder = new StringBuilder();
String line;
while ((line = streamReader.readLine()) != null) {
dataBuilder.append(line);
}
return dataBuilder.toString();
}
}
代码示例来源:origin: GoogleCloudPlatform/java-docs-samples
private static String getSchema(String schemaPath) throws IOException {
ReadableByteChannel channel = FileSystems.open(FileSystems.matchNewResource(
schemaPath, false));
try (InputStream stream = Channels.newInputStream(channel)) {
BufferedReader streamReader = new BufferedReader(new InputStreamReader(stream, "UTF-8"));
StringBuilder dataBuilder = new StringBuilder();
String line;
while ((line = streamReader.readLine()) != null) {
dataBuilder.append(line);
}
return dataBuilder.toString();
}
}
代码示例来源:origin: GoogleCloudPlatform/DataflowTemplates
/** utility method to copy binary (jar file) data from source to dest. */
private static void copy(ResourceId source, ResourceId dest) throws IOException {
try (ReadableByteChannel rbc = FileSystems.open(source)) {
try (WritableByteChannel wbc = FileSystems.create(dest, MimeTypes.BINARY)) {
ByteStreams.copy(rbc, wbc);
}
}
}
}
代码示例来源:origin: GoogleCloudPlatform/DataflowTemplates
@Override
public String apply(String input) {
ResourceId sourceResourceId = FileSystems.matchNewResource(input, false);
String schema;
try (ReadableByteChannel rbc = FileSystems.open(sourceResourceId)) {
try (ByteArrayOutputStream baos = new ByteArrayOutputStream()) {
try (WritableByteChannel wbc = Channels.newChannel(baos)) {
ByteStreams.copy(rbc, wbc);
schema = baos.toString(Charsets.UTF_8.name());
LOG.info("Extracted schema: " + schema);
}
}
} catch (IOException e) {
LOG.error("Error extracting schema: " + e.getMessage());
throw new RuntimeException(e);
}
return schema;
}
});
代码示例来源:origin: com.spotify/scio-core
private static void copyToLocal(Metadata src, Path dst) throws IOException {
FileChannel dstCh = FileChannel.open(
dst, StandardOpenOption.WRITE, StandardOpenOption.CREATE_NEW);
ReadableByteChannel srcCh = FileSystems.open(src.resourceId());
long srcSize = src.sizeBytes();
long copied = 0;
do {
copied += dstCh.transferFrom(srcCh, copied, srcSize - copied);
} while (copied < srcSize);
dstCh.close();
srcCh.close();
Preconditions.checkState(copied == srcSize);
}
代码示例来源:origin: org.apache.beam/beam-examples-java
public static String copyFile(ResourceId sourceFile, ResourceId destinationFile)
throws IOException {
try (WritableByteChannel writeChannel = FileSystems.create(destinationFile, "text/plain")) {
try (ReadableByteChannel readChannel = FileSystems.open(sourceFile)) {
final ByteBuffer buffer = ByteBuffer.allocateDirect(16 * 1024);
while (readChannel.read(buffer) != -1) {
buffer.flip();
writeChannel.write(buffer);
buffer.compact();
}
buffer.flip();
while (buffer.hasRemaining()) {
writeChannel.write(buffer);
}
}
}
return destinationFile.toString();
}
代码示例来源:origin: org.apache.beam/beam-sdks-java-core
/**
* Returns a {@link ReadableByteChannel} reading the data from this file, potentially
* decompressing it using {@link #getCompression}.
*/
public ReadableByteChannel open() throws IOException {
return compression.readDecompressed(FileSystems.open(metadata.resourceId()));
}
代码示例来源:origin: GoogleCloudPlatform/DataflowTemplates
/**
* Parses a JSON file and Returns a JSONObject containing the necessary source, sink, and schema
* information.
*
* @param pathToJSON the JSON file location so we can download and parse it
* @return the parsed JSONObject
*/
public JSONObject parseSchema(String pathToJSON) throws Exception {
try {
ReadableByteChannel readableByteChannel =
FileSystems.open(FileSystems.matchNewResource(pathToJSON, false));
String json = new String(
StreamUtils.getBytesWithoutClosing(Channels.newInputStream(readableByteChannel)));
return new JSONObject(json);
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}
代码示例来源:origin: org.apache.beam/beam-sdks-java-core
/**
* Reads all the lines of all the files.
*
* <p>Not suitable for use except in testing of small data, since the data size may be far more
* than can be reasonably processed serially, in-memory, by a single thread.
*/
@VisibleForTesting
List<String> readLines(Collection<Metadata> files) throws IOException {
List<String> allLines = Lists.newArrayList();
int i = 1;
for (Metadata file : files) {
try (Reader reader =
Channels.newReader(FileSystems.open(file.resourceId()), StandardCharsets.UTF_8.name())) {
List<String> lines = CharStreams.readLines(reader);
allLines.addAll(lines);
LOG.debug("[{} of {}] Read {} lines from file: {}", i, files.size(), lines.size(), file);
}
i++;
}
return allLines;
}
代码示例来源:origin: org.apache.beam/beam-sdks-java-core
/**
* Reads all the lines of all the files.
*
* <p>Not suitable for use except in testing of small data, since the data size may be far more
* than can be reasonably processed serially, in-memory, by a single thread.
*/
@VisibleForTesting
List<String> readLines(Collection<Metadata> files) throws IOException {
List<String> allLines = Lists.newArrayList();
int i = 1;
for (Metadata file : files) {
try (Reader reader =
Channels.newReader(FileSystems.open(file.resourceId()), StandardCharsets.UTF_8.name())) {
List<String> lines = CharStreams.readLines(reader);
allLines.addAll(lines);
LOG.debug("[{} of {}] Read {} lines from file: {}", i, files.size(), lines.size(), file);
}
i++;
}
return allLines;
}
}
代码示例来源:origin: org.apache.beam/beam-sdks-java-core
/**
* Reads all the lines of all the files.
*
* <p>Not suitable for use except in testing of small data, since the data size may be far more
* than can be reasonably processed serially, in-memory, by a single thread.
*/
@VisibleForTesting
List<String> readLines(Collection<Metadata> files) throws IOException {
List<String> allLines = Lists.newArrayList();
int i = 1;
for (Metadata file : files) {
try (Reader reader =
Channels.newReader(FileSystems.open(file.resourceId()), StandardCharsets.UTF_8.name())) {
List<String> lines = CharStreams.readLines(reader);
allLines.addAll(lines);
LOG.debug("[{} of {}] Read {} lines from file: {}", i, files.size(), lines.size(), file);
}
i++;
}
return allLines;
}
}
代码示例来源:origin: spotify/dbeam
String readFromFile(String passwordFile) throws IOException {
MatchResult.Metadata m = FileSystems.matchSingleFileSpec(passwordFile);
LOGGER.info("Reading password from file: {}", m.resourceId().toString());
InputStream inputStream = Channels.newInputStream(FileSystems.open(m.resourceId()));
return CharStreams.toString(new InputStreamReader(inputStream, Charsets.UTF_8));
}
代码示例来源:origin: GoogleCloudPlatform/DataflowTemplates
private static Export readManifest(ResourceId fileResource) {
Export.Builder result = Export.newBuilder();
try (InputStream stream = Channels.newInputStream(FileSystems.open(fileResource))) {
Reader reader = new InputStreamReader(stream);
JsonFormat.parser().merge(reader, result);
} catch (IOException e) {
throw new RuntimeException(e);
}
return result.build();
}
}
代码示例来源:origin: GoogleCloudPlatform/DataflowTemplates
try (Reader reader =
Channels.newReader(
FileSystems.open(resourceId), StandardCharsets.UTF_8.name())) {
return CharStreams.toString(reader);
} catch (IOException e) {
代码示例来源:origin: GoogleCloudPlatform/DataflowTemplates
try (InputStream stream = Channels.newInputStream(FileSystems.open(resourceId))) {
BinaryDecoder decoder = DecoderFactory.get().binaryDecoder(stream, null);
byte[] magic = new byte[DataFileConstants.MAGIC.length];
代码示例来源:origin: org.apache.beam/beam-sdks-java-io-tika
@Setup
public void setup() throws Exception {
if (spec.getTikaConfigPath() != null) {
ResourceId configResource =
FileSystems.matchSingleFileSpec(spec.getTikaConfigPath().get()).resourceId();
tikaConfig = new TikaConfig(Channels.newInputStream(FileSystems.open(configResource)));
}
}
代码示例来源:origin: GoogleCloudPlatform/DataflowTemplates
compression.readDecompressed(FileSystems.open(inputFile))) {
try (WritableByteChannel writerChannel = FileSystems.create(tempFile, MimeTypes.TEXT)) {
ByteStreams.copy(readerChannel, writerChannel);
代码示例来源:origin: GoogleCloudPlatform/DataflowTemplates
@ProcessElement
public void processElement(ProcessContext c) {
try {
KV<String, String> kv = c.element();
String filePath = GcsUtil.joinPath(importDirectory.get(), kv.getValue());
MatchResult match = FileSystems.match(filePath, EmptyMatchTreatment.DISALLOW);
ResourceId resourceId = match.metadata().get(0).resourceId();
TableManifest.Builder builder = TableManifest.newBuilder();
try (InputStream stream =
Channels.newInputStream(FileSystems.open(resourceId))) {
Reader reader = new InputStreamReader(stream);
JsonFormat.parser().merge(reader, builder);
}
c.output(KV.of(kv.getKey(), builder.build()));
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}));
代码示例来源:origin: GoogleCloudPlatform/DataflowTemplates
@ProcessElement
public void processElement(ProcessContext context) {
ResourceId inputFile = context.element().resourceId();
Compression compression = compressionValue.get();
// Add the compression extension to the output filename. Example: demo.txt -> demo.txt.gz
String outputFilename = inputFile.getFilename() + compression.getSuggestedSuffix();
// Resolve the necessary resources to perform the transfer
ResourceId outputDir = FileSystems.matchNewResource(destinationLocation.get(), true);
ResourceId outputFile =
outputDir.resolve(outputFilename, StandardResolveOptions.RESOLVE_FILE);
ResourceId tempFile =
outputDir.resolve("temp-" + outputFilename, StandardResolveOptions.RESOLVE_FILE);
// Perform the copy of the compressed channel to the destination.
try (ReadableByteChannel readerChannel = FileSystems.open(inputFile)) {
try (WritableByteChannel writerChannel =
compression.writeCompressed(FileSystems.create(tempFile, MimeTypes.BINARY))) {
// Execute the copy to the temporary file
ByteStreams.copy(readerChannel, writerChannel);
}
// Rename the temporary file to the output file
FileSystems.rename(ImmutableList.of(tempFile), ImmutableList.of(outputFile));
// Output the path to the uncompressed file
context.output(outputFile.toString());
} catch (IOException e) {
LOG.error("Error occurred during compression of {}", inputFile.toString(), e);
context.output(DEADLETTER_TAG, KV.of(inputFile.toString(), e.getMessage()));
}
}
}
代码示例来源:origin: org.apache.beam/beam-sdks-java-core
@Override
protected final boolean startImpl() throws IOException {
FileBasedSource<T> source = getCurrentSource();
this.channel = FileSystems.open(source.getSingleFileMetadata().resourceId());
if (channel instanceof SeekableByteChannel) {
SeekableByteChannel seekChannel = (SeekableByteChannel) channel;
seekChannel.position(source.getStartOffset());
} else {
// Channel is not seekable. Must not be a subrange.
checkArgument(
source.mode != Mode.SINGLE_FILE_OR_SUBRANGE,
"Subrange-based sources must only be defined for file types that support seekable "
+ " read channels");
checkArgument(
source.getStartOffset() == 0,
"Start offset %s is not zero but channel for reading the file is not seekable.",
source.getStartOffset());
}
startReading(channel);
// Advance once to load the first record.
return advanceImpl();
}
内容来源于网络,如有侵权,请联系作者删除!