本文整理了Java中org.apache.kafka.streams.kstream.Materialized.withValueSerde()
方法的一些代码示例,展示了Materialized.withValueSerde()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Materialized.withValueSerde()
方法的具体详情如下:
包路径:org.apache.kafka.streams.kstream.Materialized
类名称:Materialized
方法名:withValueSerde
[英]Set the valueSerde the materialized StateStore will use.
[中]设置物化状态存储将使用的值。
代码示例来源:origin: confluentinc/kafka-streams-examples
static KafkaStreams createStreams(final Properties streamsConfiguration) {
final Serde<String> stringSerde = Serdes.String();
StreamsBuilder builder = new StreamsBuilder();
KStream<String, String>
textLines = builder.stream(TEXT_LINES_TOPIC, Consumed.with(Serdes.String(), Serdes.String()));
final KGroupedStream<String, String> groupedByWord = textLines
.flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+")))
.groupBy((key, word) -> word, Serialized.with(stringSerde, stringSerde));
// Create a State Store for with the all time word count
groupedByWord.count(Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as("word-count")
.withValueSerde(Serdes.Long()));
// Create a Windowed State Store that contains the word count for every
// 1 minute
groupedByWord.windowedBy(TimeWindows.of(60000))
.count(Materialized.<String, Long, WindowStore<Bytes, byte[]>>as("windowed-word-count")
.withValueSerde(Serdes.Long()));
return new KafkaStreams(builder.build(), streamsConfiguration);
}
代码示例来源:origin: confluentinc/kafka-streams-examples
.withValueSerde(Serdes.Long()))
代码示例来源:origin: confluentinc/kafka-streams-examples
builder.globalTable(CUSTOMER_TOPIC, Materialized.<Long, Customer, KeyValueStore<Bytes, byte[]>>as(CUSTOMER_STORE)
.withKeySerde(Serdes.Long())
.withValueSerde(customerSerde));
builder.globalTable(PRODUCT_TOPIC, Materialized.<Long, Product, KeyValueStore<Bytes, byte[]>>as(PRODUCT_STORE)
.withKeySerde(Serdes.Long())
.withValueSerde(productSerde));
代码示例来源:origin: confluentinc/kafka-streams-examples
builder.table(SONG_FEED, Materialized.<Long, Song, KeyValueStore<Bytes, byte[]>>as(ALL_SONGS)
.withKeySerde(Serdes.Long())
.withValueSerde(valueSongSerde));
.count(Materialized.<Song, Long, KeyValueStore<Bytes, byte[]>>as(SONG_PLAY_COUNT_STORE)
.withKeySerde(valueSongSerde)
.withValueSerde(Serdes.Long()));
Materialized.<String, TopFiveSongs, KeyValueStore<Bytes, byte[]>>as(TOP_FIVE_SONGS_BY_GENRE_STORE)
.withKeySerde(Serdes.String())
.withValueSerde(topFiveSerde)
);
Materialized.<String, TopFiveSongs, KeyValueStore<Bytes, byte[]>>as(TOP_FIVE_SONGS_STORE)
.withKeySerde(Serdes.String())
.withValueSerde(topFiveSerde)
);
代码示例来源:origin: io.quicksign/kafka-encryption-core
/**
* Apply the keySerde and valueSerde of the pair to a {@link Materialized}
*
* @param materialized
* @param <S>
* @return
*/
public <S extends StateStore> Materialized<K, V, S> applyTo(Materialized<K, V, S> materialized) {
return materialized.withKeySerde(keySerde).withValueSerde(valueSerde);
}
}
代码示例来源:origin: Quicksign/kafka-encryption
/**
* Apply the keySerde and valueSerde of the pair to a {@link Materialized}
*
* @param materialized
* @param <S>
* @return
*/
public <S extends StateStore> Materialized<K, V, S> applyTo(Materialized<K, V, S> materialized) {
return materialized.withKeySerde(keySerde).withValueSerde(valueSerde);
}
}
代码示例来源:origin: spring-cloud/spring-cloud-stream-binder-kafka
private <K, V> Materialized<K, V, KeyValueStore<Bytes, byte[]>> getMaterialized(String storeName, Serde<K> k, Serde<V> v) {
return Materialized.<K, V, KeyValueStore<Bytes, byte[]>>as(storeName)
.withKeySerde(k)
.withValueSerde(v);
}
代码示例来源:origin: org.apache.kafka/kafka-streams
/**
* Materialize a {@link StateStore} with the provided key and value {@link Serde}s.
* An internal name will be used for the store.
*
* @param keySerde the key {@link Serde} to use. If the {@link Serde} is null, then the default key
* serde from configs will be used
* @param valueSerde the value {@link Serde} to use. If the {@link Serde} is null, then the default value
* serde from configs will be used
* @param <K> key type
* @param <V> value type
* @param <S> store type
* @return a new {@link Materialized} instance with the given key and value serdes
*/
public static <K, V, S extends StateStore> Materialized<K, V, S> with(final Serde<K> keySerde,
final Serde<V> valueSerde) {
return new Materialized<K, V, S>((String) null).withKeySerde(keySerde).withValueSerde(valueSerde);
}
代码示例来源:origin: spring-cloud/spring-cloud-stream-samples
@StreamListener("input2")
public void process(KStream<Object, DomainEvent> input) {
ObjectMapper mapper = new ObjectMapper();
Serde<DomainEvent> domainEventSerde = new JsonSerde<>( DomainEvent.class, mapper );
input
.groupBy(
(s, domainEvent) -> domainEvent.boardUuid,
Serialized.with(null, domainEventSerde))
.aggregate(
String::new,
(s, domainEvent, board) -> board.concat(domainEvent.eventType),
Materialized.<String, String, KeyValueStore<Bytes, byte[]>>as("test-events-snapshots").withKeySerde(Serdes.String()).
withValueSerde(Serdes.String())
);
}
}
代码示例来源:origin: spring-cloud/spring-cloud-stream-samples
@StreamListener("input")
public void process(KStream<Object, DomainEvent> input) {
ObjectMapper mapper = new ObjectMapper();
Serde<DomainEvent> domainEventSerde = new JsonSerde<>( DomainEvent.class, mapper );
input
.groupBy(
(s, domainEvent) -> domainEvent.boardUuid,
Serialized.with(null, domainEventSerde))
.aggregate(
String::new,
(s, domainEvent, board) -> board.concat(domainEvent.eventType),
Materialized.<String, String, KeyValueStore<Bytes, byte[]>>as("test-events-snapshots").withKeySerde(Serdes.String()).
withValueSerde(Serdes.String())
);
}
}
代码示例来源:origin: rayokota/kafka-graphs
public KGraph<K, VV, EV> filterOnEdges(Predicate<Edge<K>, EV> edgeFilter) {
KTable<Edge<K>, EV> filteredEdges = edges
.filter(edgeFilter, Materialized.<Edge<K>, EV, KeyValueStore<Bytes, byte[]>>as(generateStoreName()).withKeySerde(new KryoSerde<>()).withValueSerde(edgeValueSerde()));
return new KGraph<>(vertices, filteredEdges, serialized);
}
代码示例来源:origin: rayokota/kafka-graphs
public KTable<K, Long> outDegrees() {
return vertices.leftJoin(edgesGroupedBySource(), new CountNeighborsLeftJoin<>(),
Materialized.<K, Long, KeyValueStore<Bytes, byte[]>>as(generateStoreName()).withKeySerde(keySerde()).withValueSerde(Serdes.Long()));
}
代码示例来源:origin: rayokota/kafka-graphs
public KTable<K, Long> inDegrees() {
return vertices.leftJoin(edgesGroupedByTarget(), new CountNeighborsLeftJoin<>(),
Materialized.<K, Long, KeyValueStore<Bytes, byte[]>>as(generateStoreName()).withKeySerde(keySerde()).withValueSerde(Serdes.Long()));
}
代码示例来源:origin: rayokota/kafka-graphs
public <NV> KGraph<K, NV, EV> mapVertices(ValueMapperWithKey<K, VV, NV> mapper, Serde<NV> newVertexValueSerde) {
KTable<K, NV> mappedVertices = vertices.mapValues(mapper, Materialized.<K, NV, KeyValueStore<Bytes, byte[]>>as(
generateStoreName()).withKeySerde(keySerde()).withValueSerde(newVertexValueSerde));
return new KGraph<>(mappedVertices, edges,
GraphSerialized.with(keySerde(), newVertexValueSerde, edgeValueSerde()));
}
代码示例来源:origin: rayokota/kafka-graphs
public <NV> KGraph<K, VV, NV> mapEdges(ValueMapperWithKey<Edge<K>, EV, NV> mapper, Serde<NV> newEdgeValueSerde) {
KTable<Edge<K>, NV> mappedEdges = edges.mapValues(mapper, Materialized.<Edge<K>, NV, KeyValueStore<Bytes, byte[]>>as(
generateStoreName()).withKeySerde(new KryoSerde<>()).withValueSerde(newEdgeValueSerde));
return new KGraph<>(vertices, mappedEdges,
GraphSerialized.with(keySerde(), vertexValueSerde(), newEdgeValueSerde));
}
代码示例来源:origin: spring-cloud/spring-cloud-stream-samples
@StreamListener("input")
@SendTo("output")
public KStream<Integer, Long> process(KStream<Object, Product> input) {
return input
.filter((key, product) -> productIds().contains(product.getId()))
.map((key, value) -> new KeyValue<>(value.id, value))
.groupByKey(Serialized.with(Serdes.Integer(), new JsonSerde<>(Product.class)))
.count(Materialized.<Integer, Long, KeyValueStore<Bytes, byte[]>>as(STORE_NAME)
.withKeySerde(Serdes.Integer())
.withValueSerde(Serdes.Long()))
.toStream();
}
代码示例来源:origin: rayokota/kafka-graphs
public KGraph<K, VV, EV> undirected() {
KTable<Edge<K>, EV> undirectedEdges = edges
.toStream()
.flatMap(new RegularAndReversedEdgesMap<>())
.groupByKey(Serialized.with(new KryoSerde<>(), serialized.edgeValueSerde()))
.reduce((v1, v2) -> v2, Materialized.<Edge<K>, EV, KeyValueStore<Bytes, byte[]>>as(generateStoreName())
.withKeySerde(new KryoSerde<>()).withValueSerde(serialized.edgeValueSerde()));
return new KGraph<>(vertices, undirectedEdges, serialized);
}
代码示例来源:origin: rayokota/kafka-graphs
public <T> KGraph<K, VV, EV> joinWithEdgesOnSource(KTable<K, T> inputDataSet,
final EdgeJoinFunction<EV, T> edgeJoinFunction) {
KTable<Edge<K>, EV> resultedEdges = edgesGroupedBySource()
.leftJoin(inputDataSet,
new ApplyLeftJoinToEdgeValuesOnEitherSourceOrTarget<>(edgeJoinFunction),
Materialized.with(keySerde(), new KryoSerde<>()))
.toStream()
.flatMap((k, edgeWithValues) -> {
List<KeyValue<Edge<K>, EV>> edges = new ArrayList<>();
for (EdgeWithValue<K, EV> edge : edgeWithValues) {
edges.add(new KeyValue<>(new Edge<>(edge.source(), edge.target()), edge.value()));
}
return edges;
})
.groupByKey(Serialized.with(new KryoSerde<>(), edgeValueSerde()))
.<EV>reduce((v1, v2) -> v2, Materialized.<Edge<K>, EV, KeyValueStore<Bytes, byte[]>>as(
generateStoreName()).withKeySerde(new KryoSerde<>()).withValueSerde(edgeValueSerde()));
return new KGraph<>(this.vertices, resultedEdges, serialized);
}
代码示例来源:origin: rayokota/kafka-graphs
public KGraph<K, VV, EV> filterOnVertices(Predicate<K, VV> vertexFilter) {
KTable<K, VV> filteredVertices = vertices.filter(vertexFilter);
KTable<Edge<K>, EV> remainingEdges = edgesBySource()
.join(filteredVertices, (e, v) -> e, Joined.with(keySerde(), new KryoSerde<>(), vertexValueSerde()))
.map((k, edge) -> new KeyValue<>(edge.target(), edge))
.join(filteredVertices, (e, v) -> e, Joined.with(keySerde(), new KryoSerde<>(), vertexValueSerde()))
.map((k, edge) -> new KeyValue<>(new Edge<>(edge.source(), edge.target()), edge.value()))
.groupByKey(Serialized.with(new KryoSerde<>(), edgeValueSerde()))
.reduce((v1, v2) -> v2, Materialized.<Edge<K>, EV, KeyValueStore<Bytes, byte[]>>as(generateStoreName()).withKeySerde(new KryoSerde<>()).withValueSerde(edgeValueSerde()));
return new KGraph<>(filteredVertices, remainingEdges, serialized);
}
代码示例来源:origin: rayokota/kafka-graphs
public KGraph<K, VV, EV> subgraph(Predicate<K, VV> vertexFilter, Predicate<Edge<K>, EV> edgeFilter) {
KTable<K, VV> filteredVertices = vertices.filter(vertexFilter);
KTable<Edge<K>, EV> remainingEdges = edgesBySource()
.join(filteredVertices, (e, v) -> e, Joined.with(keySerde(), new KryoSerde<>(), vertexValueSerde()))
.map((k, edge) -> new KeyValue<>(edge.target(), edge))
.join(filteredVertices, (e, v) -> e, Joined.with(keySerde(), new KryoSerde<>(), vertexValueSerde()))
.map((k, edge) -> new KeyValue<>(new Edge<>(edge.source(), edge.target()), edge.value()))
.groupByKey(Serialized.with(new KryoSerde<>(), edgeValueSerde()))
.reduce((v1, v2) -> v2, Materialized.with(new KryoSerde<>(), edgeValueSerde()));
KTable<Edge<K>, EV> filteredEdges = remainingEdges
.filter(edgeFilter, Materialized.<Edge<K>, EV, KeyValueStore<Bytes, byte[]>>as(generateStoreName()).withKeySerde(new KryoSerde<>()).withValueSerde(edgeValueSerde()));
return new KGraph<>(filteredVertices, filteredEdges, serialized);
}
内容来源于网络,如有侵权,请联系作者删除!