我使用apacheflume处理从apachekafka通道到httpsink的数据。我正在尝试为失败的http调用自定义http接收器行为。当使用4xx响应代码调用http端点失败时,我需要将这些事件发送到不同的kafka主题(比如bad events kafka topic)。flume管道不会进一步处理来自此坏事件主题的事件(不同的应用程序将处理它)。
问题:
在我的flume.conf文件中,这个坏事件kafka应该声明为flume source、flume channel或flume sink吗?
在我的自定义httpsink process()方法中,如何获取对这个bad events kafka主题的引用?
在此处发布http sink的处理方法:
public class CustomHttpSink extends AbstractSink implements Configurable {
@Override
public final Status process() throws EventDeliveryException {
Status status = null;
OutputStream outputStream = null;
Channel ch = getChannel();
Transaction txn = ch.getTransaction();
txn.begin();
try {
Event event = ch.take();
byte[] eventBody = null;
if (event != null) {
eventBody = event.getBody();
}
if (eventBody != null && eventBody.length > 0) {
sinkCounter.incrementEventDrainAttemptCount();
LOG.debug("Sending request : " + new String(event.getBody()));
HttpURLConnection connection = connectionBuilder.getConnection();
outputStream = connection.getOutputStream();
outputStream.write(eventBody);
outputStream.flush();
outputStream.close();
int httpStatusCode = connection.getResponseCode();
LOG.debug("Got status code : " + httpStatusCode);
if (httpStatusCode == HttpURLConnection.HTTP_BAD_REQUEST) {
// -------> WRITE this MESSAGE TO BAD events KAFKA TOPIC
}
}
} catch (Throwable t) {
txn.rollback();
status = Status.BACKOFF;
LOG.error("Error sending HTTP request, retrying", t);
sinkCounter.incrementEventWriteOrChannelFail(t);
// re-throw all Errors
if (t instanceof Error) {
throw (Error) t;
}
} finally {
txn.close();
if (outputStream != null) {
try {
outputStream.close();
} catch (IOException e) {
// ignore errors
}
}
}
return status;
}
}
my flume.conf文件内容:
# Below agent defines kafka as a channel and http as a sink.
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source. Adding netcat as sample source for testing.
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444
# Describe the sink
a1.sinks.k1.type = com.flume.poc.sink.CustomHttpSink
a1.sinks.k1.endpoint = http://localhost:8080/process
a1.sinks.k1.contentTypeHeader = application/json
# Use a channel which buffers events in kafka
a1.channels.c1.type = org.apache.flume.channel.kafka.KafkaChannel
a1.channels.c1.kafka.bootstrap.servers = loalhost:9092
a1.channels.c1.kafka.topic = flume-channel-topic
a1.channels.c1.kafka.consumer.group.id = flume-consumer1
a1.channels.c1.parseAsFlumeEvent=false
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
暂无答案!
目前还没有任何答案,快来回答吧!