从Kafka主题到webhdfs,我们有如下的logstash管道
输入=Kafka主题
输出=webhdfs
我已经从源代码(logstash-6.2.4.tar.gz)中提取并进行了测试
我们的hadoop集群是启用kerberos的集群,我在执行管道时遇到以下错误。
[user@hostname]$ bin/logstash -f /datan1/logstash-6.2.4/bin/pipeline.conf
Sending Logstash's logs to /datan1/logstash-6.2.4/logs which is now configured via log4j2.properties
[2018-05-04T15:04:06,566][INFO ][logstash.modules.scaffold] Initializing module {:module_name=>"netflow",
:directory=>"/datan1/logstash-6.2.4/modules/netflow/configuration"}
[2018-05-04T15:04:06,607][INFO ][logstash.modules.scaffold] Initializing module {:module_name=>"fb_apache",
:directory=>"/datan1/logstash-6.2.4/modules/fb_apache/configuration"}
[2018-05-04T15:04:07,880][WARN ][logstash.config.source.multilocal] Ignoring the 'pipelines.yml' file
because modules or command line options are specified
[2018-05-04T15:04:09,193][INFO ][logstash.runner ] Starting Logstash {"logstash.version"=>"6.2.4"}
[2018-05-04T15:04:10,034][INFO ][logstash.agent ] Successfully started Logstash API endpoint {:port=>9600}
[2018-05-04T15:04:15,906][INFO ][logstash.pipeline ] Starting pipeline {:pipeline_id=>"main", "pipeline.workers"=>32,
"pipeline.batch.size"=>125, "pipeline.batch.delay"=>50}
[2018-05-04T15:04:19,373][ERROR][logstash.pipeline ] Error registering plugin {:pipeline_id=>"main",
:plugin=>"#<LogStash::OutputDelegator:0x100380a9
@namespaced_metric=#<LogStash::Instrument::NamespacedMetric:0x5ba440f8
@metric=#<LogStash::Instrument::Metric:0x2affe8ea
@collector=#<LogStash::Instrument::Collector:0x4954cc3d @agent=nil,
@metric_store=#<LogStash::Instrument::MetricStore:0x45de0b76
@store=#<Concurrent::Map:0x00000000000fac entries=4 default_proc=nil>,
@structured_lookup_mutex=#<Mutex:0x41868944>,
@fast_lookup=#<Concurrent::Map:0x00000000000fb0 entries=63
default_proc=nil>>>>, @namespace_name=[:stats, :pipelines, :main,
:plugins, :outputs,
:d4fc8a80f489c5060bccfb1317f8c420c21a88b6fd6135075b8f7131c356cf29]>,
@metric=#<LogStash::Instrument::NamespacedMetric:0x5c5513c0
@metric=#<LogStash::Instrument::Metric:0x2affe8ea
@collector=#<LogStash::Instrument::Collector:0x4954cc3d @agent=nil,
@metric_store=#<LogStash::Instrument::MetricStore:0x45de0b76
@store=#<Concurrent::Map:0x00000000000fac entries=4 default_proc=nil>,
@structured_lookup_mutex=#<Mutex:0x41868944>,
@fast_lookup=#<Concurrent::Map:0x00000000000fb0 entries=63
default_proc=nil>>>>, @namespace_name=[:stats, :pipelines, :main,
:plugins, :outputs]>,
@out_counter=org.jruby.proxy.org.logstash.instrument.metrics.counter.LongCounter$Proxy2
- name: out value:0, @strategy=#<LogStash::OutputDelegatorStrategies::Legacy:0x636843ca
@worker_count=1, @workers=[<LogStash::Outputs::WebHdfs
host=>\"xxx.xx.xx.xx\", port=>50070,
path=>\"/user/logstash/ocs_cdr_data/dt=%{+YYYY-MM-dd}/ocs_cdr_data-%{+HH}.log\",
user=>\"user\", use_kerberos_auth=>true,
kerberos_keytab=>\"/home/user/user.keytab\",
id=>\"d4fc8a80f489c5060bccfb1317f8c420c21a88b6fd6135075b8f7131c356cf29\", enable_metric=>true, codec=><LogStash::Codecs::Line
id=>\"line_80e1b3a5-9a38-4674-85cc-c7e519b32c2e\",
enable_metric=>true, charset=>\"UTF-8\", delimiter=>\"\\n\">,
workers=>1, standby_host=>false, standby_port=>50070,
idle_flush_time=>1, flush_size=>500, open_timeout=>30,
read_timeout=>30, use_httpfs=>false, single_file_per_thread=>false,
retry_known_errors=>true, retry_interval=>0.5, retry_times=>5,
compression=>\"none\", snappy_bufsize=>32768,
snappy_format=>\"stream\", use_ssl_auth=>false>],
@worker_queue=#<SizedQueue:0x3a5f35e0>>,
@in_counter=org.jruby.proxy.org.logstash.instrument.metrics.counter.LongCounter$Proxy2
- name: in value:0, @id=\"d4fc8a80f489c5060bccfb1317f8c420c21a88b6fd6135075b8f7131c356cf29\", @time_metric=org.jruby.proxy.org.logstash.instrument.metrics.counter.LongCounter$Proxy2
- name: duration_in_millis value:0, @metric_events=#<LogStash::Instrument::NamespacedMetric:0x25b6080a
@metric=#<LogStash::Instrument::Metric:0x2affe8ea
@collector=#<LogStash::Instrument::Collector:0x4954cc3d @agent=nil,
@metric_store=#<LogStash::Instrument::MetricStore:0x45de0b76
@store=#<Concurrent::Map:0x00000000000fac entries=4 default_proc=nil>,
@structured_lookup_mutex=#<Mutex:0x41868944>,
@fast_lookup=#<Concurrent::Map:0x00000000000fb0 entries=63
default_proc=nil>>>>, @namespace_name=[:stats, :pipelines, :main,
:plugins, :outputs,
:d4fc8a80f489c5060bccfb1317f8c420c21a88b6fd6135075b8f7131c356cf29,
:events]>, @output_class=LogStash::Outputs::WebHdfs>",
:error=>"uninitialized constant GSSAPI::GssApiError::LibGSSAPI\nDid
you mean? GSSAPI", :thread=>"#<Thread:0x773117db run>"}
[2018-05-04T15:04:19,411][ERROR][logstash.pipeline ] Pipeline aborted due to error {:pipeline_id=>"main",
:exception=>#<NameError: uninitialized constant
GSSAPI::GssApiError::LibGSSAPI
Did you mean? GSSAPI>, :backtrace=>["org/jruby/RubyModule.java:3343:in `const_missing'",
"/datan1/logstash-6.2.4/vendor/bundle/jruby/2.3.0/gems/webhdfs-0.8.0/lib/webhdfs/exceptions.rb:9:in
`<class:GssApiError>'",
"/datan1/logstash-6.2.4/vendor/bundle/jruby/2.3.0/gems/webhdfs-0.8.0/lib/webhdfs/exceptions.rb:7:in
`<module:GSSAPI>'",
"/datan1/logstash-6.2.4/vendor/bundle/jruby/2.3.0/gems/webhdfs-0.8.0/lib/webhdfs/exceptions.rb:6:in
`<main>'", "org/jruby/RubyKernel.java:955:in `require'",
"uri:classloader:/jruby/kernel/kernel.rb:13:in `require_relative'",
"/datan1/logstash-6.2.4/vendor/bundle/jruby/2.3.0/gems/webhdfs-0.8.0/lib/webhdfs/client_v1.rb:1:in
`<main>'", "org/jruby/RubyKernel.java:955:in `require'",
"uri:classloader:/jruby/kernel/kernel.rb:13:in `require_relative'",
"/datan1/logstash-6.2.4/vendor/bundle/jruby/2.3.0/gems/webhdfs-0.8.0/lib/webhdfs/client_v1.rb:6:in
`<main>'", "org/jruby/RubyKernel.java:955:in `require'",
"/datan1/logstash-6.2.4/vendor/bundle/jruby/2.3.0/gems/polyglot-0.3.5/lib/polyglot.rb:65:in
`require'",
"/datan1/logstash-6.2.4/vendor/bundle/jruby/2.3.0/gems/webhdfs-0.8.0/lib/webhdfs/client.rb:1:in
`<main>'", "org/jruby/RubyKernel.java:955:in `require'",
"/datan1/logstash-6.2.4/vendor/bundle/jruby/2.3.0/gems/polyglot-0.3.5/lib/polyglot.rb:65:in
`require'",
"/datan1/logstash-6.2.4/vendor/bundle/jruby/2.3.0/gems/logstash-output-webhdfs-3.0.6/lib/logstash/outputs/webhdfs_helper.rb:12:in
`load_module'",
"/datan1/logstash-6.2.4/vendor/bundle/jruby/2.3.0/gems/logstash-output-webhdfs-3.0.6/lib/logstash/outputs/webhdfs.rb:135:in
`register'", "org/jruby/RubyArray.java:1734:in `each'",
"/datan1/logstash-6.2.4/logstash-core/lib/logstash/output_delegator_strategies/legacy.rb:17:in
`register'",
"/datan1/logstash-6.2.4/logstash-core/lib/logstash/output_delegator.rb:42:in
`register'",
"/datan1/logstash-6.2.4/logstash-core/lib/logstash/pipeline.rb:342:in
`register_plugin'",
"/datan1/logstash-6.2.4/logstash-core/lib/logstash/pipeline.rb:353:in
`block in register_plugins'", "org/jruby/RubyArray.java:1734:in
`each'",
"/datan1/logstash-6.2.4/logstash-core/lib/logstash/pipeline.rb:353:in
`register_plugins'",
"/datan1/logstash-6.2.4/logstash-core/lib/logstash/pipeline.rb:730:in
`maybe_setup_out_plugins'",
"/datan1/logstash-6.2.4/logstash-core/lib/logstash/pipeline.rb:363:in
`start_workers'",
"/datan1/logstash-6.2.4/logstash-core/lib/logstash/pipeline.rb:290:in
`run'",
"/datan1/logstash-6.2.4/logstash-core/lib/logstash/pipeline.rb:250:in
`block in start'"], :thread=>"#<Thread:0x773117db run>"}
[2018-05-04T15:04:19,465][ERROR][logstash.agent ] Failed to execute action {:id=>:main,
:action_type=>LogStash::ConvergeResult::FailedAction, :message=>"Could
not execute action: LogStash::PipelineAction::Create/pipeline_id:main,
action_result: false", :backtrace=>nil}
] Pipeline aborted due to error {:pipeline_id=>"main", :exception=>#<NameError: uninitialized constant
GSSAPI::GssApiError::LibGSSAPI
我的管道配置如下
[username@hostname]$ cat /datan1/logstash-6.2.4/bin/pipeline.conf
input {
kafka{
group_id => "logstash"
jaas_path => "/usr/logstash/"
sasl_kerberos_service_name => "kafka"
kerberos_config => "/etc/krb5.conf"
auto_offset_reset => "earliest"
topics => ["ocs_cdr_data"]
codec => "json"
bootstrap_servers => "ip1:9092,ip2:9092,ip3:9092,ip4:9092"
type => "ocs_cdr_data"
}}
output {
webhdfs {
host => "ip.xx.xx.x" # (required)
port => 50070 # (optional, default: 50070)
path => "/user/logstash/ocs_cdr_data/dt=%{+YYYY-MM-dd}/ocs_cdr_data-%{+HH}.log" # (required)
user => "username" # (required)
use_kerberos_auth => "true"
kerberos_keytab => "/home/username/username.keytab"
}
}
jaas.conf如下
KafkaClient {
com.sun.security.auth.module.Krb5LoginModule required
useKeyTab=true
storeKey=true
useTicketCache=true
renewTicket=true
serviceName="kafka
keyTab="/home/username/username.keytab"
principal="username@DOMAIN";
};
~
pipeline.conf验证成功,如下所示
[username@hostname] $ bin/logstash -f /datan1/logstash-6.2.4/bin/pipeline.conf --config.test_and_exit
Sending Logstash's logs to /datan1/logstash-6.2.4/logs which is now configured via log4j2.properties
[2018-05-04T15:59:45,177][INFO ][logstash.modules.scaffold] Initializing module {:module_name=>"netflow", :directory=>"/datan1/logstash-6.2.4/modules/netflow/configuration"}
[2018-05-04T15:59:45,195][INFO ][logstash.modules.scaffold] Initializing module {:module_name=>"fb_apache", :directory=>"/datan1/logstash-6.2.4/modules/fb_apache/configuration"}
[2018-05-04T15:59:45,709][WARN ][logstash.config.source.multilocal] Ignoring the 'pipelines.yml' file because modules or command line options are specified
Configuration OK
[2018-05-04T15:59:48,478][INFO ][logstash.runner ] Using config.test_and_exit mode. Config Validation Result: OK. Exiting Logstash
======
[cat /etc/krb5.conf
[libdefaults]
default_realm = domain.domainGSM.COM
dns_lookup_kdc = true
dns_lookup_realm = false
ticket_lifetime = 86400
renew_lifetime = 604800
forwardable = true
default_tgs_enctypes = rc4-hmac des3-hmac-sha1 arcfour-hmac des-hmac-sha1 des-cbc-md5 des-cbc-crc
default_tkt_enctypes = rc4-hmac des3-hmac-sha1 arcfour-hmac des-hmac-sha1 des-cbc-md5 des-cbc-crc
permitted_enctypes = rc4-hmac des3-hmac-sha1 arcfour-hmac des-hmac-sha1 des-cbc-md5 des-cbc-crc
udp_preference_limit = 1
kdc_timeout = 9000
ignore_acceptor_hostname = true
[realms]
domain.domainGSM.COM = {
kdc = hofdmc04.domain.domaingsm.com
admin_server = hofdmc04.domain.domaingsm.com
}
==========
请协助
暂无答案!
目前还没有任何答案,快来回答吧!