使用flume在hadoop中存储数据

yxyvkwin  于 2021-06-04  发布在  Flume
关注(0)|答案(1)|浏览(416)

我已经完成了hadoop安装和flume教程中的所有步骤。我对大数据工具很幼稚。我得到以下错误。我不明白,问题出在哪里?
我也读了很多关于安装的帖子,但我仍然面临这个问题。我的最终目标是使用r。

17/09/29 02:25:39 INFO node.PollingPropertiesFileConfigurationProvider: Configuration provider starting
17/09/29 02:25:39 INFO node.PollingPropertiesFileConfigurationProvider: Reloading configuration file:/home/shivam/apache-flume-1.6.0-bin/conf/flume.conf
17/09/29 02:25:39 INFO conf.FlumeConfiguration: Processing:HDFS
17/09/29 02:25:39 INFO conf.FlumeConfiguration: Processing:HDFS
17/09/29 02:25:39 INFO conf.FlumeConfiguration: Processing:HDFS
17/09/29 02:25:39 INFO conf.FlumeConfiguration: Processing:HDFS
17/09/29 02:25:39 INFO conf.FlumeConfiguration: Processing:HDFS
17/09/29 02:25:39 INFO conf.FlumeConfiguration: Processing:HDFS
17/09/29 02:25:39 INFO conf.FlumeConfiguration: Added sinks: HDFS Agent: TwitterAgent
17/09/29 02:25:39 INFO conf.FlumeConfiguration: Processing:HDFS
17/09/29 02:25:39 INFO conf.FlumeConfiguration: Processing:HDFS
17/09/29 02:25:39 INFO conf.FlumeConfiguration: Processing:HDFS
17/09/29 02:25:39 INFO conf.FlumeConfiguration: Post-validation flume configuration contains configuration for agents: [TwitterAgent]
17/09/29 02:25:39 INFO node.AbstractConfigurationProvider: Creating channels
17/09/29 02:25:39 INFO channel.DefaultChannelFactory: Creating instance of channel MemChannel type memory
17/09/29 02:25:39 INFO node.AbstractConfigurationProvider: Created channel MemChannel
17/09/29 02:25:39 INFO source.DefaultSourceFactory: Creating instance of source Twitter, type org.apache.flume.source.twitter.TwitterSource
17/09/29 02:25:39 INFO twitter.TwitterSource: Consumer Key:        'fRw12aumIqkAWD6PP5ZHk7vva'
17/09/29 02:25:39 INFO twitter.TwitterSource: Consumer Secret:     'K9K0yL2pwngp3JXEdMGWUOEB7AaGWswXcq72WveRvnD4ZSphNQ'
17/09/29 02:25:39 INFO twitter.TwitterSource: Access Token:        '771287280438968320-XnbtNtBt40cs6gUOk6F9bjgmUABM0qG'
17/09/29 02:25:39 INFO twitter.TwitterSource: Access Token Secret: 'afUppGRqcRi2p9fzLhVdYQXkfMEm72xduaWD6uNs3HhKg'
17/09/29 02:25:39 INFO sink.DefaultSinkFactory: Creating instance of sink: HDFS, type: hdfs
17/09/29 02:25:39 INFO node.AbstractConfigurationProvider: Channel MemChannel connected to [Twitter, HDFS]
17/09/29 02:25:39 INFO node.Application: Starting new configuration:{ sourceRunners:{Twitter=EventDrivenSourceRunner: { source:org.apache.flume.source.twitter.TwitterSource{name:Twitter,state:IDLE} }} sinkRunners:{HDFS=SinkRunner: { policy:org.apache.flume.sink.DefaultSinkProcessor@3012a48f counterGroup:{ name:null counters:{} } }} channels:{MemChannel=org.apache.flume.channel.MemoryChannel{name: MemChannel}} }
17/09/29 02:25:39 INFO node.Application: Starting Channel MemChannel
17/09/29 02:25:39 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: CHANNEL, name: MemChannel: Successfully registered new MBean.
17/09/29 02:25:39 INFO instrumentation.MonitoredCounterGroup: Component type: CHANNEL, name: MemChannel started
17/09/29 02:25:39 INFO node.Application: Starting Sink HDFS
17/09/29 02:25:39 INFO node.Application: Starting Source Twitter
17/09/29 02:25:39 INFO twitter.TwitterSource: Starting twitter source org.apache.flume.source.twitter.TwitterSource{name:Twitter,state:IDLE} ...
17/09/29 02:25:39 INFO twitter.TwitterSource: Twitter source Twitter started.
17/09/29 02:25:39 INFO twitter4j.TwitterStreamImpl: Establishing connection.
17/09/29 02:25:39 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: SINK, name: HDFS: Successfully registered new MBean.
17/09/29 02:25:39 INFO instrumentation.MonitoredCounterGroup: Component type: SINK, name: HDFS started
17/09/29 02:25:42 INFO twitter4j.TwitterStreamImpl: Connection established.
17/09/29 02:25:42 INFO twitter4j.TwitterStreamImpl: Receiving status stream.
17/09/29 02:25:42 INFO hdfs.HDFSDataStream: Serializer = TEXT, UseRawLocalFileSystem = false
17/09/29 02:25:42 INFO hdfs.BucketWriter: Creating hdfs://localhost:9000/user/flume/tweets/FlumeData.1506632142370.tmp
17/09/29 02:25:42 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
17/09/29 02:25:44 WARN hdfs.HDFSEventSink: HDFS IO error
java.net.ConnectException: Call From maverick/127.0.1.1 to localhost:9000 failed on connection exception: java.net.ConnectException: Connection refused; For more details see:  http://wiki.apache.org/hadoop/ConnectionRefused
    at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
    at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
    at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
    at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
    at org.apache.hadoop.net.NetUtils.wrapWithMessage(NetUtils.java:792)
    at org.apache.hadoop.net.NetUtils.wrapException(NetUtils.java:732)
    at org.apache.hadoop.ipc.Client.call(Client.java:1480)
    at org.apache.hadoop.ipc.Client.call(Client.java:1407)
    at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:229)
    at com.sun.proxy.$Proxy13.create(Unknown Source)
    at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.create(ClientNamenodeProtocolTranslatorPB.java:296)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:187)
    at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
    at com.sun.proxy.$Proxy14.create(Unknown Source)
    at org.apache.hadoop.hdfs.DFSOutputStream.newStreamForCreate(DFSOutputStream.java:1623)
    at org.apache.hadoop.hdfs.DFSClient.create(DFSClient.java:1703)
    at org.apache.hadoop.hdfs.DFSClient.create(DFSClient.java:1638)
    at org.apache.hadoop.hdfs.DistributedFileSystem$7.doCall(DistributedFileSystem.java:448)
    at org.apache.hadoop.hdfs.DistributedFileSystem$7.doCall(DistributedFileSystem.java:444)
    at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
    at org.apache.hadoop.hdfs.DistributedFileSystem.create(DistributedFileSystem.java:444)
    at org.apache.hadoop.hdfs.DistributedFileSystem.create(DistributedFileSystem.java:387)
    at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:909)
    at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:890)
    at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:787)
    at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:776)
    at org.apache.flume.sink.hdfs.HDFSDataStream.doOpen(HDFSDataStream.java:86)
    at org.apache.flume.sink.hdfs.HDFSDataStream.open(HDFSDataStream.java:113)
    at org.apache.flume.sink.hdfs.BucketWriter$1.call(BucketWriter.java:246)
    at org.apache.flume.sink.hdfs.BucketWriter$1.call(BucketWriter.java:235)
    at org.apache.flume.sink.hdfs.BucketWriter$9$1.run(BucketWriter.java:679)
    at org.apache.flume.auth.SimpleAuthenticator.execute(SimpleAuthenticator.java:50)
    at org.apache.flume.sink.hdfs.BucketWriter$9.call(BucketWriter.java:676)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.net.ConnectException: Connection refused
    at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
    at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717)
    at org.apache.hadoop.net.SocketIOWithTimeout.connect(SocketIOWithTimeout.java:206)
    at org.apache.hadoop.net.NetUtils.connect(NetUtils.java:531)
    at org.apache.hadoop.net.NetUtils.connect(NetUtils.java:495)
    at org.apache.hadoop.ipc.Client$Connection.setupConnection(Client.java:609)
    at org.apache.hadoop.ipc.Client$Connection.setupIOstreams(Client.java:707)
    at org.apache.hadoop.ipc.Client$Connection.access$2800(Client.java:370)
    at org.apache.hadoop.ipc.Client.getConnection(Client.java:1529)
    at org.apache.hadoop.ipc.Client.call(Client.java:1446)
    ... 34 more
17/09/29 02:25:45 INFO twitter.TwitterSource: Processed 100 docs
17/09/29 02:25:45 INFO hdfs.BucketWriter: Creating hdfs://localhost:9000/user/flume/tweets/FlumeData.1506632142371.tmp
17/09/29 02:25:45 WARN hdfs.HDFSEventSink: HDFS IO error
java.net.ConnectException: Call From maverick/127.0.1.1 to localhost:9000 failed on connection exception: java.net.ConnectException: Connection refused; For more details see:  http://wiki.apache.org/hadoop/ConnectionRefused
    at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
    at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
17/09/29 02:25:48 INFO twitter.TwitterSource: Processed 200 docs
17/09/29 02:25:50 INFO twitter.TwitterSource: Processed 300 docs
17/09/29 02:25:50 INFO hdfs.BucketWriter: Creating hdfs://localhost:9000/user/flume/tweets/FlumeData.1506632142373.tmp
17/09/29 02:25:50 WARN hdfs.HDFSEventSink: HDFS IO error
java.net.ConnectException: Call From maverick/127.0.1.1 to localhost:9000 failed on connection exception: java.net.ConnectException: Connection refused; For more details see:  http://wiki.apache.org/hadoop/ConnectionRefused

有什么完整的解决办法吗。我可以从头再来一次。

kx5bkwkv

kx5bkwkv1#

flume正在尝试连接到hadoop的namenode,据说它正在监听 localhost:9000 ,没有成功。
这种行为是正确的:hadoop的namenode通常在tcp/8020或tcp/9000端口侦听与hadoop文件系统(hdfs)相关的进程间通信(ipc)。而且,默认情况下,flume会尝试连接到tcp/9000。
你能确认这样的进程在localhost上运行,在tcp/9000上监听吗?你可以用它来做 lsof 或者 netstat 命令。您还应该检查hadoop配置,以便检查hadoop正在为namenode的ipc打开哪个端口。
然后,有两个选项:
将namenode的ipc侦听端口更改为9000。这是通过配置 fs.default.name 财产 core-site.xml 文件。
将flume配置为连接到hadoop上配置的端口。这是通过配置 hdfs.path 你的Flume的属性 hdfs://127.0.0.1:<your_port>/your/path/ .

相关问题