flink:从kafka获取byte[]数据

1szpjjfi  于 2021-06-08  发布在  Kafka
关注(0)|答案(1)|浏览(980)

我使用flink-1.0-snapshot来使用Kafka的数据。数据以snappy压缩字节[]的形式传入,该字节被传递给thrift供以后使用。
当我使用flink来检索数据时,它会被破坏或者处理不当,以至于无法解压缩。代码源自此示例,如下所示:

DataStream<String> messageStream = env.addSource(new FlinkKafkaConsumer081<>(parameterTool.getRequired("topic"), new SimpleStringSchema(), parameterTool.getProperties()));

messageStream.rebalance().map(new MapFunction<String, String>() {

    @Override public String map(String value) throws Exception {
    boolean bvalid = Snappy.isValidCompressedBuffer(value.getBytes());
 });

isvalidcompressedbuffer每次都返回false。
当通过其他途径消费时,数据是好的。
我错过了什么?

解决方案:

我张贴这个,因为我找不到任何使用的例子 RawSchema .

public static void main(String[] args) throws Exception {
    // create execution environment
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

    // parse user parameters
    ParameterTool parameterTool = ParameterTool.fromArgs(args);

    DataStream<byte[]> dataStream = env.addSource(new FlinkKafkaConsumer081<>(parameterTool.getRequired("topic"), new RawSchema(), parameterTool.getProperties()));

    dataStream.map(new MapFunction<byte[], Object>() {
        @Override
        public Object map(byte[] bytes) throws Exception {
            boolean bvali = Snappy.isValidCompressedBuffer(bytes);

            });
            return 0;
        }
    }).print();
    env.execute();
}
t9eec4r0

t9eec4r01#

将字节消息作为字符串读取是不正确的。应按原样读取字节,然后解压缩:

public Object map(byte[] bytes) throws Exception {
    boolean bvalid = Snappy.isValidCompressedBuffer(bytes);
    ...

相关问题