kafka流聚合为删除和/或密钥更改

wyyhbhjk  于 2021-06-06  发布在  Kafka
关注(0)|答案(1)|浏览(260)

我试图定义一个kafka流,它接受来自某个主题(比如employee)的记录,其中记录包含关于雇员及其部门的属性,并将其转换为另一个主题department,department包含部门属性和所有雇员的列表(在employee上抛出一些无状态转换)。
员工记录重复部门数据(我实际上是在处理一些dicom头数据,但我会坚持一个更普遍理解的关系。我试图理解一个普遍的解决办法)。此外,主题中的记录仅具有当前数据(即:如果部门发生更改,则没有先前的departmentid)
这看起来像是一份工作。我有一个简单的例子:

...
        KStream<String, Employee> stream = kStreamBuilder.stream("EMPLOYEE"); // Stream from raw EMPLOYEE
        stream.map((k, v) -> new KeyValue<>(k, transformEmployee(v))) // <-- some stateless enrichment of the employee
                .groupBy((k, emp) -> emp.getDepartmentId(), jsonSerialisedWith(Employee.class))

                // dummy reduce to a get a ktable for agg:
                .reduce((aggValue, newEmp) -> newEmp) 
                .groupBy((k, emp2) -> new KeyValue<>(emp2.getDepartmentId(), emp2), jsonSerialisedWith(Employee.class))

                .aggregate(Department::new, this::addEmployee, this::removeEmployee,
                           jsonValueMaterializedAs("DEPARTMENT-AGG", Department.class))
                .toStream()
                .to("DEPARTMENT", jsonProducedWith(Department.class));
        ...

    private Department addEmployee(String deptId, Employee employee, Department department) {
        department.addEmployee(employee);
        if (department.getId() == null) {
            department.setId(employee.getDepartmentId());
            department.setName(employee.getDepartmentName());
        }
        return department;
    }

这适用于添加或更新。但是,随着时间的推移,员工可能会被删除或重新分配到另一个部门。我认为删除应该是发送到employee主题的tombstone记录(k:empid,v:null)。但是,我不再拥有departmentid,我必须执行null检查(并为departmentid返回null),因此删除员工时不会发生removeemployee。部门ID变更的类似问题。
那么,Kafka是怎么处理这个问题的呢?

5lhxktic

5lhxktic1#

我认为使用你的代码就足够了,但是稍微改变一下删除雇员的语义。
你应该加一些 Mock 部门(当用户从部门中删除时将使用)。
如果员工被删除,则将部门设置为 null ,应分配给 Mock 部门。

相关问题