本文整理了Java中org.apache.beam.sdk.io.FileSystems.setDefaultPipelineOptions()
方法的一些代码示例,展示了FileSystems.setDefaultPipelineOptions()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。FileSystems.setDefaultPipelineOptions()
方法的具体详情如下:
包路径:org.apache.beam.sdk.io.FileSystems
类名称:FileSystems
方法名:setDefaultPipelineOptions
[英]Sets the default configuration in workers.
It will be used in FileSystemRegistrar for all schemes.
This is expected only to be used by runners after Pipeline.run, or in tests.
[中]在workers中设置默认配置。
它将在所有方案的FileSystemRegistrar中使用。
这仅适用于管道后的流道。运行,或在测试中运行。
代码示例来源:origin: GoogleCloudPlatform/java-docs-samples
public static void runAvroToCsv(SampleOptions options)
throws IOException, IllegalArgumentException {
FileSystems.setDefaultPipelineOptions(options);
// Get Avro Schema
String schemaJson = getSchema(options.getAvroSchema());
Schema schema = new Schema.Parser().parse(schemaJson);
// Check schema field types before starting the Dataflow job
checkFieldTypes(schema);
// Create the Pipeline object with the options we defined above.
Pipeline pipeline = Pipeline.create(options);
// Convert Avro To CSV
pipeline.apply("Read Avro files",
AvroIO.readGenericRecords(schemaJson).from(options.getInputFile()))
.apply("Convert Avro to CSV formatted data",
ParDo.of(new ConvertAvroToCsv(schemaJson, options.getCsvDelimiter())))
.apply("Write CSV formatted data", TextIO.write().to(options.getOutput())
.withSuffix(".csv"));
// Run the pipeline.
pipeline.run().waitUntilFinish();
}
代码示例来源:origin: GoogleCloudPlatform/java-docs-samples
public static void runCsvToAvro(SampleOptions options)
throws IOException, IllegalArgumentException {
FileSystems.setDefaultPipelineOptions(options);
// Get Avro Schema
String schemaJson = getSchema(options.getAvroSchema());
Schema schema = new Schema.Parser().parse(schemaJson);
// Check schema field types before starting the Dataflow job
checkFieldTypes(schema);
// Create the Pipeline object with the options we defined above.
Pipeline pipeline = Pipeline.create(options);
// Convert CSV to Avro
pipeline.apply("Read CSV files", TextIO.read().from(options.getInputFile()))
.apply("Convert CSV to Avro formatted data",
ParDo.of(new ConvertCsvToAvro(schemaJson, options.getCsvDelimiter())))
.setCoder(AvroCoder.of(GenericRecord.class, schema))
.apply("Write Avro formatted data", AvroIO.writeGenericRecords(schemaJson)
.to(options.getOutput()).withCodec(CodecFactory.snappyCodec()).withSuffix(".avro"));
// Run the pipeline.
pipeline.run().waitUntilFinish();
}
代码示例来源:origin: com.spotify/scio-core
/**
* Create a new {@link RemoteFileUtil} instance.
*/
public static RemoteFileUtil create(PipelineOptions options) {
FileSystems.setDefaultPipelineOptions(options);
return new RemoteFileUtil();
}
代码示例来源:origin: org.apache.beam/beam-runners-core-construction-java
public SerializablePipelineOptions(PipelineOptions options) {
this.serializedPipelineOptions = serializeToJson(options);
this.options = options;
FileSystems.setDefaultPipelineOptions(options);
}
代码示例来源:origin: org.apache.beam/beam-runners-core-construction-java
private void readObject(ObjectInputStream is) throws IOException, ClassNotFoundException {
is.defaultReadObject();
this.options = deserializeFromJson(serializedPipelineOptions);
// TODO https://issues.apache.org/jira/browse/BEAM-2712: remove this call.
FileSystems.setDefaultPipelineOptions(options);
}
代码示例来源:origin: org.apache.beam/beam-runners-flink_2.11
public static void main(String[] args) throws Exception {
//TODO: Expose the fileSystem related options.
// Register standard file systems.
FileSystems.setDefaultPipelineOptions(PipelineOptionsFactory.create());
fromParams(args).run();
}
代码示例来源:origin: org.apache.beam/beam-runners-flink_2.10
@Override
public void setup(
StreamTask<?, ?> containingTask,
StreamConfig config,
Output<StreamRecord<WindowedValue<OutputT>>> output) {
// make sure that FileSystems is initialized correctly
FlinkPipelineOptions options =
serializedOptions.get().as(FlinkPipelineOptions.class);
FileSystems.setDefaultPipelineOptions(options);
super.setup(containingTask, config, output);
}
代码示例来源:origin: org.apache.beam/beam-runners-flink
public static void main(String[] args) throws Exception {
//TODO: Expose the fileSystem related options.
// Register standard file systems.
FileSystems.setDefaultPipelineOptions(PipelineOptionsFactory.create());
fromParams(args).run();
}
代码示例来源:origin: org.apache.beam/beam-runners-flink
@Override
public void setup(
StreamTask<?, ?> containingTask,
StreamConfig config,
Output<StreamRecord<WindowedValue<OutputT>>> output) {
// make sure that FileSystems is initialized correctly
FlinkPipelineOptions options = serializedOptions.get().as(FlinkPipelineOptions.class);
FileSystems.setDefaultPipelineOptions(options);
super.setup(containingTask, config, output);
}
代码示例来源:origin: org.apache.beam/beam-runners-flink_2.11
@Override
public void setup(
StreamTask<?, ?> containingTask,
StreamConfig config,
Output<StreamRecord<WindowedValue<OutputT>>> output) {
// make sure that FileSystems is initialized correctly
FlinkPipelineOptions options = serializedOptions.get().as(FlinkPipelineOptions.class);
FileSystems.setDefaultPipelineOptions(options);
super.setup(containingTask, config, output);
}
代码示例来源:origin: spotify/dbeam
Optional<String> readPassword(DBeamPipelineOptions options) throws IOException {
FileSystems.setDefaultPipelineOptions(options);
if (options.getPasswordFileKmsEncrypted() != null) {
LOGGER.info("Decrypting password using KMS...");
return Optional.of(kmsDecrypter.decrypt(readFromFile(options.getPasswordFileKmsEncrypted())));
} else if (options.getPasswordFile() != null) {
return Optional.of(readFromFile(options.getPasswordFile()));
} else {
return Optional.ofNullable(options.getPassword());
}
}
代码示例来源:origin: GoogleCloudPlatform/cloud-bigtable-client
public static void main(String[] args) throws Exception {
PipelineOptionsFactory.register(CreateTableOpts.class);
CreateTableOpts opts = PipelineOptionsFactory
.fromArgs(args).withValidation()
.as(CreateTableOpts.class);
FileSystems.setDefaultPipelineOptions(opts);
createTable(opts);
}
代码示例来源:origin: org.apache.beam/beam-sdks-java-extensions-google-cloud-platform-core
@Test
@Ignore("https://issues.apache.org/jira/browse/BEAM-4143")
public void testResourceIdTester() throws Exception {
FileSystems.setDefaultPipelineOptions(PipelineOptionsFactory.create());
ResourceIdTester.runResourceIdBattery(toResourceIdentifier("gs://bucket/foo/"));
}
代码示例来源:origin: org.apache.beam/beam-runners-google-cloud-dataflow-java
private static DataflowPipelineOptions buildPipelineOptions() throws IOException {
GcsUtil mockGcsUtil = mock(GcsUtil.class);
when(mockGcsUtil.expand(any(GcsPath.class)))
.then(invocation -> ImmutableList.of((GcsPath) invocation.getArguments()[0]));
when(mockGcsUtil.bucketAccessible(any(GcsPath.class))).thenReturn(true);
DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);
options.setRunner(DataflowRunner.class);
options.setGcpCredential(new TestCredential());
options.setJobName("some-job-name");
options.setProject("some-project");
options.setRegion("some-region");
options.setTempLocation(GcsPath.fromComponents("somebucket", "some/path").toString());
options.setFilesToStage(new ArrayList<>());
options.setDataflowClient(buildMockDataflow(new IsValidCreateRequest()));
options.setGcsUtil(mockGcsUtil);
// Enable the FileSystems API to know about gs:// URIs in this test.
FileSystems.setDefaultPipelineOptions(options);
return options;
}
代码示例来源:origin: org.apache.beam/beam-runners-google-cloud-dataflow-java
@Before
public void setUp() {
MockitoAnnotations.initMocks(this);
GcsOptions pipelineOptions = PipelineOptionsFactory.as(GcsOptions.class);
pipelineOptions.setGcsUtil(mockGcsUtil);
FileSystems.setDefaultPipelineOptions(pipelineOptions);
createOptions = StandardCreateOptions.builder().setMimeType(MimeTypes.BINARY).build();
defaultPackageUtil = PackageUtil.withDefaultThreadPool();
}
代码示例来源:origin: org.apache.beam/beam-runners-google-cloud-dataflow-java
@Test
public void testDefaultToGcpTempLocation() {
DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);
FileSystems.setDefaultPipelineOptions(options);
options.setPathValidatorClass(NoopPathValidator.class);
options.setTempLocation("gs://temp_location/");
options.setGcpTempLocation("gs://gcp_temp_location/");
assertEquals("gs://gcp_temp_location/staging/", options.getStagingLocation());
}
代码示例来源:origin: org.apache.beam/beam-runners-google-cloud-dataflow-java
@Test
public void testDefaultToTempLocation() {
DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);
FileSystems.setDefaultPipelineOptions(options);
options.setPathValidatorClass(NoopPathValidator.class);
options.setTempLocation("gs://temp_location/");
assertEquals("gs://temp_location/", options.getGcpTempLocation());
assertEquals("gs://temp_location/staging/", options.getStagingLocation());
}
代码示例来源:origin: org.apache.beam/beam-runners-google-cloud-dataflow-java
private Pipeline buildDataflowPipeline(DataflowPipelineOptions options) {
options.setStableUniqueNames(CheckEnabled.ERROR);
options.setRunner(DataflowRunner.class);
Pipeline p = Pipeline.create(options);
p.apply("ReadMyFile", TextIO.read().from("gs://bucket/object"))
.apply("WriteMyFile", TextIO.write().to("gs://bucket/object"));
// Enable the FileSystems API to know about gs:// URIs in this test.
FileSystems.setDefaultPipelineOptions(options);
return p;
}
代码示例来源:origin: org.apache.beam/beam-runners-google-cloud-dataflow-java
private DataflowPipelineOptions buildPipelineOptions() throws IOException {
DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);
options.setRunner(DataflowRunner.class);
options.setProject(PROJECT_ID);
options.setTempLocation(VALID_TEMP_BUCKET);
options.setRegion(REGION_ID);
// Set FILES_PROPERTY to empty to prevent a default value calculated from classpath.
options.setFilesToStage(new ArrayList<>());
options.setDataflowClient(buildMockDataflow());
options.setGcsUtil(mockGcsUtil);
options.setGcpCredential(new TestCredential());
// Configure the FileSystem registrar to use these options.
FileSystems.setDefaultPipelineOptions(options);
return options;
}
代码示例来源:origin: org.apache.beam/beam-runners-google-cloud-dataflow-java
@Test
public void testInvalidNumberOfWorkerHarnessThreads() throws IOException {
DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);
FileSystems.setDefaultPipelineOptions(options);
options.setRunner(DataflowRunner.class);
options.setProject("foo-12345");
options.setGcpTempLocation(VALID_TEMP_BUCKET);
options.setGcsUtil(mockGcsUtil);
options.as(DataflowPipelineDebugOptions.class).setNumberOfWorkerHarnessThreads(-1);
thrown.expect(IllegalArgumentException.class);
thrown.expectMessage("Number of worker harness threads");
thrown.expectMessage("Please make sure the value is non-negative.");
DataflowRunner.fromOptions(options);
}
内容来源于网络,如有侵权,请联系作者删除!