本文整理了Java中com.hazelcast.jet.pipeline.Pipeline.drawFrom()
方法的一些代码示例,展示了Pipeline.drawFrom()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Pipeline.drawFrom()
方法的具体详情如下:
包路径:com.hazelcast.jet.pipeline.Pipeline
类名称:Pipeline
方法名:drawFrom
[英]Returns a pipeline stage that represents a bounded (batch) data source. It has no upstream stages and emits the data (typically coming from an outside source) to its downstream stages.
[中]返回表示有界(批处理)数据源的管道阶段。它没有上游级,将数据(通常来自外部源)发送到下游级。
代码示例来源:origin: hazelcast/hazelcast-jet-demos
public static Pipeline build() {
Pipeline p = Pipeline.create();
// Palladium and Platinum only
p.drawFrom(Sources.<String, Object>mapJournal(
Constants.IMAP_NAME_PRECIOUS, JournalInitialPosition.START_FROM_OLDEST)
).map(e -> e.getKey() + "==" + e.getValue())
.filter(str -> str.toLowerCase().startsWith("p"))
.drainTo(Sinks.logger())
;
return p;
}
代码示例来源:origin: hazelcast/hazelcast-jet-code-samples
private static Pipeline buildPipeline() {
Pattern delimiter = Pattern.compile("\\W+");
Pipeline p = Pipeline.create();
p.drawFrom(Sources.<Long, String>map(BOOK_LINES))
.flatMap(e -> traverseArray(delimiter.split(e.getValue().toLowerCase())))
.filter(word -> !word.isEmpty())
.groupingKey(wholeItem())
.aggregate(counting())
.drainTo(Sinks.map(COUNTS));
return p;
}
代码示例来源:origin: hazelcast/hazelcast-jet-code-samples
public static Pipeline buildPipeline(String sourceName, String sinkName) {
Pattern pattern = Pattern.compile("\\W+");
Pipeline pipeline = Pipeline.create();
pipeline.drawFrom(Sources.<Integer, String>map(sourceName))
.flatMap(e -> Traversers.traverseArray(pattern.split(e.getValue().toLowerCase()))
.filter(w -> !w.isEmpty()))
.groupingKey(wholeItem())
.aggregate(counting())
.drainTo(Sinks.map(sinkName));
return pipeline;
}
代码示例来源:origin: hazelcast/hazelcast-jet-code-samples
/**
* This will take the contents of source map and writes it into the sink map.
*/
private static Pipeline mapSourceAndSink(String sourceMapName, String sinkMapName) {
Pipeline pipeline = Pipeline.create();
pipeline.drawFrom(Sources.map(sourceMapName))
.drainTo(Sinks.map(sinkMapName));
return pipeline;
}
代码示例来源:origin: hazelcast/hazelcast-jet-code-samples
private static Pipeline buildPipeline(String connectionUrl) {
Pipeline p = Pipeline.create();
p.drawFrom(Sources.jdbc(connectionUrl,
"SELECT * FROM " + TABLE_NAME,
resultSet -> new User(resultSet.getInt(1), resultSet.getString(2))))
.map(user -> Util.entry(user.getId(), user))
.drainTo(Sinks.map(MAP_NAME));
return p;
}
代码示例来源:origin: hazelcast/hazelcast-jet-code-samples
private static Pipeline buildPipeline() {
Pipeline p = Pipeline.create();
p.drawFrom(Sources.files(getBooksPath()))
.filter(line -> line.startsWith("The "))
.drainTo(buildTopicSink());
return p;
}
代码示例来源:origin: hazelcast/hazelcast-jet-code-samples
private static Pipeline buildPipeline(JobConf jobConfig) {
Pipeline p = Pipeline.create();
p.drawFrom(HdfsSources.<AvroWrapper<User>, NullWritable>hdfs(jobConfig))
.filter(entry -> entry.getKey().datum().get(3).equals(Boolean.TRUE))
.peek(entry -> entry.getKey().datum().toString())
.drainTo(HdfsSinks.hdfs(jobConfig));
return p;
}
代码示例来源:origin: hazelcast/hazelcast-jet-code-samples
private Pipeline buildPipeline() {
Pipeline p = Pipeline.create();
p.drawFrom(KafkaSources.kafka(brokerProperties(), TOPIC))
.drainTo(Sinks.logger());
return p;
}
代码示例来源:origin: hazelcast/hazelcast-jet-code-samples
private static Pipeline buildPipeline() {
Pipeline p = Pipeline.create();
p.drawFrom(Sources.<String, User>map(MAP_NAME))
.map(Map.Entry::getValue)
.drainTo(AvroSinks.files(DIRECTORY_NAME, AvroSink::schemaForUser, User.class));
return p;
}
代码示例来源:origin: hazelcast/hazelcast-jet-code-samples
public static void main(String[] args) {
ApplicationContext context = new AnnotationConfigApplicationContext(AppConfig.class);
JetInstance jet = context.getBean(JetInstance.class);
Pipeline pipeline = Pipeline.create();
pipeline.drawFrom(CustomSourceP.customSource())
.drainTo(Sinks.logger());
JobConfig jobConfig = new JobConfig()
.addClass(AnnotationBasedConfigurationSample.class)
.addClass(CustomSourceP.class);
jet.newJob(pipeline, jobConfig).join();
jet.shutdown();
}
}
代码示例来源:origin: hazelcast/hazelcast-jet-code-samples
@RequestMapping("/submitJob")
public void submitJob() {
Pipeline pipeline = Pipeline.create();
pipeline.drawFrom(CustomSourceP.customSource())
.drainTo(Sinks.logger());
JobConfig jobConfig = new JobConfig()
.addClass(SpringBootSample.class)
.addClass(CustomSourceP.class);
instance.newJob(pipeline, jobConfig).join();
}
代码示例来源:origin: hazelcast/hazelcast-jet-code-samples
private static Pipeline buildPipeline() {
Pipeline p = Pipeline.create();
p.drawFrom(Sources.<Trade, Integer, Trade>mapJournal(TRADES_MAP_NAME,
DistributedPredicate.alwaysTrue(), EventJournalMapEvent::getNewValue, START_FROM_CURRENT))
.groupingKey(Trade::getTicker)
.rollingAggregate(summingLong(Trade::getPrice))
.drainTo(Sinks.map(VOLUME_MAP_NAME));
return p;
}
代码示例来源:origin: hazelcast/hazelcast-jet-code-samples
private static Pipeline buildPipeline() {
Pipeline p = Pipeline.create();
p.drawFrom(Sources.<Entry<String, Integer>, Integer, Entry<String, Integer>>mapJournal(TRADES_MAP_NAME,
DistributedPredicate.alwaysTrue(), EventJournalMapEvent::getNewValue, START_FROM_CURRENT))
.groupingKey(Entry::getKey)
.rollingAggregate(summingLong(Entry::getValue))
.drainTo(Sinks.map(VOLUME_MAP_NAME));
return p;
}
代码示例来源:origin: hazelcast/hazelcast-jet-code-samples
private static Pipeline buildPipeline() {
Pipeline p = Pipeline.create();
p.drawFrom(Sources.<Trade, Integer, Trade>mapJournal(TRADES_MAP_NAME,
DistributedPredicate.alwaysTrue(), EventJournalMapEvent::getNewValue, START_FROM_CURRENT))
.addTimestamps(Trade::getTime, 3000)
.groupingKey(Trade::getTicker)
.window(WindowDefinition.sliding(SLIDING_WINDOW_LENGTH_MILLIS, SLIDE_STEP_MILLIS))
.aggregate(counting(),
(winStart, winEnd, key, result) -> String.format("%s %5s %4d", toLocalTime(winEnd), key, result))
.drainTo(Sinks.logger());
return p;
}
代码示例来源:origin: hazelcast/hazelcast-jet-code-samples
private static Pipeline buildPipeline() {
Pipeline p = Pipeline.create();
p.drawFrom(AvroSources.filesBuilder(AvroSink.DIRECTORY_NAME, ReflectDatumReader<User>::new)
//Both Jet members share the same local file system
.sharedFileSystem(true)
.build())
.map(user -> Util.entry(user.getUsername(), user))
.drainTo(Sinks.map(AvroSink.MAP_NAME));
return p;
}
代码示例来源:origin: hazelcast/hazelcast-jet-code-samples
@SuppressWarnings("Convert2MethodRef") // https://bugs.openjdk.java.net/browse/JDK-8154236
private static Pipeline aggregate() {
Pipeline p = Pipeline.create();
p.drawFrom(Sources.<PageVisit, Integer, PageVisit>mapJournal(PAGE_VISIT,
mapPutEvents(), mapEventNewValue(), START_FROM_OLDEST))
.addTimestamps(pv -> pv.timestamp(), 100)
.window(sliding(10, 1))
.aggregate(counting())
.drainTo(Sinks.logger());
return p;
}
代码示例来源:origin: hazelcast/hazelcast-jet-code-samples
private static Pipeline buildPipeline(String sourceDir, String targetDir) {
Pipeline p = Pipeline.create();
p.drawFrom(Sources.files(sourceDir))
.map(LogLine::parse)
.filter((LogLine log) -> log.getResponseCode() >= 200 && log.getResponseCode() < 400)
.flatMap(AccessLogAnalyzer::explodeSubPaths)
.groupingKey(wholeItem())
.aggregate(counting())
.drainTo(Sinks.files(targetDir));
return p;
}
代码示例来源:origin: hazelcast/hazelcast-jet-code-samples
@SuppressWarnings("Convert2MethodRef") // https://bugs.openjdk.java.net/browse/JDK-8154236
private static Pipeline groupAndAggregate() {
Pipeline p = Pipeline.create();
p.drawFrom(Sources.<PageVisit, Integer, PageVisit>mapJournal(PAGE_VISIT,
mapPutEvents(), mapEventNewValue(), START_FROM_OLDEST))
.addTimestamps(pv -> pv.timestamp(), 100)
.window(sliding(10, 1))
.groupingKey(pv -> pv.userId())
.aggregate(toList())
.drainTo(Sinks.logger());
return p;
}
代码示例来源:origin: hazelcast/hazelcast-jet
private Job newJob() {
Pipeline p = Pipeline.create();
p.drawFrom(Sources.mapJournal(SOURCE_NAME, START_FROM_OLDEST))
.withoutTimestamps()
.drainTo(Sinks.list(SINK_NAME));
return jet.newJob(p, new JobConfig().setName("job-infinite-pipeline"));
}
代码示例来源:origin: hazelcast/hazelcast-jet
@Test(timeout = 20000)
public void test() {
Pipeline p = Pipeline.create();
p.drawFrom(Sources.batchFromProcessor("source", preferLocalParallelismOne(CustomSourceP::new)))
.drainTo(Sinks.fromProcessor("sink", preferLocalParallelismOne(CustomSinkP::new)));
jetInstance.newJob(p).join();
}
内容来源于网络,如有侵权,请联系作者删除!