spark流媒体工作如何发送Kafka主题的数据并将其保存在elastic中

2admgd59  于 2021-06-06  发布在  Kafka
关注(0)|答案(1)|浏览(292)

我正在从事一个数据分析项目,在这个项目中,我从一个csv文件中读取数据,遍历Kafka主题上的数据,并使用spark streaming来使用Kafka主题数据。我在单个项目中使用的所有组件。
现在,在使用spark流媒体消耗数据之后,我必须对其进行一些计算,我必须将数据保存到elastic search中,并且我必须将这些数据发送到另一个主题。所以我从spark streaming做这些事情(将数据保存到elastic中并将数据发送到topic)。
下面是我的代码

@Component
public class RawEventSparkConsumer implements Serializable {

    @Autowired
    private ElasticSearchServiceImpl dataModelServiceImpl;

    @Autowired
    private EventKafkaProducer enrichEventKafkaProducer;

    Collection<String> topics = Arrays.asList("rawTopic");

    public void sparkRawEventConsumer(JavaStreamingContext streamingContext) {

        Map<String, Object> kafkaParams = new HashedMap();
        kafkaParams.put("bootstrap.servers", "localhost:9092");
        kafkaParams.put("key.deserializer", StringDeserializer.class);
        kafkaParams.put("value.deserializer", StringDeserializer.class);
        kafkaParams.put("group.id", "group1");
        kafkaParams.put("auto.offset.reset", "latest");
        kafkaParams.put("enable.auto.commit", true);

        JavaInputDStream<ConsumerRecord<String, String>> rawEventRDD = KafkaUtils.createDirectStream(streamingContext,
                LocationStrategies.PreferConsistent(),
                ConsumerStrategies.<String, String>Subscribe(topics, kafkaParams));

        JavaDStream<String> dStream = rawEventRDD.map((x) -> x.value());

        JavaDStream<BaseDataModel> baseDataModelDStream = dStream.map(convertIntoBaseModel);
        baseDataModelDStream.foreachRDD(rdd1 -> {
            saveDataToElasticSearch(rdd1.collect());
        });

        JavaDStream<EnrichEventDataModel> enrichEventRdd = baseDataModelDStream.map(convertIntoEnrichModel);

        enrichEventRdd.foreachRDD(rdd -> {
            System.out.println("Inside rawEventRDD.foreachRDD = = = " + rdd.count());
            sendEnrichEventToKafkaTopic(rdd.collect());
        });

        streamingContext.start();

        try {
            streamingContext.awaitTermination();
        } catch (InterruptedException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }

    }

    static Function convertIntoBaseModel = new Function<String, BaseDataModel>() {

        @Override
        public BaseDataModel call(String record) throws Exception {
            ObjectMapper mapper = new ObjectMapper();
            BaseDataModel csvDataModel = mapper.readValue(record, BaseDataModel.class);
            return csvDataModel;
        }
    };

    static Function convertIntoEnrichModel = new Function<BaseDataModel, EnrichEventDataModel>() {

        @Override
        public EnrichEventDataModel call(BaseDataModel csvDataModel) throws Exception {

            EnrichEventDataModel enrichEventDataModel = new EnrichEventDataModel(csvDataModel);
            enrichEventDataModel.setEnrichedUserName("Enriched User");
            User user = new User();
            user.setU_email("Nitin.Tyagi");
            enrichEventDataModel.setUser(user);
            return enrichEventDataModel;
        }
    };

    private void sendEnrichEventToKafkaTopic(List<EnrichEventDataModel> enrichEventDataModels) {
        if (enrichEventKafkaProducer != null && enrichEventDataModels != null && enrichEventDataModels.size() > 0)
            try {
                enrichEventKafkaProducer.sendEnrichEvent(enrichEventDataModels);
            } catch (JsonProcessingException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
    }

    private void saveDataToElasticSearch(List<BaseDataModel> baseDataModelList) {
        if(!baseDataModelList.isEmpty())
            dataModelServiceImpl.saveAllBaseModel(baseDataModelList);
    }
}

现在我有几个问题
1) 我的方法行吗,即在ElasticSearch中保存数据,并从spark streaming发送主题数据?
2) 我在一个项目中使用应用程序组件(kafka,spark streaming),有多个spark streaming类。我正在本地系统中通过commandlinerunner运行这些类。那么现在如何提交Spark流作为Spark作业?
对于spark提交,我需要创建单独的spark流类项目吗?

c3frrgcw

c3frrgcw1#

我的方法行吗,即在ElasticSearch中保存数据,并从spark streaming发送主题数据?
我想我应该考虑使用es-hadoop-spark库。看起来您刚刚直接使用了弹性javaapi(假设您正在收集rdd分区)
虽然可能有用,但它是高度耦合的。。。当elasticsearch因维护或其他高度潜在问题而停机时会发生什么情况?整个应用程序停止了吗?
另一种方法是将kafka处理逻辑拆分为自己的部署。这样,您也可以使用elasticsearch kafka connect进程从主题加载数据,而不必自己编写代码(connect api可能已经是正在运行的kafka集群的一部分)
有多个spark流媒体类
多个主类?这不应该是个问题。您需要同时提供一个jar和一个类名来spark submit。在一个jar中可以有多个“入口点”/main方法。
如何将spark流作为spark作业提交?
我不确定我是否理解这个问题。 spark-submit 适用于流式处理作业
注意:csv是kafka中最糟糕的格式之一,如果您打算更改数据类型或它们的顺序,并且您还希望除了您自己之外的任何人都可以使用这个主题。即使elasticsearch也希望您拥有json编码的有效负载

相关问题