com.hazelcast.jet.Util类的使用及代码示例

x33g5p2x  于2022-02-01 转载在 其他  
字(11.3k)|赞(0)|评价(0)|浏览(218)

本文整理了Java中com.hazelcast.jet.Util类的一些代码示例,展示了Util类的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Util类的具体详情如下:
包路径:com.hazelcast.jet.Util
类名称:Util

Util介绍

[英]Miscellaneous utility methods useful in DAG building logic.
[中]在DAG构建逻辑中有用的各种实用方法。

代码示例

代码示例来源:origin: hazelcast/hazelcast-jet-code-samples

  1. private Entry<Long, Double> tfidfEntry(double logDf, Entry<Long, Double> docidTf) {
  2. Long docId = docidTf.getKey();
  3. double tf = docidTf.getValue();
  4. double idf = logDocCount - logDf;
  5. return entry(docId, tf * idf);
  6. }
  7. }

代码示例来源:origin: hazelcast/hazelcast-jet

  1. /**
  2. * Returns a supplier of processors for
  3. * {@link Sources#mapJournal(String, JournalInitialPosition)} )}.
  4. */
  5. @Nonnull
  6. public static <K, V> ProcessorMetaSupplier streamMapP(
  7. @Nonnull String mapName,
  8. @Nonnull JournalInitialPosition initialPos,
  9. @Nonnull EventTimePolicy<? super Entry<K, V>> eventTimePolicy
  10. ) {
  11. return streamMapP(mapName, mapPutEvents(), mapEventToEntry(), initialPos, eventTimePolicy);
  12. }

代码示例来源:origin: hazelcast/hazelcast-jet

  1. /**
  2. * Returns a supplier of processors for
  3. * {@link Sources#cacheJournal(String, JournalInitialPosition)}.
  4. */
  5. @Nonnull
  6. public static <K, V> ProcessorMetaSupplier streamCacheP(
  7. @Nonnull String cacheName,
  8. @Nonnull JournalInitialPosition initialPos,
  9. @Nonnull EventTimePolicy<? super Entry<K, V>> eventTimePolicy
  10. ) {
  11. return streamCacheP(cacheName, cachePutEvents(), cacheEventToEntry(), initialPos, eventTimePolicy);
  12. }

代码示例来源:origin: hazelcast/hazelcast-jet-code-samples

  1. @SuppressWarnings("Convert2MethodRef") // https://bugs.openjdk.java.net/browse/JDK-8154236
  2. private static Pipeline aggregate() {
  3. Pipeline p = Pipeline.create();
  4. p.drawFrom(Sources.<PageVisit, Integer, PageVisit>mapJournal(PAGE_VISIT,
  5. mapPutEvents(), mapEventNewValue(), START_FROM_OLDEST))
  6. .addTimestamps(pv -> pv.timestamp(), 100)
  7. .window(sliding(10, 1))
  8. .aggregate(counting())
  9. .drainTo(Sinks.logger());
  10. return p;
  11. }

代码示例来源:origin: hazelcast/hazelcast-jet-code-samples

  1. private static Pipeline buildPipeline() {
  2. Pipeline p = Pipeline.create();
  3. p.drawFrom(Sources.<PriceUpdateEvent, String, Tuple2<Integer, Long>>mapJournal(
  4. "prices",
  5. mapPutEvents(),
  6. e -> new PriceUpdateEvent(e.getKey(), e.getNewValue().f0(), e.getNewValue().f1()),
  7. START_FROM_CURRENT
  8. ))
  9. .addTimestamps(PriceUpdateEvent::timestamp, LAG_SECONDS * 1000)
  10. .setLocalParallelism(1)
  11. .groupingKey(PriceUpdateEvent::ticker)
  12. .window(WindowDefinition.sliding(WINDOW_SIZE_SECONDS * 1000, 1000))
  13. .aggregate(AggregateOperations.counting())
  14. .drainTo(Sinks.logger());
  15. return p;
  16. }

代码示例来源:origin: hazelcast/hazelcast-jet-code-samples

  1. @SuppressWarnings("Convert2MethodRef") // https://bugs.openjdk.java.net/browse/JDK-8154236
  2. private static Pipeline coGroup() {
  3. Pipeline p = Pipeline.create();
  4. StreamStageWithKey<PageVisit, Integer> pageVisits = p
  5. .drawFrom(Sources.<PageVisit, Integer, PageVisit>mapJournal(PAGE_VISIT,
  6. mapPutEvents(), mapEventNewValue(), START_FROM_OLDEST))
  7. .addTimestamps(pv -> pv.timestamp(), 100)
  8. .groupingKey(pv -> pv.userId());
  9. StreamStageWithKey<Payment, Integer> payments = p
  10. .drawFrom(Sources.<Payment, Integer, Payment>mapJournal(PAYMENT,
  11. mapPutEvents(), mapEventNewValue(), START_FROM_OLDEST))
  12. .addTimestamps(pm -> pm.timestamp(), 100)
  13. .groupingKey(pm -> pm.userId());
  14. StreamStageWithKey<AddToCart, Integer> addToCarts = p
  15. .drawFrom(Sources.<AddToCart, Integer, AddToCart>mapJournal(ADD_TO_CART,
  16. mapPutEvents(), mapEventNewValue(), START_FROM_OLDEST))
  17. .addTimestamps(atc -> atc.timestamp(), 100)
  18. .groupingKey(atc -> atc.userId());
  19. StageWithKeyAndWindow<PageVisit, Integer> windowStage = pageVisits.window(sliding(10, 1));
  20. StreamStage<TimestampedEntry<Integer, Tuple3<List<PageVisit>, List<AddToCart>, List<Payment>>>> coGrouped =
  21. windowStage.aggregate3(toList(), addToCarts, toList(), payments, toList());
  22. coGrouped.drainTo(Sinks.logger());
  23. return p;
  24. }

代码示例来源:origin: hazelcast/hazelcast-jet-code-samples

  1. private static Entry<String, Double> tfidfEntry(double logDocCount, double logDf, Entry<String, Long> docIdTf) {
  2. String docId = docIdTf.getKey();
  3. double tf = docIdTf.getValue();
  4. double idf = logDocCount - logDf;
  5. return entry(docId, tf * idf);
  6. }
  7. }

代码示例来源:origin: hazelcast/hazelcast-jet

  1. /**
  2. * Convenience for {@link #mapJournal(String, DistributedPredicate,
  3. * DistributedFunction, JournalInitialPosition)}
  4. * which will pass only {@link EntryEventType#ADDED ADDED} and
  5. * {@link EntryEventType#UPDATED UPDATED} events and will project the
  6. * event's key and new value into a {@code Map.Entry}.
  7. */
  8. @Nonnull
  9. public static <K, V> StreamSource<Entry<K, V>> mapJournal(
  10. @Nonnull String mapName,
  11. @Nonnull JournalInitialPosition initialPos
  12. ) {
  13. return mapJournal(mapName, mapPutEvents(), mapEventToEntry(), initialPos);
  14. }

代码示例来源:origin: hazelcast/hazelcast-jet-code-samples

  1. @SuppressWarnings("Convert2MethodRef") // https://bugs.openjdk.java.net/browse/JDK-8154236
  2. private static Pipeline groupAndAggregate() {
  3. Pipeline p = Pipeline.create();
  4. p.drawFrom(Sources.<PageVisit, Integer, PageVisit>mapJournal(PAGE_VISIT,
  5. mapPutEvents(), mapEventNewValue(), START_FROM_OLDEST))
  6. .addTimestamps(pv -> pv.timestamp(), 100)
  7. .window(sliding(10, 1))
  8. .groupingKey(pv -> pv.userId())
  9. .aggregate(toList())
  10. .drainTo(Sinks.logger());
  11. return p;
  12. }

代码示例来源:origin: hazelcast/hazelcast-jet

  1. /**
  2. * Convenience for {@link #cacheJournal(String, DistributedPredicate,
  3. * DistributedFunction, JournalInitialPosition)}
  4. * which will pass only {@link CacheEventType#CREATED
  5. * CREATED} and {@link CacheEventType#UPDATED UPDATED}
  6. * events and will project the event's key and new value into a {@code
  7. * Map.Entry}.
  8. */
  9. @Nonnull
  10. public static <K, V> StreamSource<Entry<K, V>> cacheJournal(
  11. @Nonnull String cacheName,
  12. @Nonnull JournalInitialPosition initialPos
  13. ) {
  14. return cacheJournal(cacheName, cachePutEvents(), cacheEventToEntry(), initialPos);
  15. }

代码示例来源:origin: hazelcast/hazelcast-jet-code-samples

  1. private static Map.Entry<Integer, String> splitLine(String e) {
  2. String[] split = e.split(",");
  3. return entry(Integer.valueOf(split[0]), split[1]);
  4. }

代码示例来源:origin: hazelcast/hazelcast-jet

  1. /**
  2. * Returns a supplier of processors for
  3. * {@link Sources#remoteMapJournal(String, ClientConfig, JournalInitialPosition)}.
  4. */
  5. @Nonnull
  6. public static <K, V> ProcessorMetaSupplier streamRemoteMapP(
  7. @Nonnull String mapName,
  8. @Nonnull ClientConfig clientConfig,
  9. @Nonnull JournalInitialPosition initialPos,
  10. @Nonnull EventTimePolicy<? super Entry<K, V>> eventTimePolicy
  11. ) {
  12. return streamRemoteMapP(mapName, clientConfig, mapPutEvents(), mapEventToEntry(), initialPos,
  13. eventTimePolicy);
  14. }

代码示例来源:origin: hazelcast/hazelcast-jet-code-samples

  1. @SuppressWarnings("Convert2MethodRef") // https://bugs.openjdk.java.net/browse/JDK-8154236
  2. private static Pipeline coGroupWithBuilder() {
  3. Pipeline p = Pipeline.create();
  4. StreamStageWithKey<PageVisit, Integer> pageVisits = p
  5. .drawFrom(Sources.<PageVisit, Integer, PageVisit>mapJournal(PAGE_VISIT,
  6. mapPutEvents(), mapEventNewValue(), START_FROM_OLDEST))
  7. .addTimestamps(pv -> pv.timestamp(), 100)
  8. .groupingKey(pv -> pv.userId());
  9. StreamStageWithKey<AddToCart, Integer> addToCarts = p
  10. .drawFrom(Sources.<AddToCart, Integer, AddToCart>mapJournal(ADD_TO_CART,
  11. mapPutEvents(), mapEventNewValue(), START_FROM_OLDEST))
  12. .addTimestamps(atc -> atc.timestamp(), 100)
  13. .groupingKey(atc -> atc.userId());
  14. StreamStageWithKey<Payment, Integer> payments = p
  15. .drawFrom(Sources.<Payment, Integer, Payment>mapJournal(PAYMENT,
  16. mapPutEvents(), mapEventNewValue(), START_FROM_OLDEST))
  17. .addTimestamps(pm -> pm.timestamp(), 100)
  18. .groupingKey(pm -> pm.userId());
  19. StageWithKeyAndWindow<PageVisit, Integer> windowStage = pageVisits.window(sliding(10, 1));
  20. WindowGroupAggregateBuilder<Integer, List<PageVisit>> builder = windowStage.aggregateBuilder(toList());
  21. Tag<List<PageVisit>> pageVisitTag = builder.tag0();
  22. Tag<List<AddToCart>> addToCartTag = builder.add(addToCarts, toList());
  23. Tag<List<Payment>> paymentTag = builder.add(payments, toList());
  24. StreamStage<TimestampedEntry<Integer, Tuple3<List<PageVisit>, List<AddToCart>, List<Payment>>>> coGrouped =
  25. builder.build((winStart, winEnd, key, r) -> new TimestampedEntry<>(
  26. winEnd, key, tuple3(r.get(pageVisitTag), r.get(addToCartTag), r.get(paymentTag))));
  27. coGrouped.drainTo(Sinks.logger());
  28. return p;
  29. }

代码示例来源:origin: hazelcast/hazelcast-jet

  1. /**
  2. * Returns a supplier of processors for
  3. * {@link Sources#remoteCacheJournal(String, ClientConfig, JournalInitialPosition)}.
  4. */
  5. @Nonnull
  6. public static <K, V> ProcessorMetaSupplier streamRemoteCacheP(
  7. @Nonnull String cacheName,
  8. @Nonnull ClientConfig clientConfig,
  9. @Nonnull JournalInitialPosition initialPos,
  10. @Nonnull EventTimePolicy<? super Entry<K, V>> eventTimePolicy
  11. ) {
  12. return streamRemoteCacheP(
  13. cacheName, clientConfig, cachePutEvents(), cacheEventToEntry(), initialPos, eventTimePolicy);
  14. }

代码示例来源:origin: hazelcast/hazelcast-jet-code-samples

  1. private static Entry<Long, Double> tfidfEntry(Entry<Entry<Long, String>, Long> tfEntry, Double idf) {
  2. final Long tf = tfEntry.getValue();
  3. return entry(tfEntry.getKey().getKey(), tf * idf);
  4. }
  5. }

代码示例来源:origin: hazelcast/hazelcast-jet

  1. /**
  2. * Convenience for {@link #remoteMapJournal(String, ClientConfig,
  3. * DistributedPredicate, DistributedFunction, JournalInitialPosition)}
  4. * which will pass only {@link EntryEventType#ADDED ADDED}
  5. * and {@link EntryEventType#UPDATED UPDATED} events and will
  6. * project the event's key and new value into a {@code Map.Entry}.
  7. */
  8. @Nonnull
  9. public static <K, V> StreamSource<Entry<K, V>> remoteMapJournal(
  10. @Nonnull String mapName,
  11. @Nonnull ClientConfig clientConfig,
  12. @Nonnull JournalInitialPosition initialPos
  13. ) {
  14. return remoteMapJournal(mapName, clientConfig, mapPutEvents(), mapEventToEntry(), initialPos);
  15. }

代码示例来源:origin: hazelcast/hazelcast-jet

  1. /**
  2. * Convenience for {@link #remoteCacheJournal(String, ClientConfig,
  3. * DistributedPredicate, DistributedFunction, JournalInitialPosition)}
  4. * which will pass only
  5. * {@link CacheEventType#CREATED CREATED}
  6. * and {@link CacheEventType#UPDATED UPDATED}
  7. * events and will project the event's key and new value
  8. * into a {@code Map.Entry}.
  9. */
  10. @Nonnull
  11. public static <K, V> StreamSource<Entry<K, V>> remoteCacheJournal(
  12. @Nonnull String cacheName,
  13. @Nonnull ClientConfig clientConfig,
  14. @Nonnull JournalInitialPosition initialPos
  15. ) {
  16. return remoteCacheJournal(cacheName, clientConfig, cachePutEvents(), cacheEventToEntry(), initialPos);
  17. }

代码示例来源:origin: hazelcast/hazelcast-jet-code-samples

  1. private static Entry<String, Double> tfidfEntry(Entry<Entry<String, String>, Long> tfEntry, Double idf) {
  2. final Long tf = tfEntry.getValue();
  3. return entry(tfEntry.getKey().getKey(), tf * idf);
  4. }
  5. }

代码示例来源:origin: hazelcast/hazelcast-jet

  1. /**
  2. * Convenience for {@link #mapJournal(IMap, DistributedPredicate,
  3. * DistributedFunction, JournalInitialPosition)}
  4. * which will pass only {@link EntryEventType#ADDED
  5. * ADDED} and {@link EntryEventType#UPDATED UPDATED}
  6. * events and will project the event's key and new value into a {@code
  7. * Map.Entry}.
  8. * <p>
  9. * <strong>NOTE:</strong> Jet only remembers the name of the map you supply
  10. * and acquires a map with that name on the local cluster. If you supply a
  11. * map instance from another cluster, no error will be thrown to indicate
  12. * this.
  13. */
  14. @Nonnull
  15. public static <K, V> StreamSource<Entry<K, V>> mapJournal(
  16. @Nonnull IMap<? extends K, ? extends V> map,
  17. @Nonnull JournalInitialPosition initialPos
  18. ) {
  19. return mapJournal(map.getName(), mapPutEvents(), mapEventToEntry(), initialPos);
  20. }

代码示例来源:origin: hazelcast/hazelcast-jet-code-samples

  1. static Stream<Entry<String, String>> docLines(String name) {
  2. try {
  3. return Files.lines(Paths.get(TfIdfJdkStreams.class.getResource("books/" + name).toURI()))
  4. .map(String::toLowerCase)
  5. .map(line -> entry(name, line));
  6. } catch (IOException | URISyntaxException e) {
  7. throw new RuntimeException(e);
  8. }
  9. }

相关文章