通过REST API(kibana或logtsash)创建/更新logstash管道,并将管道配置(*.conf文件的内容)作为主体参数传递

l3zydbqr  于 2022-12-09  发布在  Kibana
关注(0)|答案(1)|浏览(222)

编写和部署logstash管道的最常用方法是创建一个my_pipeline.conf文件,并像下面这样运行它

bin/logstash -f conf/my_pipeline.conf

Elastic提供了一个由API组成的替代方案:
日志存储PUT API

PUT _logstash/pipeline/my_pipeline
{
  "description": "Sample pipeline for illustration purposes",
  "last_modified": "2021-01-02T02:50:51.250Z",
  "pipeline_metadata": {
    "type": "logstash_pipeline",
    "version": "1"
  },
  "username": "elastic",
  "pipeline": "input {}\n filter { grok {} }\n output {}",
  "pipeline_settings": {
    "pipeline.workers": 1,
    "pipeline.batch.size": 125,
    "pipeline.batch.delay": 50,
    "queue.type": "memory",
    "queue.max_bytes.number": 1,
    "queue.max_bytes.units": "gb",
    "queue.checkpoint.writes": 1024
  }
}

以及同样插入logstah管道的kibana API
kibana api
PUT <kibana host>:<port>/api/logstash/pipeline/<id>

$ curl -X PUT api/logstash/pipeline/hello-world
{
  "pipeline": "input { stdin {} } output { stdout {} }",
  "settings": {
    "queue.type": "persisted"
  }
}

正如您在这两个API中所看到的,logstash"pipeline.conf"文件的内容包含在HTTP调用json主体的**“pipeline”键**中。
基本上我有几十个 *.conf管道文件,我想避免开发复杂的代码来解析它们,重新格式化其内容与空格字符的新行,回车...

我的问题是:您是否知道一种“简单”的方法,可以在HTTP调用的主体中提供此“pipeline”参数,并且对原始.conf文件进行尽可能少的格式转换?

为了 * 说明 * 这个格式化操作可能有多复杂,我提供了一个terraform提供程序在后台如何从一个简单的管道“.conf”文件 * 生成正确的预期格式的示例。下面是文件logs_alerts_pubsub.conf的原始内容:

input {
      google_pubsub {
        project_id => "pj-becfr-monitoring-mgmt"
        topic => "f7_monitoring_topic_${environment}_alerting_eck"
        subscription => "f7_monitoring_subscription_${environment}_alerting_eck"
        json_key_file => "/usr/share/logstash/config/logstash-sa.json"
        codec => "json"
      }
    }
filter {
  mutate {
    add_field => { "application_code" => "a-alerting-eck"
    "leanix_id" => "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx"
    "workfront_id" => "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"
    }
  }
}
output {
  elasticsearch {
    index => "alerts-%%{+yyyy.MM.dd}"
    hosts => [ "${url}" ]
    user => "elastic"
    ssl => true
    ssl_certificate_verification => false
    password => "${pwd}"
    cacert => "/etc/logstash/certificates/ca.crt"
  }
}

下面是terraform代码:

locals {
  pipeline_list = fileset(path.root, "./modules/elasticsearch_logstash_pipeline/*.conf")
  splitpipepath = split("/", var.pipeline)
  pipename      = element(local.splitpipepath, length(local.splitpipepath) - 1)
  pipename_ex   = split(".", local.pipename)[0]
  category      = split("_", local.pipename_ex)[1]
}

resource "kibana_logstash_pipeline" "newpipeline" {
  for_each = local.pipeline_list
  name        = "tf-${local.category}-${var.environment}-${local.pipename_ex}"
  description = "Logstash Pipeline through Kibana from file"
  pipeline    = templatefile(var.pipeline, { environment = var.environment, url = var.elastic_url, pwd = var.elastic_password })
  settings = {
    "queue.type" = "persisted"
  }
}

下面您可以看到tf.state文件的内容(重点放在**“pipeline”键**上):

{
      "module": "module.elasticsearch_logstash_pipeline[\"modules/elasticsearch_logstash_pipeline/logs_alerts_pubsub.conf\"]",
      "mode": "managed",
      "type": "kibana_logstash_pipeline",
      "name": "newpipeline",
      "provider": "provider[\"registry.terraform.io/disaster37/kibana\"]",
      "instances": [
        {
          "schema_version": 0,
          "attributes": {
            "description": "Logstash Pipeline through Kibana from file",
            "id": "tf-alerts-dev-logs_alerts_pubsub",
            "name": "tf-alerts-dev-logs_alerts_pubsub",
            "pipeline": "input {\n      google_pubsub {\n        project_id =\u003e \"pj-becfr-monitoring-mgmt\"\n        topic =\u003e \"f7_monitoring_topic_dev_alerting_eck\"\n        subscription =\u003e \"f7_monitoring_subscription_dev_alerting_eck\"\n        json_key_file =\u003e \"/usr/share/logstash/config/logstash-sa.json\"\n        codec =\u003e \"json\"\n      }\n    }\nfilter {\n  mutate {\n    add_field =\u003e { \"application_code\" =\u003e \"a-alerting-eck\"\n    \"leanix_id\" =\u003e \"xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx\"\n    \"workfront_id\" =\u003e \"xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx\"\n    }\n  }\n}\noutput {\n  elasticsearch {\n    index =\u003e \"alerts-gcp\"\n    hosts =\u003e [ \"https://35.187.29.254:9200\" ]\n    user =\u003e \"elastic\"\n    ssl =\u003e true\n    ssl_certificate_verification =\u003e false\n    password =\u003e \"HIDDEN\"\n    cacert =\u003e \"/etc/logstash/certificates/ca.crt\"\n  }\n}",
            "settings": {
              "queue.type": "persisted"
            },
            "username": "elastic"
          },
          "sensitive_attributes": [
            [
              {
                "type": "get_attr",
                "value": "pipeline"
              }
            ]
          ],
          "private": "bnVsbA=="
        }
      ]
    }

如果您对bash或任何语言中的简单命令有任何想法,我可以在其中执行转储/加载或编码/解码或任何简单的正则表达式,尽可能通用,这将是有帮助的(仅供参考,在此特定上下文中,我不能使用terraform)

yv5phkfx

yv5phkfx1#

我找到了一种方法来替换<pipeline>.conf文件中的变量,以及一种方法来正确地将该文件的内容格式化为json字符串。为了从头开始,下面是logstash管道文件logs_alerts_pubsub.conf的内容:

input {
      google_pubsub {
        project_id => "pj-becfr-monitoring-mgmt"
        topic => "f7_monitoring_topic_${environment}_alerting_eck"
        subscription => "f7_monitoring_subscription_${environment}_alerting_eck"
        json_key_file => "/usr/share/logstash/config/logstash-sa.json"
        codec => "json"
      }
    }
filter {
  mutate {
    add_field => { "application_code" => "a-alerting-eck"
    "leanix_id" => "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx"
    "workfront_id" => "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"
    }
  }
}
output {
  elasticsearch {
    index => "alerts-%%{+yyyy.MM.dd}"
    hosts => [ "${url}" ]
    user => "elastic"
    ssl => true
    ssl_certificate_verification => false
    password => "${pwd}"
    cacert => "/etc/logstash/certificates/ca.crt"
  }
}

现在将变量替换为它们的值:
导出网址= google.com导出密码=HjkTdddddss导出环境=dev
envsubst < logs_alerts_pubsub.conf

input {
      google_pubsub {
        project_id => "pj-becfr-monitoring-mgmt"
        topic => "f7_monitoring_topic_dev_alerting_eck"
        subscription => "f7_monitoring_subscription_dev_alerting_eck"
        json_key_file => "/usr/share/logstash/config/logstash-sa.json"
        codec => "json"
      }
    }
filter {
  mutate {
    add_field => { "application_code" => "a-alerting-eck"
    "leanix_id" => "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx"
    "workfront_id" => "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"
    }
  }
}
output {
  elasticsearch {
    index => "alerts-%%{+yyyy.MM.dd}"
    hosts => [ "google.com" ]
    user => "elastic"
    ssl => true
    ssl_certificate_verification => false
    password => "HjkTdddddss"
    cacert => "/etc/logstash/certificates/ca.crt"
  }
}

现在将管道文件的格式设置为json字符串:jq -c -Rs "." <(envsubst < logs_alerts_pubsub.conf)

"input {\n      google_pubsub {\n        project_id => \"pj-becfr-monitoring-mgmt\"\n        topic => \"f7_monitoring_topic_dev_alerting_eck\"\n        subscription => \"f7_monitoring_subscription_dev_alerting_eck\"\n        json_key_file => \"/usr/share/logstash/config/logstash-sa.json\"\n        codec => \"json\"\n      }\n    }\nfilter {\n  mutate {\n    add_field => { \"application_code\" => \"a-alerting-eck\"\n    \"leanix_id\" => \"xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx\"\n    \"workfront_id\" => \"xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx\"\n    }\n  }\n}\noutput {\n  elasticsearch {\n    index => \"alerts-%%{+yyyy.MM.dd}\"\n    hosts => [ \"google.com\" ]\n    user => \"elastic\"\n    ssl => true\n    ssl_certificate_verification => false\n    password => \"HjkTdddddss\"\n    cacert => \"/etc/logstash/certificates/ca.crt\"\n  }\n}"

相关问题