嗨,伙计们,我和Kafka一起工作>spark streaming>elasticsearch。但我不会将spark streaming javainputdstream json转换为elasticsearch。
我的代码:
SparkConf conf = new SparkConf()
.setAppName("Streaming")
.setMaster("local")
.set("es.nodes","localhost:9200")
.set("es.index.auto.create","true");
JavaStreamingContext streamingContext = new JavaStreamingContext(conf, new Duration(5000));
Map<String, Object> kafkaParams = new HashMap<>();
kafkaParams.put("bootstrap.servers", "localhost:9092");
kafkaParams.put("key.deserializer", StringDeserializer.class);
kafkaParams.put("value.deserializer", StringDeserializer.class);
kafkaParams.put("group.id", "exastax");
kafkaParams.put("auto.offset.reset", "latest");
kafkaParams.put("enable.auto.commit", false);
Collection<String> topics = Arrays.asList("loglar");
JavaInputDStream<ConsumerRecord<String, String>> stream =
KafkaUtils.createDirectStream(
streamingContext,
LocationStrategies.PreferConsistent(),
ConsumerStrategies.<String, String>Subscribe(topics, kafkaParams)
);
JavaPairDStream<String, String> finisStream = stream.mapToPair(record -> new Tuple2<>("", record.value()));
finisStream.print();
JavaEsSparkStreaming.saveJsonToEs(finisStream,"spark/docs");
streamingContext.start();
streamingContext.awaitTermination();
}
javaessparkstreaming.savejsontoes(finisstream,“spark/docs”);>>FinishStream无法工作,因为它不是javadstream。如何转换javadstream?
2条答案
按热度按时间yduiuuwa1#
JavaEsSparkStreaming.saveJsonToEs
使用JavaDStream
JavaEsSparkStreaming.saveToEsWithMeta
使用JavaPairDStream
要修复代码:dtcbnfnu2#
答案太多了!但我破解了这个密码: