com.hazelcast.jet.pipeline.Pipeline.drawFrom()方法的使用及代码示例

x33g5p2x  于2022-01-26 转载在 其他  
字(8.2k)|赞(0)|评价(0)|浏览(269)

本文整理了Java中com.hazelcast.jet.pipeline.Pipeline.drawFrom()方法的一些代码示例,展示了Pipeline.drawFrom()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Pipeline.drawFrom()方法的具体详情如下:
包路径:com.hazelcast.jet.pipeline.Pipeline
类名称:Pipeline
方法名:drawFrom

Pipeline.drawFrom介绍

[英]Returns a pipeline stage that represents a bounded (batch) data source. It has no upstream stages and emits the data (typically coming from an outside source) to its downstream stages.
[中]返回表示有界(批处理)数据源的管道阶段。它没有上游级,将数据(通常来自外部源)发送到下游级。

代码示例

代码示例来源:origin: hazelcast/hazelcast-jet-demos

public static Pipeline build() {
  Pipeline p = Pipeline.create();
  // Palladium and Platinum only
  p.drawFrom(Sources.<String, Object>mapJournal(
      Constants.IMAP_NAME_PRECIOUS, JournalInitialPosition.START_FROM_OLDEST)
  ).map(e -> e.getKey() + "==" + e.getValue())
   .filter(str -> str.toLowerCase().startsWith("p"))
   .drainTo(Sinks.logger())
  ;
  return p;
}

代码示例来源:origin: hazelcast/hazelcast-jet-code-samples

private static Pipeline buildPipeline() {
  Pattern delimiter = Pattern.compile("\\W+");
  Pipeline p = Pipeline.create();
  p.drawFrom(Sources.<Long, String>map(BOOK_LINES))
   .flatMap(e -> traverseArray(delimiter.split(e.getValue().toLowerCase())))
   .filter(word -> !word.isEmpty())
   .groupingKey(wholeItem())
   .aggregate(counting())
   .drainTo(Sinks.map(COUNTS));
  return p;
}

代码示例来源:origin: hazelcast/hazelcast-jet-code-samples

public static Pipeline buildPipeline(String sourceName, String sinkName) {
  Pattern pattern = Pattern.compile("\\W+");
  Pipeline pipeline = Pipeline.create();
  pipeline.drawFrom(Sources.<Integer, String>map(sourceName))
      .flatMap(e -> Traversers.traverseArray(pattern.split(e.getValue().toLowerCase()))
                  .filter(w -> !w.isEmpty()))
      .groupingKey(wholeItem())
      .aggregate(counting())
      .drainTo(Sinks.map(sinkName));
  return pipeline;
}

代码示例来源:origin: hazelcast/hazelcast-jet-code-samples

/**
 * This will take the contents of source map and writes it into the sink map.
 */
private static Pipeline mapSourceAndSink(String sourceMapName, String sinkMapName) {
  Pipeline pipeline = Pipeline.create();
  pipeline.drawFrom(Sources.map(sourceMapName))
      .drainTo(Sinks.map(sinkMapName));
  return pipeline;
}

代码示例来源:origin: hazelcast/hazelcast-jet-code-samples

private static Pipeline buildPipeline(String connectionUrl) {
  Pipeline p = Pipeline.create();
  p.drawFrom(Sources.jdbc(connectionUrl,
      "SELECT * FROM " + TABLE_NAME,
      resultSet -> new User(resultSet.getInt(1), resultSet.getString(2))))
   .map(user -> Util.entry(user.getId(), user))
   .drainTo(Sinks.map(MAP_NAME));
  return p;
}

代码示例来源:origin: hazelcast/hazelcast-jet-code-samples

private static Pipeline buildPipeline() {
  Pipeline p = Pipeline.create();
  p.drawFrom(Sources.files(getBooksPath()))
   .filter(line -> line.startsWith("The "))
   .drainTo(buildTopicSink());
  return p;
}

代码示例来源:origin: hazelcast/hazelcast-jet-code-samples

private static Pipeline buildPipeline(JobConf jobConfig) {
  Pipeline p = Pipeline.create();
  p.drawFrom(HdfsSources.<AvroWrapper<User>, NullWritable>hdfs(jobConfig))
   .filter(entry -> entry.getKey().datum().get(3).equals(Boolean.TRUE))
   .peek(entry -> entry.getKey().datum().toString())
   .drainTo(HdfsSinks.hdfs(jobConfig));
  return p;
}

代码示例来源:origin: hazelcast/hazelcast-jet-code-samples

private Pipeline buildPipeline() {
  Pipeline p = Pipeline.create();
  p.drawFrom(KafkaSources.kafka(brokerProperties(), TOPIC))
   .drainTo(Sinks.logger());
  return p;
}

代码示例来源:origin: hazelcast/hazelcast-jet-code-samples

private static Pipeline buildPipeline() {
  Pipeline p = Pipeline.create();
  p.drawFrom(Sources.<String, User>map(MAP_NAME))
   .map(Map.Entry::getValue)
   .drainTo(AvroSinks.files(DIRECTORY_NAME, AvroSink::schemaForUser, User.class));
  return p;
}

代码示例来源:origin: hazelcast/hazelcast-jet-code-samples

public static void main(String[] args) {
    ApplicationContext context = new AnnotationConfigApplicationContext(AppConfig.class);
    JetInstance jet = context.getBean(JetInstance.class);

    Pipeline pipeline = Pipeline.create();
    pipeline.drawFrom(CustomSourceP.customSource())
        .drainTo(Sinks.logger());

    JobConfig jobConfig = new JobConfig()
        .addClass(AnnotationBasedConfigurationSample.class)
        .addClass(CustomSourceP.class);
    jet.newJob(pipeline, jobConfig).join();

    jet.shutdown();
  }
}

代码示例来源:origin: hazelcast/hazelcast-jet-code-samples

@RequestMapping("/submitJob")
public void submitJob() {
  Pipeline pipeline = Pipeline.create();
  pipeline.drawFrom(CustomSourceP.customSource())
      .drainTo(Sinks.logger());
  JobConfig jobConfig = new JobConfig()
      .addClass(SpringBootSample.class)
      .addClass(CustomSourceP.class);
  instance.newJob(pipeline, jobConfig).join();
}

代码示例来源:origin: hazelcast/hazelcast-jet-code-samples

private static Pipeline buildPipeline() {
  Pipeline p = Pipeline.create();
  p.drawFrom(Sources.<Trade, Integer, Trade>mapJournal(TRADES_MAP_NAME,
      DistributedPredicate.alwaysTrue(), EventJournalMapEvent::getNewValue, START_FROM_CURRENT))
   .groupingKey(Trade::getTicker)
   .rollingAggregate(summingLong(Trade::getPrice))
   .drainTo(Sinks.map(VOLUME_MAP_NAME));
  return p;
}

代码示例来源:origin: hazelcast/hazelcast-jet-code-samples

private static Pipeline buildPipeline() {
  Pipeline p = Pipeline.create();
  p.drawFrom(Sources.<Entry<String, Integer>, Integer, Entry<String, Integer>>mapJournal(TRADES_MAP_NAME,
      DistributedPredicate.alwaysTrue(), EventJournalMapEvent::getNewValue, START_FROM_CURRENT))
   .groupingKey(Entry::getKey)
   .rollingAggregate(summingLong(Entry::getValue))
   .drainTo(Sinks.map(VOLUME_MAP_NAME));
  return p;
}

代码示例来源:origin: hazelcast/hazelcast-jet-code-samples

private static Pipeline buildPipeline() {
  Pipeline p = Pipeline.create();
  p.drawFrom(Sources.<Trade, Integer, Trade>mapJournal(TRADES_MAP_NAME,
      DistributedPredicate.alwaysTrue(), EventJournalMapEvent::getNewValue, START_FROM_CURRENT))
   .addTimestamps(Trade::getTime, 3000)
   .groupingKey(Trade::getTicker)
   .window(WindowDefinition.sliding(SLIDING_WINDOW_LENGTH_MILLIS, SLIDE_STEP_MILLIS))
   .aggregate(counting(),
       (winStart, winEnd, key, result) -> String.format("%s %5s %4d", toLocalTime(winEnd), key, result))
   .drainTo(Sinks.logger());
  return p;
}

代码示例来源:origin: hazelcast/hazelcast-jet-code-samples

private static Pipeline buildPipeline() {
  Pipeline p = Pipeline.create();
  p.drawFrom(AvroSources.filesBuilder(AvroSink.DIRECTORY_NAME, ReflectDatumReader<User>::new)
      //Both Jet members share the same local file system
      .sharedFileSystem(true)
      .build())
   .map(user -> Util.entry(user.getUsername(), user))
   .drainTo(Sinks.map(AvroSink.MAP_NAME));
  return p;
}

代码示例来源:origin: hazelcast/hazelcast-jet-code-samples

@SuppressWarnings("Convert2MethodRef") // https://bugs.openjdk.java.net/browse/JDK-8154236
private static Pipeline aggregate() {
  Pipeline p = Pipeline.create();
  p.drawFrom(Sources.<PageVisit, Integer, PageVisit>mapJournal(PAGE_VISIT,
      mapPutEvents(), mapEventNewValue(), START_FROM_OLDEST))
   .addTimestamps(pv -> pv.timestamp(), 100)
   .window(sliding(10, 1))
   .aggregate(counting())
   .drainTo(Sinks.logger());
  return p;
}

代码示例来源:origin: hazelcast/hazelcast-jet-code-samples

private static Pipeline buildPipeline(String sourceDir, String targetDir) {
  Pipeline p = Pipeline.create();
  p.drawFrom(Sources.files(sourceDir))
   .map(LogLine::parse)
   .filter((LogLine log) -> log.getResponseCode() >= 200 && log.getResponseCode() < 400)
   .flatMap(AccessLogAnalyzer::explodeSubPaths)
   .groupingKey(wholeItem())
   .aggregate(counting())
   .drainTo(Sinks.files(targetDir));
  return p;
}

代码示例来源:origin: hazelcast/hazelcast-jet-code-samples

@SuppressWarnings("Convert2MethodRef") // https://bugs.openjdk.java.net/browse/JDK-8154236
private static Pipeline groupAndAggregate() {
  Pipeline p = Pipeline.create();
  p.drawFrom(Sources.<PageVisit, Integer, PageVisit>mapJournal(PAGE_VISIT,
      mapPutEvents(), mapEventNewValue(), START_FROM_OLDEST))
   .addTimestamps(pv -> pv.timestamp(), 100)
   .window(sliding(10, 1))
   .groupingKey(pv -> pv.userId())
   .aggregate(toList())
   .drainTo(Sinks.logger());
  return p;
}

代码示例来源:origin: hazelcast/hazelcast-jet

private Job newJob() {
  Pipeline p = Pipeline.create();
  p.drawFrom(Sources.mapJournal(SOURCE_NAME, START_FROM_OLDEST))
      .withoutTimestamps()
      .drainTo(Sinks.list(SINK_NAME));
  return jet.newJob(p, new JobConfig().setName("job-infinite-pipeline"));
}

代码示例来源:origin: hazelcast/hazelcast-jet

@Test(timeout = 20000)
public void test() {
  Pipeline p = Pipeline.create();
  p.drawFrom(Sources.batchFromProcessor("source", preferLocalParallelismOne(CustomSourceP::new)))
   .drainTo(Sinks.fromProcessor("sink", preferLocalParallelismOne(CustomSinkP::new)));
  jetInstance.newJob(p).join();
}

相关文章