我让kafka在confluent cloud上运行,在那里我可以用node.js客户端生成数据,数据作为字符串发送,我在confluent cloud中得到以下字段。
然后,我创建了一个 ElasticsearchSink Connector
并将其连接到ElasticSearch云。如果我在ElasticSearch中没有创建任何Map,那么数据的传输将是成功的,格式如下所示。
"_source" : {
"booked" : false,
"phone_number" : "919191919191",
"location" : {
"lon" : 60.23,
"lat" : 78.233
}
}
现在的问题是,如果我想运行任何 geo queries
它不允许我,并给出以下错误:
"root_cause" : [
{
"type" : "query_shard_exception",
"reason" : "failed to find geo_point field [location]",
"index_uuid" : "C8Xxu9QlTMKN4Lk1LjpOmQ",
"index" : "locations"
}
原因是动态Map不支持geo\u字段。因此,当我尝试为ElasticSearch创建自定义Map,同时创建索引时,如下所示:
PUT /locations
{
"mappings": {
"properties": {
"phone_number": {
"type": "text"
},
"booked": {
"type": "boolean"
},
"location": {
"type": "geo_point"
}
}
}
}
然后confluent连接器失败并显示以下错误:
There is a mapping collision in your index: Can't merge a non object mapping with an object mapping.
我也试过了 booked
作为一个 text
但事情似乎并不顺利。我没有在合流云上强制任何模式。下面是汇合云的一些基本配置。
如何强制Map以便可以运行 geo queries
ElasticSearch?
更新:这个问题之所以持续存在,主要是因为发送给Kafka的数据格式不同
{
"phone_number": "919191919191",
"location": {
"lat": 78.233,
"lon": 60.23
},
"booked": false,
}
{
"phone_number": "+919191919190",
"location": " 78.233, 60.23",
"booked": false,
}
两种格式都无法Map到中的上述定义的Map ElasticSearch
以及 connector sink
显示以下错误:
Received Illegal Argument Exception from Elasticsearch: One of your fields' type does not match the mapped type in Elasticsearch
1条答案
按热度按时间rslzwgfq1#
confluent cloud在找出与模式相关的内容并将发现存储在其内置的模式注册表中方面做了一项偷偷摸摸的工作。恐怕您的Map不起作用,因为:
连接器仍在尝试发送以前存储的数据。
以前存储的数据仍附加到旧架构。
合流云没有意识到模式已经进化。
尝试通过在合流云中创建一个新环境(这将强制创建一个新的sr示例)或者使用一个全新的kafka主题来重置设置。无论哪种方法,都从新数据开始。连接器总是试图保持乐观,确保没有数据丢失,但在这个过程中,这可能是错误的,因为模式已经演变。
在elasticsearch上设置Map。完成此操作后,连接器将Map到正确的模式。另外,由于某些原因,它只在我为elasticsearch索引的Map使用动态模板时才起作用。