我有传入的Kafka数据,看起来像("field1", "field2")
。所以,现在我试图通过Pyflink 1.17.1
使用custom deserializer
解析此输入,它看起来像下面通过引用此链接:
from pyflink.common import DeserializationSchema, Types, TypeInformation
from model.exceptions import SystemException
class StringToTupleDeserializationSchema(DeserializationSchema):
def __init__(self):
super().__init__()
def deserialize(self, message):
parts = message.split(',')
try:
if len(parts) == 2:
return (parts[0], parts[1])
except Exception as e:
raise SystemException(e)
def get_produced_type(self):
return TypeInformation.of((Types.STRING(), Types.STRING()))
字符串
现在,我将这个Class作为输入传递给KafkaSource,而不是SimpleStringSchema()
,如下所示:
source = KafkaSource.builder() \
.set_bootstrap_servers("localhost:9092") \
.set_topics("test-topic1") \
.set_group_id("my-group") \
.set_starting_offsets(KafkaOffsetsInitializer.latest()) \
.set_value_only_deserializer(StringToTupleDeserializationSchema()) \
.build()
ds = self.env.from_source(source, WatermarkStrategy.no_watermarks(), "Kafka Source")
型
但这会引发一个错误:
py4j.protocol.Py4JJavaError: An error occurred while calling o25.fromSource.
: java.lang.NullPointerException
at org.apache.flink.connector.kafka.source.reader.deserializer.KafkaValueOnlyDeserializationSchemaWrapper.getProducedType(KafkaValueOnlyDeserializationSchemaWrapper.java:56)
at org.apache.flink.connector.kafka.source.KafkaSource.getProducedType(KafkaSource.java:216)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getTypeInfo(StreamExecutionEnvironment.java:2643)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.fromSource(StreamExecutionEnvironment.java:2015)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
at org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
at org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.base/java.lang.Thread.run(Thread.java:834)
型
我可以使用string.strip
和string.split
获取想要的元素,但这不是访问元素的有效方法。
我错过了什么?
Tia
1条答案
按热度按时间sxpgvts31#
Kafka消息不是字符串,而是字节,它们必须首先转换为字符串?
当您检查
SimpleStringSchema
或JsonRowDeserializationSchema
时,它们利用了相关的JAVA类,我猜这些类在内部将字节转换为字符串。我建议使用
SimpleStringSchema
将值格式化,并应用map函数将其拆分为元组。的数据