本文整理了Java中com.hazelcast.jet.Util
类的一些代码示例,展示了Util
类的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Util
类的具体详情如下:
包路径:com.hazelcast.jet.Util
类名称:Util
[英]Miscellaneous utility methods useful in DAG building logic.
[中]在DAG构建逻辑中有用的各种实用方法。
代码示例来源:origin: hazelcast/hazelcast-jet-code-samples
private Entry<Long, Double> tfidfEntry(double logDf, Entry<Long, Double> docidTf) {
Long docId = docidTf.getKey();
double tf = docidTf.getValue();
double idf = logDocCount - logDf;
return entry(docId, tf * idf);
}
}
代码示例来源:origin: hazelcast/hazelcast-jet
/**
* Returns a supplier of processors for
* {@link Sources#mapJournal(String, JournalInitialPosition)} )}.
*/
@Nonnull
public static <K, V> ProcessorMetaSupplier streamMapP(
@Nonnull String mapName,
@Nonnull JournalInitialPosition initialPos,
@Nonnull EventTimePolicy<? super Entry<K, V>> eventTimePolicy
) {
return streamMapP(mapName, mapPutEvents(), mapEventToEntry(), initialPos, eventTimePolicy);
}
代码示例来源:origin: hazelcast/hazelcast-jet
/**
* Returns a supplier of processors for
* {@link Sources#cacheJournal(String, JournalInitialPosition)}.
*/
@Nonnull
public static <K, V> ProcessorMetaSupplier streamCacheP(
@Nonnull String cacheName,
@Nonnull JournalInitialPosition initialPos,
@Nonnull EventTimePolicy<? super Entry<K, V>> eventTimePolicy
) {
return streamCacheP(cacheName, cachePutEvents(), cacheEventToEntry(), initialPos, eventTimePolicy);
}
代码示例来源:origin: hazelcast/hazelcast-jet-code-samples
@SuppressWarnings("Convert2MethodRef") // https://bugs.openjdk.java.net/browse/JDK-8154236
private static Pipeline aggregate() {
Pipeline p = Pipeline.create();
p.drawFrom(Sources.<PageVisit, Integer, PageVisit>mapJournal(PAGE_VISIT,
mapPutEvents(), mapEventNewValue(), START_FROM_OLDEST))
.addTimestamps(pv -> pv.timestamp(), 100)
.window(sliding(10, 1))
.aggregate(counting())
.drainTo(Sinks.logger());
return p;
}
代码示例来源:origin: hazelcast/hazelcast-jet-code-samples
private static Pipeline buildPipeline() {
Pipeline p = Pipeline.create();
p.drawFrom(Sources.<PriceUpdateEvent, String, Tuple2<Integer, Long>>mapJournal(
"prices",
mapPutEvents(),
e -> new PriceUpdateEvent(e.getKey(), e.getNewValue().f0(), e.getNewValue().f1()),
START_FROM_CURRENT
))
.addTimestamps(PriceUpdateEvent::timestamp, LAG_SECONDS * 1000)
.setLocalParallelism(1)
.groupingKey(PriceUpdateEvent::ticker)
.window(WindowDefinition.sliding(WINDOW_SIZE_SECONDS * 1000, 1000))
.aggregate(AggregateOperations.counting())
.drainTo(Sinks.logger());
return p;
}
代码示例来源:origin: hazelcast/hazelcast-jet-code-samples
@SuppressWarnings("Convert2MethodRef") // https://bugs.openjdk.java.net/browse/JDK-8154236
private static Pipeline coGroup() {
Pipeline p = Pipeline.create();
StreamStageWithKey<PageVisit, Integer> pageVisits = p
.drawFrom(Sources.<PageVisit, Integer, PageVisit>mapJournal(PAGE_VISIT,
mapPutEvents(), mapEventNewValue(), START_FROM_OLDEST))
.addTimestamps(pv -> pv.timestamp(), 100)
.groupingKey(pv -> pv.userId());
StreamStageWithKey<Payment, Integer> payments = p
.drawFrom(Sources.<Payment, Integer, Payment>mapJournal(PAYMENT,
mapPutEvents(), mapEventNewValue(), START_FROM_OLDEST))
.addTimestamps(pm -> pm.timestamp(), 100)
.groupingKey(pm -> pm.userId());
StreamStageWithKey<AddToCart, Integer> addToCarts = p
.drawFrom(Sources.<AddToCart, Integer, AddToCart>mapJournal(ADD_TO_CART,
mapPutEvents(), mapEventNewValue(), START_FROM_OLDEST))
.addTimestamps(atc -> atc.timestamp(), 100)
.groupingKey(atc -> atc.userId());
StageWithKeyAndWindow<PageVisit, Integer> windowStage = pageVisits.window(sliding(10, 1));
StreamStage<TimestampedEntry<Integer, Tuple3<List<PageVisit>, List<AddToCart>, List<Payment>>>> coGrouped =
windowStage.aggregate3(toList(), addToCarts, toList(), payments, toList());
coGrouped.drainTo(Sinks.logger());
return p;
}
代码示例来源:origin: hazelcast/hazelcast-jet-code-samples
private static Entry<String, Double> tfidfEntry(double logDocCount, double logDf, Entry<String, Long> docIdTf) {
String docId = docIdTf.getKey();
double tf = docIdTf.getValue();
double idf = logDocCount - logDf;
return entry(docId, tf * idf);
}
}
代码示例来源:origin: hazelcast/hazelcast-jet
/**
* Convenience for {@link #mapJournal(String, DistributedPredicate,
* DistributedFunction, JournalInitialPosition)}
* which will pass only {@link EntryEventType#ADDED ADDED} and
* {@link EntryEventType#UPDATED UPDATED} events and will project the
* event's key and new value into a {@code Map.Entry}.
*/
@Nonnull
public static <K, V> StreamSource<Entry<K, V>> mapJournal(
@Nonnull String mapName,
@Nonnull JournalInitialPosition initialPos
) {
return mapJournal(mapName, mapPutEvents(), mapEventToEntry(), initialPos);
}
代码示例来源:origin: hazelcast/hazelcast-jet-code-samples
@SuppressWarnings("Convert2MethodRef") // https://bugs.openjdk.java.net/browse/JDK-8154236
private static Pipeline groupAndAggregate() {
Pipeline p = Pipeline.create();
p.drawFrom(Sources.<PageVisit, Integer, PageVisit>mapJournal(PAGE_VISIT,
mapPutEvents(), mapEventNewValue(), START_FROM_OLDEST))
.addTimestamps(pv -> pv.timestamp(), 100)
.window(sliding(10, 1))
.groupingKey(pv -> pv.userId())
.aggregate(toList())
.drainTo(Sinks.logger());
return p;
}
代码示例来源:origin: hazelcast/hazelcast-jet
/**
* Convenience for {@link #cacheJournal(String, DistributedPredicate,
* DistributedFunction, JournalInitialPosition)}
* which will pass only {@link CacheEventType#CREATED
* CREATED} and {@link CacheEventType#UPDATED UPDATED}
* events and will project the event's key and new value into a {@code
* Map.Entry}.
*/
@Nonnull
public static <K, V> StreamSource<Entry<K, V>> cacheJournal(
@Nonnull String cacheName,
@Nonnull JournalInitialPosition initialPos
) {
return cacheJournal(cacheName, cachePutEvents(), cacheEventToEntry(), initialPos);
}
代码示例来源:origin: hazelcast/hazelcast-jet-code-samples
private static Map.Entry<Integer, String> splitLine(String e) {
String[] split = e.split(",");
return entry(Integer.valueOf(split[0]), split[1]);
}
代码示例来源:origin: hazelcast/hazelcast-jet
/**
* Returns a supplier of processors for
* {@link Sources#remoteMapJournal(String, ClientConfig, JournalInitialPosition)}.
*/
@Nonnull
public static <K, V> ProcessorMetaSupplier streamRemoteMapP(
@Nonnull String mapName,
@Nonnull ClientConfig clientConfig,
@Nonnull JournalInitialPosition initialPos,
@Nonnull EventTimePolicy<? super Entry<K, V>> eventTimePolicy
) {
return streamRemoteMapP(mapName, clientConfig, mapPutEvents(), mapEventToEntry(), initialPos,
eventTimePolicy);
}
代码示例来源:origin: hazelcast/hazelcast-jet-code-samples
@SuppressWarnings("Convert2MethodRef") // https://bugs.openjdk.java.net/browse/JDK-8154236
private static Pipeline coGroupWithBuilder() {
Pipeline p = Pipeline.create();
StreamStageWithKey<PageVisit, Integer> pageVisits = p
.drawFrom(Sources.<PageVisit, Integer, PageVisit>mapJournal(PAGE_VISIT,
mapPutEvents(), mapEventNewValue(), START_FROM_OLDEST))
.addTimestamps(pv -> pv.timestamp(), 100)
.groupingKey(pv -> pv.userId());
StreamStageWithKey<AddToCart, Integer> addToCarts = p
.drawFrom(Sources.<AddToCart, Integer, AddToCart>mapJournal(ADD_TO_CART,
mapPutEvents(), mapEventNewValue(), START_FROM_OLDEST))
.addTimestamps(atc -> atc.timestamp(), 100)
.groupingKey(atc -> atc.userId());
StreamStageWithKey<Payment, Integer> payments = p
.drawFrom(Sources.<Payment, Integer, Payment>mapJournal(PAYMENT,
mapPutEvents(), mapEventNewValue(), START_FROM_OLDEST))
.addTimestamps(pm -> pm.timestamp(), 100)
.groupingKey(pm -> pm.userId());
StageWithKeyAndWindow<PageVisit, Integer> windowStage = pageVisits.window(sliding(10, 1));
WindowGroupAggregateBuilder<Integer, List<PageVisit>> builder = windowStage.aggregateBuilder(toList());
Tag<List<PageVisit>> pageVisitTag = builder.tag0();
Tag<List<AddToCart>> addToCartTag = builder.add(addToCarts, toList());
Tag<List<Payment>> paymentTag = builder.add(payments, toList());
StreamStage<TimestampedEntry<Integer, Tuple3<List<PageVisit>, List<AddToCart>, List<Payment>>>> coGrouped =
builder.build((winStart, winEnd, key, r) -> new TimestampedEntry<>(
winEnd, key, tuple3(r.get(pageVisitTag), r.get(addToCartTag), r.get(paymentTag))));
coGrouped.drainTo(Sinks.logger());
return p;
}
代码示例来源:origin: hazelcast/hazelcast-jet
/**
* Returns a supplier of processors for
* {@link Sources#remoteCacheJournal(String, ClientConfig, JournalInitialPosition)}.
*/
@Nonnull
public static <K, V> ProcessorMetaSupplier streamRemoteCacheP(
@Nonnull String cacheName,
@Nonnull ClientConfig clientConfig,
@Nonnull JournalInitialPosition initialPos,
@Nonnull EventTimePolicy<? super Entry<K, V>> eventTimePolicy
) {
return streamRemoteCacheP(
cacheName, clientConfig, cachePutEvents(), cacheEventToEntry(), initialPos, eventTimePolicy);
}
代码示例来源:origin: hazelcast/hazelcast-jet-code-samples
private static Entry<Long, Double> tfidfEntry(Entry<Entry<Long, String>, Long> tfEntry, Double idf) {
final Long tf = tfEntry.getValue();
return entry(tfEntry.getKey().getKey(), tf * idf);
}
}
代码示例来源:origin: hazelcast/hazelcast-jet
/**
* Convenience for {@link #remoteMapJournal(String, ClientConfig,
* DistributedPredicate, DistributedFunction, JournalInitialPosition)}
* which will pass only {@link EntryEventType#ADDED ADDED}
* and {@link EntryEventType#UPDATED UPDATED} events and will
* project the event's key and new value into a {@code Map.Entry}.
*/
@Nonnull
public static <K, V> StreamSource<Entry<K, V>> remoteMapJournal(
@Nonnull String mapName,
@Nonnull ClientConfig clientConfig,
@Nonnull JournalInitialPosition initialPos
) {
return remoteMapJournal(mapName, clientConfig, mapPutEvents(), mapEventToEntry(), initialPos);
}
代码示例来源:origin: hazelcast/hazelcast-jet
/**
* Convenience for {@link #remoteCacheJournal(String, ClientConfig,
* DistributedPredicate, DistributedFunction, JournalInitialPosition)}
* which will pass only
* {@link CacheEventType#CREATED CREATED}
* and {@link CacheEventType#UPDATED UPDATED}
* events and will project the event's key and new value
* into a {@code Map.Entry}.
*/
@Nonnull
public static <K, V> StreamSource<Entry<K, V>> remoteCacheJournal(
@Nonnull String cacheName,
@Nonnull ClientConfig clientConfig,
@Nonnull JournalInitialPosition initialPos
) {
return remoteCacheJournal(cacheName, clientConfig, cachePutEvents(), cacheEventToEntry(), initialPos);
}
代码示例来源:origin: hazelcast/hazelcast-jet-code-samples
private static Entry<String, Double> tfidfEntry(Entry<Entry<String, String>, Long> tfEntry, Double idf) {
final Long tf = tfEntry.getValue();
return entry(tfEntry.getKey().getKey(), tf * idf);
}
}
代码示例来源:origin: hazelcast/hazelcast-jet
/**
* Convenience for {@link #mapJournal(IMap, DistributedPredicate,
* DistributedFunction, JournalInitialPosition)}
* which will pass only {@link EntryEventType#ADDED
* ADDED} and {@link EntryEventType#UPDATED UPDATED}
* events and will project the event's key and new value into a {@code
* Map.Entry}.
* <p>
* <strong>NOTE:</strong> Jet only remembers the name of the map you supply
* and acquires a map with that name on the local cluster. If you supply a
* map instance from another cluster, no error will be thrown to indicate
* this.
*/
@Nonnull
public static <K, V> StreamSource<Entry<K, V>> mapJournal(
@Nonnull IMap<? extends K, ? extends V> map,
@Nonnull JournalInitialPosition initialPos
) {
return mapJournal(map.getName(), mapPutEvents(), mapEventToEntry(), initialPos);
}
代码示例来源:origin: hazelcast/hazelcast-jet-code-samples
static Stream<Entry<String, String>> docLines(String name) {
try {
return Files.lines(Paths.get(TfIdfJdkStreams.class.getResource("books/" + name).toURI()))
.map(String::toLowerCase)
.map(line -> entry(name, line));
} catch (IOException | URISyntaxException e) {
throw new RuntimeException(e);
}
}
内容来源于网络,如有侵权,请联系作者删除!