如何让flume定期从get api读取

2ul0zpep  于 2021-06-04  发布在  Flume
关注(0)|答案(0)|浏览(265)

我在端点处有一个rest api:localhost:8081/users 它支持get方法。
我正在尝试用flume配置httpsource。一切运行正常,但没有从api端点读取任何数据。
配置如下。

a1.sources = r1
a1.sinks = sample 
a1.channels = sample-channel

a1.sources.r1.interceptors = i1 i2 
a1.sources.r1.interceptors.i1.type = host
a1.sources.r1.interceptors.i1.preserveExisting = false
a1.sources.r1.interceptors.i1.hostHeader = host
a1.sources.r1.interceptors.i2.type = timestamp

a1.sources.r1.type     = org.apache.flume.source.http.HTTPSource
a1.sources.r1.bind     = localhost
a1.sources.r1.port     = 8200

# JSONHandler is the default for the httpsource #

a1.sources.r1.handler = org.apache.flume.source.http.JSONHandler
a1.sources.r1.channels = channel1

a1.channels.sample-channel.type = memory
a1.channels.sample-channel.capacity = 1000
a1.channels.sample-channel.transactionCapacity = 100
a1.sources.r1.channels = sample-channel

a1.sinks.sample.type=org.apache.flume.sink.kafka.KafkaSink
a1.sinks.sample.kafka.topic = simple
a1.sinks.sample.kafka.bootstrap.servers = localhost:9092
a1.sinks.sample.kafka.producer.acks = 1
a1.sinks.sample.flumeBatchSize = 20
a1.sinks.sample.channel = sample-channel

我知道httpsource配置为在端口8200而不是8081上侦听。当尝试在8081上侦听时,它给出一个错误,表示服务已经绑定到端口。通过这种配置,我得到以下日志。

2020-12-03 14:27:49,026 (conf-file-poller-0) [INFO - org.apache.flume.node.Application.startAllComponents(Application.java:196)] Starting Sink sample
2020-12-03 14:27:49,030 (conf-file-poller-0) [INFO - org.apache.flume.node.Application.startAllComponents(Application.java:207)] Starting Source r1
2020-12-03 14:27:49,041 (lifecycleSupervisor-1-0) [INFO - org.eclipse.jetty.util.log.Log.initialized(Log.java:192)] Logging initialized @623ms to org.eclipse.jetty.util.log.Slf4jLog
2020-12-03 14:27:49,051 (lifecycleSupervisor-1-1) [INFO - org.apache.kafka.common.config.AbstractConfig.logAll(AbstractConfig.java:279)] ProducerConfig values:
        acks = 1
        batch.size = 16384
        bootstrap.servers = [localhost:9092]
        buffer.memory = 33554432
        client.id =
        compression.type = none
        connections.max.idle.ms = 540000
        enable.idempotence = false
        interceptor.classes = []
        key.serializer = class org.apache.kafka.common.serialization.StringSerializer
        linger.ms = 0
        max.block.ms = 60000
        max.in.flight.requests.per.connection = 5
        max.request.size = 1048576
        metadata.max.age.ms = 300000
        metric.reporters = []
        metrics.num.samples = 2
        metrics.recording.level = INFO
        metrics.sample.window.ms = 30000
        partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
        receive.buffer.bytes = 32768
        reconnect.backoff.max.ms = 1000
        reconnect.backoff.ms = 50
        request.timeout.ms = 30000
        retries = 0
        retry.backoff.ms = 100
        sasl.client.callback.handler.class = null
        sasl.jaas.config = null
        sasl.kerberos.kinit.cmd = /usr/bin/kinit
        sasl.kerberos.min.time.before.relogin = 60000
        sasl.kerberos.service.name = null
        sasl.kerberos.ticket.renew.jitter = 0.05
        sasl.kerberos.ticket.renew.window.factor = 0.8
        sasl.login.callback.handler.class = null
        sasl.login.class = null
        sasl.login.refresh.buffer.seconds = 300
        sasl.login.refresh.min.period.seconds = 60
        sasl.login.refresh.window.factor = 0.8
        sasl.login.refresh.window.jitter = 0.05
        sasl.mechanism = GSSAPI
        security.protocol = PLAINTEXT
        send.buffer.bytes = 131072
        ssl.cipher.suites = null
        ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
        ssl.endpoint.identification.algorithm = https
        ssl.key.password = null
        ssl.keymanager.algorithm = SunX509
        ssl.keystore.location = null
        ssl.keystore.password = null
        ssl.keystore.type = JKS
        ssl.protocol = TLS
        ssl.provider = null
        ssl.secure.random.implementation = null
        ssl.trustmanager.algorithm = PKIX
        ssl.truststore.location = null
        ssl.truststore.password = null
        ssl.truststore.type = JKS
        transaction.timeout.ms = 60000
        transactional.id = null
        value.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer

2020-12-03 14:27:49,165 (lifecycleSupervisor-1-0) [INFO - org.eclipse.jetty.server.Server.doStart(Server.java:372)] jetty-9.4.6.v20170531
2020-12-03 14:27:49,203 (lifecycleSupervisor-1-0) [INFO - org.eclipse.jetty.server.session.DefaultSessionIdManager.doStart(DefaultSessionIdManager.java:364)] DefaultSessionIdManager workerName=node0
2020-12-03 14:27:49,203 (lifecycleSupervisor-1-0) [INFO - org.eclipse.jetty.server.session.DefaultSessionIdManager.doStart(DefaultSessionIdManager.java:369)] No SessionScavenger set, using defaults
2020-12-03 14:27:49,206 (lifecycleSupervisor-1-0) [INFO - org.eclipse.jetty.server.session.HouseKeeper.startScavenging(HouseKeeper.java:149)] Scavenging every 600000ms
2020-12-03 14:27:49,210 (lifecycleSupervisor-1-0) [WARN - org.eclipse.jetty.security.ConstraintSecurityHandler.checkPathsWithUncoveredHttpMethods(ConstraintSecurityHandler.java:806)] ServletContext@o.e.j.s.ServletContextHandler@4a2be708{/,null,STARTING} has uncovered http methods for path: /*
2020-12-03 14:27:49,214 (lifecycleSupervisor-1-0) [INFO - org.eclipse.jetty.server.handler.ContextHandler.doStart(ContextHandler.java:788)] Started o.e.j.s.ServletContextHandler@4a2be708{/,null,AVAILABLE}
2020-12-03 14:27:49,412 (lifecycleSupervisor-1-0) [INFO - org.eclipse.jetty.server.AbstractConnector.doStart(AbstractConnector.java:280)] Started ServerConnector@208e9495{HTTP/1.1,[http/1.1]}{localhost:8200}
2020-12-03 14:27:49,412 (lifecycleSupervisor-1-0) [INFO - org.eclipse.jetty.server.Server.doStart(Server.java:444)] Started @993ms
2020-12-03 14:27:49,414 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.register(MonitoredCounterGroup.java:119)] Monitored counter group for type: SOURCE, name: r1: Successfully registered new MBean.
2020-12-03 14:27:49,417 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.start(MonitoredCounterGroup.java:95)] Component type: SOURCE, name: r1 started
2020-12-03 14:27:49,422 (lifecycleSupervisor-1-1) [INFO - org.apache.kafka.common.utils.AppInfoParser$AppInfo.<init>(AppInfoParser.java:109)] Kafka version : 2.0.1
2020-12-03 14:27:49,423 (lifecycleSupervisor-1-1) [INFO - org.apache.kafka.common.utils.AppInfoParser$AppInfo.<init>(AppInfoParser.java:110)] Kafka commitId : fa14705e51bd2ce5
2020-12-03 14:27:49,425 (lifecycleSupervisor-1-1) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.register(MonitoredCounterGroup.java:119)] Monitored counter group for type: SINK, name: sample: Successfully registered new MBean.
2020-12-03 14:27:49,427 (lifecycleSupervisor-1-1) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.start(MonitoredCounterGroup.java:95)] Component type: SINK, name: sample started

这个东西正在运行,没有任何问题,但我不明白如何让它从指定的端点获取数据 localhost:8081/users .
有人知道我在哪里搞砸了吗。

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题