我制作了一个docker-compose.yml文件,其中包含Hadoop、Kafka和Flume等多个apache服务。目前,我尝试使用Kafka检索数据,并将其发送到Flume(能够转换数据(结构),并将其存储在HDFS中。我通过使用Kafka生产者生成虚拟数据,在那里我可以向Kafka代理发送消息。Flume监听某个主题,转换并定义数据的位置,并尝试将其发送到HDFS。每当flume代理注意到数据正在进入时,就会发生以下错误:
2021-11-14 20:16:13,554 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
2021-11-14 20:16:17,448 WARN hdfs.DataStreamer: Exception in createBlockOutputStream blk_1073742188_1365
java.net.ConnectException: Connection refused
at java.base/sun.nio.ch.Net.pollConnect(Native Method)
at java.base/sun.nio.ch.Net.pollConnectNow(Net.java:672)
at java.base/sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:946)
at org.apache.hadoop.net.SocketIOWithTimeout.connect(SocketIOWithTimeout.java:206)
at org.apache.hadoop.net.NetUtils.connect(NetUtils.java:586)
at org.apache.hadoop.hdfs.DataStreamer.createSocketForPipeline(DataStreamer.java:253)
at org.apache.hadoop.hdfs.DataStreamer.createBlockOutputStream(DataStreamer.java:1757)
at org.apache.hadoop.hdfs.DataStreamer.nextBlockOutputStream(DataStreamer.java:1711)
at org.apache.hadoop.hdfs.DataStreamer.run(DataStreamer.java:707)
2021-11-14 20:16:17,451 WARN hdfs.DataStreamer: Abandoning BP-2051009381-192.168.160.8-1635954925420:blk_1073742188_1365
2021-11-14 20:16:17,462 WARN hdfs.DataStreamer: Excluding datanode DatanodeInfoWithStorage[192.168.192.12:50010,DS-0eb49c38-45e0-46bb-be71-23f07b5ac9dc,DISK]
2021-11-14 20:16:28,525 WARN hdfs.DataStreamer: Exception in createBlockOutputStream blk_1073742189_1366
java.net.ConnectException: Connection refused
at java.base/sun.nio.ch.Net.pollConnect(Native Method)
at java.base/sun.nio.ch.Net.pollConnectNow(Net.java:672)
at java.base/sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:946)
at org.apache.hadoop.net.SocketIOWithTimeout.connect(SocketIOWithTimeout.java:206)
at org.apache.hadoop.net.NetUtils.connect(NetUtils.java:586)
at org.apache.hadoop.hdfs.DataStreamer.createSocketForPipeline(DataStreamer.java:253)
at org.apache.hadoop.hdfs.DataStreamer.createBlockOutputStream(DataStreamer.java:1757)
at org.apache.hadoop.hdfs.DataStreamer.nextBlockOutputStream(DataStreamer.java:1711)
at org.apache.hadoop.hdfs.DataStreamer.run(DataStreamer.java:707)
2021-11-14 20:16:28,525 WARN hdfs.DataStreamer: Abandoning BP-2051009381-192.168.160.8-1635954925420:blk_1073742189_1366
2021-11-14 20:16:28,533 WARN hdfs.DataStreamer: Excluding datanode DatanodeInfoWithStorage[192.168.192.10:50010,DS-829fd615-4b31-4379-874a-ad06769d138e,DISK]
2021-11-14 20:16:29,557 WARN hdfs.DataStreamer: Exception in createBlockOutputStream blk_1073742190_1367
java.net.ConnectException: Connection refused
at java.base/sun.nio.ch.Net.pollConnect(Native Method)
at java.base/sun.nio.ch.Net.pollConnectNow(Net.java:672)
at java.base/sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:946)
at org.apache.hadoop.net.SocketIOWithTimeout.connect(SocketIOWithTimeout.java:206)
at org.apache.hadoop.net.NetUtils.connect(NetUtils.java:586)
at org.apache.hadoop.hdfs.DataStreamer.createSocketForPipeline(DataStreamer.java:253)
at org.apache.hadoop.hdfs.DataStreamer.createBlockOutputStream(DataStreamer.java:1757)
at org.apache.hadoop.hdfs.DataStreamer.nextBlockOutputStream(DataStreamer.java:1711)
at org.apache.hadoop.hdfs.DataStreamer.run(DataStreamer.java:707)
2021-11-14 20:16:29,557 WARN hdfs.DataStreamer: Abandoning BP-2051009381-192.168.160.8-1635954925420:blk_1073742190_1367
2021-11-14 20:16:29,569 WARN hdfs.DataStreamer: Excluding datanode DatanodeInfoWithStorage[192.168.192.11:50010,DS-3c3a744b-d53c-4cb5-97ac-4dd3e128f6a7,DISK]
2021-11-14 20:16:29,588 WARN hdfs.DataStreamer: DataStreamer Exception
org.apache.hadoop.ipc.RemoteException(java.io.IOException): File /kafka/last-test-5/14-11-21/sensor-data.1636917373340.tmp could only be written to 0 of the 1 minReplication nodes. There are 3 datanode(s) running and 3 node(s) are excluded in this operation.
at org.apache.hadoop.hdfs.server.blockmanagement.BlockManager.chooseTarget4NewBlock(BlockManager.java:2219)
at org.apache.hadoop.hdfs.server.namenode.FSDirWriteFileOp.chooseTargetForNewBlock(FSDirWriteFileOp.java:294)
at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getAdditionalBlock(FSNamesystem.java:2789)
at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.addBlock(NameNodeRpcServer.java:892)
at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.addBlock(ClientNamenodeProtocolServerSideTranslatorPB.java:574)
at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:528)
at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:1070)
at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:999)
at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:927)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1730)
at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2915)
at org.apache.hadoop.ipc.Client.getRpcResponse(Client.java:1573)
at org.apache.hadoop.ipc.Client.call(Client.java:1519)
at org.apache.hadoop.ipc.Client.call(Client.java:1416)
at org.apache.hadoop.ipc.ProtobufRpcEngine2$Invoker.invoke(ProtobufRpcEngine2.java:242)
at org.apache.hadoop.ipc.ProtobufRpcEngine2$Invoker.invoke(ProtobufRpcEngine2.java:129)
at jdk.proxy2/jdk.proxy2.$Proxy14.addBlock(Unknown Source)
at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.addBlock(ClientNamenodeProtocolTranslatorPB.java:530)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:568)
at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:422)
at org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invokeMethod(RetryInvocationHandler.java:165)
at org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invoke(RetryInvocationHandler.java:157)
at org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invokeOnce(RetryInvocationHandler.java:95)
at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:359)
at jdk.proxy2/jdk.proxy2.$Proxy15.addBlock(Unknown Source)
at org.apache.hadoop.hdfs.DFSOutputStream.addBlock(DFSOutputStream.java:1084)
at org.apache.hadoop.hdfs.DataStreamer.locateFollowingBlock(DataStreamer.java:1898)
at org.apache.hadoop.hdfs.DataStreamer.nextBlockOutputStream(DataStreamer.java:1700)
at org.apache.hadoop.hdfs.DataStreamer.run(DataStreamer.java:707)
2021-11-14 20:16:29,590 WARN hdfs.DFSClient: Error while syncing
数据写入HDFS,但生成的文件大小为0字节,并且文件中没有可用内容。
有人知道是什么原因导致这个错误以及如何修复它吗?
用于此项目的Docker图像:
- bde 2020(适用于Hadoop)
- (为Kafka和Zookeeper)
为了重现这个问题,我创建了一个git repo,您可以在其中提取项目来重新创建错误https://github.com/Benjaminbakir/Big-data-test
您还必须在本地计算机上安装Flume以运行agent.conf文件。
可以使用以下命令运行该文件(您必须使用cd命令转到存储配置文件的目录):flume-ng代理-c . -f代理.conf --名称代理-Xmx 512 m
最后,您需要将以下内容添加到etc/hosts文件中:
- 127.0.0.1本地主机名节点数据节点1数据节点2数据节点3
- ::1本地主机名节点数据节点1数据节点2数据节点3
现在,当您使用Kafka生成器向名为“test”的主题发送消息时,应该会出现错误。
创建Kafka主题的命令:/opt/bitnami/Kafka/bin/kafka-topics.sh--创建--引导服务器本地主机:9092 --主题测试--分区3 --复制因子1
用于创建生成方的命令:-kafka-console-producer.sh------------------------------------------------------------
如果有什么不清楚的地方,请告诉我,我会尽量解释得更详细。
PS:Hadoop集群运行正常,datanodes和namenode正在运行,用户可以通过Hadoop Web UI手动下载/上传文件,但当通过Kafka & Flume发送数据时,会出现此错误。
1条答案
按热度按时间dgjrabp21#
你只需要做一些“端口代理”,比如把ip:port转发到other_ip:port