文章18 | 阅读 11229 | 点赞0
主要针对直播/短视频平台审核指标的统计
开发介绍采用的是 Java 代码实现的,完整工程代码及 Scala 代码的实现详见底部 GitHub 代码地址
功能: 聚合数据代码
package henry.flink.function;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.tuple.Tuple4;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Date;
import java.util.Iterator;
/** * @Author: Henry * @Description: 聚合数据代码 **/
public class MyAggFunction implements WindowFunction<Tuple3<Long, String, String>, Tuple4<String, String, String, Long>, Tuple, TimeWindow>{
@Override
public void apply(Tuple tuple,
TimeWindow window,
Iterable<Tuple3<Long, String, String>> input,
Collector<Tuple4<String, String, String, Long>> out)
throws Exception {
//获取分组字段信息
String type = tuple.getField(0).toString();
String area = tuple.getField(1).toString();
Iterator<Tuple3<Long, String, String>> it = input.iterator();
//存储时间,为了获取最后一条数据的时间
ArrayList<Long> arrayList = new ArrayList<>();
long count = 0;
while (it.hasNext()) {
Tuple3<Long, String, String> next = it.next();
arrayList.add(next.f0);
count++;
}
System.err.println(Thread.currentThread().getId()+",window触发了,数据条数:"+count);
//排序,默认正排
Collections.sort(arrayList);
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
String time = sdf.format(new Date(arrayList.get(arrayList.size() - 1)));
//组装结果
Tuple4<String, String, String, Long> res = new Tuple4<>(time, type, area, count);
out.collect(res);
}
}
主要代码:
/** * @Author: Henry * @Description: 数据报表 * **/
public class DataReport {
private static Logger logger = LoggerFactory.getLogger(DataReport.class);
public static void main(String[] args) throws Exception{
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 设置并行度
env.setParallelism(5);
// 设置使用eventtime
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
// checkpoint配置
...
// 配置 kafkaSource
String topic = "auditLog"; // 审核日志
Properties prop = new Properties();
prop.setProperty("bootstrap.servers", "master:9092");
prop.setProperty("group.id", "con1");
FlinkKafkaConsumer011<String> myConsumer = new FlinkKafkaConsumer011<String>(
topic, new SimpleStringSchema(),prop);
/* * 获取到kafka的数据 * {"dt":"审核时间{年月日 时分秒}", "type":"审核类型","username":"审核人姓名","area":"大区"} * */
DataStreamSource<String> data = env.addSource(myConsumer);
// 对数据进行清洗
DataStream<Tuple3<Long, String, String>> mapData = data.map(
new MapFunction<String, Tuple3<Long, String, String>>() {
@Override
public Tuple3<Long, String, String> map(String line) throws Exception {
JSONObject jsonObject = JSON.parseObject(line);
String dt = jsonObject.getString("dt");
long time = 0;
try {
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
Date parse = sdf.parse(dt);
time = parse.getTime();
} catch (ParseException e) {
//也可以把这个日志存储到其他介质中
logger.error("时间解析异常,dt:" + dt, e.getCause());
}
String type = jsonObject.getString("type");
String area = jsonObject.getString("area");
return new Tuple3<>(time, type, area);
}
});
// 过滤掉异常数据
DataStream<Tuple3<Long, String, String>> filterData = mapData.filter(
new FilterFunction<Tuple3<Long, String, String>>() {
@Override
public boolean filter(Tuple3<Long, String, String> value) throws Exception {
boolean flag = true;
if (value.f0 == 0) { // 即 time 字段为0
flag = false;
}
return flag;
}
});
// 保存迟到太久的数据
OutputTag<Tuple3<Long, String, String>> outputTag = new OutputTag<Tuple3<Long, String, String>>("late-data"){};
/* * 窗口统计操作 * */
SingleOutputStreamOperator<Tuple4<String, String, String, Long>> resultData = filterData.assignTimestampsAndWatermarks(
new MyWatermark())
.keyBy(1, 2) // 根据第1、2个字段,即type、area分组,第0个字段是timestamp
.window(TumblingEventTimeWindows.of(Time.minutes(30))) // 每隔一分钟统计前一分钟的数据
.allowedLateness(Time.seconds(30)) // 允许迟到30s
.sideOutputLateData(outputTag) // 记录迟到太久的数据
.apply(new MyAggFunction());
// 获取迟到太久的数据
DataStream<Tuple3<Long, String, String>> sideOutput = resultData.getSideOutput(outputTag);
// 存储迟到太久的数据到kafka中
String outTopic = "lateLog";
Properties outprop = new Properties();
outprop.setProperty("bootstrap.servers", "master:9092");
// 设置事务超时时间
outprop.setProperty("transaction.timeout.ms", 60000*15+"");
FlinkKafkaProducer011<String> myProducer = new FlinkKafkaProducer011<String>(
outTopic,
new KeyedSerializationSchemaWrapper<String>(
new SimpleStringSchema()),
outprop,
FlinkKafkaProducer011.Semantic.EXACTLY_ONCE);
// 迟到太久的数据存储到 kafka 中
sideOutput.map(new MapFunction<Tuple3<Long, String, String>, String>() {
@Override
public String map(Tuple3<Long, String, String> value) throws Exception {
return value.f0+"\t"+value.f1+"\t"+value.f2;
}
}).addSink(myProducer);
/* * 把计算的结存储到 ES 中 * */
List<HttpHost> httpHosts = new ArrayList<>();
httpHosts.add(new HttpHost("master", 9200, "http"));
ElasticsearchSink.Builder<Tuple4<String, String, String, Long>> esSinkBuilder = new ElasticsearchSink.Builder<
Tuple4<String, String, String, Long>>(
httpHosts,
new ElasticsearchSinkFunction<Tuple4<String, String, String, Long>>() {
public IndexRequest createIndexRequest(Tuple4<String, String, String, Long> element) {
Map<String, Object> json = new HashMap<>();
json.put("time",element.f0);
json.put("type",element.f1);
json.put("area",element.f2);
json.put("count",element.f3);
//使用time+type+area 保证id唯一
String id = element.f0.replace(" ","_")+"-"+element.f1+"-"+element.f2;
return Requests.indexRequest()
.index("auditindex")
.type("audittype")
.id(id)
.source(json);
}
@Override
public void process(Tuple4<String, String, String, Long> element,
RuntimeContext ctx, RequestIndexer indexer) {
indexer.add(createIndexRequest(element));
}
}
);
// ES是有缓冲区的,这里设置1代表,每增加一条数据直接就刷新到ES
esSinkBuilder.setBulkFlushMaxActions(1);
resultData.addSink(esSinkBuilder.build());
env.execute("DataReport");
}
}
kafka 生产者消息数据:
创建 kafka topic:
./kafka-topics.sh --create --topic lateLog --zookeeper localhost:2181 --partitions 5 --replication-factor 1
./kafka-topics.sh --create --topic auditLog --zookeeper localhost:2181 --partitions 5 --replication-factor 1
1. 启动:
# 后台启动 ES
./bin/elasticsearch -d
# 后台启动 kibana
nohup ./bin/kibana >/dev/null 2>&1 &
2. 启动 UI 访问:
3. 创建索引:
创建索引脚本代码:
# 注意,如果索引建的有问题,则先删除再重新创建索引
curl -XDELETE 'localhost:9200/auditindex'
# 创建索引
curl -XPUT 'http://master:9200/auditindex?pretty'
# 创建type的mapping信息
curl -H "Content-Type: application/json" -XPOST 'http://master:9200/auditindex/audittype/_mapping?pretty' -d ' { "audittype":{ "properties":{ "area":{"type":"keyword"}, "type":{"type":"keyword"}, "count":{"type":"long"}, "time":{"type":"date","format": "yyyy-MM-dd HH:mm:ss"} } } } '
# 执行创建索引脚本
sh -x createindex.sh
创建索引执行结果:
4. 查看 ES 数据:
# 访问网址
http://master:9200/auditindex/_search?pretty=true
1. Kibana 关联 ES:
2. Discover 查看数据:
显示数据差8个小时原因:
在程序中向 ES 写数据没有带时区,就是默认时区,并且会检查浏览器时区
解决方法:
设置时区方法如下:
重新刷新并查看数据:
3. 新建饼图对数据可视化操作:
4. 新建 Dashboard 添加可视化图:
版权说明 : 本文为转载文章, 版权归原作者所有 版权申明
原文链接 : https://blog.csdn.net/hongzhen91/article/details/92795891
内容来源于网络,如有侵权,请联系作者删除!