我试图定义一个kafka流,它接受来自某个主题(比如employee)的记录,其中记录包含关于雇员及其部门的属性,并将其转换为另一个主题department,department包含部门属性和所有雇员的列表(在employee上抛出一些无状态转换)。
员工记录重复部门数据(我实际上是在处理一些dicom头数据,但我会坚持一个更普遍理解的关系。我试图理解一个普遍的解决办法)。此外,主题中的记录仅具有当前数据(即:如果部门发生更改,则没有先前的departmentid)
这看起来像是一份工作。我有一个简单的例子:
...
KStream<String, Employee> stream = kStreamBuilder.stream("EMPLOYEE"); // Stream from raw EMPLOYEE
stream.map((k, v) -> new KeyValue<>(k, transformEmployee(v))) // <-- some stateless enrichment of the employee
.groupBy((k, emp) -> emp.getDepartmentId(), jsonSerialisedWith(Employee.class))
// dummy reduce to a get a ktable for agg:
.reduce((aggValue, newEmp) -> newEmp)
.groupBy((k, emp2) -> new KeyValue<>(emp2.getDepartmentId(), emp2), jsonSerialisedWith(Employee.class))
.aggregate(Department::new, this::addEmployee, this::removeEmployee,
jsonValueMaterializedAs("DEPARTMENT-AGG", Department.class))
.toStream()
.to("DEPARTMENT", jsonProducedWith(Department.class));
...
private Department addEmployee(String deptId, Employee employee, Department department) {
department.addEmployee(employee);
if (department.getId() == null) {
department.setId(employee.getDepartmentId());
department.setName(employee.getDepartmentName());
}
return department;
}
这适用于添加或更新。但是,随着时间的推移,员工可能会被删除或重新分配到另一个部门。我认为删除应该是发送到employee主题的tombstone记录(k:empid,v:null)。但是,我不再拥有departmentid,我必须执行null检查(并为departmentid返回null),因此删除员工时不会发生removeemployee。部门ID变更的类似问题。
那么,Kafka是怎么处理这个问题的呢?
1条答案
按热度按时间5lhxktic1#
我认为使用你的代码就足够了,但是稍微改变一下删除雇员的语义。
你应该加一些
Mock
部门(当用户从部门中删除时将使用)。如果员工被删除,则将部门设置为
null
,应分配给Mock
部门。