org.apache.kafka.streams.kstream.Materialized.as()方法的使用及代码示例

x33g5p2x  于2022-01-25 转载在 其他  
字(11.7k)|赞(0)|评价(0)|浏览(313)

本文整理了Java中org.apache.kafka.streams.kstream.Materialized.as()方法的一些代码示例,展示了Materialized.as()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Materialized.as()方法的具体详情如下:
包路径:org.apache.kafka.streams.kstream.Materialized
类名称:Materialized
方法名:as

Materialized.as介绍

[英]Materialize a StateStore with the given name.
[中]用给定的名称具体化StateStore。

代码示例

代码示例来源:origin: confluentinc/ksql

@Test
public void shouldCreateHoppingWindowAggregate() {
 final KGroupedStream stream = EasyMock.createNiceMock(KGroupedStream.class);
 final TimeWindowedKStream windowedKStream = EasyMock.createNiceMock(TimeWindowedKStream.class);
 final UdafAggregator aggregator = EasyMock.createNiceMock(UdafAggregator.class);
 final HoppingWindowExpression windowExpression = new HoppingWindowExpression(10, TimeUnit.SECONDS, 4, TimeUnit.MILLISECONDS);
 final Initializer initializer = () -> 0;
 final Materialized<String, GenericRow, WindowStore<Bytes, byte[]>> store = Materialized.as("store");
 EasyMock.expect(stream.windowedBy(TimeWindows.of(Duration.ofMillis(10000L)).advanceBy(Duration.ofMillis(4L)))).andReturn(windowedKStream);
 EasyMock.expect(windowedKStream.aggregate(same(initializer), same(aggregator), same(store))).andReturn(null);
 EasyMock.replay(stream, windowedKStream);
 windowExpression.applyAggregate(stream, initializer, aggregator, store);
 EasyMock.verify(stream, windowedKStream);
}

代码示例来源:origin: confluentinc/ksql

@Test
public void shouldCreateTumblingWindowAggregate() {
 final KGroupedStream stream = EasyMock.createNiceMock(KGroupedStream.class);
 final TimeWindowedKStream windowedKStream = EasyMock.createNiceMock(TimeWindowedKStream.class);
 final UdafAggregator aggregator = EasyMock.createNiceMock(UdafAggregator.class);
 final TumblingWindowExpression windowExpression = new TumblingWindowExpression(10, TimeUnit.SECONDS);
 final Initializer initializer = () -> 0;
 final Materialized<String, GenericRow, WindowStore<Bytes, byte[]>> store = Materialized.as("store");
 EasyMock.expect(stream.windowedBy(TimeWindows.of(Duration.ofMillis(10000L)))).andReturn(windowedKStream);
 EasyMock.expect(windowedKStream.aggregate(same(initializer), same(aggregator), same(store))).andReturn(null);
 EasyMock.replay(stream, windowedKStream);
 windowExpression.applyAggregate(stream, initializer, aggregator, store);
 EasyMock.verify(stream, windowedKStream);
}

代码示例来源:origin: confluentinc/kafka-streams-examples

static KafkaStreams createStreams(final Properties streamsConfiguration) {
 final Serde<String> stringSerde = Serdes.String();
 StreamsBuilder builder = new StreamsBuilder();
 KStream<String, String>
   textLines = builder.stream(TEXT_LINES_TOPIC, Consumed.with(Serdes.String(), Serdes.String()));
 final KGroupedStream<String, String> groupedByWord = textLines
   .flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+")))
   .groupBy((key, word) -> word, Serialized.with(stringSerde, stringSerde));
 // Create a State Store for with the all time word count
 groupedByWord.count(Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as("word-count")
   .withValueSerde(Serdes.Long()));
 // Create a Windowed State Store that contains the word count for every
 // 1 minute
 groupedByWord.windowedBy(TimeWindows.of(60000))
   .count(Materialized.<String, Long, WindowStore<Bytes, byte[]>>as("windowed-word-count")
     .withValueSerde(Serdes.Long()));
 return new KafkaStreams(builder.build(), streamsConfiguration);
}

代码示例来源:origin: confluentinc/kafka-streams-examples

/**
 * Create a table of orders which we can query. When the table is updated
 * we check to see if there is an outstanding HTTP GET request waiting to be
 * fulfilled.
 */
private StreamsBuilder createOrdersMaterializedView() {
 StreamsBuilder builder = new StreamsBuilder();
 builder.table(ORDERS.name(), Consumed.with(ORDERS.keySerde(), ORDERS.valueSerde()), Materialized.as(ORDERS_STORE_NAME))
   .toStream().foreach(this::maybeCompleteLongPollGet);
 return builder;
}

代码示例来源:origin: confluentinc/kafka-streams-examples

.count(Materialized.<String, Long, SessionStore<Bytes, byte[]>>as(PLAY_EVENTS_PER_SESSION)
  .withKeySerde(Serdes.String())
  .withValueSerde(Serdes.Long()))

代码示例来源:origin: confluentinc/kafka-streams-examples

builder.globalTable(CUSTOMER_TOPIC, Materialized.<Long, Customer, KeyValueStore<Bytes, byte[]>>as(CUSTOMER_STORE)
    .withKeySerde(Serdes.Long())
    .withValueSerde(customerSerde));
builder.globalTable(PRODUCT_TOPIC, Materialized.<Long, Product, KeyValueStore<Bytes, byte[]>>as(PRODUCT_STORE)
    .withKeySerde(Serdes.Long())
    .withValueSerde(productSerde));

代码示例来源:origin: confluentinc/kafka-streams-examples

builder.table(SONG_FEED, Materialized.<Long, Song, KeyValueStore<Bytes, byte[]>>as(ALL_SONGS)
  .withKeySerde(Serdes.Long())
  .withValueSerde(valueSongSerde));
  .count(Materialized.<Song, Long, KeyValueStore<Bytes, byte[]>>as(SONG_PLAY_COUNT_STORE)
          .withKeySerde(valueSongSerde)
          .withValueSerde(Serdes.Long()));
   return aggregate;
  },
  Materialized.<String, TopFiveSongs, KeyValueStore<Bytes, byte[]>>as(TOP_FIVE_SONGS_BY_GENRE_STORE)
    .withKeySerde(Serdes.String())
    .withValueSerde(topFiveSerde)
   return aggregate;
  },
  Materialized.<String, TopFiveSongs, KeyValueStore<Bytes, byte[]>>as(TOP_FIVE_SONGS_STORE)
    .withKeySerde(Serdes.String())
    .withValueSerde(topFiveSerde)

代码示例来源:origin: confluentinc/kafka-streams-examples

() -> Long.MIN_VALUE,
  (aggKey, value, aggregate) -> Math.max(value, aggregate),
  Materialized.as(maxStore)
);
    () -> Long.MIN_VALUE,
    (aggKey, value, aggregate) -> Math.max(value, aggregate),
    Materialized.as(maxWindowStore));

代码示例来源:origin: confluentinc/kafka-streams-examples

userRegions.join(userLastLogins,
  (regionValue, lastLoginValue) -> regionValue + "/" + lastLoginValue,
  Materialized.as(storeName))
  .toStream()
  .to(outputTopic, Produced.with(Serdes.String(), Serdes.String()));

代码示例来源:origin: spring-projects/spring-kafka

@Bean
public KStream<Integer, String> kStream(StreamsBuilder kStreamBuilder) {
  KStream<Integer, String> stream = kStreamBuilder.stream(STREAMING_TOPIC1);
  stream.mapValues((ValueMapper<String, String>) String::toUpperCase)
      .mapValues(Foo::new)
      .through(FOOS, Produced.with(Serdes.Integer(), new JsonSerde<Foo>() {
      }))
      .mapValues(Foo::getName)
      .groupByKey()
      .windowedBy(TimeWindows.of(1000))
      .reduce((value1, value2) -> value1 + value2, Materialized.as("windowStore"))
      .toStream()
      .map((windowedId, value) -> new KeyValue<>(windowedId.key(), value))
      .filter((i, s) -> s.length() > 40).to(streamingTopic2);
  stream.print(Printed.toSysOut());
  return stream;
}

代码示例来源:origin: spring-cloud/spring-cloud-stream-binder-kafka

private <K, V> Materialized<K, V, KeyValueStore<Bytes, byte[]>> getMaterialized(String storeName, Serde<K> k, Serde<V> v) {
  return Materialized.<K, V, KeyValueStore<Bytes, byte[]>>as(storeName)
      .withKeySerde(k)
      .withValueSerde(v);
}

代码示例来源:origin: spring-cloud/spring-cloud-stream-samples

@StreamListener("binding2")
@SendTo("singleOutput")
public KStream<?, WordCount> process(KStream<Object, String> input) {
  return input
      .flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+")))
      .map((key, value) -> new KeyValue<>(value, value))
      .groupByKey(Serialized.with(Serdes.String(), Serdes.String()))
      .windowedBy(timeWindows)
      .count(Materialized.as("WordCounts-1"))
      .toStream()
      .map((key, value) -> new KeyValue<>(null, new WordCount(key.key(), value, new Date(key.window().start()), new Date(key.window().end()))));
}

代码示例来源:origin: spring-cloud/spring-cloud-stream-samples

@StreamListener("input")
  @SendTo("output")
  public KStream<?, WordCount> process(KStream<Object, String> input) {
    return input
        .flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+")))
        .map((key, value) -> new KeyValue<>(value, value))
        .groupByKey(Serialized.with(Serdes.String(), Serdes.String()))
        .windowedBy(TimeWindows.of(5000))
        .count(Materialized.as("WordCounts-1"))
        .toStream()
        .map((key, value) -> new KeyValue<>(null, new WordCount(key.key(), value, new Date(key.window().start()), new Date(key.window().end()))));
  }
}

代码示例来源:origin: rayokota/kafka-graphs

public KGraph<K, VV, EV> filterOnEdges(Predicate<Edge<K>, EV> edgeFilter) {
  KTable<Edge<K>, EV> filteredEdges = edges
    .filter(edgeFilter, Materialized.<Edge<K>, EV, KeyValueStore<Bytes, byte[]>>as(generateStoreName()).withKeySerde(new KryoSerde<>()).withValueSerde(edgeValueSerde()));
  return new KGraph<>(vertices, filteredEdges, serialized);
}

代码示例来源:origin: rayokota/kafka-graphs

public KTable<K, Long> outDegrees() {
  return vertices.leftJoin(edgesGroupedBySource(), new CountNeighborsLeftJoin<>(),
    Materialized.<K, Long, KeyValueStore<Bytes, byte[]>>as(generateStoreName()).withKeySerde(keySerde()).withValueSerde(Serdes.Long()));
}

代码示例来源:origin: rayokota/kafka-graphs

public <NV> KGraph<K, NV, EV> mapVertices(ValueMapperWithKey<K, VV, NV> mapper, Serde<NV> newVertexValueSerde) {
  KTable<K, NV> mappedVertices = vertices.mapValues(mapper, Materialized.<K, NV, KeyValueStore<Bytes, byte[]>>as(
    generateStoreName()).withKeySerde(keySerde()).withValueSerde(newVertexValueSerde));
  return new KGraph<>(mappedVertices, edges,
    GraphSerialized.with(keySerde(), newVertexValueSerde, edgeValueSerde()));
}

代码示例来源:origin: rayokota/kafka-graphs

public <NV> KGraph<K, VV, NV> mapEdges(ValueMapperWithKey<Edge<K>, EV, NV> mapper, Serde<NV> newEdgeValueSerde) {
  KTable<Edge<K>, NV> mappedEdges = edges.mapValues(mapper, Materialized.<Edge<K>, NV, KeyValueStore<Bytes, byte[]>>as(
    generateStoreName()).withKeySerde(new KryoSerde<>()).withValueSerde(newEdgeValueSerde));
  return new KGraph<>(vertices, mappedEdges,
    GraphSerialized.with(keySerde(), vertexValueSerde(), newEdgeValueSerde));
}

代码示例来源:origin: spring-cloud/spring-cloud-stream-samples

@StreamListener("input")
@SendTo("output")
public KStream<Integer, Long> process(KStream<Object, Product> input) {
  return input
      .filter((key, product) -> productIds().contains(product.getId()))
      .map((key, value) -> new KeyValue<>(value.id, value))
      .groupByKey(Serialized.with(Serdes.Integer(), new JsonSerde<>(Product.class)))
      .count(Materialized.<Integer, Long, KeyValueStore<Bytes, byte[]>>as(STORE_NAME)
        .withKeySerde(Serdes.Integer())
        .withValueSerde(Serdes.Long()))
      .toStream();
}

代码示例来源:origin: rayokota/kafka-graphs

public KGraph<K, VV, EV> undirected() {
  KTable<Edge<K>, EV> undirectedEdges = edges
    .toStream()
    .flatMap(new RegularAndReversedEdgesMap<>())
    .groupByKey(Serialized.with(new KryoSerde<>(), serialized.edgeValueSerde()))
    .reduce((v1, v2) -> v2, Materialized.<Edge<K>, EV, KeyValueStore<Bytes, byte[]>>as(generateStoreName())
      .withKeySerde(new KryoSerde<>()).withValueSerde(serialized.edgeValueSerde()));
  return new KGraph<>(vertices, undirectedEdges, serialized);
}

代码示例来源:origin: rayokota/kafka-graphs

public KGraph<K, VV, EV> filterOnVertices(Predicate<K, VV> vertexFilter) {
  KTable<K, VV> filteredVertices = vertices.filter(vertexFilter);
  KTable<Edge<K>, EV> remainingEdges = edgesBySource()
    .join(filteredVertices, (e, v) -> e, Joined.with(keySerde(), new KryoSerde<>(), vertexValueSerde()))
    .map((k, edge) -> new KeyValue<>(edge.target(), edge))
    .join(filteredVertices, (e, v) -> e, Joined.with(keySerde(), new KryoSerde<>(), vertexValueSerde()))
    .map((k, edge) -> new KeyValue<>(new Edge<>(edge.source(), edge.target()), edge.value()))
    .groupByKey(Serialized.with(new KryoSerde<>(), edgeValueSerde()))
    .reduce((v1, v2) -> v2, Materialized.<Edge<K>, EV, KeyValueStore<Bytes, byte[]>>as(generateStoreName()).withKeySerde(new KryoSerde<>()).withValueSerde(edgeValueSerde()));
  return new KGraph<>(filteredVertices, remainingEdges, serialized);
}

相关文章