如何从Kafka连接器的响应体中获取特定的头

3phpmpom  于 2023-02-18  发布在  Apache
关注(0)|答案(1)|浏览(124)

有一个属于ApacheKafka连接的React,我想从那个身体里得到“ID”,有什么方法可以消除那个身体吗?

{"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"Id"},
{"type":"string","optional":false,"field":"__dbz__physicalTableIdentifier"}],
"optional":false,"name":"****.Key"},
"payload":{"Id":20030726,"__dbz__physicalTableIdentifier":
"test.***.dbo.Log"}}   Timestamp: 2023-02-16 11:05:22.496 Headers: empty 
{
"Id": 20030726,
"Date": 1652485947593,
"Thread": "ClusteredScheduler_Worker-9",
"Level": "DEBUG",
"Logger": "Quartz.Core.JobRunShell",
"Message": "Trigger instruction : NoInstruction",
"Exception": "",
"HostName": "****",
"Domain": "*****",
"Identity": "",
"UserName": "*****\\SYSTEM",
"Tier": "AppServer",
"ActivityId": "(null)",
"SessionId": "(null)",
"RequestPath": "(null)",
"DiagnosticsData": "(null)"
}

我已经显示如下请求创建连接器。

curl -X POST -H "Accept:application/json" -H "Content-Type:application/json" 
localhost:8083/connectors/ -d '{
"name": "****",
"config": {
 "connector.class" : "io.debezium.connector.sqlserver.SqlServerConnector",
 "tasks.max" : "1",
 "database.server.name" : "test",
 "database.hostname" : "****",
 "database.port" : "1433",
 "database.user" : "*****",
 "database.password" : "*****",
 "database.dbname" : "******",
 "table.include.list" : "dbo.Merchant",
 "database.history.kafka.bootstrap.servers" : "******:9092",
 "database.history.kafka.topic" : "schema-changes.****",
 "tombstones.on.delete" : "false",
 "transforms": "Reroute, unwrap",
 "transforms.Reroute.type": "io.debezium.transforms.ByLogicalTableRouter",
 "transforms.Reroute.topic.regex": "(.*)",
 "transforms.Reroute.topic.replacement": "test.****",
 "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState"
 }
 }'
xienkqul

xienkqul1#

可以使用ExtractField变换仅获取一个字段

相关问题