Kafka connectelasticsearchsink从JSON中提取并执行值

ybzsozfc  于 2022-11-21  发布在  Apache
关注(0)|答案(1)|浏览(111)

我使用Elasticsearch Sink连接器将数据从Kafka流到Elasticsearch,我还有下一个问题。
我有下一个结构在Kafka主题document

Partition : 0 
Offset: 0
Key: 
Value: 
{
  "attributes": {
    "3": "Mike"
  }
}
Timestamp: 2022-11-03 19:03:34.866

对于此数据,我在ElasticSearch中有下一个索引模板

{
  "version": 1,
  "index_patterns": [
    "documents-*"
  ],
  "settings": {
    "number_of_shards": 1
  },
  "mappings": {
    "properties": {
      "id": {
        "type": "keyword"
      },
      "cashier": {
        "type": "text"
      }
    }
  }
}

接下来是Elastcisearch接收器连接器配置

{
  "name": "elasticsearch-sink",
  "config": {
    "connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
    "tasks.max": "1",
    "topics": "document, document-processing-error",
    "key.ignore": "true",
    "schema.ignore": "true",
    "connection.url": "http://elasticsearch:9200",
    "type.name": "_doc",
    "name": "elasticsearch-sink",
    "key.converter": "org.apache.kafka.connect.storage.StringConverter",
    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "value.converter.schemas.enable": "false",
    "flush.synchronously": "true",

    "transforms": "appendTimestampToIX",
    "transforms.appendTimestampToIX.type": "org.apache.kafka.connect.transforms.TimestampRouter",
    "transforms.appendTimestampToIX.topic.format": "${topic}-${timestamp}",
    "transforms.appendTimestampToIX.timestamp.format": "yyyy-MM-dd"
  }
}

在输出中,我的索引document-2022-11-03中有next数据

{
    "took": 1,
    "timed_out": false,
    "_shards": {
        "total": 1,
        "successful": 1,
        "skipped": 0,
        "failed": 0
    },
    "hits": {
        "total": {
            "value": 1,
            "relation": "eq"
        },
        "max_score": 1.0,
        "hits": [
            {
                "_index": "document-2022-11-03",
                "_type": "_doc",
                "_id": "document-2022-11-03+0+0",
                "_score": 1.0,
                "_source": {
                    "attributes": {
                        "3": "Mike"
                    }
                }
            }
        ]
    }
}

这样做很好,但是我需要对我的数据进行额外的转换,例如,如果在属性中我有关键字3,我需要替换这个字段并添加关键字cashier,然后将此结构变形为带有随机id的平面JSON文档,因此,在最后的输出中,我需要下一个结构(例如)

{
    "took": 1,
    "timed_out": false,
    "_shards": {
        "total": 1,
        "successful": 1,
        "skipped": 0,
        "failed": 0
    },
    "hits": {
        "total": {
            "value": 1,
            "relation": "eq"
        },
        "max_score": 1.0,
        "hits": [
            {
                "_index": "document-2022-11-03",
                "_type": "_doc",
                "_id": "134DaBfWAE6AZUyKUAbjRksjXHTmP6hDxedGm4YhBnZW",
                "_score": 1.0,
                "_source": {
                      "cashier": "Mike"
                }
            }
        ]
    }
}

我尝试使用替换字段的下一个配置,但这对我不起作用

"transforms": "RenameField",
"transforms.RenameField.type": "org.apache.kafka.connect.transforms.ReplaceField$Value",
"transforms.RenameField.renames": "arrtubites.3:cashier"

我该怎么做?

sczxawaw

sczxawaw1#

ReplaceField 变换 不 适用 于 嵌套 属性 ( 如 " 贴图 " 或 " 对象 " ) , 仅 适用 于 其中 一 个 的 顶级 字段 。
如果 要 转换

{
  "attributes": {
    "3": "Mike"
  }
}

中 的 每 一 个
进入

{
  "cashier": "Mike"
}

格式
然后 , Kafka Streams 或 ksqlDB 是 常见 的 建议 ( 也 就是 在 其他 地方 消费 , 并 产生 一 个 新 的 主题 与 您 想要 执行 的 逻辑 ) 。
Logstash 也 可能 是 一 个 选项 , 而 不是 + Kafka 连接 。

相关问题