我正在使用elasticsearchsinkconnector将kafka主题中的数据存储到elasticsearch索引中。这是一个Kafka信息的例子:
{"ID" : "7d6203f4-3ae7-4daa-af03-71f98d619f7e",
"Timestamp" : "2020-11-02T12:05:57.87639003Z",
"Type" : "CREATION",
"PlaceType" : "home",
"Location" : {
"Lat" : 43.7575119,
"Lon" : 11.2921363
},
"Created" : "2020-11-02T12:05:57.876390266Z",
"LastUpdated" : "2020-11-02T12:05:57.876390398Z"}
我想代表 Location
对象作为es中的地理点,但lat/lon必须小写才能作为地理点对象。我正在使用 ReplaceField$Value
将location重命名为“location”,但无法重命名嵌套字段lat/lon。这是我重命名位置、纬度和经度的代码片段:
transforms: 'RenameField'
transforms.RenameField.type: org.apache.kafka.connect.transforms.ReplaceField$Value
transforms.RenameField.renames: 'Location:location,location.Lat:lat,location.Lon:lon'
定位是可行的,但lat/lon不行。简而言之,我想在es中得到以下结果:
{"ID" : "7d6203f4-3ae7-4daa-af03-71f98d619f7e",
"Timestamp" : "2020-11-02T12:05:57.87639003Z",
"Type" : "CREATION",
"PlaceType" : "home",
"location" : {
"lat" : 43.7575119,
"lon" : 11.2921363
},
"Created" : "2020-11-02T12:05:57.876390266Z",
"LastUpdated" : "2020-11-02T12:05:57.876390398Z"}
更新
太棒了,非常感谢。在ksql cli中创建目标流时出现问题。
ksql> CREATE STREAM ES_PLACES_EVENT WITH (KAFKA_TOPIC='es-places-event') AS
> SELECT *,
> STRUCT('lat' = LOCATION->LAT, 'lon'= LOCATION->LON) AS "location"
> FROM PLACES_EVENT;
Can't find any functions with the name 'STRUCT'
ksql> CREATE STREAM ES_PLACES_EVENT WITH (KAFKA_TOPIC='es-places-event') AS
> SELECT *,
> STRUCT('lat' = LOCATION->LAT, 'lon'= LOCATION->LON) AS 'location'
> FROM PLACES_EVENT;
line 3:64: mismatched input ''location'' expecting {'NO', 'INTEGER', 'DATE', 'TIME', 'TIMESTAMP', 'INTERVAL', 'YEAR', 'MONTH', 'DAY', 'HOUR', 'MINUTE', 'SECOND', 'ZONE', 'PARTITION', 'STRUCT', 'REPLACE', 'EXPLAIN', 'ANALYZE', 'FORMAT', 'TYPE', 'TEXT', 'SHOW', 'TABLES', 'SCHEMAS', 'COLUMNS', 'COLUMN', 'PARTITIONS', 'FUNCTIONS', 'FUNCTION', 'ARRAY', 'MAP', 'SET', 'RESET', 'SESSION', 'DATA', 'IF', IDENTIFIER, DIGIT_IDENTIFIER, QUOTED_IDENTIFIER, BACKQUOTED_IDENTIFIER}
Caused by: org.antlr.v4.runtime.InputMismatchException
我试图设置没有引号的结构名,但是ksql抛出了一个与第一个一样的错误。
ksql> CREATE STREAM ES_PLACES_EVENT WITH (KAFKA_TOPIC='es-places-event') AS
> SELECT *,
> STRUCT('lat' = LOCATION->LAT, 'lon'= LOCATION->LON) AS GeoPointLocation
> FROM PLACES_EVENT;
Can't find any functions with the name 'STRUCT'
你能帮助我吗?
1条答案
按热度按时间50pmv0ei1#
我遇到了完全相同的问题—我不知道现有的单个消息转换是否有帮助。你有几个选择:
编写您自己的单个消息转换来完成此操作
使用ksqldb来整理模式,这是我选择的路线
如果您还没有创建Map模板,那么您还需要创建Map模板来准备elasticsearch索引
要扩展ksqldb示例:
使用示例数据填充源主题:
作为主题的来源
source
,声明ksqldbSTREAM
对象(基本上是覆盖了模式的Kafka主题):通过从第一条消息中选择字段,确认流的架构有效:
创建一个目标流,将lat/lon字段Map为小写名称。这里我还展示了连接它们的另一种方法,elasticsearch也会接受这种方法:
如果索引还没有
geo_point
已声明Map。在这里,它将匹配以target
```curl --silent --show-error -XPUT -H 'Content-Type: application/json'
http://localhost:9200/_index_template/rmoff_template01/
-d'{
"index_patterns": [ "target*" ],
"template": {
"mappings": {
"properties": {
"location_example_01": {
"type": "geo_point"
},
"location_example_02": {
"type": "geo_point"
}
}
}
} }'
CREATE SINK CONNECTOR SINK_ELASTIC_01 WITH (
'connector.class' = 'io.confluent.connect.elasticsearch.ElasticsearchSinkConnector',
'topics' = 'target_topic',
'key.converter' = 'org.apache.kafka.connect.storage.StringConverter',
'value.converter' = 'org.apache.kafka.connect.json.JsonConverter',
'value.converter.schemas.enable' = 'false',
'connection.url' = 'http://elasticsearch:9200',
'type.name' = '_doc',
'key.ignore' = 'true',
'schema.ignore' = 'true');
curl -XGET --silent --show-error http://localhost:9200"/target_topic/_mappings" | jq '.'
{
"target_topic": {
"mappings": {
"properties": {
"CREATED": {
"type": "date"
},
"ID": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
},
"LASTUPDATED": {
"type": "date"
},
"LOCATION": {
"properties": {
"LAT": {
"type": "float"
},
"LON": {
"type": "float"
}
}
},
"PLACETYPE": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
},
"TIMESTAMP": {
"type": "date"
},
"TYPE": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
},
"location_example_01": {
"type": "geo_point"
},
"location_example_02": {
"type": "geo_point"
}
}
}
}
}