我的spark应用程序从kafka接收二进制数据。数据以字节原始信息的形式发送给Kafka。原始信息是:
message Any {
string type_url=1;
bytes value=2;
}
使用scalapb库,我可以将任意消息反序列化为其原始格式。如何将值从字节反序列化为可读格式?serializationutils不起作用。这就是反序列化后any消息的外观。
# +-----------------------------------------|
# | type_url | value |
# +-----------------------------------------|
# |type.googleapis.c...|[0A 8D D8 04 0A 1...|
# +-----------------------------------------|
值仍为字节格式。使用serializationutils反序列化后,数据不正确。
# +-----------------+
# |value |
# +-----------------+
# |2020-09-04T10:...|
# +-----------------+
还有别的选择吗?有没有办法将字节反序列化为字符串、结构或字符串json?
我使用带有udf的scalapbs示例将字节反序列化到any消息中。
val parseCloud = ProtoSQL.udf { bytes: Array[Byte] => CloudEvent.parseFrom(bytes) }
字节值带有serializationutils的udf如下所示。
val parseBytes = ProtoSQL.udf {bytes: Array[Byte] => deserialize(bytes)}
1条答案
按热度按时间bz4sfanl1#
如果您知道
Any
,可以使用unpack方法反序列化。