使用DataStream API在Apache Flink中进行外键连接

t5zmwmid  于 2024-01-04  发布在  Apache
关注(0)|答案(1)|浏览(133)

免责声明:我正在为我的组织使用各种场景进行Apache Flink POC。我处于学习阶段。
目前,我们正在使用Kafka Streams(沿着KTable)来连接多个流。然而,我们更关心Kafka Streams的延迟问题,这就是我们使用Apache Flink探索选项的地方。
其中一个场景涉及FK join。为了简单起见,我以Employee和Department为例,其中employee.deptId = department.depId和Department可以由多个雇员组成(一对多关系)。我使用状态流实现这一点,如下所示:

//DataStreamSource<Long> streamSource = env.fromSequence(1, 10000000);
        SingleOutputStreamOperator<Employee> employeeSourceStreamOperator = env
                .fromSource(employeeSource, WatermarkStrategy.noWatermarks(), "Employee")
                .map(value -> objectMapper.readValue(value, Employee.class));
        SingleOutputStreamOperator<Department> departmentSingleOutputStreamOperator = env
                .fromSource(departmentSource, WatermarkStrategy.noWatermarks(), "Department")
                .map(value -> objectMapper.readValue(value, Department.class));
        employeeSourceStreamOperator
                .connect(departmentSingleOutputStreamOperator)
                .keyBy(Employee::getDeptId, Department::getDeptId)
                .map(new RichCoMapFunction<Employee, Department, Object>() {
                    // ListState to store multiple Employee instances for each department
                    private ListState<Employee> employeeListState;
                    // ValueState to store Department information
                    private ValueState<Department> departmentState;
                    @Override
                    public void open(Configuration parameters) throws Exception {
                        // Initialize ListState for Employee
                        ListStateDescriptor<Employee> employeeListStateDescriptor =
                                new ListStateDescriptor<>("employeeListState", Employee.class);
                        employeeListState = getRuntimeContext().getListState(employeeListStateDescriptor);
                        // Initialize ValueState for Department
                        ValueStateDescriptor<Department> departmentStateDescriptor =
                                new ValueStateDescriptor<>("departmentState", Department.class);
                        departmentState = getRuntimeContext().getState(departmentStateDescriptor);
                    }

                    @Override
                    public Object map1(Employee employee) throws Exception {
                        // Process Employee stream
                        // Store Employee information in MapState based on departmentId
                        employeeListState.add(employee);
                        // Try to join with Department information
                        Department department = departmentState.value();
                        if (department != null) {
                            // Join Employee and Department information
                            return "Employee join:" + employee.getName() + " works in " + department.getDeptName();
                        }
                        return ""; // No immediate result, need to wait for Department information
                    }

                    @Override
                    public Object map2(Department department) throws Exception {
                        // Process Department stream
                        // Store Department information in ValueState
                        departmentState.update(department);
                        // Try to join with Employee information
                        Iterable<Employee> employees = employeeListState.get();
                        if (employees != null) {
                            // Join Employee and Department information for each employee in the list
                            StringBuilder result = new StringBuilder();
                            for (Employee employee : employees) {
                                result.append("Department join:" + generateOutput(employee, department)).append("\n");
                            }
                            return result.toString();
                        }
                        return ""; // No immediate result, need to wait for Employee information
                    }

                    private String generateOutput(Employee employee, Department department) {
                        return employee.getName() + " works in " + department.getDeptName();
                    }
                })
                .sinkTo(new PrintSink<>());

字符串
这是预期的工作.然而,我无法弄清楚下面的场景.员工可以改变部门.在这种情况下,从员工到部门的关系可以很容易地改变,但从旧部门删除关系是具有挑战性的.有人可以请帮助与Flink功能,这将帮助我获得旧部门状态和删除员工?
让我知道,如果你需要更多的信息是上面失踪?
=使用表API根据大卫的建议更新了代码。

SingleOutputStreamOperator<Employee> employeeSourceStreamOperator = env
                .fromSource(employeeSource, WatermarkStrategy.noWatermarks(), "Employee")
                .map(value -> objectMapper.readValue(value, Employee.class));
        Table employeeTable = tableEnvironment
                .fromDataStream(employeeSourceStreamOperator, $("deptId").as("empDeptId"),$("name").as("name"));

        SingleOutputStreamOperator<Department> departmentSingleOutputStreamOperator = env
                .fromSource(departmentSource, WatermarkStrategy.noWatermarks(), "Department")
                .map(value -> objectMapper.readValue(value, Department.class));

        Table departmentTable = tableEnvironment
                .fromDataStream(departmentSingleOutputStreamOperator, Schema.newBuilder().build());

        Table table = employeeTable.join(departmentTable)
                .where($("empDeptId").isEqual($("deptId")))
                .select($("name"), $("deptName"));

        DataStream<Row> rowDataStream =
                tableEnvironment.toChangelogStream(table);
        rowDataStream.sinkTo(new PrintSink<>());


在部门1上发布沿着员工6在其中,如下所示{“deptId”:1,“deptName”:“Department 1”} 5> +I[员工6,部门1]
通过将deptId从1更改为2发布了员工6 {“empId”:6,“name”:“员工6”,“deptId”:2}员工6更新为部门2 5> +I[员工6,部门2]
现在已发布部门1的更新,收到以下输出,其中员工6仍属于部门1 5> +I[员工6,部门1]
因此,员工6仍然与部门1有关系,尽管员工6将部门改为2。
我在flink-test上创建了git repo。EmployeeDepartmentTestDataGenerator.java示例testdata工具,它发布了关于Kafka主题的随机数据。
我错过了什么吗?

m2xkgtsf

m2xkgtsf1#

如果你使用Flink的Table API(或Flink SQL),这类应用程序的实现要简单得多。Table/SQL连接将自动处理你关心的更新。用DataStream API手工实现这一点是很多不必要的工作(它需要在Flink状态下具体化连接,以便生成更新)。

相关问题