从http接收器向不同的kafka主题发送失败的消息

4xrmg8kj  于 2021-07-15  发布在  Flume
关注(0)|答案(0)|浏览(278)

我使用apacheflume处理从apachekafka通道到httpsink的数据。我正在尝试为失败的http调用自定义http接收器行为。当使用4xx响应代码调用http端点失败时,我需要将这些事件发送到不同的kafka主题(比如bad events kafka topic)。flume管道不会进一步处理来自此坏事件主题的事件(不同的应用程序将处理它)。
问题:
在我的flume.conf文件中,这个坏事件kafka应该声明为flume source、flume channel或flume sink吗?
在我的自定义httpsink process()方法中,如何获取对这个bad events kafka主题的引用?
在此处发布http sink的处理方法:

public class CustomHttpSink extends AbstractSink implements Configurable {

    @Override
    public final Status process() throws EventDeliveryException {
        Status status = null;
        OutputStream outputStream = null;

        Channel ch = getChannel();
        Transaction txn = ch.getTransaction();
        txn.begin();

        try {
            Event event = ch.take();

            byte[] eventBody = null;
            if (event != null) {
                eventBody = event.getBody();
            }

            if (eventBody != null && eventBody.length > 0) {
                sinkCounter.incrementEventDrainAttemptCount();
                LOG.debug("Sending request : " + new String(event.getBody()));

                    HttpURLConnection connection = connectionBuilder.getConnection();

                    outputStream = connection.getOutputStream();
                    outputStream.write(eventBody);
                    outputStream.flush();
                    outputStream.close();

                    int httpStatusCode = connection.getResponseCode();
                    LOG.debug("Got status code : " + httpStatusCode);

                    if (httpStatusCode == HttpURLConnection.HTTP_BAD_REQUEST) {
                        // -------> WRITE this MESSAGE TO BAD events KAFKA TOPIC
                    }

            } 

        } catch (Throwable t) {
            txn.rollback();
            status = Status.BACKOFF;

            LOG.error("Error sending HTTP request, retrying", t);
            sinkCounter.incrementEventWriteOrChannelFail(t);

            // re-throw all Errors
            if (t instanceof Error) {
                throw (Error) t;
            }

        } finally {
            txn.close();

            if (outputStream != null) {
                try {
                    outputStream.close();
                } catch (IOException e) {
                    // ignore errors
                }
            }
        }

        return status;
    }
}

my flume.conf文件内容:


# Below agent defines kafka as a channel and http as a sink.

# Name the components on this agent

a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source. Adding netcat as sample source for testing.

a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444

# Describe the sink

a1.sinks.k1.type = com.flume.poc.sink.CustomHttpSink
a1.sinks.k1.endpoint = http://localhost:8080/process
a1.sinks.k1.contentTypeHeader = application/json

# Use a channel which buffers events in kafka

a1.channels.c1.type = org.apache.flume.channel.kafka.KafkaChannel
a1.channels.c1.kafka.bootstrap.servers = loalhost:9092
a1.channels.c1.kafka.topic = flume-channel-topic
a1.channels.c1.kafka.consumer.group.id = flume-consumer1
a1.channels.c1.parseAsFlumeEvent=false

# Bind the source and sink to the channel

a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题