我创建了以下应用程序,用于在20sec窗口中打印特定消息:
public class SparkMain {
public static void main(String[] args) {
Map<String, Object> kafkaParams = new HashMap<>();
kafkaParams.put(BOOTSTRAP_SERVERS_CONFIG, "localhost:9092, localhost:9093");
kafkaParams.put(GROUP_ID_CONFIG, "spark-consumer-id");
kafkaParams.put(KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
kafkaParams.put(VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
// events topic has 2 partitions
Collection<String> topics = Arrays.asList("events");
// local[*] Run Spark locally with as many worker threads as logical cores on your machine.
SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("SsvpSparkStreaming");
// Create context with a 1 seconds batch interval
JavaStreamingContext streamingContext =
new JavaStreamingContext(conf, Durations.seconds(1));
JavaInputDStream<ConsumerRecord<String, String>> stream =
KafkaUtils.createDirectStream(
streamingContext,
LocationStrategies.PreferConsistent(),
ConsumerStrategies.<String, String>Subscribe(topics, kafkaParams)
);
// extract event name from record value
stream.map(new Function<ConsumerRecord<String, String>, String>() {
@Override
public String call(ConsumerRecord<String, String> rec) throws Exception {
return rec.value().substring(0, 5);
}})
// filter events
.filter(new Function<String, Boolean>() {
@Override
public Boolean call(String eventName) throws Exception {
return eventName.contains("msg");
}})
// count with 20sec window and 5 sec slide duration
.countByValueAndWindow(Durations.seconds(20), Durations.seconds(5))
.print();
streamingContext.checkpoint("c:\\projects\\spark\\");
streamingContext.start();
try {
streamingContext.awaitTermination();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
在日志中运行main方法后,我只看到一个使用者初始化,它同时获得两个分区:
2018-10-25 18:25:56007信息[org.apache.kafka.common.utils.logcontext$kafkalogger.info]-<[consumer clientid=consumer-1,groupid=spark consumer id]设置新分配的分区[events-0,events-1]>
消费者的数量不应该等于spark工人的数量吗?按照https://spark.apache.org/docs/2.3.2/submitting-applications.html#master-网址
local[*]表示在本地运行spark,其工作线程数与计算机上的逻辑核数相同。
我有8个cpu核,所以我希望应该创建8个使用者或至少2个使用者,每个使用者都获得“事件”主题的分区(2个分区)。
在我看来,我需要运行一个完整的独立的spark主从集群,其中有2个节点,每个节点启动自己的消费者。。。
暂无答案!
目前还没有任何答案,快来回答吧!