kafka+spark集成(需要restapi吗?)

mspsb9vt  于 2021-06-08  发布在  Kafka
关注(0)|答案(1)|浏览(397)

有一些基本问题,希望有人能解决。
所以我想在我的应用程序中使用apachekafka和apachespark。我已经通过了大量的教程,得到了它是什么和它将如何工作的基本想法。
用例:
数据将以40秒的间隔从一个移动设备(多个设备,比方说1000个)生成,我需要处理这些数据并向数据库添加值,这些值反过来会反映在 Jmeter 板中。
我想做的是使用apache streams并从android本身发出post请求,然后这些数据将由spark应用程序处理,就这样。
问题:
ApacheSpark
我遵循本教程来启动并运行它。(我使用的是java,而不是scala)链接:https://www.santoshsrinivas.com/installing-apache-spark-on-ubuntu-16-04/
完成所有操作后,我执行sparkshell并启动它。我也在我的服务器上安装了zookeeper和kafka,我在后台启动了kafka,所以这不是问题。
当我跑的时候 http://161...:4040/jobs/ 我得到这一页

在我阅读的所有教程中,都有这样一页:https://i.stack.imgur.com/gf1fn.png 但我不明白。是不是Spark没有安装好?
现在,当我想部署一个独立的jar到spark时,(使用以下链接:http://data-scientist-in-training.blogspot.in/2015/03/apache-spark-cluster-deployment-part-1.html )我可以运行它。i、 使用以下命令: spark-submit --class SimpleApp.SimpleApp --master spark://http://161...:7077 --name "try" /opt/spark/bin/try-0.0.1-SNAPSHOT.jar ,我得到输出。
如果我想使用它,我需要每次提交申请吗?
这是我的程序:

package SimpleApp;

/* SimpleApp.java */
import org.apache.spark.api.java.*;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.Function;

public class SimpleApp {
  public static void main(String[] args) {
    String logFile = "/opt/spark/README.md"; // Should be some file on your system
    SparkConf conf = new SparkConf().setAppName("Simple Application").setMaster("local[*]");
    JavaSparkContext sc = new JavaSparkContext(conf);
    //System.setProperty("hadoop.home.dir", "C:/winutil");
    sc.setLogLevel("ERROR"); // Don't want the INFO stuff
    JavaRDD<String> logData = sc.textFile(logFile).cache();

    long numAs = logData.filter(new Function<String, Boolean>() {
      public Boolean call(String s) { return s.contains("a"); }
    }).count();

    long numBs = logData.filter(new Function<String, Boolean>() {
      public Boolean call(String s) { return s.contains("b"); }
    }).count();

    System.out.println("Lines with a: " + numAs + ", lines with b: " + numBs);
    System.out.println("word count : "+logData.first());
    sc.stop();
  }
}

现在我该如何将Kafka融入其中?
如何将应用程序配置为每次Kafka收到消息时都执行它?
此外,我是否需要制作一个restapi,通过它我需要将数据发送到kafka,即restapi将被用作生产者?类似spark java框架的东西?http://sparkjava.com/
如果是,瓶颈将再次发生在restapi级别,即它可以处理多少请求,因为我读到的kafka到处都有很高的吞吐量。
最终的结构会像SparkJava->kafka->ApacheSpark一样吗?
最后,如何在本地设备上设置开发结构?我已经安装了kafka/apache spark。我正在使用eclipse。
谢谢

up9lanfz

up9lanfz1#

好,
你在理解spark如何与Kafka合作时遇到了一些问题。
首先让我们了解一些事情:
kafka是一个低延迟、高吞吐量的流处理平台。这将允许您存储和读取大量的数据非常快。
spark有两种处理方式,spark批处理和spark流处理。您正在学习的是批处理,对于您的问题,我建议您看看apache流媒体。
什么是流媒体?
流是一种实时或近实时传输和转换数据的方法。不需要创建一个需要每10分钟或每10秒调用一次的进程。您将启动作业,它将消耗源并在接收器中发布。
Kafka是一个被动的平台,因此Kafka可以是流进程的源或汇。
就你而言,我的建议是:
为你的kafka创建一个流媒体制作者,你将在你的web服务器上阅读你的移动应用程序的日志。因此,您需要在web服务器上插入一些东西来开始使用数据。我建议你的是fluentdis是一个非常强大的流媒体应用程序,它是ruby的,但是非常容易使用。如果您想在bigdata中获得更健壮和更集中的内容,我建议apachenifi这很难做到,这并不容易,但是您可以创建数据流管道来将信息传输到集群。一个非常简单的解决问题的方法是apacheflume。
开始你的Kafka,你可以用docker来使用它。这将保存你的数据一段时间,并允许你采取你的数据时,你需要非常快速和大量的信息。请阅读文档以了解其工作原理。
spark streaming—如果没有流处理,那么使用kafka是没有意义的,在kafka生成数据的rest解决方案很慢,如果是批处理,那么就没有意义。因此,如果你是以流式写作,你也应该以流式分析。我建议你在这里读一下spark streaming。以及如何将Spark与Kafka结合在一起。
所以,正如你所问:
我需要RESTAPI吗?答案是否定的。
架构如下:
web服务器->fluentd->apache kafka->spark streaming->输出
我希望这会有帮助

相关问题