Spark:Streaming 实践 Dstream 转换算子、窗口、输出文件

x33g5p2x  于2021-12-07 转载在 Spark  
字(11.6k)|赞(0)|评价(0)|浏览(770)

1、启动集群

  1. 启动zookeeper,hadoop,flume
  2. # 1、三个节点
  3. /usr/zookeeper/zookeeper-3.4.10/bin/zkServer.sh start
  4. /usr/zookeeper/zookeeper-3.4.10/bin/zkServer.sh status
  5. # 2、master节点 启动hadoop
  6. /usr/hadoop/hadoop-2.7.3/sbin/start-all.sh

2、IDEA安装依赖

  1. <properties>
  2. <scala.version>2.11.8</scala.version>
  3. <hadoop.version>2.7.3</hadoop.version>
  4. <spark.version>2.4.0</spark.version>
  5. <hbase.version>1.2.4</hbase.version>
  6. <hive.version>2.1.1</hive.version>
  7. </properties>
  8. <dependencies>
  9. <!--Scala-->
  10. <dependency>
  11. <groupId>org.scala-lang</groupId>
  12. <artifactId>scala-library</artifactId>
  13. <version>${scala.version}</version>
  14. </dependency>
  15. <!--Spark-->
  16. <dependency>
  17. <groupId>org.apache.spark</groupId>
  18. <artifactId>spark-core_2.11</artifactId>
  19. <version>${spark.version}</version>
  20. </dependency>
  21. <dependency>
  22. <groupId>org.apache.spark</groupId>
  23. <artifactId>spark-streaming_2.11</artifactId>
  24. <version>2.4.0</version>
  25. </dependency>
  26. <dependency>
  27. <groupId>org.apache.spark</groupId>
  28. <artifactId>spark-sql_2.11</artifactId>
  29. <version>2.4.0</version>
  30. </dependency>
  31. <dependency>
  32. <groupId>org.apache.spark</groupId>
  33. <artifactId>spark-mllib_2.11</artifactId>
  34. <version>${spark.version}</version>
  35. </dependency>
  36. <dependency>
  37. <groupId>org.apache.spark</groupId>
  38. <artifactId>spark-streaming-kafka-0-8_2.11</artifactId>
  39. <version>${spark.version}</version>
  40. </dependency>
  41. <!--Spark & flume-->
  42. <dependency>
  43. <groupId>org.apache.spark</groupId>
  44. <artifactId>spark-streaming-flume_2.11</artifactId>
  45. <version>2.3.0</version>
  46. </dependency>
  47. <!--Hadoop-->
  48. <dependency>
  49. <groupId>org.apache.hadoop</groupId>
  50. <artifactId>hadoop-client</artifactId>
  51. <version>${hadoop.version}</version>
  52. </dependency>
  53. <dependency>
  54. <groupId>org.apache.hadoop</groupId>
  55. <artifactId>hadoop-common</artifactId>
  56. <version>2.7.3</version>
  57. </dependency>
  58. <dependency>
  59. <groupId>org.apache.hadoop</groupId>
  60. <artifactId>hadoop-hdfs</artifactId>
  61. <version>2.7.3</version>
  62. </dependency>
  63. <!--Hbase-->
  64. <dependency>
  65. <groupId>org.apache.hbase</groupId>
  66. <artifactId>hbase-client</artifactId>
  67. <version>${hbase.version}</version>
  68. </dependency>
  69. <dependency>
  70. <groupId>org.apache.hbase</groupId>
  71. <artifactId>hbase-common</artifactId>
  72. <version>${hbase.version}</version>
  73. </dependency>
  74. <dependency>
  75. <groupId>org.apache.hbase</groupId>
  76. <artifactId>hbase-server</artifactId>
  77. <version>${hbase.version}</version>
  78. </dependency>
  79. <dependency>
  80. <groupId>org.apache.hbase</groupId>
  81. <artifactId>hbase-protocol</artifactId>
  82. <version>${hbase.version}</version>
  83. </dependency>
  84. <dependency>
  85. <groupId>org.apache.hbase</groupId>
  86. <artifactId>hbase-annotations</artifactId>
  87. <version>${hbase.version}</version>
  88. <type>test-jar</type>
  89. <scope>test</scope>
  90. </dependency>
  91. <!--Hive-->
  92. <dependency>
  93. <groupId>org.apache.hive</groupId>
  94. <artifactId>hive-exec</artifactId>
  95. <version>${hive.version}</version>
  96. </dependency>
  97. <dependency>
  98. <groupId>org.apache.hive</groupId>
  99. <artifactId>hive-jdbc</artifactId>
  100. <version>${hive.version}</version>
  101. </dependency>
  102. <!--kafka-->
  103. <dependency>
  104. <groupId>org.apache.kafka</groupId>
  105. <artifactId>kafka-clients</artifactId>
  106. <version>2.4.0</version>
  107. </dependency>
  108. <dependency>
  109. <groupId>org.apache.kafka</groupId>
  110. <artifactId>kafka-streams</artifactId>
  111. <version>2.4.0</version>
  112. </dependency>
  113. <!--mysql-->
  114. <dependency>
  115. <groupId>mysql</groupId>
  116. <artifactId>mysql-connector-java</artifactId>
  117. <version>5.1.46</version>
  118. </dependency>
  119. <dependency>
  120. <groupId>com.alibaba</groupId>
  121. <artifactId>fastjson</artifactId>
  122. <version>1.2.17</version>
  123. </dependency>
  124. <dependency>
  125. <groupId>com.huaban</groupId>
  126. <artifactId>jieba-analysis</artifactId>
  127. <version>1.0.2</version>
  128. </dependency>
  129. <dependency>
  130. <groupId>junit</groupId>
  131. <artifactId>junit</artifactId>
  132. <version>4.12</version>
  133. <scope>test</scope>
  134. </dependency>
  135. </dependencies>
  136. <build>
  137. <plugins>
  138. <plugin>
  139. <groupId>org.scala-tools</groupId>
  140. <artifactId>maven-scala-plugin</artifactId>
  141. <version>2.15.2</version>
  142. <executions>
  143. <execution>
  144. <goals>
  145. <goal>compile</goal>
  146. <goal>testCompile</goal>
  147. </goals>
  148. </execution>
  149. </executions>
  150. </plugin>
  151. <plugin>
  152. <groupId>org.apache.maven.plugins</groupId>
  153. <artifactId>maven-compiler-plugin</artifactId>
  154. <version>3.1</version>
  155. <configuration>
  156. <source>1.8</source>
  157. <target>1.8</target>
  158. </configuration>
  159. </plugin>
  160. <plugin>
  161. <groupId>org.apache.maven.plugins</groupId>
  162. <artifactId>maven-surefire-plugin</artifactId>
  163. <version>2.12.4</version>
  164. <configuration>
  165. <skip>true</skip>
  166. </configuration>
  167. </plugin>
  168. </plugins>
  169. <defaultGoal>compile</defaultGoal>
  170. </build>
  171. </project>

在之前安装过的只需要安装:

  1. <dependency>
  2. <groupId>org.apache.kafka</groupId>
  3. <artifactId>kafka-streams</artifactId>
  4. <version>2.4.0</version>
  5. </dependency>

2.1 启动服务端监听 Socket 服务

命令:nc -lk 9999

2.2 实现 transform() 方法,分割多个单词

  1. import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
  2. import org.apache.spark.streaming.{Seconds, StreamingContext}
  3. import org.apache.spark.{SparkConf, SparkContext}
  4. object TransformTest {
  5. def main(args: Array[String]): Unit = {
  6. // 1.创建SparkConf对象
  7. val sparkConf: SparkConf = new SparkConf()
  8. .setAppName("TransformTest").setMaster("local[2]")
  9. // 2.创建SparkContext对象,它是所有任务计算的源头
  10. val sc: SparkContext = new SparkContext(sparkConf)
  11. // 3.设置日志级别
  12. sc.setLogLevel("WARN")
  13. // 4.创建StreamingContext,需要两个参数,分别为SparkContext和批处理时间间隔
  14. val ssc: StreamingContext = new StreamingContext(sc,Seconds(5))
  15. // 5.连接socket服务,需要socket服务地址、端口号及存储级别(默认的)
  16. // 以上是固定搭配结构
  17. val dstream: ReceiverInputDStream[String] = ssc.socketTextStream("192.168.142.128",9999)
  18. // 6.使用RDD-to-RDD函数,返回新的DStream对象(即words),并空格切分每行
  19. val words: DStream[String] = dstream.transform(rdd => rdd.flatMap(_.split(" ")))
  20. // 7.打印输出结果
  21. words.print()
  22. // 8.开启流式计算
  23. ssc.start()
  24. // 9.让程序一直运行,除非人为干预停止
  25. ssc.awaitTermination()
  26. }
  27. }

2.3 UpdateStateByKeyTest 更新值

  1. import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
  2. import org.apache.spark.streaming.{Seconds, StreamingContext}
  3. import org.apache.spark.{SparkConf, SparkContext}
  4. object UpdateStateByKeyTest {
  5. //newValues 表示当前批次汇总成的(word,1)中相同单词的所有1
  6. //runningCount 表示历史的所有相同key的value总和
  7. def updateFunction(newValues: Seq[Int], runningCount: Option[Int]): Option[Int] = {
  8. val newCount =runningCount.getOrElse(0)+newValues.sum
  9. Some(newCount)
  10. }
  11. def main(args: Array[String]): Unit = {
  12. // 1.创建SparkConf对象 设置appName和master地址 local[2] 表示本地采用2个线程运行任务
  13. val sparkConf: SparkConf = new SparkConf().setAppName("WordCount").setMaster("local[2]")
  14. // 2.创建SparkContext对象,它是所有任务计算的源头,它会创建DAGScheduler和TaskScheduler
  15. val sc: SparkContext = new SparkContext(sparkConf)
  16. // 3.设置日志级别
  17. sc.setLogLevel("WARN")
  18. // 4.创建StreamingContext,需要2个参数,一个是SparkContext,一个是批处理的时间间隔
  19. val ssc: StreamingContext = new StreamingContext(sc,Seconds(2))
  20. // 5.配置检查点目录,使用updateStateByKey方法必须配置检查点目录
  21. ssc.checkpoint("./")
  22. // 6.对接socket数据创建DStream对象,需要socket服务的地址、端口号及存储级别(默认的)
  23. val dstream: ReceiverInputDStream[String] = ssc.socketTextStream("master",9999)
  24. // 7.按空格进行切分每一行,并将切分的单词出现次数记录为1
  25. val wordAndOne: DStream[(String, Int)] = dstream.flatMap(_.split(" ")).map(word =>(word,1))
  26. // 8.调用updateStateByKey操作,统计单词在全局中出现的次数
  27. var result: DStream[(String, Int)] = wordAndOne.updateStateByKey(updateFunction)
  28. // 9.打印输出结果
  29. result.print()
  30. // 10.开启流式计算
  31. ssc.start()
  32. // 11.让程序一直运行,除非人为干预停止
  33. ssc.awaitTermination()
  34. }
  35. }

2.4 Dstream 窗口操作

  • 事先设定一个滑动窗口的长度(也就是窗口的持续时间);
  • 设定滑动窗口的时间间隔(每隔多长时间执行一次计算),让窗口按照指定时间间隔在源DStream上滑动;
  • 每次窗口停放的位置上,都会有一部分Dstream(或者一部分RDD)被框入窗口内,形成一个小段的Dstream;
  • 可以启动对这个小段DStream的计算。

方法名称相关说明
window(windowLength, slideInterval)基于源DStream产生的窗口化的批数据,计算得到一个新的Dstream;
countByWindow(windowLength, slideInterval)返回流中元素的一个滑动窗口数;
reduceByWindow(func, windowLength, slideInterval)返回一个单元素流。利用函数func聚集滑动时间间隔的流的元素创建这个单元素流。函数func必须满足结合律,从而可以支持并行计算;
reduceByKeyAndWindow(func, windowLength, slideInterval, [numTasks])应用到一个(K,V)键值对组成的DStream上时,会返回一个由(K,V)键值对组成的新的DStream。每一个key的值均由给定的reduce函数(func函数)进行聚合计算。注意:在默认情况下,这个算子利用了Spark默认的并发任务数去分组。可以通过numTasks参数的设置来指定不同的任务数。
reduceByKeyAndWindow(func, invFunc, windowLength, slideInterval, [numTasks])更加高效的 reduceByKeyAndWindow,每个窗口的reduce值,是基于先前窗口的reduce值进行增量计算得到的;它会对进入滑动窗口的新数据进行reduce操作,并对离开窗口的老数据进行“逆向reduce‖操作。但是,只能用于“可逆reduce函数”,即那些reduce函数都有一个对应的“逆向reduce函数”(以InvFunc参数传入)。
countByValueAndWindow(windowLength, slideInterval, [numTasks])当应用到一个(K,V)键值对组成的DStream上,返回一个由(K,V)键值对组成的新的DStream;每个key的值都是它们在滑动窗口中出现的频率。
  1. import org.apache.spark.{SparkConf, SparkContext}
  2. import org.apache.spark.streaming.{Seconds, StreamingContext}
  3. import org.apache.spark.streaming.dstream.{DStream,ReceiverInputDStream}
  4. object WindowTest {
  5. def main(args: Array[String]): Unit = {
  6. // 1.创建SparkConf对象
  7. val sparkConf: SparkConf = new SparkConf()
  8. .setAppName("WindowTest ").setMaster("local[2]")
  9. // 2.创建SparkContext对象,它是所有任务计算的源头
  10. val sc: SparkContext = new SparkContext(sparkConf)
  11. // 3.设置日志级别
  12. sc.setLogLevel("WARN")
  13. // 4.创建StreamingContext,需要两个参数,分别为SparkContext和批处理时间间隔
  14. val ssc: StreamingContext = new StreamingContext(sc,Seconds(1))
  15. // 5.连接socket服务,需要socket服务地址、端口号及存储级别(默认的)
  16. val dstream: ReceiverInputDStream[String] = ssc
  17. .socketTextStream("master",9999)
  18. // 6.按空格进行切分每一行
  19. val words: DStream[String] = dstream.flatMap(_.split(" "))
  20. // 7.调用window操作,需要两个参数,窗口长度和滑动时间间隔
  21. val windowWords: DStream[String] = words.window(Seconds(3),Seconds(1))
  22. // 8.打印输出结果
  23. windowWords.print()
  24. // 9.开启流式计算
  25. ssc.start()
  26. // 10.让程序一直运行,除非人为干预停止
  27. ssc.awaitTermination()
  28. }
  29. }

把批处理时间间隔、窗口长度和滑动时间间隔进行变换,在其中两者非整数倍的情况下,会报错。

2.5 DStream 输出操作

  1. import org.apache.spark.{SparkConf, SparkContext}
  2. import org.apache.spark.streaming.{Seconds, StreamingContext}
  3. import org.apache.spark.streaming.dstream.ReceiverInputDStream
  4. object SaveAsTextFilesTest {
  5. def main(args: Array[String]): Unit = {
  6. System.setProperty("HADOOP_USER_NAME", "root")
  7. //1.创建SparkConf对象 设置appName和master地址 local[2] 表示本地采用2个线程运行任务
  8. val sparkConf: SparkConf = new SparkConf().setAppName("SaveAsTextFilesTest").setMaster("local[2]")
  9. //2.创建SparkContext对象,它是所有任务计算的源头,它会创建DAGScheduler和TaskScheduler
  10. val sc: SparkContext = new SparkContext(sparkConf)
  11. //3.设置日志级别
  12. sc.setLogLevel("WARN")
  13. //4.创建StreamingContext,需要2个参数,一个是SparkContext,一个是批处理的时间间隔
  14. val ssc: StreamingContext = new StreamingContext(sc,Seconds(5))
  15. //5.对接socket数据创建DStream对象,需要socket服务的地址、端口号及存储级别(默认的)
  16. val dstream: ReceiverInputDStream[String] = ssc.socketTextStream("192.168.142.128",9999)
  17. //6.调用saveAsTextFiles操作,将nc窗口输出的内容保存到HDFS上
  18. dstream.saveAsTextFiles("hdfs://master:8020//saveAsTextFiles/satf","txt")
  19. //7.开启流式计算
  20. ssc.start()
  21. //8.让程序一直运行,除非人为干预停止
  22. ssc.awaitTermination()
  23. }
  24. }

相关文章

最新文章

更多