kafkautils.createdirectstream没有使用正确的参数-spark streaming+kafka

z18hc3ub  于 2021-06-07  发布在  Kafka
关注(0)|答案(1)|浏览(488)

我有一个应用程序,它将序列化的twitter数据发送到kafka主题。目前一切正常。
使用者应用程序应该读取数据并将其反序列化。现在,当我打电话的时候 KafkaUtils.createDirectStream ,我认为我设置了正确的参数(正如您将在抛出的错误中看到的),所以我无法理解为什么它不起作用。
kafkautils类型中的方法createdirectstream(javastreamingcontext,class-k-,class-v-,class-kd-,class-vd-,map-string,string-,set-string-)不适用于参数(javastreamingcontext,class-string-,class-status-,class-stringdeserializer-,class-statusdeserializer-,map-string,string-,set-string-)
查了一下spark javadoc,我的爱人对我来说还是对的。
我的代码是:

Set<String> topics = new HashSet<>();
topics.add("twitter-test");
JavaStreamingContext jssc = new JavaStreamingContext(jsc, new Duration(duration));
Map<String, String> props = new HashMap<>();
//some properties...
JavaPairInputDStream messages =  KafkaUtils.createDirectStream(jssc, String.class, Status.class, org.apache.kafka.common.serialization.StringDeserializer.class, stream_data.StatusDeserializer.class, props, topics);

状态序列化程序代码:

public class StatusSerializer implements Serializer<Status> {

  @Override public byte[] serialize(String s, Status o) {

           try {
                ByteArrayOutputStream baos = new ByteArrayOutputStream();
                ObjectOutputStream oos = new ObjectOutputStream(baos);
                oos.writeObject(o);
                oos.close();
                byte[] b = baos.toByteArray();
                return b;
            } catch (IOException e) {
                return new byte[0];
            }
        }

      @Override public void close() {

      }

    @Override
    public void configure(Map<String, ?> configs, boolean isKey) {

    }

}
nukf8bse

nukf8bse1#

看起来问题出在“stream\u data.statusdeserializer.class”上。请输入这个自定义反序列化程序类的代码。另外,你能看看这个用scalaforkafkaapi0.10编写的kafkaconsumerforspark:customavro反序列化程序吗。
在kafkaparam参数中包含以下内容。

key.deserializer -> classOf[StringDeserializer]
value.deserializer -> classOf[StatusDeserializer]

相关问题