org.apache.kafka.streams.kstream.Materialized.withKeySerde()方法的使用及代码示例

x33g5p2x  于2022-01-25 转载在 其他  
字(11.6k)|赞(0)|评价(0)|浏览(98)

本文整理了Java中org.apache.kafka.streams.kstream.Materialized.withKeySerde()方法的一些代码示例,展示了Materialized.withKeySerde()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Materialized.withKeySerde()方法的具体详情如下:
包路径:org.apache.kafka.streams.kstream.Materialized
类名称:Materialized
方法名:withKeySerde

Materialized.withKeySerde介绍

[英]Set the keySerde the materialize StateStore will use.
[中]设置materialize StateStore将使用的键值。

代码示例

代码示例来源:origin: confluentinc/kafka-streams-examples

.withKeySerde(Serdes.String())
.withValueSerde(Serdes.Long()))

代码示例来源:origin: confluentinc/kafka-streams-examples

customers =
builder.globalTable(CUSTOMER_TOPIC, Materialized.<Long, Customer, KeyValueStore<Bytes, byte[]>>as(CUSTOMER_STORE)
    .withKeySerde(Serdes.Long())
    .withValueSerde(customerSerde));
products =
builder.globalTable(PRODUCT_TOPIC, Materialized.<Long, Product, KeyValueStore<Bytes, byte[]>>as(PRODUCT_STORE)
    .withKeySerde(Serdes.Long())
    .withValueSerde(productSerde));

代码示例来源:origin: confluentinc/kafka-streams-examples

songTable =
builder.table(SONG_FEED, Materialized.<Long, Song, KeyValueStore<Bytes, byte[]>>as(ALL_SONGS)
  .withKeySerde(Serdes.Long())
  .withValueSerde(valueSongSerde));
                            Serialized.with(keySongSerde, valueSongSerde))
  .count(Materialized.<Song, Long, KeyValueStore<Bytes, byte[]>>as(SONG_PLAY_COUNT_STORE)
          .withKeySerde(valueSongSerde)
          .withValueSerde(Serdes.Long()));
  },
  Materialized.<String, TopFiveSongs, KeyValueStore<Bytes, byte[]>>as(TOP_FIVE_SONGS_BY_GENRE_STORE)
    .withKeySerde(Serdes.String())
    .withValueSerde(topFiveSerde)
);
  },
  Materialized.<String, TopFiveSongs, KeyValueStore<Bytes, byte[]>>as(TOP_FIVE_SONGS_STORE)
    .withKeySerde(Serdes.String())
    .withValueSerde(topFiveSerde)
);

代码示例来源:origin: io.quicksign/kafka-encryption-core

/**
   * Apply the keySerde and valueSerde of the pair to a {@link Materialized}
   *
   * @param materialized
   * @param <S>
   * @return
   */
  public <S extends StateStore> Materialized<K, V, S> applyTo(Materialized<K, V, S> materialized) {
    return materialized.withKeySerde(keySerde).withValueSerde(valueSerde);
  }
}

代码示例来源:origin: Quicksign/kafka-encryption

/**
   * Apply the keySerde and valueSerde of the pair to a {@link Materialized}
   *
   * @param materialized
   * @param <S>
   * @return
   */
  public <S extends StateStore> Materialized<K, V, S> applyTo(Materialized<K, V, S> materialized) {
    return materialized.withKeySerde(keySerde).withValueSerde(valueSerde);
  }
}

代码示例来源:origin: spring-cloud/spring-cloud-stream-binder-kafka

private <K, V> Materialized<K, V, KeyValueStore<Bytes, byte[]>> getMaterialized(String storeName, Serde<K> k, Serde<V> v) {
  return Materialized.<K, V, KeyValueStore<Bytes, byte[]>>as(storeName)
      .withKeySerde(k)
      .withValueSerde(v);
}

代码示例来源:origin: org.apache.kafka/kafka-streams

/**
 * Materialize a {@link StateStore} with the provided key and value {@link Serde}s.
 * An internal name will be used for the store.
 *
 * @param keySerde      the key {@link Serde} to use. If the {@link Serde} is null, then the default key
 *                      serde from configs will be used
 * @param valueSerde    the value {@link Serde} to use. If the {@link Serde} is null, then the default value
 *                      serde from configs will be used
 * @param <K>           key type
 * @param <V>           value type
 * @param <S>           store type
 * @return a new {@link Materialized} instance with the given key and value serdes
 */
public static <K, V, S extends StateStore> Materialized<K, V, S> with(final Serde<K> keySerde,
                                   final Serde<V> valueSerde) {
  return new Materialized<K, V, S>((String) null).withKeySerde(keySerde).withValueSerde(valueSerde);
}

代码示例来源:origin: spring-cloud/spring-cloud-stream-samples

@StreamListener("input2")
  public void process(KStream<Object, DomainEvent> input) {
    ObjectMapper mapper = new ObjectMapper();
    Serde<DomainEvent> domainEventSerde = new JsonSerde<>( DomainEvent.class, mapper );
    input
        .groupBy(
            (s, domainEvent) -> domainEvent.boardUuid,
            Serialized.with(null, domainEventSerde))
        .aggregate(
            String::new,
            (s, domainEvent, board) -> board.concat(domainEvent.eventType),
            Materialized.<String, String, KeyValueStore<Bytes, byte[]>>as("test-events-snapshots").withKeySerde(Serdes.String()).
                withValueSerde(Serdes.String())
        );
  }
}

代码示例来源:origin: spring-cloud/spring-cloud-stream-samples

@StreamListener("input")
  public void process(KStream<Object, DomainEvent> input) {
    ObjectMapper mapper = new ObjectMapper();
    Serde<DomainEvent> domainEventSerde = new JsonSerde<>( DomainEvent.class, mapper );
    input
        .groupBy(
            (s, domainEvent) -> domainEvent.boardUuid,
            Serialized.with(null, domainEventSerde))
        .aggregate(
            String::new,
            (s, domainEvent, board) -> board.concat(domainEvent.eventType),
            Materialized.<String, String, KeyValueStore<Bytes, byte[]>>as("test-events-snapshots").withKeySerde(Serdes.String()).
                withValueSerde(Serdes.String())
        );
  }
}

代码示例来源:origin: rayokota/kafka-graphs

public KGraph<K, VV, EV> filterOnEdges(Predicate<Edge<K>, EV> edgeFilter) {
  KTable<Edge<K>, EV> filteredEdges = edges
    .filter(edgeFilter, Materialized.<Edge<K>, EV, KeyValueStore<Bytes, byte[]>>as(generateStoreName()).withKeySerde(new KryoSerde<>()).withValueSerde(edgeValueSerde()));
  return new KGraph<>(vertices, filteredEdges, serialized);
}

代码示例来源:origin: spring-cloud/spring-cloud-stream-samples

@StreamListener("input")
@SendTo("output")
public KStream<String, Long> process(KStream<Object, Sensor> input) {
  final Map<String, String> serdeConfig = Collections.singletonMap(
      AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, "http://localhost:8081");
  final SpecificAvroSerde<Sensor> sensorSerde = new SpecificAvroSerde<>();
  sensorSerde.configure(serdeConfig, false);
  return input
      .map((k, value) -> {
        String newKey = "v1";
        if (value.getId().toString().endsWith("v2")) {
          newKey = "v2";
        }
        return new KeyValue<>(newKey, value);
      })
      .groupByKey()
      .count(Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as(STORE_NAME)
          .withKeySerde(Serdes.String())
          .withValueSerde(Serdes.Long()))
      .toStream();
}

代码示例来源:origin: rayokota/kafka-graphs

public KTable<K, Long> outDegrees() {
  return vertices.leftJoin(edgesGroupedBySource(), new CountNeighborsLeftJoin<>(),
    Materialized.<K, Long, KeyValueStore<Bytes, byte[]>>as(generateStoreName()).withKeySerde(keySerde()).withValueSerde(Serdes.Long()));
}

代码示例来源:origin: rayokota/kafka-graphs

public KTable<K, Long> inDegrees() {
  return vertices.leftJoin(edgesGroupedByTarget(), new CountNeighborsLeftJoin<>(),
    Materialized.<K, Long, KeyValueStore<Bytes, byte[]>>as(generateStoreName()).withKeySerde(keySerde()).withValueSerde(Serdes.Long()));
}

代码示例来源:origin: rayokota/kafka-graphs

public <NV> KGraph<K, NV, EV> mapVertices(ValueMapperWithKey<K, VV, NV> mapper, Serde<NV> newVertexValueSerde) {
  KTable<K, NV> mappedVertices = vertices.mapValues(mapper, Materialized.<K, NV, KeyValueStore<Bytes, byte[]>>as(
    generateStoreName()).withKeySerde(keySerde()).withValueSerde(newVertexValueSerde));
  return new KGraph<>(mappedVertices, edges,
    GraphSerialized.with(keySerde(), newVertexValueSerde, edgeValueSerde()));
}

代码示例来源:origin: rayokota/kafka-graphs

public <NV> KGraph<K, VV, NV> mapEdges(ValueMapperWithKey<Edge<K>, EV, NV> mapper, Serde<NV> newEdgeValueSerde) {
  KTable<Edge<K>, NV> mappedEdges = edges.mapValues(mapper, Materialized.<Edge<K>, NV, KeyValueStore<Bytes, byte[]>>as(
    generateStoreName()).withKeySerde(new KryoSerde<>()).withValueSerde(newEdgeValueSerde));
  return new KGraph<>(vertices, mappedEdges,
    GraphSerialized.with(keySerde(), vertexValueSerde(), newEdgeValueSerde));
}

代码示例来源:origin: spring-cloud/spring-cloud-stream-samples

@StreamListener("input")
@SendTo("output")
public KStream<Integer, Long> process(KStream<Object, Product> input) {
  return input
      .filter((key, product) -> productIds().contains(product.getId()))
      .map((key, value) -> new KeyValue<>(value.id, value))
      .groupByKey(Serialized.with(Serdes.Integer(), new JsonSerde<>(Product.class)))
      .count(Materialized.<Integer, Long, KeyValueStore<Bytes, byte[]>>as(STORE_NAME)
        .withKeySerde(Serdes.Integer())
        .withValueSerde(Serdes.Long()))
      .toStream();
}

代码示例来源:origin: rayokota/kafka-graphs

public KGraph<K, VV, EV> undirected() {
  KTable<Edge<K>, EV> undirectedEdges = edges
    .toStream()
    .flatMap(new RegularAndReversedEdgesMap<>())
    .groupByKey(Serialized.with(new KryoSerde<>(), serialized.edgeValueSerde()))
    .reduce((v1, v2) -> v2, Materialized.<Edge<K>, EV, KeyValueStore<Bytes, byte[]>>as(generateStoreName())
      .withKeySerde(new KryoSerde<>()).withValueSerde(serialized.edgeValueSerde()));
  return new KGraph<>(vertices, undirectedEdges, serialized);
}

代码示例来源:origin: rayokota/kafka-graphs

public <T> KGraph<K, VV, EV> joinWithEdgesOnSource(KTable<K, T> inputDataSet,
                          final EdgeJoinFunction<EV, T> edgeJoinFunction) {
  KTable<Edge<K>, EV> resultedEdges = edgesGroupedBySource()
    .leftJoin(inputDataSet,
      new ApplyLeftJoinToEdgeValuesOnEitherSourceOrTarget<>(edgeJoinFunction),
      Materialized.with(keySerde(), new KryoSerde<>()))
    .toStream()
    .flatMap((k, edgeWithValues) -> {
      List<KeyValue<Edge<K>, EV>> edges = new ArrayList<>();
      for (EdgeWithValue<K, EV> edge : edgeWithValues) {
        edges.add(new KeyValue<>(new Edge<>(edge.source(), edge.target()), edge.value()));
      }
      return edges;
    })
    .groupByKey(Serialized.with(new KryoSerde<>(), edgeValueSerde()))
    .<EV>reduce((v1, v2) -> v2, Materialized.<Edge<K>, EV, KeyValueStore<Bytes, byte[]>>as(
      generateStoreName()).withKeySerde(new KryoSerde<>()).withValueSerde(edgeValueSerde()));
  return new KGraph<>(this.vertices, resultedEdges, serialized);
}

代码示例来源:origin: rayokota/kafka-graphs

public KGraph<K, VV, EV> filterOnVertices(Predicate<K, VV> vertexFilter) {
  KTable<K, VV> filteredVertices = vertices.filter(vertexFilter);
  KTable<Edge<K>, EV> remainingEdges = edgesBySource()
    .join(filteredVertices, (e, v) -> e, Joined.with(keySerde(), new KryoSerde<>(), vertexValueSerde()))
    .map((k, edge) -> new KeyValue<>(edge.target(), edge))
    .join(filteredVertices, (e, v) -> e, Joined.with(keySerde(), new KryoSerde<>(), vertexValueSerde()))
    .map((k, edge) -> new KeyValue<>(new Edge<>(edge.source(), edge.target()), edge.value()))
    .groupByKey(Serialized.with(new KryoSerde<>(), edgeValueSerde()))
    .reduce((v1, v2) -> v2, Materialized.<Edge<K>, EV, KeyValueStore<Bytes, byte[]>>as(generateStoreName()).withKeySerde(new KryoSerde<>()).withValueSerde(edgeValueSerde()));
  return new KGraph<>(filteredVertices, remainingEdges, serialized);
}

代码示例来源:origin: rayokota/kafka-graphs

public KGraph<K, VV, EV> subgraph(Predicate<K, VV> vertexFilter, Predicate<Edge<K>, EV> edgeFilter) {
  KTable<K, VV> filteredVertices = vertices.filter(vertexFilter);
  KTable<Edge<K>, EV> remainingEdges = edgesBySource()
    .join(filteredVertices, (e, v) -> e, Joined.with(keySerde(), new KryoSerde<>(), vertexValueSerde()))
    .map((k, edge) -> new KeyValue<>(edge.target(), edge))
    .join(filteredVertices, (e, v) -> e, Joined.with(keySerde(), new KryoSerde<>(), vertexValueSerde()))
    .map((k, edge) -> new KeyValue<>(new Edge<>(edge.source(), edge.target()), edge.value()))
    .groupByKey(Serialized.with(new KryoSerde<>(), edgeValueSerde()))
    .reduce((v1, v2) -> v2, Materialized.with(new KryoSerde<>(), edgeValueSerde()));
  KTable<Edge<K>, EV> filteredEdges = remainingEdges
    .filter(edgeFilter, Materialized.<Edge<K>, EV, KeyValueStore<Bytes, byte[]>>as(generateStoreName()).withKeySerde(new KryoSerde<>()).withValueSerde(edgeValueSerde()));
  return new KGraph<>(filteredVertices, filteredEdges, serialized);
}

相关文章