org.apache.beam.sdk.io.FileSystems.setDefaultPipelineOptions()方法的使用及代码示例

x33g5p2x  于2022-01-19 转载在 其他  
字(10.8k)|赞(0)|评价(0)|浏览(154)

本文整理了Java中org.apache.beam.sdk.io.FileSystems.setDefaultPipelineOptions()方法的一些代码示例,展示了FileSystems.setDefaultPipelineOptions()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。FileSystems.setDefaultPipelineOptions()方法的具体详情如下:
包路径:org.apache.beam.sdk.io.FileSystems
类名称:FileSystems
方法名:setDefaultPipelineOptions

FileSystems.setDefaultPipelineOptions介绍

[英]Sets the default configuration in workers.

It will be used in FileSystemRegistrar for all schemes.

This is expected only to be used by runners after Pipeline.run, or in tests.
[中]在workers中设置默认配置。
它将在所有方案的FileSystemRegistrar中使用。
这仅适用于管道后的流道。运行,或在测试中运行。

代码示例

代码示例来源:origin: GoogleCloudPlatform/java-docs-samples

public static void runAvroToCsv(SampleOptions options)
  throws IOException, IllegalArgumentException {
 FileSystems.setDefaultPipelineOptions(options);
 // Get Avro Schema
 String schemaJson = getSchema(options.getAvroSchema());
 Schema schema = new Schema.Parser().parse(schemaJson);
 // Check schema field types before starting the Dataflow job
 checkFieldTypes(schema);
 // Create the Pipeline object with the options we defined above.
 Pipeline pipeline = Pipeline.create(options);
 // Convert Avro To CSV
 pipeline.apply("Read Avro files",
   AvroIO.readGenericRecords(schemaJson).from(options.getInputFile()))
   .apply("Convert Avro to CSV formatted data",
     ParDo.of(new ConvertAvroToCsv(schemaJson, options.getCsvDelimiter())))
   .apply("Write CSV formatted data", TextIO.write().to(options.getOutput())
     .withSuffix(".csv"));
 // Run the pipeline.
 pipeline.run().waitUntilFinish();
}

代码示例来源:origin: GoogleCloudPlatform/java-docs-samples

public static void runCsvToAvro(SampleOptions options)
  throws IOException, IllegalArgumentException {
 FileSystems.setDefaultPipelineOptions(options);
 // Get Avro Schema
 String schemaJson = getSchema(options.getAvroSchema());
 Schema schema = new Schema.Parser().parse(schemaJson);
 // Check schema field types before starting the Dataflow job
 checkFieldTypes(schema);
 // Create the Pipeline object with the options we defined above.
 Pipeline pipeline = Pipeline.create(options);
 // Convert CSV to Avro
 pipeline.apply("Read CSV files", TextIO.read().from(options.getInputFile()))
   .apply("Convert CSV to Avro formatted data",
     ParDo.of(new ConvertCsvToAvro(schemaJson, options.getCsvDelimiter())))
   .setCoder(AvroCoder.of(GenericRecord.class, schema))
   .apply("Write Avro formatted data", AvroIO.writeGenericRecords(schemaJson)
     .to(options.getOutput()).withCodec(CodecFactory.snappyCodec()).withSuffix(".avro"));
 // Run the pipeline.
 pipeline.run().waitUntilFinish();
}

代码示例来源:origin: com.spotify/scio-core

/**
 * Create a new {@link RemoteFileUtil} instance.
 */
public static RemoteFileUtil create(PipelineOptions options) {
 FileSystems.setDefaultPipelineOptions(options);
 return new RemoteFileUtil();
}

代码示例来源:origin: org.apache.beam/beam-runners-core-construction-java

public SerializablePipelineOptions(PipelineOptions options) {
 this.serializedPipelineOptions = serializeToJson(options);
 this.options = options;
 FileSystems.setDefaultPipelineOptions(options);
}

代码示例来源:origin: org.apache.beam/beam-runners-core-construction-java

private void readObject(ObjectInputStream is) throws IOException, ClassNotFoundException {
 is.defaultReadObject();
 this.options = deserializeFromJson(serializedPipelineOptions);
 // TODO https://issues.apache.org/jira/browse/BEAM-2712: remove this call.
 FileSystems.setDefaultPipelineOptions(options);
}

代码示例来源:origin: org.apache.beam/beam-runners-flink_2.11

public static void main(String[] args) throws Exception {
 //TODO: Expose the fileSystem related options.
 // Register standard file systems.
 FileSystems.setDefaultPipelineOptions(PipelineOptionsFactory.create());
 fromParams(args).run();
}

代码示例来源:origin: org.apache.beam/beam-runners-flink_2.10

@Override
public void setup(
  StreamTask<?, ?> containingTask,
  StreamConfig config,
  Output<StreamRecord<WindowedValue<OutputT>>> output) {
 // make sure that FileSystems is initialized correctly
 FlinkPipelineOptions options =
   serializedOptions.get().as(FlinkPipelineOptions.class);
 FileSystems.setDefaultPipelineOptions(options);
 super.setup(containingTask, config, output);
}

代码示例来源:origin: org.apache.beam/beam-runners-flink

public static void main(String[] args) throws Exception {
 //TODO: Expose the fileSystem related options.
 // Register standard file systems.
 FileSystems.setDefaultPipelineOptions(PipelineOptionsFactory.create());
 fromParams(args).run();
}

代码示例来源:origin: org.apache.beam/beam-runners-flink

@Override
public void setup(
  StreamTask<?, ?> containingTask,
  StreamConfig config,
  Output<StreamRecord<WindowedValue<OutputT>>> output) {
 // make sure that FileSystems is initialized correctly
 FlinkPipelineOptions options = serializedOptions.get().as(FlinkPipelineOptions.class);
 FileSystems.setDefaultPipelineOptions(options);
 super.setup(containingTask, config, output);
}

代码示例来源:origin: org.apache.beam/beam-runners-flink_2.11

@Override
public void setup(
  StreamTask<?, ?> containingTask,
  StreamConfig config,
  Output<StreamRecord<WindowedValue<OutputT>>> output) {
 // make sure that FileSystems is initialized correctly
 FlinkPipelineOptions options = serializedOptions.get().as(FlinkPipelineOptions.class);
 FileSystems.setDefaultPipelineOptions(options);
 super.setup(containingTask, config, output);
}

代码示例来源:origin: spotify/dbeam

Optional<String> readPassword(DBeamPipelineOptions options) throws IOException {
 FileSystems.setDefaultPipelineOptions(options);
 if (options.getPasswordFileKmsEncrypted() != null) {
  LOGGER.info("Decrypting password using KMS...");
  return Optional.of(kmsDecrypter.decrypt(readFromFile(options.getPasswordFileKmsEncrypted())));
 } else if (options.getPasswordFile() != null) {
  return Optional.of(readFromFile(options.getPasswordFile()));
 } else {
  return Optional.ofNullable(options.getPassword());
 }
}

代码示例来源:origin: GoogleCloudPlatform/cloud-bigtable-client

public static void main(String[] args) throws Exception {
 PipelineOptionsFactory.register(CreateTableOpts.class);
 CreateTableOpts opts = PipelineOptionsFactory
   .fromArgs(args).withValidation()
   .as(CreateTableOpts.class);
 FileSystems.setDefaultPipelineOptions(opts);
 createTable(opts);
}

代码示例来源:origin: org.apache.beam/beam-sdks-java-extensions-google-cloud-platform-core

@Test
@Ignore("https://issues.apache.org/jira/browse/BEAM-4143")
public void testResourceIdTester() throws Exception {
 FileSystems.setDefaultPipelineOptions(PipelineOptionsFactory.create());
 ResourceIdTester.runResourceIdBattery(toResourceIdentifier("gs://bucket/foo/"));
}

代码示例来源:origin: org.apache.beam/beam-runners-google-cloud-dataflow-java

private static DataflowPipelineOptions buildPipelineOptions() throws IOException {
 GcsUtil mockGcsUtil = mock(GcsUtil.class);
 when(mockGcsUtil.expand(any(GcsPath.class)))
   .then(invocation -> ImmutableList.of((GcsPath) invocation.getArguments()[0]));
 when(mockGcsUtil.bucketAccessible(any(GcsPath.class))).thenReturn(true);
 DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);
 options.setRunner(DataflowRunner.class);
 options.setGcpCredential(new TestCredential());
 options.setJobName("some-job-name");
 options.setProject("some-project");
 options.setRegion("some-region");
 options.setTempLocation(GcsPath.fromComponents("somebucket", "some/path").toString());
 options.setFilesToStage(new ArrayList<>());
 options.setDataflowClient(buildMockDataflow(new IsValidCreateRequest()));
 options.setGcsUtil(mockGcsUtil);
 // Enable the FileSystems API to know about gs:// URIs in this test.
 FileSystems.setDefaultPipelineOptions(options);
 return options;
}

代码示例来源:origin: org.apache.beam/beam-runners-google-cloud-dataflow-java

@Before
public void setUp() {
 MockitoAnnotations.initMocks(this);
 GcsOptions pipelineOptions = PipelineOptionsFactory.as(GcsOptions.class);
 pipelineOptions.setGcsUtil(mockGcsUtil);
 FileSystems.setDefaultPipelineOptions(pipelineOptions);
 createOptions = StandardCreateOptions.builder().setMimeType(MimeTypes.BINARY).build();
 defaultPackageUtil = PackageUtil.withDefaultThreadPool();
}

代码示例来源:origin: org.apache.beam/beam-runners-google-cloud-dataflow-java

@Test
public void testDefaultToGcpTempLocation() {
 DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);
 FileSystems.setDefaultPipelineOptions(options);
 options.setPathValidatorClass(NoopPathValidator.class);
 options.setTempLocation("gs://temp_location/");
 options.setGcpTempLocation("gs://gcp_temp_location/");
 assertEquals("gs://gcp_temp_location/staging/", options.getStagingLocation());
}

代码示例来源:origin: org.apache.beam/beam-runners-google-cloud-dataflow-java

@Test
public void testDefaultToTempLocation() {
 DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);
 FileSystems.setDefaultPipelineOptions(options);
 options.setPathValidatorClass(NoopPathValidator.class);
 options.setTempLocation("gs://temp_location/");
 assertEquals("gs://temp_location/", options.getGcpTempLocation());
 assertEquals("gs://temp_location/staging/", options.getStagingLocation());
}

代码示例来源:origin: org.apache.beam/beam-runners-google-cloud-dataflow-java

private Pipeline buildDataflowPipeline(DataflowPipelineOptions options) {
 options.setStableUniqueNames(CheckEnabled.ERROR);
 options.setRunner(DataflowRunner.class);
 Pipeline p = Pipeline.create(options);
 p.apply("ReadMyFile", TextIO.read().from("gs://bucket/object"))
   .apply("WriteMyFile", TextIO.write().to("gs://bucket/object"));
 // Enable the FileSystems API to know about gs:// URIs in this test.
 FileSystems.setDefaultPipelineOptions(options);
 return p;
}

代码示例来源:origin: org.apache.beam/beam-runners-google-cloud-dataflow-java

private DataflowPipelineOptions buildPipelineOptions() throws IOException {
 DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);
 options.setRunner(DataflowRunner.class);
 options.setProject(PROJECT_ID);
 options.setTempLocation(VALID_TEMP_BUCKET);
 options.setRegion(REGION_ID);
 // Set FILES_PROPERTY to empty to prevent a default value calculated from classpath.
 options.setFilesToStage(new ArrayList<>());
 options.setDataflowClient(buildMockDataflow());
 options.setGcsUtil(mockGcsUtil);
 options.setGcpCredential(new TestCredential());
 // Configure the FileSystem registrar to use these options.
 FileSystems.setDefaultPipelineOptions(options);
 return options;
}

代码示例来源:origin: org.apache.beam/beam-runners-google-cloud-dataflow-java

@Test
public void testInvalidNumberOfWorkerHarnessThreads() throws IOException {
 DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);
 FileSystems.setDefaultPipelineOptions(options);
 options.setRunner(DataflowRunner.class);
 options.setProject("foo-12345");
 options.setGcpTempLocation(VALID_TEMP_BUCKET);
 options.setGcsUtil(mockGcsUtil);
 options.as(DataflowPipelineDebugOptions.class).setNumberOfWorkerHarnessThreads(-1);
 thrown.expect(IllegalArgumentException.class);
 thrown.expectMessage("Number of worker harness threads");
 thrown.expectMessage("Please make sure the value is non-negative.");
 DataflowRunner.fromOptions(options);
}

相关文章