当传递json数组对象时,kafka flapmapvalues会将记录分割成多个记录吗?

cu6pst1q  于 2021-06-07  发布在  Kafka
关注(0)|答案(1)|浏览(285)

我正在使用confluent 5.0.0版本*
我有一个json数组,如下所示:

  1. {
  2. "name" : "David,Corral,Babu",
  3. "age" : 23
  4. }

通过使用kafka流,我想根据“name”键的值中的逗号标准将上述记录拆分为两个。输出应该类似于:

  1. {
  2. "name" : "David",
  3. "age" : 23
  4. },
  5. {
  6. "name" : "Corral",
  7. "age" : 23
  8. },
  9. {
  10. "name" : "Babu",
  11. "age" : 23
  12. }

为此,我使用“flatmapvalues”。但到目前为止我还没有达到预期的效果。
但是想检查“flatmapvalues”是否是用于我的需求的正确函数?
我使用了以下代码:

  1. package test;
  2. import org.apache.kafka.common.serialization.Serde;
  3. import org.apache.kafka.common.serialization.Serdes;
  4. import org.apache.kafka.streams.*;
  5. import org.apache.kafka.streams.kstream.KStream;
  6. import org.apache.kafka.streams.kstream.ValueMapper;
  7. import org.apache.kafka.streams.kstream.KeyValueMapper;
  8. import org.apache.kafka.streams.kstream.Produced;
  9. import java.util.Arrays;
  10. import java.util.Properties;
  11. import java.util.concurrent.CountDownLatch;
  12. public class RecordSplliter {
  13. public static void main(String[] args) throws Exception {
  14. System.out.println("**STARTING RecordSplliter STREAM APP**");
  15. Properties props = new Properties();
  16. props.put(StreamsConfig.APPLICATION_ID_CONFIG, "json-e44nric2315her");
  17. props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
  18. props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
  19. props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, PersonSeder.class);
  20. final Serde<String> stringSerde = Serdes.String();
  21. final StreamsBuilder builder = new StreamsBuilder();
  22. // Consume JSON and enriches it
  23. KStream<String, Person> source = builder.stream("streams-plaintext-input");
  24. KStream<String, String> output = source
  25. .flatMapValues(person -> Arrays.asList(person.getName().split(",")));
  26. output.to("streams-output");
  27. final Topology topology = builder.build();
  28. final KafkaStreams streams = new KafkaStreams(topology, props);
  29. final CountDownLatch latch = new CountDownLatch(1);
  30. // Attach shutdown handler to catch control-c
  31. Runtime.getRuntime().addShutdownHook(new Thread("streams-shutdown-hook") {
  32. @Override
  33. public void run() {
  34. streams.close();
  35. latch.countDown();
  36. }
  37. });
  38. try {
  39. streams.start();
  40. latch.await();
  41. } catch (Throwable e) {
  42. System.exit(1);
  43. }
  44. System.exit(0);
  45. }
  46. }

在运行期间,出现以下异常:

  1. 08:31:10,822 ERROR
  2. org.apache.kafka.streams.processor.internals.AssignedStreamsTasks -
  3. stream-thread [json-enricher-0f8bc964-40c0-41f2-a724-dfa638923387-
  4. StreamThread-1] Failed to process stream task 0_0 due to the following
  5. error:
  6. org.apache.kafka.streams.errors.StreamsException: Exception caught in
  7. process. taskId=0_0, processor=KSTREAM-SOURCE-0000000000, topic=streams-
  8. plaintext-input, partition=0, offset=0
  9. at
  10. org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:304)
  11. at org.apache.kafka.streams.processor.internals.AssignedStreamsTasks.process(AssignedStreamsTasks.java:94)
  12. at org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:409)
  13. at org.apache.kafka.streams.processor.internals.StreamThread.processAndMaybeCommit(StreamThread.java:957)
  14. at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:832)
  15. at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:767)
  16. at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:736)
  17. Caused by: org.apache.kafka.streams.errors.StreamsException: A serializer (key: org.apache.kafka.common.serialization.StringSerializer / value: myapps.PersonSerializer) is not compatible to the actual key or value type (key type: unknown because key is null / value type: java.lang.String). Change the default Serdes in StreamConfig or provide correct Serdes via method parameters.
  18. at org.apache.kafka.streams.processor.internals.SinkNode.process(SinkNode.java:94)
  19. at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:143)
  20. at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:126)
  21. at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:90)
  22. at org.apache.kafka.streams.kstream.internals.KStreamFlatMapValues$KStreamFlatMapValuesProcessor.process(KStreamFlatMapValues.java:42)
  23. at org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:50)
  24. at org.apache.kafka.streams.processor.internals.ProcessorNode.runAndMeasureLatency(ProcessorNode.java:244)
  25. at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:133)
  26. at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:143)
  27. at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:126)
  28. at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:90)
  29. at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:87)
  30. at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:288)
  31. ... 6 more
  32. Caused by: java.lang.ClassCastException: java.lang.String cannot be cast to myapps.Person
  33. at myapps.PersonSerializer.serialize(PersonSerializer.java:1)
  34. at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:154)
  35. at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:98)
  36. at org.apache.kafka.streams.processor.internals.SinkNode.process(SinkNode.java:89)
  37. ... 18 more
  38. 08:31:10,827 INFO org.apache.kafka.streams.processor.internals.StreamThread - stream-thread [json-enricher-0f8bc964-40c0-41f2-a724-dfa638923387-StreamThread-1] State transition from RUNNING to PENDING_SHUTDOWN
  39. 08:31:10,827 INFO org.apache.kafka.streams.processor.internals.StreamThread - stream-thread [json-enricher-0f8bc964-40c0-41f2-a724-dfa638923387-StreamThread-1] Shutting down
  40. 08:31:10,833 INFO org.apache.kafka.clients.producer.KafkaProducer - [Producer clientId=json-enricher-0f8bc964-40c0-41f2-a724-dfa638923387-StreamThread-1-producer] Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms.
  41. 08:31:10,843 INFO org.apache.kafka.streams.processor.internals.StreamThread - stream-thread [json-enricher-0f8bc964-40c0-41f2-a724-dfa638923387-StreamThread-1] State transition from PENDING_SHUTDOWN to DEAD
  42. 08:31:10,843 INFO org.apache.kafka.streams.KafkaStreams - stream-client [json-enricher-0f8bc964-40c0-41f2-a724-dfa638923387] State transition from RUNNING to ERROR
  43. 08:31:10,843 WARN org.apache.kafka.streams.KafkaStreams - stream-client [json-enricher-0f8bc964-40c0-41f2-a724-dfa638923387] All stream threads have died. The instance will be in error state and should be closed.
  44. 08:31:10,843 INFO org.apache.kafka.streams.processor.internals.StreamThread - stream-thread [json-enricher-0f8bc964-40c0-41f2-a724-dfa638923387-StreamThread-1] Shutdown complete
  45. Exception in thread "json-enricher-0f8bc964-40c0-41f2-a724-dfa638923387-StreamThread-1" org.apache.kafka.streams.errors.StreamsException: Exception caught in process. taskId=0_0, processor=KSTREAM-SOURCE-0000000000, topic=streams-plaintext-input, partition=0, offset=0
  46. at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:304)
  47. at org.apache.kafka.streams.processor.internals.AssignedStreamsTasks.process(AssignedStreamsTasks.java:94)
  48. at org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:409)
  49. at org.apache.kafka.streams.processor.internals.StreamThread.processAndMaybeCommit(StreamThread.java:957)
  50. at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:832)
  51. at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:767)
  52. at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:736)
  53. Caused by: org.apache.kafka.streams.errors.StreamsException: A serializer (key: org.apache.kafka.common.serialization.StringSerializer / value: myapps.PersonSerializer) is not compatible to the actual key or value type (key type: unknown because key is null / value type: java.lang.String). Change the default Serdes in StreamConfig or provide correct Serdes via method parameters.
  54. at org.apache.kafka.streams.processor.internals.SinkNode.process(SinkNode.java:94)
  55. at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:143)
  56. at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:126)
  57. at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:90)
  58. at org.apache.kafka.streams.kstream.internals.KStreamFlatMapValues$KStreamFlatMapValuesProcessor.process(KStreamFlatMapValues.java:42)
  59. at org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:50)
  60. at org.apache.kafka.streams.processor.internals.ProcessorNode.runAndMeasureLatency(ProcessorNode.java:244)
  61. at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:133)
  62. at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:143)
  63. at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:126)
  64. at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:90)
  65. at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:87)
  66. at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:288)
  67. ... 6 more
  68. Caused by: java.lang.ClassCastException: java.lang.String cannot be cast to myapps.Person
  69. at myapps.PersonSerializer.serialize(PersonSerializer.java:1)
  70. at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:154)
  71. at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:98)
  72. at
  73. org.apache.kafka.streams.processor.internals.SinkNode.process(SinkNode.java:89)
  74. ... 18 more
sycxhyv7

sycxhyv71#

例外是因为你的 flatMapValues 类型产值 String . 在你的代码里你不能传递任何信息 ProducedKStream::to 函数,以便它尝试使用默认值(传入的属性),在您的示例中是 PersonSeder.class .
您的值属于 String ,但是 PersonSeder.class 用于序列化。
如果你想把它分开,你需要这样的东西

  1. KStream<String, Person> output = source
  2. .flatMapValues(person ->
  3. Arrays.stream(person.getName().split(","))
  4. .map(name -> new Person(name, person.getAge()))
  5. .collect(Collectors.toList()));

我在序列化程序和反序列化程序中使用了以下代码,它们是对称的(也使用gson),而且可以工作

  1. Properties props = new Properties();
  2. props.put(StreamsConfig.APPLICATION_ID_CONFIG, "app1");
  3. props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
  4. props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
  5. props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, PersonSerdes.class);
  6. final StreamsBuilder builder = new StreamsBuilder();
  7. KStream<String, Person> source = builder.stream("input");
  8. KStream<String, Person> output = source
  9. .flatMapValues(person ->
  10. Arrays.stream(person.getName()
  11. .split(","))
  12. .map(name -> new Person(name, person.getAge()))
  13. .collect(Collectors.toList()));
  14. output.to("output");
  15. KafkaStreams streams = new KafkaStreams(builder.build(), props);
  16. streams.start();
  17. Runtime.getRuntime().addShutdownHook(new Thread(streams::close));

更新1:
根据您关于使用json代替pojo的问题,一切都取决于您的sede。如果您使用泛型serdes,您可以序列化和反序列化json(map)
下面是简单的mapserdes,它可以用于那个和示例代码的用法。

  1. import com.google.gson.Gson;
  2. import com.google.gson.reflect.TypeToken;
  3. import org.apache.kafka.common.serialization.Deserializer;
  4. import org.apache.kafka.common.serialization.Serde;
  5. import org.apache.kafka.common.serialization.Serializer;
  6. import java.lang.reflect.Type;
  7. import java.nio.charset.Charset;
  8. import java.util.Map;
  9. public class MapSerdes implements Serde<Map<String, String>> {
  10. private static final Charset CHARSET = Charset.forName("UTF-8");
  11. @Override
  12. public void configure(Map<String, ?> configs, boolean isKey) {}
  13. @Override
  14. public void close() {}
  15. @Override
  16. public Serializer<Map<String, String>> serializer() {
  17. return new Serializer<Map<String, String>>() {
  18. private Gson gson = new Gson();
  19. @Override
  20. public void configure(Map<String, ?> configs, boolean isKey) {}
  21. @Override
  22. public byte[] serialize(String topic, Map<String, String> data) {
  23. String line = gson.toJson(data); // Return the bytes from the String 'line'
  24. return line.getBytes(CHARSET);
  25. }
  26. @Override
  27. public void close() {}
  28. };
  29. }
  30. @Override
  31. public Deserializer<Map<String, String>> deserializer() {
  32. return new Deserializer<Map<String, String>>() {
  33. private Type type = new TypeToken<Map<String, String>>(){}.getType();
  34. private Gson gson = new Gson();
  35. @Override
  36. public void configure(Map<String, ?> configs, boolean isKey) {}
  37. @Override
  38. public Map<String, String> deserialize(String topic, byte[] data) {
  39. Map<String,String> result = gson.fromJson(new String(data), type);
  40. return result;
  41. }
  42. @Override
  43. public void close() {}
  44. };
  45. }
  46. }

示例用法:取而代之的是名称,取决于您的Map,您可以使用不同的属性。

  1. public class GenericJsonSplitterApp {
  2. public static void main(String[] args) {
  3. Properties props = new Properties();
  4. props.put(StreamsConfig.APPLICATION_ID_CONFIG, "app1");
  5. props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
  6. props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
  7. props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, MapSerdes.class);
  8. final StreamsBuilder builder = new StreamsBuilder();
  9. KStream<String, Map<String, String>> source = builder.stream("input");
  10. KStream<String, Map<String, String>> output = source
  11. .flatMapValues(map ->
  12. Arrays.stream(map.get("name")
  13. .split(","))
  14. .map(name -> {
  15. HashMap<String, String> splittedMap = new HashMap<>(map);
  16. splittedMap.put("name", name);
  17. return splittedMap;
  18. })
  19. .collect(Collectors.toList()));
  20. output.to("output");
  21. KafkaStreams streams = new KafkaStreams(builder.build(), props);
  22. streams.start();
  23. Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
  24. }
  25. }
展开查看全部

相关问题