使用套接字在flink中发送数据流;序列化问题

7vhp5slm  于 2021-06-25  发布在  Flink
关注(0)|答案(2)|浏览(297)

我想把数据流从虚拟机发送到主机,我正在使用这个方法 writeToSocket() 如下图所示:

joinedStreamEventDataStream.writeToSocket("192.168.1.10", 6998) ;

在这里 joinedStreamEventDataStream 属于类型 DataStream<Integer,Integer> .
有人能告诉我我应该如何将序列化程序传递给上述方法吗。
提前谢谢

jogvjijk

jogvjijk1#

这取决于您希望如何从套接字读取数据。如果您希望它是数据的字符串表示形式,则可以通过以下方式实现:

joinedStreamEventDataStream.map(new MapFunction<Type, String>() {
    @Override
    public String map(Type value) throws Exception {
        return value.toString();
    }
}).writeToSocket(hostname, port, new SimpleStringSchema());

如果要保留flink的序列化格式,则可以编写:

joinedStreamEventDataStream.writeToSocket(
    hostname, 
    port, 
    new TypeInformationSerializationSchema<>(
        joinedStreamEventDataStream.getType(), 
        env.getConfig()));

如果您想以自己的序列化格式输出它,那么您必须实现自己的序列化格式 SerializationSchema 正如亚历克斯所指出的。

t3irkdon

t3irkdon2#

这个 writeToSocket() 方法接受3个参数:套接字主机和端口,以及 SerializationSchema 用于序列化数据的接口。所以你的实现可能是这样的:

joinedStreamEventDataStream.writeToSocket(
    "192.168.1.10",  // host name
    6998,  // port
    new SerializationSchema<Integer>() {

        @Override
        public byte[] serialize(Integer element) {
            return ByteBuffer.allocate(4).putInt(element).array();
        }
    }
);

这是真的如果 joinedStreamEventDataStreamDataStream<Integer> 类型。

相关问题