如何配置flume来侦听web api http请求

6ljaweal  于 2021-06-02  发布在  Hadoop
关注(0)|答案(1)|浏览(428)

我已经构建了一个api web应用程序,它发布在iis服务器上,我正在尝试配置apache flume以侦听该web api并将http请求的响应保存在hdfs中,这是我需要侦听的post方法:

[HttpPost]
    public IEnumerable<Data> obtenerValores(arguments arg)
    {
        Random rdm = new Random();

        int ano = arg.ano;
        int rdmInt;
        decimal rdmDecimal;

        int anoActual = DateTime.Now.Year;
        int mesActual = DateTime.Now.Month;

        List<Data> ano_mes_sales = new List<Data>();

        while (ano <= anoActual)
        {
            int mes = 1;
            while ((anoActual == ano && mes <= mesActual) || (ano < anoActual && mes <= 12))
            {
                rdmInt = rdm.Next();
                rdmDecimal = (decimal)rdm.NextDouble();
                Data anoMesSales = new Data(ano, mes,(rdmInt * rdmDecimal));
                ano_mes_sales.Add(anoMesSales);

                mes++;
            }
            ano++;
        }
        return ano_mes_sales;
    }

flume正在vmware虚拟机centos上运行,这是我尝试配置flume以侦听该应用程序:


# Sources, channels, and sinks are defined per # agent name, in this case 'tier1'.

a1.sources  = source1
a1.channels = channel1
a1.sinks    = sink1
a1.sources.source1.interceptors = i1 i2 
a1.sources.source1.interceptors.i1.type = host
a1.sources.source1.interceptors.i1.preserveExisting = false
a1.sources.source1.interceptors.i1.hostHeader = host
a1.sources.source1.interceptors.i2.type = timestamp

# For each source, channel, and sink, set # standard properties.

a1.sources.source1.type     = org.apache.flume.source.http.HTTPSource
a1.sources.source1.bind     = transacciones.misionempresarial.com/CSharpFlume
a1.sources.source1.port     = 80

# JSONHandler is the default for the httpsource #

a1.sources.source1.handler = org.apache.flume.source.http.JSONHandler
a1.sources.source1.channels = channel1
a1.channels.channel1.type   = memory
a1.sinks.sink1.type         = hdfs
a1.sinks.sink1.hdfs.path = /monthSales
a1.sinks.sink1.hdfs.filePrefix = event-file-prefix-
a1.sinks.sink1.hdfs.round = false
a1.sinks.sink1.channel      = channel1

# Other properties are specific to each type of # source, channel, or sink. In this case, we # specify the capacity of the memory channel.

a1.channels.channel1.capacity = 1000

我使用curl来发帖,下面是我的尝试:

curl -X POST -H 'Content-Type: application/json; charset=UTF-8' -d '[{"ano":"2010"}]' http://transacciones.misionempresarial.com/CSharpFlume/api/SourceFlume/ObtenerValores

我只得到这个错误:

{"Message":"Error."}

我的问题是,哪种配置flume来监听web api的http请求的方法是正确的,我遗漏了什么?

oyxsuwqo

oyxsuwqo1#

标准flume“httpsource”及其默认值 JSONHandler ,将仅以特定的、以flume为中心的格式处理事件。
该格式记录在用户手册中,也记录在jsonhandler源代码开头的注解中。
总之,它希望收到一个json对象列表,每个对象包含 headers (键/值对,Map到flume事件头)和 body (一个简单的字符串,Map到flume事件主体)。
举个例子,如果您发送:

[{"headers": {}, "body": "{\"ano\":\"2010\"}"}]

我想你会得到你想要的。
如果你不能灵活地更改发送的内容,那么你可以使用 org.apache.flume.source.http.BLOBHandler ,取决于您尝试执行的处理(注意。手册中没有这方面的文档,只有 org.apache.flume.sink.solr.morphline.BlobHandler -它们不是一回事,但是flume-2718)中有一些注解,或者您可能需要提供自己的flume的实现 HTTPSourceHandler 而不是接口。
旁注:http源代码 bind 选项需要主机名或ip地址。幸运的是,您的值被视为主机名,而路径被忽略。

相关问题