我把我的数据存储到了apachekafka中。然后我就可以使用apachestorm-spout来处理数据了。现在我想把处理过的数据导出到ElasticSearch中。
dnph8jn41#
您可以使用es storm集成:https://www.elastic.co/guide/en/elasticsearch/hadoop/current/storm.html或者将数据写回kafka并使用logstash使用此队列并写入elasticsearch。
kzmpq1sx2#
elasticsearch提供maven存储库中可用的java客户端api。如果您已经实现了将数据从kafka拉到storm,那么您所要做的就是实现一个bolt,将该日志的索引请求发送到ElasticSearch。在这里,我是从传统的拓扑学Angular 讲的。例如,在prepare方法实现中,您创建了这样一个传输客户机。
Client client = new TransportClient() .addTransportAddress(new InetSocketTransportAddress("host1", 9300)) .addTransportAddress(new InetSocketTransportAddress("host2", 9300));
在execute方法实现中,发送如下索引请求。
String json = "{" + "\"user\":\"kimchy\"," + "\"postDate\":\"2013-01-30\"," + "\"message\":\"trying out Elasticsearch\"" + "}"; IndexResponse response = client.prepareIndex("twitter", "tweet") .setSource(json) .execute() .actionGet();
有关更多信息,请参阅http://www.elastic.co/guide/en/elasticsearch/client/java-api/current/index.html.
2条答案
按热度按时间dnph8jn41#
您可以使用es storm集成:
https://www.elastic.co/guide/en/elasticsearch/hadoop/current/storm.html
或者将数据写回kafka并使用logstash使用此队列并写入elasticsearch。
kzmpq1sx2#
elasticsearch提供maven存储库中可用的java客户端api。
如果您已经实现了将数据从kafka拉到storm,那么您所要做的就是实现一个bolt,将该日志的索引请求发送到ElasticSearch。
在这里,我是从传统的拓扑学Angular 讲的。
例如,在prepare方法实现中,您创建了这样一个传输客户机。
在execute方法实现中,发送如下索引请求。
有关更多信息,请参阅http://www.elastic.co/guide/en/elasticsearch/client/java-api/current/index.html.