在通过kafka和spark处理avro消息流时,我将处理后的数据保存为elasticsearch索引中的文档。下面是代码(简化):
directKafkaStream.foreachRDD(rdd ->{
rdd.foreach(avroRecord -> {
byte[] encodedAvroData = avroRecord._2;
MyType t = deserialize(encodedAvroData);
// Creating the ElasticSearch Transport client
Settings settings = Settings.builder()
.put("client.transport.ping_timeout", 5, TimeUnit.SECONDS).build();
TransportClient client = new PreBuiltTransportClient(settings)
.addTransportAddress(new TransportAddress(InetAddress.getByName("localhost"), 9300));
IndexRequest indexRequest = new IndexRequest("index", "item", id)
.source(jsonBuilder()
.startObject()
.field("name", name)
.field("timestamp", new Timestamp(System.currentTimeMillis()))
.endObject());
UpdateRequest updateRequest = new UpdateRequest("index", "item", id)
.doc(jsonBuilder()
.startObject()
.field("name", name)
.field("timestamp", new Timestamp(System.currentTimeMillis()))
.endObject())
.upsert(indexRequest);
client.update(updateRequest).get();
client.close();
一切正常;唯一的问题是性能:保存到es需要一些时间,我想这是因为我为每个rdd打开/关闭了es传输客户机。spark文档表明这种方法是非常正确的:据我所知,唯一可能的优化是使用rdd.foreachpartition,但我只有一个分区,所以我不确定这是否有益。有没有其他解决方案可以实现更好的性能?
2条答案
按热度按时间ocebsuys1#
我会将处理过的消息流回到一个单独的kafka主题,然后使用kafka connect将它们放到elasticsearch上。这将使特定于spark的处理与将数据导入elasticsearch分离。
它在行动中的例子:https://www.confluent.io/blog/blogthe-simplest-useful-kafka-connect-data-pipeline-in-the-world-or-thereabouts-part-2/
gstyhher2#
因为只要处理rdd的记录,就创建新的connect。所以,我想用
foreachPartition
无论只使用一个分区,都可以获得更好的性能,因为它可以帮助您将es连接示例带到外部,并在循环中重用它。