我正在尝试使用flume将文本写入远程机器的hdfs。但我的尝试失败了。
我使用cloudera quickstart虚拟机作为远程机器。以下是我的步骤:
我已将flume发布为:
sudo init.d/flume-ng-agent start
在cloudera manager中编辑了flume配置
# Please paste flume.conf here. Example:
# Sources, channels, and sinks are defined per
# agent name, in this case 'tier1'.
tier1.sources = source1
tier1.channels = channel1
tier1.sinks = sink1
# For each source, channel, and sink, set
# standard properties.
tier1.sources.source1.type = avro
tier1.sources.source1.bind = 172.24.***.***# address of remote machine (cloudera quickstart VM)
tier1.sources.source1.port = 41414
tier1.sources.source1.channels = channel1
tier1.channels.channel1.type = memory
tier1.sinks.sink1.type = hdfs
tier1.sinks.sink1.channel = channel1
tier1.sinks.sink1.hdfs.path = /tmp/%y-%m-%d/%H%M/%S
tier1.sinks.sink1.hdfs.fileType = DataStream
# Format to be written
tier1.sinks.sink1.hdfs.writeFormat = Text
tier1.sinks.sink1.hdfs.maxOpenFiles = 10
# rollover file based on maximum size of 10 MB
tier1.sinks.sink1.hdfs.rollSize = 10485760
# never rollover based on the number of events
tier1.sinks.sink1.hdfs.rollCount = 0
# rollover file based on max time of 1 mi
tier1.sinks.sink1.hdfs.rollInterval = 60
# Specify the channel the sink should use
tier1.sinks.sink1.channel = memoryChannel
# Other properties are specific to each type of
# source, channel, or sink. In this case, we
# specify the capacity of the memory channel.
tier1.channels.channel1.capacity = 100
这里是我的代码,应该发送消息到远程机器
import org.apache.flume.Event;
import org.apache.flume.EventDeliveryException;
import org.apache.flume.api.RpcClient;
import org.apache.flume.api.RpcClientFactory;
import org.apache.flume.event.EventBuilder;
import java.nio.charset.Charset;
public class FlumeTransport {
public static void main(String[] args) {
MyRpcClientFacade client = new MyRpcClientFacade();
// Initialize client with the remote Flume agent's host and port
client.init("172.24.***.***", 41414);
// Send 10 events to the remote Flume agent. That agent should be
// configured to listen with an AvroSource.
String sampleData = "Hello Flume!";
for (int i = 0; i < 10; i++) {
client.sendDataToFlume(sampleData);
}
client.cleanUp();
}
}
class MyRpcClientFacade {
private RpcClient client;
private String hostname;
private int port;
public void init(String hostname, int port) {
// Setup the RPC connection
this.hostname = hostname;
this.port = port;
this.client = RpcClientFactory.getDefaultInstance(hostname, port);
// Use the following method to create a thrift client (instead of the above line):
// this.client = RpcClientFactory.getThriftInstance(hostname, port);
}
public void sendDataToFlume(String data) {
// Create a Flume Event object that encapsulates the sample data
Event event = EventBuilder.withBody(data, Charset.forName("UTF-8"));
// Send the event
try {
client.append(event);
} catch (EventDeliveryException e) {
// clean up and recreate the client
System.out.println(e.getMessage());
client.close();
client = null;
client = RpcClientFactory.getDefaultInstance(hostname, port);
// Use the following method to create a thrift client (instead of the above line):
// this.client = RpcClientFactory.getThriftInstance(hostname, port);
}
}
public void cleanUp() {
// Close the RPC connection
client.close();
}
}
当我运行应用程序时,我遇到了异常
13/2-14:57:14,202 WARN : o.a.f.a.NettyAvroRpcClient - Invalid value for batchSize: 0; Using default value.
13/2-14:57:14,209 WARN : o.a.f.a.NettyAvroRpcClient - Using default maxIOWorkers
NettyAvroRpcClient { host: quickstart.cloudera.*******.com.ua, port: 41414 }: Failed to send event
1条答案
按热度按时间jtjikinw1#
您的avro rpc客户端无法连接到flume代理。检查/var/log/flume ng/flume.log中的日志文件以了解发生了什么。很可能您的代理无法绑定到接口。考虑更换
与
有效地绑定到所有接口。在41414本地尝试telnet以测试端口是否真正响应。