flume ng-avro事件交付可靠性

uelo1irk  于 2021-06-04  发布在  Flume
关注(0)|答案(0)|浏览(326)

我使用flume ng将应用程序生成的数据聚合到一个中心位置进行处理。生产者将事件发布到本地http源,然后通过avro接收器将其转发给收集器。收集器通过avro源从avro接收器接收事件。所有生产者都将事件复制到两个采集器。
我已经注意到一些与我有关的细微差别,并且我还不能通过文档和flume ng的周围信息完全理解我的配置的全部影响。将事件发布与producer进行比较表明,并非所有事件都到达收集器。成功传递的事件的延迟高于预期。偶尔,我们会看到一些活动落在收藏家的前门,比预定时间晚了几个小时。下游和上游Flumeng日志中也出现间歇性异常。
在producer上使用文件通道使我相信我们仍然是持久的,flume ng最终会将事件上传到上游(正确吗?)。
有了利用内存通道的收集器,我们是否面临数据丢失的风险?如果是这样的话,有没有什么技术可以用来确定丢失了多少事件,或者至少丢失的频率有多高?
由于收集者的交易能力低于生产者,是否会出现问题?
对于需要保证传输的文件通道,合理的检查点间隔是多少?我假设间隔是flume ng确保事件在本地磁盘上的频率(例如,根据我的配置,我预计停电时最大1s的数据丢失等)。
下面是每个阶段的flume ng配置的相关片段以及堆栈跟踪片段。想知道是否有人可以根据前面提到的问题(假设有的话)解释一下配置中的差异。
上游例外:

(org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.channelClosed:209)  - Connection to /<ip address>:<port> disconnected.
(org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.exceptionCaught:201)  - Unexpected exception from downstream.

下游例外:

(org.apache.flume.SinkRunner$PollingRunner.run:160)  - Unable to deliver event. Exception follows.
org.apache.flume.EventDeliveryException: Failed to send events
        at org.apache.flume.sink.AbstractRpcSink.process(AbstractRpcSink.java:382)
        at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68)
        at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
        at java.lang.Thread.run(Thread.java:679)
Caused by: org.apache.flume.FlumeException: NettyAvroRpcClient { host: <producer ip>, port: <producer port> }: RPC connection error
        at org.apache.flume.api.NettyAvroRpcClient.connect(NettyAvroRpcClient.java:161)
        at org.apache.flume.api.NettyAvroRpcClient.connect(NettyAvroRpcClient.java:115)
        at org.apache.flume.api.NettyAvroRpcClient.configure(NettyAvroRpcClient.java:590)
        at org.apache.flume.api.RpcClientFactory.getInstance(RpcClientFactory.java:88)
        at org.apache.flume.sink.AvroSink.initializeRpcClient(AvroSink.java:127)
        at org.apache.flume.sink.AbstractRpcSink.createConnection(AbstractRpcSink.java:209)
        at org.apache.flume.sink.AbstractRpcSink.verifyConnection(AbstractRpcSink.java:269)
        at org.apache.flume.sink.AbstractRpcSink.process(AbstractRpcSink.java:339)
        ... 3 more
Caused by: java.io.IOException: Error connecting to /<producer ip>:<producer port>
        at org.apache.avro.ipc.NettyTransceiver.getChannel(NettyTransceiver.java:261)
        at org.apache.avro.ipc.NettyTransceiver.<init>(NettyTransceiver.java:203)
        at org.apache.avro.ipc.NettyTransceiver.<init>(NettyTransceiver.java:152)
        at org.apache.flume.api.NettyAvroRpcClient.connect(NettyAvroRpcClient.java:147)
        ... 10 more
Caused by: java.net.NoRouteToHostException: No route to host
        at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
        at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:592)
        at org.jboss.netty.channel.socket.nio.NioClientSocketPipelineSink$Boss.connect(NioClientSocketPipelineSink.java:396)
        at org.jboss.netty.channel.socket.nio.NioClientSocketPipelineSink$Boss.processSelectedKeys(NioClientSocketPipelineSink.java:358)
        at org.jboss.netty.channel.socket.nio.NioClientSocketPipelineSink$Boss.run(NioClientSocketPipelineSink.java:274)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1146)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        ... 1 more

生产者配置:

agent.sources.t1d.type     = HTTP
agent.sources.t1d.bind     = <source ip>
agent.sources.t1d.port     = <source port>
agent.sources.t1d.channels = c01 c02 c03
agent.sources.t1d.selector.type = replicating

agent.channels.c01.type               = FILE
agent.channels.c01.checkpointDir = /path/to/checkpoint1
agent.channels.c01.dataDirs = /path/to/data1
agent.channels.c01.capacity   = 4000000
agent.channels.c01.transactionCapacity = 10000
agent.channels.c01.checkpointInterval = 1000
agent.channels.c01.keep-alive = 3

agent.channels.c02.type               = FILE
agent.channels.c02.checkpointDir = /path/to/checkpoint2
agent.channels.c02.dataDirs = /path/to/data2
agent.channels.c02.capacity   = 4000000
agent.channels.c02.transactionCapacity = 10000
agent.channels.c02.checkpointInterval = 1000
agent.channels.c02.keep-alive = 3

agent.channels.c03.type               = FILE
agent.channels.c03.checkpointDir = /path/to/checkpoint3
agent.channels.c03.dataDirs = /path/to/data3
agent.channels.c03.capacity   = 4000000
agent.channels.c03.transactionCapacity = 10000
agent.channels.c03.checkpointInterval = 1000
agent.channels.c03.keep-alive = 3

agent.sinks.s01.type          = AVRO
agent.sinks.s01.hostname      = <collector ip>
agent.sinks.s01.port          = <collector port>
agent.sinks.s01.channel       = c01
agent.sinks.s01.batch-size    = 1000
agent.sinks.s01.compression-type  = deflate
agent.sinks.s01.compression-level = 5

agent.sinks.s02.type          = AVRO
agent.sinks.s02.hostname      = <collector ip>
agent.sinks.s02.port          = <collector port>
agent.sinks.s02.channel       = c02
agent.sinks.s02.batch-size    = 1000
agent.sinks.s02.compression-type  = deflate
agent.sinks.s02.compression-level = 5

agent.sinks.s03.type         = FILE_ROLL
agent.sinks.s03.channel      = c03
agent.sinks.s03.sink.directory = /path/to/producer/reconciliation
agent.sinks.s03.sink.rollInterval = 1200

收集器a配置:

agent.sources.a.type     = avro
agent.sources.a.bind     = <source ip>
agent.sources.a.port     = <source port>
agent.sources.a.channels = c01 c03
agent.sources.a.compression-type = deflate

agent.channels.c01.type   = memory
agent.channels.c01.capacity = 1000000
agent.channels.c01.transactionCapacity = 1000

agent.channels.c03.type   = memory
agent.channels.c03.capacity = 1000000
agent.channels.c03.transactionCapacity = 1000

agent.sinks.s01.type         = FILE_ROLL
agent.sinks.s01.channel      = c01
agent.sinks.s01.sink.directory = /path/to/storage/s01
agent.sinks.s01.sink.rollInterval = 300

agent.sinks.s03.type         = FILE_ROLL
agent.sinks.s03.channel      = c03
agent.sinks.s03.sink.directory = /path/to/storage/s03
agent.sinks.s03.sink.rollInterval = 300

收集器b配置:

agent.sources.b.type     = avro
agent.sources.b.bind     = <source ip>
agent.sources.b.port     = <source port>
agent.sources.b.channels = c11 c13
agent.sources.b.compression-type = deflate

agent.channels.c11.type   = memory
agent.channels.c11.capacity = 1000000
agent.channels.c11.transactionCapacity = 1000

agent.channels.c13.type   = memory
agent.channels.c13.capacity = 1000000
agent.channels.c13.transactionCapacity = 1000

agent.sinks.s11.type         = FILE_ROLL
agent.sinks.s11.channel      = c11
agent.sinks.s11.sink.directory = /path/to/storage/s11
agent.sinks.s11.sink.rollInterval = 300

agent.sinks.s13.type         = FILE_ROLL
agent.sinks.s13.channel      = c13
agent.sinks.s13.sink.directory = /path/to/storage/s13
agent.sinks.s13.sink.rollInterval = 300

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题