向flink作业添加跟踪和span id

lo8azlld  于 2021-06-04  发布在  Kafka
关注(0)|答案(2)|浏览(505)

我需要向集群中运行的flink作业添加track和span id,请求流如下所示
用户-->rest api->kafka-topic-1-->flinkjob-1-->kafka-topic-2-->flinkjob-2-->使用者-->数据库
我使用SpringBoot创建RESTAPI,并使用SpringSleuth向生成的日志添加track和span id,track和span id是在调用restapi和将消息放在kakfa-topic-1上时添加的,但是我不知道如何在使用flinkjob-1和flinkjob-2上的消息时添加track和span id,因为它们不在spring上下文中。
一种方法是将track和span id设置为kafka消息头,让kafka消费者/生产者拦截器提取并记录track和span id,我尝试了这个方法,但是我的拦截器没有被调用,因为flink api使用的是flink版本的kafka客户机。
无法调用我的自定义kafkadeserializationschema

public class MyDeserializationSchema implements KafkaDeserializationSchema<String> {

private static final Logger LOGGER = LoggerFactory.getLogger(MyDeserializationSchema.class);

@Override
public TypeInformation<String> getProducedType() {
    System.out.println("**************Invoked 1");
    LOGGER.debug("**************Invoked 1");
    return null;
}

@Override
public boolean isEndOfStream(String nextElement) {
    System.out.println("**************Invoked 2");
    LOGGER.debug("**************Invoked 2");
    return true;
}

@Override
public String deserialize(ConsumerRecord<byte[], byte[]> record) throws Exception {
    System.out.println("**************Invoked 3");
    LOGGER.debug("**************Invoked 3");
    return record.toString();
}

 }

有没有人能建议我如何做到这一点。

uajslkp6

uajslkp61#

您在这里使用的是一个简单的字符串,在序列化字节到字符串时,可以执行如下代码所示的操作。

public class MyDeserializationSchema  implements KafkaDeserializationSchema<String> {
    @Override
    public boolean isEndOfStream(String nextElement) {
        return false;
    }

    @Override
    public String deserialize(ConsumerRecord<byte[], byte[]> record) throws Exception {
        return new String(record.value(), "UTF-8");
    }

    @Override
    public TypeInformation<String> getProducedType() {
        return BasicTypeInfo.STRING_TYPE_INFO;
    }
  }
4xy9mtcn

4xy9mtcn2#

也可以使用kafkadeserializationschema来获取头
为了访问kafka消息的键、值和元数据,kafkadeserializationschema具有以下反序列化方法t deserialize(consumerrecord record)。

public class Bla implements KafkaDeserializationSchema {
    @Override
    public boolean isEndOfStream(Object dcEvents) {
        return false;
    }

    @Override
    public Object deserialize(ConsumerRecord consumerRecord) throws Exception {
        return null;
    }

    @Override
    public TypeInformation<DCEvents> getProducedType() {
        return null;
    }

相关问题