Go语言 使用Kafka Exporter和OpenTelemetry推送指标

0kjbasz6  于 2023-10-14  发布在  Go
关注(0)|答案(1)|浏览(160)

我正在尝试收集一些指标并使用OpenTelemetry将其导出到Kafka。对于一个简单的POC,我有一个OpenSSL格式的本地文件,我希望解析它并将指标推送到Kafka上。
我知道我应该使用kafkaexporter,但我无法理解端到端流程是什么样子的。一个代码片段会很有帮助。
我正在使用以下代码创建Kafka导出器:

  1. func newExporter(ctx context.Context) (exporter.Metrics, error) {
  2. f := kafkaexporter.NewFactory(kafkaexporter.WithMetricsMarshalers())
  3. cfg := f.CreateDefaultConfig()
  4. ts := component.TelemetrySettings{
  5. Logger: logger,
  6. MeterProvider: meterProvider(),
  7. MetricsLevel: configtelemetry.LevelNormal,
  8. }
  9. cs := exporter.CreateSettings{
  10. ID: component.NewID("metrics"),
  11. TelemetrySettings: ts,
  12. BuildInfo: component.NewDefaultBuildInfo(),
  13. }
  14. return f.CreateMetricsExporter(ctx, cs, cfg)
  15. }
  16. func meterProvider() *metric.MeterProvider {
  17. return metric.NewMeterProvider()
  18. }

现在假设我有几个计数器/计量表,我如何使用上述导出器推它?

yyyllmsg

yyyllmsg1#

我能够运行端到端流程,从下游依赖项收集一些指标,解析它并推送到Kafka。
下面是代码片段:

  1. func collectMetrics() (pmetric.ResourceMetrics, error) {
  2. // collect and return metrics
  3. }
  4. func CollectMetrics(ctx context.Context) error {
  5. me, err := newExporter(ctx)
  6. if err != nil {
  7. return err
  8. }
  9. err = me.Start(ctx, nil)
  10. if err != nil {
  11. return err
  12. }
  13. defer me.Shutdown(ctx)
  14. ms := pmetric.NewMetrics()
  15. rm, err := collectMetrics()
  16. r := ms.ResourceMetrics().AppendEmpty()
  17. rm.CopyTo(r)
  18. err = me.ConsumeMetrics(ctx, ms)
  19. if err != nil {
  20. return err
  21. }
  22. return nil
  23. }

可以在here中找到完整的POC。

展开查看全部

相关问题