无法使用flume从写入远程hdfs

0s0u357o  于 2021-06-03  发布在  Hadoop
关注(0)|答案(1)|浏览(452)

我正在尝试使用flume将文本写入远程机器的hdfs。但我的尝试失败了。
我使用cloudera quickstart虚拟机作为远程机器。以下是我的步骤:
我已将flume发布为:

sudo init.d/flume-ng-agent start

在cloudera manager中编辑了flume配置


# Please paste flume.conf here. Example:

# Sources, channels, and sinks are defined per

# agent name, in this case 'tier1'.

tier1.sources  = source1
tier1.channels = channel1
tier1.sinks    = sink1

# For each source, channel, and sink, set

# standard properties.

tier1.sources.source1.type     = avro
tier1.sources.source1.bind     = 172.24.***.***# address of remote machine (cloudera quickstart VM) 
tier1.sources.source1.port     = 41414
tier1.sources.source1.channels = channel1

tier1.channels.channel1.type   = memory

tier1.sinks.sink1.type          = hdfs
tier1.sinks.sink1.channel       = channel1
tier1.sinks.sink1.hdfs.path     = /tmp/%y-%m-%d/%H%M/%S
tier1.sinks.sink1.hdfs.fileType = DataStream

# Format to be written

tier1.sinks.sink1.hdfs.writeFormat = Text

tier1.sinks.sink1.hdfs.maxOpenFiles = 10

# rollover file based on maximum size of 10 MB

tier1.sinks.sink1.hdfs.rollSize = 10485760

# never rollover based on the number of events

tier1.sinks.sink1.hdfs.rollCount = 0

# rollover file based on max time of 1 mi

tier1.sinks.sink1.hdfs.rollInterval = 60

# Specify the channel the sink should use

tier1.sinks.sink1.channel = memoryChannel

# Other properties are specific to each type of

# source, channel, or sink. In this case, we

# specify the capacity of the memory channel.

tier1.channels.channel1.capacity = 100

这里是我的代码,应该发送消息到远程机器

import org.apache.flume.Event;
import org.apache.flume.EventDeliveryException;
import org.apache.flume.api.RpcClient;
import org.apache.flume.api.RpcClientFactory;
import org.apache.flume.event.EventBuilder;
import java.nio.charset.Charset;

public class FlumeTransport {
    public static void main(String[] args) {
        MyRpcClientFacade client = new MyRpcClientFacade();
        // Initialize client with the remote Flume agent's host and port
        client.init("172.24.***.***", 41414);

        // Send 10 events to the remote Flume agent. That agent should be
        // configured to listen with an AvroSource.
        String sampleData = "Hello Flume!";
        for (int i = 0; i < 10; i++) {
            client.sendDataToFlume(sampleData);
        }

        client.cleanUp();
    }
}

class MyRpcClientFacade {
    private RpcClient client;
    private String hostname;
    private int port;

    public void init(String hostname, int port) {
        // Setup the RPC connection
        this.hostname = hostname;
        this.port = port;
        this.client = RpcClientFactory.getDefaultInstance(hostname, port);
        // Use the following method to create a thrift client (instead of the above line):
        // this.client = RpcClientFactory.getThriftInstance(hostname, port);
    }

    public void sendDataToFlume(String data) {
        // Create a Flume Event object that encapsulates the sample data
        Event event = EventBuilder.withBody(data, Charset.forName("UTF-8"));

        // Send the event
        try {
            client.append(event);
        } catch (EventDeliveryException e) {
            // clean up and recreate the client
            System.out.println(e.getMessage());
            client.close();
            client = null;
            client = RpcClientFactory.getDefaultInstance(hostname, port);
            // Use the following method to create a thrift client (instead of the above line):
            // this.client = RpcClientFactory.getThriftInstance(hostname, port);
        }
    }

    public void cleanUp() {
        // Close the RPC connection
        client.close();
    }

}

当我运行应用程序时,我遇到了异常

13/2-14:57:14,202 WARN : o.a.f.a.NettyAvroRpcClient - Invalid value for batchSize: 0; Using default value.
13/2-14:57:14,209 WARN : o.a.f.a.NettyAvroRpcClient - Using default maxIOWorkers
NettyAvroRpcClient { host: quickstart.cloudera.*******.com.ua, port: 41414 }: Failed to send event
jtjikinw

jtjikinw1#

您的avro rpc客户端无法连接到flume代理。检查/var/log/flume ng/flume.log中的日志文件以了解发生了什么。很可能您的代理无法绑定到接口。考虑更换

tier1.sources.source1.bind     = 172.24.***.***

tier1.sources.source1.bind     = 0.0.0.0

有效地绑定到所有接口。在41414本地尝试telnet以测试端口是否真正响应。

相关问题