如何使用elasticsearchsinkconnector重命名/转换json对象中的嵌套字段

3pmvbmvn  于 2021-06-04  发布在  Kafka
关注(0)|答案(1)|浏览(438)

我正在使用elasticsearchsinkconnector将kafka主题中的数据存储到elasticsearch索引中。这是一个Kafka信息的例子:

{"ID" : "7d6203f4-3ae7-4daa-af03-71f98d619f7e",
 "Timestamp" : "2020-11-02T12:05:57.87639003Z",
 "Type" : "CREATION",
 "PlaceType" : "home",
 "Location" : {
        "Lat" : 43.7575119,
        "Lon" : 11.2921363
      },
"Created" : "2020-11-02T12:05:57.876390266Z",
"LastUpdated" : "2020-11-02T12:05:57.876390398Z"}

我想代表 Location 对象作为es中的地理点,但lat/lon必须小写才能作为地理点对象。我正在使用 ReplaceField$Value 将location重命名为“location”,但无法重命名嵌套字段lat/lon。这是我重命名位置、纬度和经度的代码片段:

transforms: 'RenameField'
transforms.RenameField.type: org.apache.kafka.connect.transforms.ReplaceField$Value
transforms.RenameField.renames: 'Location:location,location.Lat:lat,location.Lon:lon'

定位是可行的,但lat/lon不行。简而言之,我想在es中得到以下结果:

{"ID" : "7d6203f4-3ae7-4daa-af03-71f98d619f7e",
 "Timestamp" : "2020-11-02T12:05:57.87639003Z",
 "Type" : "CREATION",
 "PlaceType" : "home",
 "location" : {
        "lat" : 43.7575119,
        "lon" : 11.2921363
      },
"Created" : "2020-11-02T12:05:57.876390266Z",
"LastUpdated" : "2020-11-02T12:05:57.876390398Z"}

更新
太棒了,非常感谢。在ksql cli中创建目标流时出现问题。

ksql> CREATE STREAM ES_PLACES_EVENT WITH (KAFKA_TOPIC='es-places-event') AS
>    SELECT *,
>        STRUCT('lat' = LOCATION->LAT, 'lon'= LOCATION->LON) AS "location"
>    FROM PLACES_EVENT;
Can't find any functions with the name 'STRUCT'
ksql> CREATE STREAM ES_PLACES_EVENT WITH (KAFKA_TOPIC='es-places-event') AS
>    SELECT *,
>        STRUCT('lat' = LOCATION->LAT, 'lon'= LOCATION->LON) AS 'location'
>    FROM PLACES_EVENT;
line 3:64: mismatched input ''location'' expecting {'NO', 'INTEGER', 'DATE', 'TIME', 'TIMESTAMP', 'INTERVAL', 'YEAR', 'MONTH', 'DAY', 'HOUR', 'MINUTE', 'SECOND', 'ZONE', 'PARTITION', 'STRUCT', 'REPLACE', 'EXPLAIN', 'ANALYZE', 'FORMAT', 'TYPE', 'TEXT', 'SHOW', 'TABLES', 'SCHEMAS', 'COLUMNS', 'COLUMN', 'PARTITIONS', 'FUNCTIONS', 'FUNCTION', 'ARRAY', 'MAP', 'SET', 'RESET', 'SESSION', 'DATA', 'IF', IDENTIFIER, DIGIT_IDENTIFIER, QUOTED_IDENTIFIER, BACKQUOTED_IDENTIFIER}
Caused by: org.antlr.v4.runtime.InputMismatchException

我试图设置没有引号的结构名,但是ksql抛出了一个与第一个一样的错误。

ksql> CREATE STREAM ES_PLACES_EVENT WITH (KAFKA_TOPIC='es-places-event') AS
>    SELECT *,
>        STRUCT('lat' = LOCATION->LAT, 'lon'= LOCATION->LON) AS GeoPointLocation
>    FROM PLACES_EVENT;
Can't find any functions with the name 'STRUCT'

你能帮助我吗?

50pmv0ei

50pmv0ei1#

我遇到了完全相同的问题—我不知道现有的单个消息转换是否有帮助。你有几个选择:
编写您自己的单个消息转换来完成此操作
使用ksqldb来整理模式,这是我选择的路线

CREATE STREAM OUTPUT_STREAM AS
     SELECT *,
     STRUCT("lat" := LATITUDE, "lon":= LONGITUDE) AS "location"
   FROM SOURCE_STREAM
   EMIT CHANGES;

如果您还没有创建Map模板,那么您还需要创建Map模板来准备elasticsearch索引
要扩展ksqldb示例:
使用示例数据填充源主题:

kafkacat -b localhost:9092 -P -t input_topic <<EOF
{ "ID": "7d6203f4-3ae7-4daa-af03-71f98d619f7e", "Timestamp": "2020-11-02T12:05:57.87639003Z", "Type": "CREATION", "PlaceType": "home", "Location": { "Lat": 43.7575119, "Lon": 11.2921363 }, "Created": "2020-11-02T12:05:57.876390266Z", "LastUpdated": "2020-11-02T12:05:57.876390398Z" }
EOF

作为主题的来源 source ,声明ksqldb STREAM 对象(基本上是覆盖了模式的Kafka主题):

CREATE STREAM SOURCE_STREAM (ID VARCHAR,
                            Timestamp VARCHAR,
                            Type VARCHAR,
                            PlaceType VARCHAR,
                            Location STRUCT<Lat DOUBLE, Lon DOUBLE>,
                            Created VARCHAR,
                            LastUpdated VARCHAR)
        WITH (KAFKA_TOPIC='input_topic', 
            VALUE_FORMAT='JSON');

通过从第一条消息中选择字段,确认流的架构有效:

ksql> SET 'auto.offset.reset' = 'earliest';
>
Successfully changed local property 'auto.offset.reset' to 'earliest'. Use the UNSET command to revert your change.

ksql> SELECT ID, PLACETYPE, LOCATION->LAT, LOCATION->LON FROM SOURCE_STREAM EMIT CHANGES LIMIT 1;
+---------------------------------------+----------+-----------+-----------+
|ID                                     |PLACETYPE |LAT        |LON        |
+---------------------------------------+----------+-----------+-----------+
|7d6203f4-3ae7-4daa-af03-71f98d619f7e   |home      |43.7575119 |11.2921363 |
Limit Reached
Query terminated

创建一个目标流,将lat/lon字段Map为小写名称。这里我还展示了连接它们的另一种方法,elasticsearch也会接受这种方法:

CREATE STREAM TARGET_STREAM WITH (KAFKA_TOPIC='target_topic') AS
    SELECT *, 
        STRUCT("lat" := LOCATION->LAT, "lon":= LOCATION->LON) AS "location_example_01",
        CAST(LOCATION->LAT AS VARCHAR)  + ',' + CAST(LOCATION->LON AS VARCHAR) AS "location_example_02"
    FROM SOURCE_STREAM;

如果索引还没有 geo_point 已声明Map。在这里,它将匹配以 target ```
curl --silent --show-error -XPUT -H 'Content-Type: application/json'
http://localhost:9200/_index_template/rmoff_template01/
-d'{
"index_patterns": [ "target*" ],
"template": {
"mappings": {
"properties": {
"location_example_01": {
"type": "geo_point"
},
"location_example_02": {
"type": "geo_point"
}
}
}
} }'

使用kafka connect将数据从kafka流式传输到elasticsearch。您可以使用本机kafka connect rest api进行配置,也可以直接从ksqldb本身进行配置:

CREATE SINK CONNECTOR SINK_ELASTIC_01 WITH (
'connector.class' = 'io.confluent.connect.elasticsearch.ElasticsearchSinkConnector',
'topics' = 'target_topic',
'key.converter' = 'org.apache.kafka.connect.storage.StringConverter',
'value.converter' = 'org.apache.kafka.connect.json.JsonConverter',
'value.converter.schemas.enable' = 'false',
'connection.url' = 'http://elasticsearch:9200',
'type.name' = '_doc',
'key.ignore' = 'true',
'schema.ignore' = 'true');

检查新elasticsearch索引中的Map

curl -XGET --silent --show-error http://localhost:9200"/target_topic/_mappings" | jq '.'
{
"target_topic": {
"mappings": {
"properties": {
"CREATED": {
"type": "date"
},
"ID": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
},
"LASTUPDATED": {
"type": "date"
},
"LOCATION": {
"properties": {
"LAT": {
"type": "float"
},
"LON": {
"type": "float"
}
}
},
"PLACETYPE": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
},
"TIMESTAMP": {
"type": "date"
},
"TYPE": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
},
"location_example_01": {
"type": "geo_point"
},
"location_example_02": {
"type": "geo_point"
}
}
}
}
}

查看数据
![](https://i.stack.imgur.com/fguaB.png)
![](https://i.stack.imgur.com/UpxbS.png)

相关问题