似乎无法将kstream< a,b>转换为ktable< x,y>

13z8s7eq  于 2021-06-08  发布在  Kafka
关注(0)|答案(1)|浏览(416)

这是我第一次尝试使用 KTable . 我有一个kafka流,其中包含类型为的avro序列化对象 A,B . 这个很好用。我可以写一个消费者,消费只是罚款或简单的 KStream 这只是统计记录。
这个 B 对象有一个包含国家代码的字段。我想把这个代码提供给一个ktable,这样它就可以计算包含特定国家代码的记录数。为了做到这一点,我试图把这个流转换成一个 X,Y (或者说:国家代码,计数)。最后,我查看表中的内容并提取一个kv对数组。
我(包括在内)的代码总是会出现以下错误(请参见带有'caused by'的一行):

2018-07-26 13:42:48.688 [com.findology.tools.controller.TestEventGeneratorController-16d7cd06-4742-402e-a679-898b9ef78c41-StreamThread-1; AssignedStreamsTasks] ERROR -- stream-thread [com.findology.tools.controller.TestEventGeneratorController-16d7c\
d06-4742-402e-a679-898b9ef78c41-StreamThread-1] Failed to process stream task 0_0 due to the following error:
org.apache.kafka.streams.errors.StreamsException: Exception caught in process. taskId=0_0, processor=KSTREAM-SOURCE-0000000000, topic=com.findology.model.traffic.CpaTrackingCallback, partition=0, offset=962649
        at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:240)
        at org.apache.kafka.streams.processor.internals.AssignedStreamsTasks.process(AssignedStreamsTasks.java:94)
        at org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:411)
        at org.apache.kafka.streams.processor.internals.StreamThread.processAndMaybeCommit(StreamThread.java:922)
        at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:802)
        at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:749)
        at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:719)
Caused by: org.apache.kafka.streams.errors.StreamsException: A serializer (key: org.apache.kafka.common.serialization.ByteArraySerializer / value: org.apache.kafka.common.serialization.ByteArraySerializer) is not compatible to the actual key or value type (key type: java.lang.Integer / value type: java.lang.Integer). Change the default Serdes in StreamConfig or provide correct Serdes via method parameters.
        at org.apache.kafka.streams.processor.internals.SinkNode.process(SinkNode.java:92)
        at org.apache.kafka.streams.processor.internals.AbstractProcessorContext.forward(AbstractProcessorContext.java:174)
        at org.apache.kafka.streams.kstream.internals.KStreamFilter$KStreamFilterProcessor.process(KStreamFilter.java:43)
        at org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:46)
        at org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:211)
        at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:124)
        at org.apache.kafka.streams.processor.internals.AbstractProcessorContext.forward(AbstractProcessorContext.java:174)
        at org.apache.kafka.streams.kstream.internals.KStreamTransform$KStreamTransformProcessor.process(KStreamTransform.java:59)
        at org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:46)
        at org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:211)
        at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:124)
        at org.apache.kafka.streams.processor.internals.AbstractProcessorContext.forward(AbstractProcessorContext.java:174)
        at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:80)
        at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:224)
        ... 6 more
Caused by: java.lang.ClassCastException: java.lang.Integer cannot be cast to [B
        at org.apache.kafka.common.serialization.ByteArraySerializer.serialize(ByteArraySerializer.java:21)
        at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:146)
        at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:94)
        at org.apache.kafka.streams.processor.internals.SinkNode.process(SinkNode.java:87)
        ... 19 more

这是我正在使用的代码。为了简洁起见,我省略了某些课程。注意,我没有使用合流的kafkaavro类。

private synchronized void createStreamProcessor2() {
    if (streams == null) {
        try {
            Properties props = new Properties();
            props.put(StreamsConfig.APPLICATION_ID_CONFIG, getClass().getName());
            props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);

            StreamsConfig config = new StreamsConfig(props);
            StreamsBuilder builder = new StreamsBuilder();

            Map<String, Object> serdeProps = new HashMap<>();
            serdeProps.put("schema.registry.url", schemaRegistryURL);
            AvroSerde<CpaTrackingCallback> cpaTrackingCallbackAvroSerde = new AvroSerde<>(schemaRegistryURL);
            cpaTrackingCallbackAvroSerde.configure(serdeProps, false);

            // This is the key to telling kafka the specific Serde instance to use
            // to deserialize the Avro encoded value
            KStream<Long, CpaTrackingCallback> stream = builder.stream(CpaTrackingCallback.class.getName(),
                            Consumed.with(Serdes.Long(), cpaTrackingCallbackAvroSerde));

            // provide a way to convert CpsTrackicking... info into just country codes
            // (Long, CpaTrackingCallback) -> (countryCode:Integer, placeHolder:Long)
            TransformerSupplier<Long, CpaTrackingCallback, KeyValue<Integer, Long>> transformer = new TransformerSupplier<Long, CpaTrackingCallback, KeyValue<Integer, Long>>() {
                @Override
                public Transformer<Long, CpaTrackingCallback, KeyValue<Integer, Long>> get() {
                    return new Transformer<Long, CpaTrackingCallback, KeyValue<Integer, Long>>() {

                        @Override
                        public void init(ProcessorContext context) {
                            // Not doing Punctuate so no need to store context
                        }

                        @Override
                        public KeyValue<Integer, Long> transform(Long key, CpaTrackingCallback value) {
                            return new KeyValue(value.getCountryCode(), 1);
                        }

                        @Override
                        public KeyValue<Integer, Long> punctuate(long timestamp) {
                            return null;
                        }

                        @Override
                        public void close() {
                        }
                    };
                }
            };

            KTable<Integer, Long> countryCounts = stream.transform(transformer).groupByKey() //
                            .count(Materialized.as("country-counts"));

            streams = new KafkaStreams(builder.build(), config);
            Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
            streams.cleanUp();
            streams.start();

            try {
                countryCountsView = waitUntilStoreIsQueryable("country-counts", QueryableStoreTypes.keyValueStore(),
                                streams);
            }
            catch (InterruptedException e) {
                log.warn("Interrupted while waiting for query store to become available", e);
            }
        }
        catch (Exception e) {
            log.error(e);
        }
    }
}
afdcj2ne

afdcj2ne1#

光秃秃的 groupByKey() 上的方法 KStream 使用默认的序列化程序/反序列化程序(尚未设置)。使用方法 groupByKey(Serialized<K,V> serialized) ,例如:

.groupByKey(Serialized.with(Serdes.Integer(), Serdes.Long()))

另外请注意,您在定制transformersupplier中所做的工作,只需使用 KStream.map 打电话。

相关问题