我正在从事一个数据分析项目,在这个项目中,我从一个csv文件中读取数据,遍历Kafka主题上的数据,并使用spark streaming来使用Kafka主题数据。我在单个项目中使用的所有组件。
现在,在使用spark流媒体消耗数据之后,我必须对其进行一些计算,我必须将数据保存到elastic search中,并且我必须将这些数据发送到另一个主题。所以我从spark streaming做这些事情(将数据保存到elastic中并将数据发送到topic)。
下面是我的代码
@Component
public class RawEventSparkConsumer implements Serializable {
@Autowired
private ElasticSearchServiceImpl dataModelServiceImpl;
@Autowired
private EventKafkaProducer enrichEventKafkaProducer;
Collection<String> topics = Arrays.asList("rawTopic");
public void sparkRawEventConsumer(JavaStreamingContext streamingContext) {
Map<String, Object> kafkaParams = new HashedMap();
kafkaParams.put("bootstrap.servers", "localhost:9092");
kafkaParams.put("key.deserializer", StringDeserializer.class);
kafkaParams.put("value.deserializer", StringDeserializer.class);
kafkaParams.put("group.id", "group1");
kafkaParams.put("auto.offset.reset", "latest");
kafkaParams.put("enable.auto.commit", true);
JavaInputDStream<ConsumerRecord<String, String>> rawEventRDD = KafkaUtils.createDirectStream(streamingContext,
LocationStrategies.PreferConsistent(),
ConsumerStrategies.<String, String>Subscribe(topics, kafkaParams));
JavaDStream<String> dStream = rawEventRDD.map((x) -> x.value());
JavaDStream<BaseDataModel> baseDataModelDStream = dStream.map(convertIntoBaseModel);
baseDataModelDStream.foreachRDD(rdd1 -> {
saveDataToElasticSearch(rdd1.collect());
});
JavaDStream<EnrichEventDataModel> enrichEventRdd = baseDataModelDStream.map(convertIntoEnrichModel);
enrichEventRdd.foreachRDD(rdd -> {
System.out.println("Inside rawEventRDD.foreachRDD = = = " + rdd.count());
sendEnrichEventToKafkaTopic(rdd.collect());
});
streamingContext.start();
try {
streamingContext.awaitTermination();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
static Function convertIntoBaseModel = new Function<String, BaseDataModel>() {
@Override
public BaseDataModel call(String record) throws Exception {
ObjectMapper mapper = new ObjectMapper();
BaseDataModel csvDataModel = mapper.readValue(record, BaseDataModel.class);
return csvDataModel;
}
};
static Function convertIntoEnrichModel = new Function<BaseDataModel, EnrichEventDataModel>() {
@Override
public EnrichEventDataModel call(BaseDataModel csvDataModel) throws Exception {
EnrichEventDataModel enrichEventDataModel = new EnrichEventDataModel(csvDataModel);
enrichEventDataModel.setEnrichedUserName("Enriched User");
User user = new User();
user.setU_email("Nitin.Tyagi");
enrichEventDataModel.setUser(user);
return enrichEventDataModel;
}
};
private void sendEnrichEventToKafkaTopic(List<EnrichEventDataModel> enrichEventDataModels) {
if (enrichEventKafkaProducer != null && enrichEventDataModels != null && enrichEventDataModels.size() > 0)
try {
enrichEventKafkaProducer.sendEnrichEvent(enrichEventDataModels);
} catch (JsonProcessingException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
private void saveDataToElasticSearch(List<BaseDataModel> baseDataModelList) {
if(!baseDataModelList.isEmpty())
dataModelServiceImpl.saveAllBaseModel(baseDataModelList);
}
}
现在我有几个问题
1) 我的方法行吗,即在ElasticSearch中保存数据,并从spark streaming发送主题数据?
2) 我在一个项目中使用应用程序组件(kafka,spark streaming),有多个spark streaming类。我正在本地系统中通过commandlinerunner运行这些类。那么现在如何提交Spark流作为Spark作业?
对于spark提交,我需要创建单独的spark流类项目吗?
1条答案
按热度按时间c3frrgcw1#
我的方法行吗,即在ElasticSearch中保存数据,并从spark streaming发送主题数据?
我想我应该考虑使用es-hadoop-spark库。看起来您刚刚直接使用了弹性javaapi(假设您正在收集rdd分区)
虽然可能有用,但它是高度耦合的。。。当elasticsearch因维护或其他高度潜在问题而停机时会发生什么情况?整个应用程序停止了吗?
另一种方法是将kafka处理逻辑拆分为自己的部署。这样,您也可以使用elasticsearch kafka connect进程从主题加载数据,而不必自己编写代码(connect api可能已经是正在运行的kafka集群的一部分)
有多个spark流媒体类
多个主类?这不应该是个问题。您需要同时提供一个jar和一个类名来spark submit。在一个jar中可以有多个“入口点”/main方法。
如何将spark流作为spark作业提交?
我不确定我是否理解这个问题。
spark-submit
适用于流式处理作业注意:csv是kafka中最糟糕的格式之一,如果您打算更改数据类型或它们的顺序,并且您还希望除了您自己之外的任何人都可以使用这个主题。即使elasticsearch也希望您拥有json编码的有效负载