使用kafka变更数据捕获的数据库审计跟踪

kmb7vmvb  于 2021-06-04  发布在  Kafka
关注(0)|答案(0)|浏览(294)

我正在尝试使用kafka对mysql数据库进行审计跟踪,系统应检测到源数据库中的任何更改,并相应地在相关表的目标数据库中插入一条记录,对该特定表进行审计跟踪。我目前正在使用debezium for cdc和jdbc接收器/源连接器。所需的用例是:
如果在源数据库的表students中插入一条记录,则应在students\u trail destination数据库中插入一行,其中包含该行的旧值和新值
更新和删除时相同
源数据库中有多个表,以上同样适用于所有表
目前,我有以下源和接收器连接器配置,可以将源数据库中发生的任何更改复制到目标数据库中,比如插入将插入同一行,而更新将更新该行,由于kafka的新手知识有限,我们如何实现上述审计日志记录场景。谢谢
@source.json文件:

{
    "name": "jdbc-source",
    "config": {
        "connector.class": "io.debezium.connector.mysql.MySqlConnector",
        "tasks.max": "1",
        "database.hostname": "localhost",
        "database.port": "3306",
        "database.user": "",
        "database.password": "",
        "database.server.name": "localhost",
        "database.whitelist": "dbname",
        "database.server.id": "1234",
        "database.history.kafka.bootstrap.servers": "kafka:9092",
        "database.history.kafka.topic": "schema-changes.topicname",
        "transforms": "route",
        "transforms.route.type": "org.apache.kafka.connect.transforms.RegexRouter",
        "transforms.route.regex": "([^.]+)\\.([^.]+)\\.([^.]+)",
        "transforms.route.replacement": "$3"
    }
}

@sink.json:

{
    "name": "jdbc-sink",
    "config": {
        "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
        "tasks.max": "1",
        "topics": "",
        "connection.url": "jdbc:mysql://localhost:3306",
        "connection.user": "",
        "connection.password": "",
        "transforms": "unwrap",
        "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
        "transforms.unwrap.drop.tombstones": "false",
        "insert.mode": "upsert",
        "auto.create": "true",
        "delete.enabled": "true",
        "auto.evolve": "true",
        "pk.fields": "id",
        "pk.mode": "record_key"
    }
}

更新:
假设这是一个源数据库表(student):

id  name   birth_date
1   den    2001-09-12
2   jeff   2002-09-02

如果我在这里插入新行,例如 3 foo 1999-09-28 然后,当前设置将在相关的自动创建表的dest db中插入同一行,但我希望在自定义模式表(例如,在本例中为students\u trail)中插入before和after状态值(josn表示),它们的列(id、student\u id、before、after、timestamp)可以是自预创建的或自动创建的,无论哪一个有效。在这种插入情况下,应在dest table中创建新行,并应如下所示:
学生之路

id   student_id   before   after                 timestamp

98   3            null     "3, foo, 1999-09-28"  current_timestamp

如果插入前的状态为空,删除后的状态为空,更新前和更新后的状态都存在,并且对于每个操作,在相关的dest表中插入一行。

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题