Kibana Logstash管道推送数据而不将其转换为Elasticsearch

ego6inou  于 2022-12-09  发布在  Kibana
关注(0)|答案(1)|浏览(161)

我使用的是一个带有ELK堆栈(Elastic,Logstash,Kibana)的docker-compose文件。docker-compose.yml文件非常简单:

version: '3.8'
services: 
  elasticsearch:
    image: docker.elastic.co/elasticsearch/elasticsearch:7.3.2
    ports:
      - 9300:9300
      - 9200:9200
    environment:
      - http.cors.enabled=true
      - http.cors.allow-origin=*
      - http.cors.allow-methods=OPTIONS,HEAD,GET,POST,PUT,DELETE
      - http.cors.allow-headers=X-Requested-With,X-Auth-Token,Content-Type,Content-Length,Authorization
      - transport.host=127.0.0.1
      - cluster.name=docker-cluster
      - discovery.type=single-node
      - "ES_JAVA_OPTS=-Xms512m -Xmx512m"
    volumes:
      - elasticsearch_data:/usr/share/elasticsearch/data
    networks:
      - share-network
  kibana:
    image: docker.elastic.co/kibana/kibana:7.3.2
    ports:
      - 5601:5601
    networks:
      - share-network
    depends_on:
      - elasticsearch
  logstash:
    build: 
      dockerfile: Dockerfile
      context: .
    env_file:
      - .local.env
    volumes: 
      - ./pipelines/provider_scores.conf:/usr/share/logstash/pipeline/logstash.conf
    ports:
      - 9600:9600
      - 5044:5044
    networks:
      - share-network
    depends_on:
      - elasticsearch
      - kibana
volumes:
  elasticsearch_data:
networks:
  share-network:

Logstash服务中的Dockerfile只是为了将一些插件安装到Docker中的Logstash映像:

FROM docker.elastic.co/logstash/logstash:7.3.2

# install dependency
RUN /usr/share/logstash/bin/logstash-plugin install logstash-input-jdbc
RUN /usr/share/logstash/bin/logstash-plugin install logstash-filter-aggregate
RUN /usr/share/logstash/bin/logstash-plugin install logstash-filter-jdbc_streaming
RUN /usr/share/logstash/bin/logstash-plugin install logstash-filter-mutate

# copy lib database jdbc jars
COPY ./drivers/mysql/mysql-connector-java-8.0.11.jar /usr/share/logstash/logstash-core/lib/jars/mysql-connector-java.jar
COPY ./drivers/sql-server/mssql-jdbc-7.4.1.jre11.jar /usr/share/logstash/logstash-core/lib/jars/mssql-jdbc.jar
COPY ./drivers/oracle/ojdbc6-11.2.0.4.jar /usr/share/logstash/logstash-core/lib/jars/ojdbc6.jar
COPY ./drivers/postgres/postgresql-42.2.8.jar /usr/share/logstash/logstash-core/lib/jars/postgresql.jar

provider_scores.conf文件如下所示:

input {
    jdbc {
        jdbc_driver_library => "${LOGSTASH_JDBC_DRIVER_JAR_LOCATION}"
        jdbc_driver_class => "com.microsoft.sqlserver.jdbc.SQLServerDriver"
        jdbc_connection_string => "jdbc:sqlserver://${DbServer};database=${DataDbName}"
        jdbc_user => "${DataUserName}"
        jdbc_password => "${DataPassword}"
        schedule => "${CronSchedule_Metrics}"
        statement => "
            select pws.ProviderID,
                pws.SpeedScore,
                pws.QualityScore
            from ProviderWeightedOverallScore pws
            order by pws.ProviderID
            "
    }
}
filter {
    aggregate {
        task_id => "%{ProviderID}"
        code => "
             map['providerid'] ||= event.get('ProviderID')
             map['kpi'] ||= []
             map['kpi'] << {
                'speedscore' => event.get('SpeedScore'),
                'qualityscore' => event.get('QualityScore')
                }
             event.cancel()
        "      
        push_previous_map_as_event => true
        timeout => 3
    }
}
output {
    elasticsearch {
        hosts => ["${LOGSTASH_ELASTICSEARCH_HOST}"]
        document_id => "%{providerid}"
        index => "testing-%{+YYYY.MM.dd.HH.mm.ss}"
        action => "update"
        doc_as_upsert => true
    }
    stdout {  }

}

这是我的docker配置。一切运行正常,唯一的问题是filter->aggregate部分不工作,弹性索引被直接数据填充,没有发生转换。
筛选器部分没有转换数据的原因有什么线索吗?

ix0qys7i

ix0qys7i1#

最常见的原因是Logstash管道由多个工作线程处理,具体取决于您拥有的CPU数量。默认情况下,pipeline.workers setting设置为主机上可用的CPU数量。
为了正常工作,aggregate筛选器必须使用单个辅助线程运行,否则事件可能不会都通过同一个辅助线程。
因此,应确保设置pipeline.workers: 1或确保将环境变量PIPELINE_WORKERS设置为1

相关问题