如何将数据从storm导出到elastic search?

vfh0ocws  于 2021-05-30  发布在  Hadoop
关注(0)|答案(2)|浏览(492)

我把我的数据存储到了apachekafka中。然后我就可以使用apachestorm-spout来处理数据了。现在我想把处理过的数据导出到ElasticSearch中。

dnph8jn4

dnph8jn41#

您可以使用es storm集成:
https://www.elastic.co/guide/en/elasticsearch/hadoop/current/storm.html
或者将数据写回kafka并使用logstash使用此队列并写入elasticsearch。

kzmpq1sx

kzmpq1sx2#

elasticsearch提供maven存储库中可用的java客户端api。
如果您已经实现了将数据从kafka拉到storm,那么您所要做的就是实现一个bolt,将该日志的索引请求发送到ElasticSearch。
在这里,我是从传统的拓扑学Angular 讲的。
例如,在prepare方法实现中,您创建了这样一个传输客户机。

Client client = new TransportClient()
    .addTransportAddress(new InetSocketTransportAddress("host1", 9300))
    .addTransportAddress(new InetSocketTransportAddress("host2", 9300));

在execute方法实现中,发送如下索引请求。

String json = "{" +
    "\"user\":\"kimchy\"," +
    "\"postDate\":\"2013-01-30\"," +
    "\"message\":\"trying out Elasticsearch\"" +
"}";

IndexResponse response = client.prepareIndex("twitter", "tweet")
    .setSource(json)
    .execute()
    .actionGet();

有关更多信息,请参阅http://www.elastic.co/guide/en/elasticsearch/client/java-api/current/index.html.

相关问题