kafka数据到elasticsearch

afdcj2ne  于 2021-06-07  发布在  Kafka
关注(0)|答案(3)|浏览(293)

我正在尝试使用kafka connect elasticsearch将apache kafka producer的数据发送到elasticsearch
elasticsearch中出现以下错误:

[2017-12-21T11:00:54,979][DEBUG][o.e.a.b.TransportShardBulkAction] 
[pageviews7][0] failed to execute bulk item (index) BulkShardRequest 
[[pageviews7][0]] containing [index {[pageviews7][kafkaconnect]
[pageviews7+0+0], source[{"key1":"value1"}]}]
org.elasticsearch.index.mapper.MapperParsingException: failed to find 
type parsed [string] for [key1]

以下是我的kafka connect elasticsearch属性文件:

name=elasticsearch-sink
connector.class=
io.confluent.connect.elasticsearch.ElasticsearchSinkConnector
tasks.max=1
topics=pageviews7
key.ignore=true
connection.url=http://localhost:9200
type.name=kafkaconnect
schemas.enable=false
schema.ignore=true
wwtsj6pe

wwtsj6pe1#

试试这个 stream-reactor KafkaElasticSearch连接器。其中一个使用tcp协议,另一个使用http协议,它们已经在生产中经过了战斗测试,每天将50多亿个事件放入es索引中,而且它们非常容易配置,而且有很好的文档记录
https://github.com/landoop/stream-reactor

inb24sb2

inb24sb22#

您正在运行哪个版本的elasticsearch?你可能会喜欢那种环境 schema.ignore=true 在您的Kafka连接连接器配置将解决这个错误。

5cnsuln7

5cnsuln73#

这看起来像是将文档索引到elasticsearch时出现问题:
尝试手动将文档发布到es中
你应该得到同样的错误。这意味着索引Map有问题。
如果索引有我怀疑的Map问题,您可以使用topic.index.map属性写入另一个索引,也可以删除当前索引(仅当您不关心数据时)。
如何手动 curl 到es的示例:

curl -XPOST <ES-url>/pageviews7/test_id -d '{"key1":"value1"}'

删除当前索引:

curl -XDELETE <ES-url>/pageviews7

相关问题