CustomerBuilder无法在Pyflink中接受字符串作为元组

jdg4fx2g  于 2023-11-15  发布在  Apache
关注(0)|答案(1)|浏览(137)

我有传入的Kafka数据,看起来像("field1", "field2")。所以,现在我试图通过Pyflink 1.17.1使用custom deserializer解析此输入,它看起来像下面通过引用此链接:

  1. from pyflink.common import DeserializationSchema, Types, TypeInformation
  2. from model.exceptions import SystemException
  3. class StringToTupleDeserializationSchema(DeserializationSchema):
  4. def __init__(self):
  5. super().__init__()
  6. def deserialize(self, message):
  7. parts = message.split(',')
  8. try:
  9. if len(parts) == 2:
  10. return (parts[0], parts[1])
  11. except Exception as e:
  12. raise SystemException(e)
  13. def get_produced_type(self):
  14. return TypeInformation.of((Types.STRING(), Types.STRING()))

字符串
现在,我将这个Class作为输入传递给KafkaSource,而不是SimpleStringSchema(),如下所示:

  1. source = KafkaSource.builder() \
  2. .set_bootstrap_servers("localhost:9092") \
  3. .set_topics("test-topic1") \
  4. .set_group_id("my-group") \
  5. .set_starting_offsets(KafkaOffsetsInitializer.latest()) \
  6. .set_value_only_deserializer(StringToTupleDeserializationSchema()) \
  7. .build()
  8. ds = self.env.from_source(source, WatermarkStrategy.no_watermarks(), "Kafka Source")


但这会引发一个错误

  1. py4j.protocol.Py4JJavaError: An error occurred while calling o25.fromSource.
  2. : java.lang.NullPointerException
  3. at org.apache.flink.connector.kafka.source.reader.deserializer.KafkaValueOnlyDeserializationSchemaWrapper.getProducedType(KafkaValueOnlyDeserializationSchemaWrapper.java:56)
  4. at org.apache.flink.connector.kafka.source.KafkaSource.getProducedType(KafkaSource.java:216)
  5. at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getTypeInfo(StreamExecutionEnvironment.java:2643)
  6. at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.fromSource(StreamExecutionEnvironment.java:2015)
  7. at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
  8. at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
  9. at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
  10. at java.base/java.lang.reflect.Method.invoke(Method.java:566)
  11. at org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
  12. at org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
  13. at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
  14. at org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
  15. at org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
  16. at org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
  17. at java.base/java.lang.Thread.run(Thread.java:834)


我可以使用string.stripstring.split获取想要的元素,但这不是访问元素的有效方法。
我错过了什么?
Tia

sxpgvts3

sxpgvts31#

Kafka消息不是字符串,而是字节,它们必须首先转换为字符串?
当您检查SimpleStringSchemaJsonRowDeserializationSchema时,它们利用了相关的JAVA类,我猜这些类在内部将字节转换为字符串。
我建议使用SimpleStringSchema将值格式化,并应用map函数将其拆分为元组。


的数据

相关问题