文章18 | 阅读 11232 | 点赞0
Transformations 常见API:
同Spark中的操作
package com.Streaming.StreamAPI;
import com.Streaming.custormSource.MyNoParalleSource;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
/**
* @Author: Henry
* @Description: Filter演示
* @Date: Create in 2019/5/12 13:57
**/
public class StreamingDemoFilter {
public static void main(String[] args) throws Exception {
//获取Flink的运行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//获取数据源
DataStreamSource<Long> text = env.addSource(new MyNoParalleSource()).setParallelism(1);//注意:针对此source,并行度只能设置为1
DataStream<Long> num = text.map(new MapFunction<Long, Long>() {
@Override
public Long map(Long value) throws Exception {
System.out.println("原始接收到数据:" + value);
return value;
}
});
//执行filter过滤,满足条件的数据会被留下
DataStream<Long> filterData = num.filter(new FilterFunction<Long>() {
//把所有的奇数过滤掉
@Override
public boolean filter(Long value) throws Exception {
return value % 2 == 0;
}
});
DataStream<Long> resultData = filterData.map(new MapFunction<Long, Long>() {
@Override
public Long map(Long value) throws Exception {
System.out.println("过滤之后的数据:" + value);
return value;
}
});
//每2秒钟处理一次数据
DataStream<Long> sum = resultData.timeWindowAll(Time.seconds(2)).sum(0);
//打印结果
sum.print().setParallelism(1);
String jobName = StreamingDemoFilter.class.getSimpleName();
env.execute(jobName);
}
}
将满足条件的数据进行保留
package cn.Streaming.StreamAPI
import cn.Streaming.custormSource.MyNoParallelSourceScala
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.windowing.time.Time
/**
* @Author: Henry
* @Description: 过滤偶数并求和
* @Date: Create in 2019/5/14 22:03
**/
object StreamingDemoFilter {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
//隐式转换
import org.apache.flink.api.scala._
val text = env.addSource(new MyNoParallelSourceScala)
val mapData = text.map(line=>{
println("原始接收到的数据:"+line)
line
}).filter(_ % 2 == 0)
val sum = mapData.map(line=>{
println("过滤之后的数据:"+line)
line
}).timeWindowAll(Time.seconds(2)).sum(0)
sum.print().setParallelism(1)
env.execute("StreamingDemoWithMyNoParallelSourceScala")
}
}
keyBy()中的数据类型要不是tuple,要不是自定义类型,如Long就不可以进行
keyBy操作
两种典型用法:
注意:以下类型是无法作为key的
同SocketWordCount代码中使用部分:
// 3.2、将输入的文本分为不相交的分区,每个分区包含的都是具有相同key的元素。
// 也就是说,相同的单词被分在了同一个区域,下一步的reduce就是统计分区中的个数
.keyBy("word")
同SocketWordCount
合并多个数据流(可以大于2个流),新的流会包含所有流中的数据,但是union是一个限制,
就是所有合并的流类型必须是一致的。
DATAStreamSource<Long> text1 = env.addsource(...)
DATAStreamSource<Long> text2 = env.addsource(...)
// 把text1和text2组装到一起
DATAStreamSource<Long> text = text1.union(text2)
package com.Streaming.StreamAPI;
import com.Streaming.custormSource.MyNoParalleSource;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
/**
* @Author: Henry
* @Description: 合并多个流,新的流会包含所有流中的数据,
* 但是union是一个限制,就是所有合并的流类型必须是一致的
* @Date: Create in 2019/5/12 13:58
**/
public class StreamingDemoUnion {
public static void main(String[] args) throws Exception {
//获取Flink的运行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//获取数据源
DataStreamSource<Long> text1 = env.addSource(new MyNoParalleSource()).setParallelism(1);//注意:针对此source,并行度只能设置为1
DataStreamSource<Long> text2 = env.addSource(new MyNoParalleSource()).setParallelism(1);
//把text1和text2组装到一起
DataStream<Long> text = text1.union(text2);
DataStream<Long> num = text.map(new MapFunction<Long, Long>() {
@Override
public Long map(Long value) throws Exception {
System.out.println("原始接收到数据:" + value);
return value;
}
});
//每2秒钟处理一次数据
DataStream<Long> sum = num.timeWindowAll(Time.seconds(2))
.sum(0);
//打印结果
sum.print().setParallelism(1);
String jobName = StreamingDemoUnion.class.getSimpleName();
env.execute(jobName);
}
}
package cn.Streaming.StreamAPI
import cn.Streaming.custormSource.MyNoParallelSourceScala
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.windowing.time.Time
/**
* @Author: Henry
* @Description:
* @Date: Create in 2019/5/14 22:09
**/
object StreamingDemoUnionScala {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
//隐式转换
import org.apache.flink.api.scala._
val text1 = env.addSource(new MyNoParallelSourceScala)
val text2 = env.addSource(new MyNoParallelSourceScala)
val unionall = text1.union(text2)
val sum = unionall.map(line=>{
println("接收到的数据:"+line)
line
}).timeWindowAll(Time.seconds(2)).sum(0)
sum.print().setParallelism(1)
env.execute("StreamingDemoWithMyNoParallelSourceScala")
}
}
Connect和union类似,但是只能连接两个流,两个流的数据类型可以不同,
会对两个流中的数据应用不同的处理方法。
CoMap, CoFlatMap:在ConnectedStreams中需要使用这种函数,类似于map和flatmap。
package com.Streaming.StreamAPI;
import com.Streaming.custormSource.MyNoParalleSource;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.ConnectedStreams;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.CoMapFunction;
import org.apache.flink.streaming.api.windowing.time.Time;
/**
* @Author: Henry
* @Description: connect和union类似,但是只能连接两个流,
* 两个流的数据类型可以不同,会对两个流中的数据应用不同的处理方法
*
* @Date: Create in 2019/5/12 13:58
**/
public class StreamingDemoConnect {
public static void main(String[] args) throws Exception {
//获取Flink的运行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//获取数据源
DataStreamSource<Long> text1 = env.addSource(new MyNoParalleSource()).setParallelism(1);
DataStreamSource<Long> text2 = env.addSource(new MyNoParalleSource()).setParallelism(1);
SingleOutputStreamOperator<String> text2_str = text2.map(new MapFunction<Long, String>() {
@Override
public String map(Long value) throws Exception {
return "str_" + value;
}
});
ConnectedStreams<Long, String> connectStream = text1.connect(text2_str);
SingleOutputStreamOperator<Object> result = connectStream.map(new CoMapFunction<Long, String, Object>() {
// CoMapFunction 需要实现两个 map 方法
// map1 Long 类型,map2 是 String 类型,即分别处理不同的两种类型数据
@Override
public Object map1(Long value) throws Exception {
return value;
}
@Override
public Object map2(String value) throws Exception {
return value;
}
});
//打印结果
result.print().setParallelism(1);
String jobName = StreamingDemoConnect.class.getSimpleName();
env.execute(jobName);
}
}
分别处理Long和String类型的两种数据:
package cn.Streaming.StreamAPI
import cn.Streaming.custormSource.MyNoParallelSourceScala
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
/**
* @Author: Henry
* @Description:
* @Date: Create in 2019/5/14 22:14
**/
object StreamingDemoConnectScala {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
//隐式转换
import org.apache.flink.api.scala._
val text1 = env.addSource(new MyNoParallelSourceScala)
val text2 = env.addSource(new MyNoParallelSourceScala)
val text2_str = text2.map("str" + _)
val connectedStreams = text1.connect(text2_str)
val result = connectedStreams.map(
line1=>{line1},
line2=>{line2} )
result.print().setParallelism(1)
env.execute("StreamingDemoWithMyNoParallelSourceScala")
}
}
根据规则把一个数据流切分为多个流
package com.Streaming.StreamAPI;
import com.Streaming.custormSource.MyNoParalleSource;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.collector.selector.OutputSelector;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SplitStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import java.util.ArrayList;
/**
* @Author: Henry
* @Description: 根据规则把一个数据流切分为多个流
* @Date: Create in 2019/5/12 13:57
**/
public class StreamingDemoSplit {
public static void main(String[] args) throws Exception {
//获取Flink的运行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//获取数据源
DataStreamSource<Long> text = env.addSource(new MyNoParalleSource()).setParallelism(1);//注意:针对此source,并行度只能设置为1
//对流进行切分,按照数据的奇偶性进行区分
SplitStream<Long> splitStream = text.split(new OutputSelector<Long>() {
@Override
public Iterable<String> select(Long value) {
ArrayList<String> outPut = new ArrayList<>();
if (value % 2 == 0) {
outPut.add("even");//偶数
} else {
outPut.add("odd");//奇数
}
return outPut;
}
});
//选择一个或者多个切分后的流
DataStream<Long> evenStream = splitStream.select("even");
DataStream<Long> oddStream = splitStream.select("odd");
DataStream<Long> moreStream = splitStream.select("odd","even");
//打印结果
moreStream.print().setParallelism(1);
String jobName = StreamingDemoSplit.class.getSimpleName();
env.execute(jobName);
}
}
打印偶数数据流:
// 关键代码处理部分
val splitStream = text.split(new OutputSelector[Long] {
override def select(value: Long) = {
// 这里的list需要使用Java中的ArrayList类型
val list = new util.ArrayList[String]()
if(value%2 == 0){
list.add("even")// 偶数
}else{
list.add("odd")// 奇数
}
list
}
})
打印偶数数据流:
版权说明 : 本文为转载文章, 版权归原作者所有 版权申明
原文链接 : https://blog.csdn.net/hongzhen91/article/details/90056602
内容来源于网络,如有侵权,请联系作者删除!