复杂对象kstream globalktable left join

8ehkhllq  于 2021-06-04  发布在  Kafka
关注(0)|答案(1)|浏览(330)

我是新来Kafka的。我想执行以下kstream globalktable纯dsl左连接操作,而不是使用map操作。
我有一个输入流 a.topic 即<string,a>,其中值:

{
    "b_obj": {
        "b_value": "xyz",
        "c_list": [
            {
                "d_obj": {
                    "d_id1": "value1",
                    "d_id2": "value2",
                    "d_value": "some value"
                },
                "c_value": "jkl"
            },
            {
                "d_obj": {
                    "d_id1": "value3",
                    "d_id2": "value4",
                    "d_value": "some value 2"
                },
                "c_value": "pqr"
            }
        ]
    },
    "a_value": "abcd"
}

另一个输入主题 e.topic 即<string,e>,其中值:

{
    "e_id1": "value1",
    "e_id2": "value2",
    "e_value": "some value"
}

我要执行左连接操作 a.topic 是一个流和主数据 e.topic 是实现结果值的全局表

{
    "b_obj": {
        "b_value": "xyz",
        "c_list": [
            {
                "d_obj": {
                    "d_id1": "value1",
                    "d_id2": "value2",
                    "d_value": "some value"
                },
                "e_obj": {
                    "e_id1": "value1",
                    "e_id2": "value2",
                    "e_value": "some value a"
                },
                "c_value": "jkl"
            },
            {
                "d_obj": {
                    "d_id1": "value3",
                    "d_id2": "value4",
                    "d_value": "some value 2"
                },
                "e_obj": {
                    "e_id1": "value3",
                    "e_id2": "value4",
                    "e_value": "some value b"
                },
                "c_value": "pqr"
            }
        ]
    },
    "a_value": "abcd"
}

连接条件是 a.b.c[i].d.d_id1 == e.e_id1 && a.b.c[i].d.d_id2 == e.e_id2 代码:

public class ComplexBeanStream {

    public static void main(String[] args) {

        Serde<A> aSerde = new JsonSerde<>(A.class);

        Serde<E> eSerde = new JsonSerde<>(E.class);

        final Properties streamsConfiguration = new Properties();
        streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "complex-bean-app");
        streamsConfiguration.put(StreamsConfig.CLIENT_ID_CONFIG, "complex-bean-client");
        streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");

        StreamsBuilder builder = new StreamsBuilder();

        final GlobalKTable<String, E> eGlobalTable =
                builder.globalTable(
                        "e.topic",
                        Materialized.<String, E, KeyValueStore<Bytes, byte[]>>
                                as("E-STORE")
                                .withKeySerde(Serdes.String())
                                .withValueSerde(eSerde)
                );

        final KStream<String, A> aStream =
                builder.stream(
                        "a.topic",
                        Consumed.with(Serdes.String(), aSerde));

        // perform left-join here

        Topology topology = builder.build();

        System.out.println("\n\nComplexBeanStream Topology: \n" + topology.describe());

        final KafkaStreams streams = new KafkaStreams(topology, streamsConfiguration);

        streams.cleanUp();

        streams.start();

        Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
    }

}

class A {

    private B b_obj;

    private String a_value;
}

class B {

    private List<C> c_list;

    private String b_value;
}

class C {

    private D d_obj;

    private E e_obj;

    private String c_value;
}

class D {

    private String d_id1;

    private String d_id2;

    private String d_value;
}

class E {

    private String e_id1;

    private String e_id2;

    private String e_value;
}
g6ll5ycj

g6ll5ycj1#

在您的案例中,只要连接查找基于值而不是键,就不可能在没有Map的情况下进行连接。
kafka流只能基于两侧的同一个密钥进行连接。这意味着您应该Map并为连接的两侧选择新的键(重新设置键),以便实现 a.b.c[i].d.d_id1 == e.e_id1 && a.b.c[i].d.d_id2 == e.e_id2 .
在这种情况下,一方可能 [a.b.c[i].d.d_id1, a.b.c[i].d.d_id2] 作为它的钥匙和另一面 [e.e_id1, e.e_id2] . 如果有匹配项,则可以将这些值连接到新对象。也许你应该把你的 c_list 在重新设置关键帧之前。
阅读Kafka流的连接也是很有帮助的。

相关问题