我有一些flink作业,它使用kafka作为源和汇,我想对它添加跟踪,这样就可以很好地跟踪从kafka消费/产生/到kafka的任何消息,因为我使用kafka拦截器来截取消息并记录跟踪、跨度和父跟踪ID,为此,我将opentracing kafka客户机(v0.1.11)与brave opentracing(v0.35.1)结合使用,这就是我之所以使用自定义拦截器的原因,因为我需要以指定的格式记录消息。
在配置拦截器之后,它们被调用,它使用来自上游系统的跟踪信息(来自报头)并记录它,但是当它再次生成消息给kafka时,跟踪上下文就丢失了,例如考虑下面的场景
1) 某个rest服务放在kafka上的消息2)flink job和拦截器消耗的消息启动并使用头中的跟踪信息并记录下来3)经过处理后的消息由flink job产生给kafka
它在第2步之前一直工作得很好,但在生成消息时,不会使用上一步的跟踪信息,因为它没有任何头信息,因此会生成全新的跟踪。
我注册tracer如下:-
public class MyTracer {
private static final Tracer INSTANCE = BraveTracer.create(Tracing.newBuilder().build());
public static void registerTracer() {
GlobalTracer.registerIfAbsent(INSTANCE);
}
public static Tracer getTracer() {
return INSTANCE;
}
}
我用的是 TracingConsumerInterceptor
以及 TracingProducerInterceptor
来自opentracing kafka。
暂无答案!
目前还没有任何答案,快来回答吧!