本文整理了Java中org.apache.kafka.streams.kstream.Materialized.as()
方法的一些代码示例,展示了Materialized.as()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Materialized.as()
方法的具体详情如下:
包路径:org.apache.kafka.streams.kstream.Materialized
类名称:Materialized
方法名:as
[英]Materialize a StateStore with the given name.
[中]用给定的名称具体化StateStore。
代码示例来源:origin: confluentinc/ksql
@Test
public void shouldCreateHoppingWindowAggregate() {
final KGroupedStream stream = EasyMock.createNiceMock(KGroupedStream.class);
final TimeWindowedKStream windowedKStream = EasyMock.createNiceMock(TimeWindowedKStream.class);
final UdafAggregator aggregator = EasyMock.createNiceMock(UdafAggregator.class);
final HoppingWindowExpression windowExpression = new HoppingWindowExpression(10, TimeUnit.SECONDS, 4, TimeUnit.MILLISECONDS);
final Initializer initializer = () -> 0;
final Materialized<String, GenericRow, WindowStore<Bytes, byte[]>> store = Materialized.as("store");
EasyMock.expect(stream.windowedBy(TimeWindows.of(Duration.ofMillis(10000L)).advanceBy(Duration.ofMillis(4L)))).andReturn(windowedKStream);
EasyMock.expect(windowedKStream.aggregate(same(initializer), same(aggregator), same(store))).andReturn(null);
EasyMock.replay(stream, windowedKStream);
windowExpression.applyAggregate(stream, initializer, aggregator, store);
EasyMock.verify(stream, windowedKStream);
}
代码示例来源:origin: confluentinc/ksql
@Test
public void shouldCreateTumblingWindowAggregate() {
final KGroupedStream stream = EasyMock.createNiceMock(KGroupedStream.class);
final TimeWindowedKStream windowedKStream = EasyMock.createNiceMock(TimeWindowedKStream.class);
final UdafAggregator aggregator = EasyMock.createNiceMock(UdafAggregator.class);
final TumblingWindowExpression windowExpression = new TumblingWindowExpression(10, TimeUnit.SECONDS);
final Initializer initializer = () -> 0;
final Materialized<String, GenericRow, WindowStore<Bytes, byte[]>> store = Materialized.as("store");
EasyMock.expect(stream.windowedBy(TimeWindows.of(Duration.ofMillis(10000L)))).andReturn(windowedKStream);
EasyMock.expect(windowedKStream.aggregate(same(initializer), same(aggregator), same(store))).andReturn(null);
EasyMock.replay(stream, windowedKStream);
windowExpression.applyAggregate(stream, initializer, aggregator, store);
EasyMock.verify(stream, windowedKStream);
}
代码示例来源:origin: confluentinc/kafka-streams-examples
static KafkaStreams createStreams(final Properties streamsConfiguration) {
final Serde<String> stringSerde = Serdes.String();
StreamsBuilder builder = new StreamsBuilder();
KStream<String, String>
textLines = builder.stream(TEXT_LINES_TOPIC, Consumed.with(Serdes.String(), Serdes.String()));
final KGroupedStream<String, String> groupedByWord = textLines
.flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+")))
.groupBy((key, word) -> word, Serialized.with(stringSerde, stringSerde));
// Create a State Store for with the all time word count
groupedByWord.count(Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as("word-count")
.withValueSerde(Serdes.Long()));
// Create a Windowed State Store that contains the word count for every
// 1 minute
groupedByWord.windowedBy(TimeWindows.of(60000))
.count(Materialized.<String, Long, WindowStore<Bytes, byte[]>>as("windowed-word-count")
.withValueSerde(Serdes.Long()));
return new KafkaStreams(builder.build(), streamsConfiguration);
}
代码示例来源:origin: confluentinc/kafka-streams-examples
/**
* Create a table of orders which we can query. When the table is updated
* we check to see if there is an outstanding HTTP GET request waiting to be
* fulfilled.
*/
private StreamsBuilder createOrdersMaterializedView() {
StreamsBuilder builder = new StreamsBuilder();
builder.table(ORDERS.name(), Consumed.with(ORDERS.keySerde(), ORDERS.valueSerde()), Materialized.as(ORDERS_STORE_NAME))
.toStream().foreach(this::maybeCompleteLongPollGet);
return builder;
}
代码示例来源:origin: confluentinc/kafka-streams-examples
.count(Materialized.<String, Long, SessionStore<Bytes, byte[]>>as(PLAY_EVENTS_PER_SESSION)
.withKeySerde(Serdes.String())
.withValueSerde(Serdes.Long()))
代码示例来源:origin: confluentinc/kafka-streams-examples
builder.globalTable(CUSTOMER_TOPIC, Materialized.<Long, Customer, KeyValueStore<Bytes, byte[]>>as(CUSTOMER_STORE)
.withKeySerde(Serdes.Long())
.withValueSerde(customerSerde));
builder.globalTable(PRODUCT_TOPIC, Materialized.<Long, Product, KeyValueStore<Bytes, byte[]>>as(PRODUCT_STORE)
.withKeySerde(Serdes.Long())
.withValueSerde(productSerde));
代码示例来源:origin: confluentinc/kafka-streams-examples
builder.table(SONG_FEED, Materialized.<Long, Song, KeyValueStore<Bytes, byte[]>>as(ALL_SONGS)
.withKeySerde(Serdes.Long())
.withValueSerde(valueSongSerde));
.count(Materialized.<Song, Long, KeyValueStore<Bytes, byte[]>>as(SONG_PLAY_COUNT_STORE)
.withKeySerde(valueSongSerde)
.withValueSerde(Serdes.Long()));
return aggregate;
},
Materialized.<String, TopFiveSongs, KeyValueStore<Bytes, byte[]>>as(TOP_FIVE_SONGS_BY_GENRE_STORE)
.withKeySerde(Serdes.String())
.withValueSerde(topFiveSerde)
return aggregate;
},
Materialized.<String, TopFiveSongs, KeyValueStore<Bytes, byte[]>>as(TOP_FIVE_SONGS_STORE)
.withKeySerde(Serdes.String())
.withValueSerde(topFiveSerde)
代码示例来源:origin: confluentinc/kafka-streams-examples
() -> Long.MIN_VALUE,
(aggKey, value, aggregate) -> Math.max(value, aggregate),
Materialized.as(maxStore)
);
() -> Long.MIN_VALUE,
(aggKey, value, aggregate) -> Math.max(value, aggregate),
Materialized.as(maxWindowStore));
代码示例来源:origin: confluentinc/kafka-streams-examples
userRegions.join(userLastLogins,
(regionValue, lastLoginValue) -> regionValue + "/" + lastLoginValue,
Materialized.as(storeName))
.toStream()
.to(outputTopic, Produced.with(Serdes.String(), Serdes.String()));
代码示例来源:origin: spring-projects/spring-kafka
@Bean
public KStream<Integer, String> kStream(StreamsBuilder kStreamBuilder) {
KStream<Integer, String> stream = kStreamBuilder.stream(STREAMING_TOPIC1);
stream.mapValues((ValueMapper<String, String>) String::toUpperCase)
.mapValues(Foo::new)
.through(FOOS, Produced.with(Serdes.Integer(), new JsonSerde<Foo>() {
}))
.mapValues(Foo::getName)
.groupByKey()
.windowedBy(TimeWindows.of(1000))
.reduce((value1, value2) -> value1 + value2, Materialized.as("windowStore"))
.toStream()
.map((windowedId, value) -> new KeyValue<>(windowedId.key(), value))
.filter((i, s) -> s.length() > 40).to(streamingTopic2);
stream.print(Printed.toSysOut());
return stream;
}
代码示例来源:origin: spring-cloud/spring-cloud-stream-binder-kafka
private <K, V> Materialized<K, V, KeyValueStore<Bytes, byte[]>> getMaterialized(String storeName, Serde<K> k, Serde<V> v) {
return Materialized.<K, V, KeyValueStore<Bytes, byte[]>>as(storeName)
.withKeySerde(k)
.withValueSerde(v);
}
代码示例来源:origin: spring-cloud/spring-cloud-stream-samples
@StreamListener("binding2")
@SendTo("singleOutput")
public KStream<?, WordCount> process(KStream<Object, String> input) {
return input
.flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+")))
.map((key, value) -> new KeyValue<>(value, value))
.groupByKey(Serialized.with(Serdes.String(), Serdes.String()))
.windowedBy(timeWindows)
.count(Materialized.as("WordCounts-1"))
.toStream()
.map((key, value) -> new KeyValue<>(null, new WordCount(key.key(), value, new Date(key.window().start()), new Date(key.window().end()))));
}
代码示例来源:origin: spring-cloud/spring-cloud-stream-samples
@StreamListener("input")
@SendTo("output")
public KStream<?, WordCount> process(KStream<Object, String> input) {
return input
.flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+")))
.map((key, value) -> new KeyValue<>(value, value))
.groupByKey(Serialized.with(Serdes.String(), Serdes.String()))
.windowedBy(TimeWindows.of(5000))
.count(Materialized.as("WordCounts-1"))
.toStream()
.map((key, value) -> new KeyValue<>(null, new WordCount(key.key(), value, new Date(key.window().start()), new Date(key.window().end()))));
}
}
代码示例来源:origin: rayokota/kafka-graphs
public KGraph<K, VV, EV> filterOnEdges(Predicate<Edge<K>, EV> edgeFilter) {
KTable<Edge<K>, EV> filteredEdges = edges
.filter(edgeFilter, Materialized.<Edge<K>, EV, KeyValueStore<Bytes, byte[]>>as(generateStoreName()).withKeySerde(new KryoSerde<>()).withValueSerde(edgeValueSerde()));
return new KGraph<>(vertices, filteredEdges, serialized);
}
代码示例来源:origin: rayokota/kafka-graphs
public KTable<K, Long> outDegrees() {
return vertices.leftJoin(edgesGroupedBySource(), new CountNeighborsLeftJoin<>(),
Materialized.<K, Long, KeyValueStore<Bytes, byte[]>>as(generateStoreName()).withKeySerde(keySerde()).withValueSerde(Serdes.Long()));
}
代码示例来源:origin: rayokota/kafka-graphs
public <NV> KGraph<K, NV, EV> mapVertices(ValueMapperWithKey<K, VV, NV> mapper, Serde<NV> newVertexValueSerde) {
KTable<K, NV> mappedVertices = vertices.mapValues(mapper, Materialized.<K, NV, KeyValueStore<Bytes, byte[]>>as(
generateStoreName()).withKeySerde(keySerde()).withValueSerde(newVertexValueSerde));
return new KGraph<>(mappedVertices, edges,
GraphSerialized.with(keySerde(), newVertexValueSerde, edgeValueSerde()));
}
代码示例来源:origin: rayokota/kafka-graphs
public <NV> KGraph<K, VV, NV> mapEdges(ValueMapperWithKey<Edge<K>, EV, NV> mapper, Serde<NV> newEdgeValueSerde) {
KTable<Edge<K>, NV> mappedEdges = edges.mapValues(mapper, Materialized.<Edge<K>, NV, KeyValueStore<Bytes, byte[]>>as(
generateStoreName()).withKeySerde(new KryoSerde<>()).withValueSerde(newEdgeValueSerde));
return new KGraph<>(vertices, mappedEdges,
GraphSerialized.with(keySerde(), vertexValueSerde(), newEdgeValueSerde));
}
代码示例来源:origin: spring-cloud/spring-cloud-stream-samples
@StreamListener("input")
@SendTo("output")
public KStream<Integer, Long> process(KStream<Object, Product> input) {
return input
.filter((key, product) -> productIds().contains(product.getId()))
.map((key, value) -> new KeyValue<>(value.id, value))
.groupByKey(Serialized.with(Serdes.Integer(), new JsonSerde<>(Product.class)))
.count(Materialized.<Integer, Long, KeyValueStore<Bytes, byte[]>>as(STORE_NAME)
.withKeySerde(Serdes.Integer())
.withValueSerde(Serdes.Long()))
.toStream();
}
代码示例来源:origin: rayokota/kafka-graphs
public KGraph<K, VV, EV> undirected() {
KTable<Edge<K>, EV> undirectedEdges = edges
.toStream()
.flatMap(new RegularAndReversedEdgesMap<>())
.groupByKey(Serialized.with(new KryoSerde<>(), serialized.edgeValueSerde()))
.reduce((v1, v2) -> v2, Materialized.<Edge<K>, EV, KeyValueStore<Bytes, byte[]>>as(generateStoreName())
.withKeySerde(new KryoSerde<>()).withValueSerde(serialized.edgeValueSerde()));
return new KGraph<>(vertices, undirectedEdges, serialized);
}
代码示例来源:origin: rayokota/kafka-graphs
public KGraph<K, VV, EV> filterOnVertices(Predicate<K, VV> vertexFilter) {
KTable<K, VV> filteredVertices = vertices.filter(vertexFilter);
KTable<Edge<K>, EV> remainingEdges = edgesBySource()
.join(filteredVertices, (e, v) -> e, Joined.with(keySerde(), new KryoSerde<>(), vertexValueSerde()))
.map((k, edge) -> new KeyValue<>(edge.target(), edge))
.join(filteredVertices, (e, v) -> e, Joined.with(keySerde(), new KryoSerde<>(), vertexValueSerde()))
.map((k, edge) -> new KeyValue<>(new Edge<>(edge.source(), edge.target()), edge.value()))
.groupByKey(Serialized.with(new KryoSerde<>(), edgeValueSerde()))
.reduce((v1, v2) -> v2, Materialized.<Edge<K>, EV, KeyValueStore<Bytes, byte[]>>as(generateStoreName()).withKeySerde(new KryoSerde<>()).withValueSerde(edgeValueSerde()));
return new KGraph<>(filteredVertices, remainingEdges, serialized);
}
内容来源于网络,如有侵权,请联系作者删除!