// setting the topic.
HashSet<String> topicsSet = new HashSet<String>(Arrays.asList("myTopic"));
// setting the broker list.
Map<String, String> kafkaParams = new HashMap<String, String>();
kafkaParams.put("metadata.broker.list", "localhost:9092");
// To read the messages from start.
kafkaParams.put("auto.offset.reset", "smallest");
// creating the DStream
JavaPairInputDStream<byte[], byte[]> kafkaStream = KafkaUtils.createDirectStream(streamingContext, byte[].class, byte[].class, DefaultDecoder.class, DefaultDecoder.class, kafkaParams, topicsSet);
mapr流api:
// setting the topic.
HashSet<String> topicsSet = new HashSet<String>(Arrays.asList("myTopic"));
// setting the broker list.
Map<String, String> kafkaParams = new HashMap<String, String>();
kafkaParams.put("metadata.broker.list", "localhost:9092");
// To read the messages from start.
kafkaParams.put("auto.offset.reset", "earliest");
// setting up the key and value deserializer
kafkaParams.put("key.deserializer", StringDeserializer.class.getName());
kafkaParams.put("value.deserializer", ByteArrayDeserializer.class.getName());
// creating the DStream
JavaPairInputDStream<byte[], byte[]> kafkaStream = KafkaUtils.createDirectStream(streamingContext, byte[].class, byte[].class, kafkaParams, topicsSet);
2条答案
按热度按时间mnemlml81#
因此,kafka和mapr流api在编码方面没有很大的区别。
但在配置和api参数方面存在一些差异:
kafka支持接收器和直接两种方法,但mapr流只支持直接方法。
用于从开始读取数据的偏移重置配置值在kafka中最小,但在mapr流中最早。
kafka api支持在方法中传递键和值反序列化器参数,但在mapr stream api中,必须根据key.deserializer和value.deserializer键在kafka params map中配置它们。
kafka和mapr stream api调用接收数据流的直接方法示例:
Kafkaapi:
mapr流api:
我希望上面的解释能帮助您理解kafka和maprstreamapi之间的区别。
谢谢,
霍坎
www.streamanalytix.com
d4so4syb2#
我没有使用mapr流(因为它不是开源的),但我的理解是它们克隆了kafka0.9javaapi。因此,如果您使用的是kafka0.9客户机,那么应该非常类似(但是您需要使用他们的客户机,而不是apache的)。
此外,请注意,其他语言的客户端将不可用。其他使用不同api(特别是spark流)的apache项目将需要特殊的mapr兼容版本。