启动zookeeper,hadoop,flume
# 1、三个节点
/usr/zookeeper/zookeeper-3.4.10/bin/zkServer.sh start
/usr/zookeeper/zookeeper-3.4.10/bin/zkServer.sh status
# 2、master节点 启动hadoop
/usr/hadoop/hadoop-2.7.3/sbin/start-all.sh
<properties>
<scala.version>2.11.8</scala.version>
<hadoop.version>2.7.3</hadoop.version>
<spark.version>2.4.0</spark.version>
<hbase.version>1.2.4</hbase.version>
<hive.version>2.1.1</hive.version>
</properties>
<dependencies>
<!--Scala-->
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>${scala.version}</version>
</dependency>
<!--Spark-->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.11</artifactId>
<version>2.4.0</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>2.4.0</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-mllib_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-8_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
<!--Spark & flume-->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-flume_2.11</artifactId>
<version>2.3.0</version>
</dependency>
<!--Hadoop-->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>${hadoop.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>2.7.3</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>2.7.3</version>
</dependency>
<!--Hbase-->
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId>
<version>${hbase.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-common</artifactId>
<version>${hbase.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-server</artifactId>
<version>${hbase.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-protocol</artifactId>
<version>${hbase.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-annotations</artifactId>
<version>${hbase.version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<!--Hive-->
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-exec</artifactId>
<version>${hive.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-jdbc</artifactId>
<version>${hive.version}</version>
</dependency>
<!--kafka-->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.4.0</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
<version>2.4.0</version>
</dependency>
<!--mysql-->
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.46</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.17</version>
</dependency>
<dependency>
<groupId>com.huaban</groupId>
<artifactId>jieba-analysis</artifactId>
<version>1.0.2</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.scala-tools</groupId>
<artifactId>maven-scala-plugin</artifactId>
<version>2.15.2</version>
<executions>
<execution>
<goals>
<goal>compile</goal>
<goal>testCompile</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.1</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<version>2.12.4</version>
<configuration>
<skip>true</skip>
</configuration>
</plugin>
</plugins>
<defaultGoal>compile</defaultGoal>
</build>
</project>
在之前安装过的只需要安装:
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
<version>2.4.0</version>
</dependency>
命令:nc -lk 9999
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.{SparkConf, SparkContext}
object TransformTest {
def main(args: Array[String]): Unit = {
// 1.创建SparkConf对象
val sparkConf: SparkConf = new SparkConf()
.setAppName("TransformTest").setMaster("local[2]")
// 2.创建SparkContext对象,它是所有任务计算的源头
val sc: SparkContext = new SparkContext(sparkConf)
// 3.设置日志级别
sc.setLogLevel("WARN")
// 4.创建StreamingContext,需要两个参数,分别为SparkContext和批处理时间间隔
val ssc: StreamingContext = new StreamingContext(sc,Seconds(5))
// 5.连接socket服务,需要socket服务地址、端口号及存储级别(默认的)
// 以上是固定搭配结构
val dstream: ReceiverInputDStream[String] = ssc.socketTextStream("192.168.142.128",9999)
// 6.使用RDD-to-RDD函数,返回新的DStream对象(即words),并空格切分每行
val words: DStream[String] = dstream.transform(rdd => rdd.flatMap(_.split(" ")))
// 7.打印输出结果
words.print()
// 8.开启流式计算
ssc.start()
// 9.让程序一直运行,除非人为干预停止
ssc.awaitTermination()
}
}
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.{SparkConf, SparkContext}
object UpdateStateByKeyTest {
//newValues 表示当前批次汇总成的(word,1)中相同单词的所有1
//runningCount 表示历史的所有相同key的value总和
def updateFunction(newValues: Seq[Int], runningCount: Option[Int]): Option[Int] = {
val newCount =runningCount.getOrElse(0)+newValues.sum
Some(newCount)
}
def main(args: Array[String]): Unit = {
// 1.创建SparkConf对象 设置appName和master地址 local[2] 表示本地采用2个线程运行任务
val sparkConf: SparkConf = new SparkConf().setAppName("WordCount").setMaster("local[2]")
// 2.创建SparkContext对象,它是所有任务计算的源头,它会创建DAGScheduler和TaskScheduler
val sc: SparkContext = new SparkContext(sparkConf)
// 3.设置日志级别
sc.setLogLevel("WARN")
// 4.创建StreamingContext,需要2个参数,一个是SparkContext,一个是批处理的时间间隔
val ssc: StreamingContext = new StreamingContext(sc,Seconds(2))
// 5.配置检查点目录,使用updateStateByKey方法必须配置检查点目录
ssc.checkpoint("./")
// 6.对接socket数据创建DStream对象,需要socket服务的地址、端口号及存储级别(默认的)
val dstream: ReceiverInputDStream[String] = ssc.socketTextStream("master",9999)
// 7.按空格进行切分每一行,并将切分的单词出现次数记录为1
val wordAndOne: DStream[(String, Int)] = dstream.flatMap(_.split(" ")).map(word =>(word,1))
// 8.调用updateStateByKey操作,统计单词在全局中出现的次数
var result: DStream[(String, Int)] = wordAndOne.updateStateByKey(updateFunction)
// 9.打印输出结果
result.print()
// 10.开启流式计算
ssc.start()
// 11.让程序一直运行,除非人为干预停止
ssc.awaitTermination()
}
}
方法名称 | 相关说明 |
---|---|
window(windowLength, slideInterval) | 基于源DStream产生的窗口化的批数据,计算得到一个新的Dstream; |
countByWindow(windowLength, slideInterval) | 返回流中元素的一个滑动窗口数; |
reduceByWindow(func, windowLength, slideInterval) | 返回一个单元素流。利用函数func聚集滑动时间间隔的流的元素创建这个单元素流。函数func必须满足结合律,从而可以支持并行计算; |
reduceByKeyAndWindow(func, windowLength, slideInterval, [numTasks]) | 应用到一个(K,V)键值对组成的DStream上时,会返回一个由(K,V)键值对组成的新的DStream。每一个key的值均由给定的reduce函数(func函数)进行聚合计算。注意:在默认情况下,这个算子利用了Spark默认的并发任务数去分组。可以通过numTasks参数的设置来指定不同的任务数。 |
reduceByKeyAndWindow(func, invFunc, windowLength, slideInterval, [numTasks]) | 更加高效的 reduceByKeyAndWindow,每个窗口的reduce值,是基于先前窗口的reduce值进行增量计算得到的;它会对进入滑动窗口的新数据进行reduce操作,并对离开窗口的老数据进行“逆向reduce‖操作。但是,只能用于“可逆reduce函数”,即那些reduce函数都有一个对应的“逆向reduce函数”(以InvFunc参数传入)。 |
countByValueAndWindow(windowLength, slideInterval, [numTasks]) | 当应用到一个(K,V)键值对组成的DStream上,返回一个由(K,V)键值对组成的新的DStream;每个key的值都是它们在滑动窗口中出现的频率。 |
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.dstream.{DStream,ReceiverInputDStream}
object WindowTest {
def main(args: Array[String]): Unit = {
// 1.创建SparkConf对象
val sparkConf: SparkConf = new SparkConf()
.setAppName("WindowTest ").setMaster("local[2]")
// 2.创建SparkContext对象,它是所有任务计算的源头
val sc: SparkContext = new SparkContext(sparkConf)
// 3.设置日志级别
sc.setLogLevel("WARN")
// 4.创建StreamingContext,需要两个参数,分别为SparkContext和批处理时间间隔
val ssc: StreamingContext = new StreamingContext(sc,Seconds(1))
// 5.连接socket服务,需要socket服务地址、端口号及存储级别(默认的)
val dstream: ReceiverInputDStream[String] = ssc
.socketTextStream("master",9999)
// 6.按空格进行切分每一行
val words: DStream[String] = dstream.flatMap(_.split(" "))
// 7.调用window操作,需要两个参数,窗口长度和滑动时间间隔
val windowWords: DStream[String] = words.window(Seconds(3),Seconds(1))
// 8.打印输出结果
windowWords.print()
// 9.开启流式计算
ssc.start()
// 10.让程序一直运行,除非人为干预停止
ssc.awaitTermination()
}
}
把批处理时间间隔、窗口长度和滑动时间间隔进行变换,在其中两者非整数倍的情况下,会报错。
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.dstream.ReceiverInputDStream
object SaveAsTextFilesTest {
def main(args: Array[String]): Unit = {
System.setProperty("HADOOP_USER_NAME", "root")
//1.创建SparkConf对象 设置appName和master地址 local[2] 表示本地采用2个线程运行任务
val sparkConf: SparkConf = new SparkConf().setAppName("SaveAsTextFilesTest").setMaster("local[2]")
//2.创建SparkContext对象,它是所有任务计算的源头,它会创建DAGScheduler和TaskScheduler
val sc: SparkContext = new SparkContext(sparkConf)
//3.设置日志级别
sc.setLogLevel("WARN")
//4.创建StreamingContext,需要2个参数,一个是SparkContext,一个是批处理的时间间隔
val ssc: StreamingContext = new StreamingContext(sc,Seconds(5))
//5.对接socket数据创建DStream对象,需要socket服务的地址、端口号及存储级别(默认的)
val dstream: ReceiverInputDStream[String] = ssc.socketTextStream("192.168.142.128",9999)
//6.调用saveAsTextFiles操作,将nc窗口输出的内容保存到HDFS上
dstream.saveAsTextFiles("hdfs://master:8020//saveAsTextFiles/satf","txt")
//7.开启流式计算
ssc.start()
//8.让程序一直运行,除非人为干预停止
ssc.awaitTermination()
}
}
版权说明 : 本文为转载文章, 版权归原作者所有 版权申明
原文链接 : https://blog.csdn.net/weixin_44775255/article/details/121765631
内容来源于网络,如有侵权,请联系作者删除!