本文整理了Java中com.hazelcast.jet.pipeline.Pipeline
类的一些代码示例,展示了Pipeline
类的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Pipeline
类的具体详情如下:
包路径:com.hazelcast.jet.pipeline.Pipeline
类名称:Pipeline
[英]Models a distributed computation job using an analogy with a system of interconnected water pipes. The basic element is a stage which can be attached to one or more other stages. A stage accepts the data coming from its upstream stages, transforms it, and directs the resulting data to its downstream stages.
The Pipeline object is a container of all the stages defined on a pipeline: the source stages obtained directly from it by calling #drawFrom(BatchSource) as well as all the stages attached (directly or indirectly) to them.
Note that there is no simple one-to-one correspondence between pipeline stages and Core API's DAG vertices. Some stages map to several vertices (e.g., grouping and co-grouping are implemented as a cascade of two vertices) and some stages may be merged with others into a single vertex (e.g., a cascade of map/filter/flatMap stages can be fused into one vertex).
[中]使用与互连水管系统的类比来模拟分布式计算作业。基本元素是一个可连接到一个或多个其他阶段的“阶段”。阶段接受来自其上游阶段的数据,对其进行转换,并将生成的数据定向到其下游阶段。
Pipeline对象是管道上定义的所有阶段的容器:通过调用#drawFrom(BatchSource)直接从中获得的源阶段,以及(直接或间接)附加到它们的所有阶段。
请注意,管道阶段和核心API的DAG顶点之间没有简单的一对一对应关系。一些阶段映射到多个顶点(例如,分组和联合分组实现为两个顶点的级联),一些阶段可以与其他阶段合并到单个顶点(例如,映射/过滤/平面映射阶段的级联可以融合到一个顶点)。
代码示例来源:origin: hazelcast/hazelcast-jet-demos
public static Pipeline build(String bootstrapServers) {
Properties properties = new Properties();
properties.put(ConsumerConfig.GROUP_ID_CONFIG, UUID.randomUUID().toString());
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getCanonicalName());
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getCanonicalName());
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
Pipeline pipeline = Pipeline.create();
pipeline
.drawFrom(KafkaSources.kafka(properties, Constants.TOPIC_NAME_PRECIOUS))
.drainTo(Sinks.map(Constants.IMAP_NAME_PRECIOUS));
return pipeline;
}
代码示例来源:origin: hazelcast/hazelcast-jet
/**
* Creates and returns an executable job based on the supplied pipeline.
* Jet will asynchronously start executing the job.
*/
@Nonnull
default Job newJob(@Nonnull Pipeline pipeline) {
return newJob(pipeline.toDag());
}
代码示例来源:origin: hazelcast/hazelcast-jet-code-samples
/**
* This will take the contents of source map and writes it into the sink map.
*/
private static Pipeline mapSourceAndSink(String sourceMapName, String sinkMapName) {
Pipeline pipeline = Pipeline.create();
pipeline.drawFrom(Sources.map(sourceMapName))
.drainTo(Sinks.map(sinkMapName));
return pipeline;
}
代码示例来源:origin: hazelcast/hazelcast-jet
/**
* Creates and returns a Jet job based on the supplied pipeline and job
* configuration. Jet will asynchronously start executing the job.
*
* <p>If the name in the JobConfig is non-null, Jet checks if there is an
* active job with equal name, in which case it throws {@link
* JobAlreadyExistsException}. Job is active if it is running,
* suspended or waiting to be run; that is it has not completed or failed.
* Thus there can be at most one active job with a given name at a time and
* you can re-use the job name after the previous job completed.
*
* <p>See also {@link #newJobIfAbsent}.
*
* @throws JobAlreadyExistsException if there is an active job with
* an equal name
*/
@Nonnull
default Job newJob(@Nonnull Pipeline pipeline, @Nonnull JobConfig config) {
return newJob(pipeline.toDag(), config);
}
代码示例来源:origin: hazelcast/hazelcast-jet-demos
public static Pipeline build() {
Pipeline p = Pipeline.create();
// Palladium and Platinum only
p.drawFrom(Sources.<String, Object>mapJournal(
Constants.IMAP_NAME_PRECIOUS, JournalInitialPosition.START_FROM_OLDEST)
).map(e -> e.getKey() + "==" + e.getValue())
.filter(str -> str.toLowerCase().startsWith("p"))
.drainTo(Sinks.logger())
;
return p;
}
代码示例来源:origin: hazelcast/hazelcast-jet
/**
* Creates and returns a Jet job based on the supplied pipeline and job
* configuration. Jet will asynchronously start executing the job.
*
* <p>If the name in the JobConfig is non-null, Jet checks if there is an
* active job with equal name. If there is, it will join that job instead
* of submitting a new one. Job is active if it is running, suspended or
* waiting to be run; that is it has not completed or failed. In other
* words, this method ensures that the job with this name is running and is
* not running multiple times in parallel.
*
* <p>This method is useful for microservices deployment when each package
* contains a jet member and the job and you want the job to run only once.
* But if the job is a batch job and runs very quickly, it can happen that
* it executes multiple times, because the job name can be reused after a
* previous execution completed.
*
* <p>If the job name is null, a new job is always submitted.
*
* <p>See also {@link #newJob}.
*/
@Nonnull
default Job newJobIfAbsent(@Nonnull Pipeline pipeline, @Nonnull JobConfig config) {
return newJobIfAbsent(pipeline.toDag(), config);
}
代码示例来源:origin: hazelcast/hazelcast-jet-code-samples
private Pipeline buildPipeline() {
Pipeline p = Pipeline.create();
p.drawFrom(KafkaSources.kafka(brokerProperties(), "t1", "t2"))
.drainTo(Sinks.map(SINK_NAME));
return p;
}
代码示例来源:origin: hazelcast/hazelcast-jet-code-samples
private static Pipeline buildPipeline() {
Pipeline p = Pipeline.create();
p.drawFrom(Sources.<String, User>map(MAP_NAME))
.map(Map.Entry::getValue)
.drainTo(AvroSinks.files(DIRECTORY_NAME, AvroSink::schemaForUser, User.class));
return p;
}
代码示例来源:origin: hazelcast/hazelcast-jet-code-samples
private Pipeline buildPipeline() {
Pipeline p = Pipeline.create();
p.drawFrom(KafkaSources.kafka(brokerProperties(), TOPIC))
.drainTo(Sinks.logger());
return p;
}
代码示例来源:origin: hazelcast/hazelcast-jet-code-samples
private static Pipeline buildPipeline() {
Pipeline p = Pipeline.create();
p.drawFrom(Sources.files(getBooksPath()))
.filter(line -> line.startsWith("The "))
.drainTo(buildTopicSink());
return p;
}
代码示例来源:origin: hazelcast/hazelcast-jet-code-samples
private static Pipeline buildPipeline(String connectionUrl) {
Pipeline p = Pipeline.create();
p.drawFrom(Sources.jdbc(connectionUrl,
"SELECT * FROM " + TABLE_NAME,
resultSet -> new User(resultSet.getInt(1), resultSet.getString(2))))
.map(user -> Util.entry(user.getId(), user))
.drainTo(Sinks.map(MAP_NAME));
return p;
}
代码示例来源:origin: hazelcast/hazelcast-jet-code-samples
/**
* This will take the contents of source map, converts values to the string and
* suffixes the value with {@code odd} if the key is odd and with {@code event} if the key is even.
*/
private static Pipeline mapWithUpdating(String sourceMapName, String sinkMapName) {
Pipeline pipeline = Pipeline.create();
pipeline.drawFrom(Sources.<Integer, Integer>map(sourceMapName))
.map(e -> entry(e.getKey(), String.valueOf(e.getValue())))
.drainTo(
Sinks.mapWithUpdating(
sinkMapName,
(oldValue, item) ->
item.getKey() % 2 == 0
? oldValue + "-even"
: oldValue + "-odd"
)
);
return pipeline;
}
代码示例来源:origin: hazelcast/hazelcast-jet-code-samples
private static Pipeline buildPipeline(String connectionUrl) {
Pipeline p = Pipeline.create();
p.drawFrom(Sources.<Integer, User>map(MAP_NAME))
.map(Map.Entry::getValue)
.drainTo(Sinks.jdbc("INSERT INTO " + TABLE_NAME + "(id, name) VALUES(?, ?)",
connectionUrl,
(stmt, user) -> {
// Bind the values from the stream item to a PreparedStatement created from
// the above query.
stmt.setInt(1, user.getId());
stmt.setString(2, user.getName());
}));
return p;
}
代码示例来源:origin: hazelcast/hazelcast-jet-code-samples
/**
* This will take the contents of source map, maps all keys to a key called {@code sum }
* and write it into sink map using an merge function which merges the map values by adding
* old value and new value.
*/
private static Pipeline mapWithMerging(String sourceMapName, String sinkMapName) {
Pipeline pipeline = Pipeline.create();
pipeline.drawFrom(Sources.<Integer, Integer>map(sourceMapName))
.map(e -> entry("sum", e.getValue()))
.drainTo(
Sinks.mapWithMerging(
sinkMapName,
(oldValue, newValue) -> oldValue + newValue
)
);
return pipeline;
}
代码示例来源:origin: hazelcast/hazelcast-jet-code-samples
private static Pipeline buildPipeline() {
Pattern delimiter = Pattern.compile("\\W+");
Pipeline p = Pipeline.create();
p.drawFrom(Sources.<Long, String>map(BOOK_LINES))
.flatMap(e -> traverseArray(delimiter.split(e.getValue().toLowerCase())))
.filter(word -> !word.isEmpty())
.groupingKey(wholeItem())
.aggregate(counting())
.drainTo(Sinks.map(COUNTS));
return p;
}
代码示例来源:origin: hazelcast/hazelcast-jet-code-samples
public static Pipeline buildPipeline(String sourceName, String sinkName) {
Pattern pattern = Pattern.compile("\\W+");
Pipeline pipeline = Pipeline.create();
pipeline.drawFrom(Sources.<Integer, String>map(sourceName))
.flatMap(e -> Traversers.traverseArray(pattern.split(e.getValue().toLowerCase()))
.filter(w -> !w.isEmpty()))
.groupingKey(wholeItem())
.aggregate(counting())
.drainTo(Sinks.map(sinkName));
return pipeline;
}
代码示例来源:origin: hazelcast/hazelcast-jet-code-samples
private static Pipeline buildPipeline(JobConf jobConfig) {
Pipeline p = Pipeline.create();
p.drawFrom(HdfsSources.<AvroWrapper<User>, NullWritable>hdfs(jobConfig))
.filter(entry -> entry.getKey().datum().get(3).equals(Boolean.TRUE))
.peek(entry -> entry.getKey().datum().toString())
.drainTo(HdfsSinks.hdfs(jobConfig));
return p;
}
代码示例来源:origin: hazelcast/hazelcast-jet-code-samples
/**
* This will take the contents of source map and apply entry processor to
* increment the values by 5.
*/
private static Pipeline mapWithEntryProcessor(String sourceMapName, String sinkMapName) {
Pipeline pipeline = Pipeline.create();
pipeline.drawFrom(Sources.<Integer, Integer>map(sourceMapName))
.drainTo(
Sinks.mapWithEntryProcessor(
sinkMapName,
entryKey(),
item -> new IncrementEntryProcessor(5)
)
);
return pipeline;
}
代码示例来源:origin: hazelcast/hazelcast-jet-code-samples
@RequestMapping("/submitJob")
public void submitJob() {
Pipeline pipeline = Pipeline.create();
pipeline.drawFrom(CustomSourceP.customSource())
.drainTo(Sinks.logger());
JobConfig jobConfig = new JobConfig()
.addClass(SpringBootSample.class)
.addClass(CustomSourceP.class);
instance.newJob(pipeline, jobConfig).join();
}
代码示例来源:origin: hazelcast/hazelcast-jet-code-samples
private static Pipeline buildPipeline() {
Pipeline p = Pipeline.create();
p.drawFrom(Sources.<Trade, Integer, Trade>mapJournal(TRADES_MAP_NAME,
DistributedPredicate.alwaysTrue(), EventJournalMapEvent::getNewValue, START_FROM_CURRENT))
.groupingKey(Trade::getTicker)
.rollingAggregate(summingLong(Trade::getPrice))
.drainTo(Sinks.map(VOLUME_MAP_NAME));
return p;
}
内容来源于网络,如有侵权,请联系作者删除!