Flink实战之电商用户行为实时分析

x33g5p2x  于2022-02-20 转载在 Flink  
字(4.1k)|赞(0)|评价(0)|浏览(973)

🌻在前面的章节中,我们学习了flink的DataStream API、Table API和SQL,想必大家学完后都想找个项目实战一下,于是它来了,对往期内容感兴趣的同学可以参考👇:

  • 链接: Flink学习之flink sql.
  • 链接: Flink学习之Table API(python版本).
  • 链接: Flink学习之DataStream API(python版本).
  • 链接: Hadoop专题.
  • 链接: Spark专题.

🍀今天我们实战的项目是关于电商用户行为实时分析,主要是用flink Table 和 SQL来实现,让我们更加了解flink处理的一整套流程。让我们开始今日份的学习吧!

1. 环境部署

我这才用的是docker安装,参考的是阿里云flink教程,关于安装的部分,我在前面的章节中有详细说明👇:

  • flink环境搭建: Flink学习之环境搭建.

我这里建议使用docker,因为我们这个项目涉及到的东西较多:

  • mysql
  • kafka
  • es
  • kibana(BI工具)

结构如下:

1.1 启动容器

  1. #启动容器
  2. sudo docker-compose up -d

各项服务将会进行启动:

  1. #启动flink clt客户端
  2. sudo docker-compose exec sql-client ./sql-client.sh

启动成功如下:

1.3 容器的组成

  1. flink sql client:flink sql的客户端,用于提交sql
  2. flink集群:这里包括了jobmanager和taskmanager 用于执行sql任务
  3. datagen:数据源,用于生成用户数据,然后发送到kafka中。默认每秒生成2000条数据。
  4. mysql:作为维度表使用
  5. kafka:用作消息队列为flink提供数据
  6. zookeeper:kafka容器的依赖
  7. elasticsearch:用于存储flink产生的数据
  8. kibana:可视化es中的数据

2. 从kafka中导入数据

我们先看一下kafka里面的数据:

  1. --查看10条数据
  2. docker-compose exec kafka bash -c 'kafka-console-consumer.sh --topic user_behavior --bootstrap-server kafka:9094 --from-beginning --max-messages 10'

结果如下:

启动 flink sql clienk,你就可以看见一只大松鼠。

  1. sudo docker-compose exec sql-client ./sql-client.sh

创建数据表,用来接受kafka中的数据。

  1. CREATE TABLE user_behavior (
  2. user_id BIGINT,
  3. item_id BIGINT,
  4. category_id BIGINT,
  5. behavior STRING,
  6. ts TIMESTAMP(3),
  7. proctime AS PROCTIME(), -- 数据处理时间
  8. WATERMARK FOR ts AS ts - INTERVAL '5' SECOND -- 水位线
  9. ) WITH (
  10. 'connector' = 'kafka', --kafka数据源
  11. 'topic' = 'user_behavior', -- kafka数据主题
  12. 'scan.startup.mode' = 'earliest-offset', -- 数据读取策略
  13. 'properties.bootstrap.servers' = 'kafka:9094', -- kafka 服务地址
  14. 'format' = 'json' -- 数据格式
  15. );

结果如下:显示table已经创建成功了。

查看表的信息:

  1. --展示所有表
  2. show tables
  3. -- 查看表的信息
  4. desc 表名

结果如下:

select 的时候数据是一直变化的

flink 任务管理器中查看任务运行情况,端口号为localhost:8081

在这一步中,我们使用DDL语句创建了user_behavior表,并用WITH语句从kafka中导入了数据。

3. 统计每小时的成交量

创建数据表buy_cnt_per_hour,将数据导出到es数据库中,主要导出的数据是2列:小时、成交量

  1. CREATE TABLE buy_cnt_per_hour (
  2. hour_of_day BIGINT,
  3. buy_cnt BIGINT
  4. ) WITH (
  5. 'connector' = 'elasticsearch-7', -- es数据源
  6. 'hosts' = 'http://elasticsearch:9200', -- es服务地址
  7. 'index' = 'buy_cnt_per_hour' -- 数据库表名
  8. );

结果如下:

需要查询每小时有多少buy的行为,并将数据导出到es中(insert语句)

  1. --这里涉及到flink的窗口函数,tumble滚动窗口。
  2. INSERT INTO buy_cnt_per_hour
  3. SELECT HOUR(TUMBLE_START(ts, INTERVAL '1' HOUR)), COUNT(*)
  4. FROM user_behavior
  5. WHERE behavior = 'buy'
  6. GROUP BY TUMBLE(ts, INTERVAL '1' HOUR);

结果如下:

看一下flink的执行任务,显示正在运行,说明数据正在源源不断输入到es中。

4. 可视化结果

我们这里用kibana来可视化上面输出的结果。我们这里需要通过localhost:5601来访问kibanba。

创建数据源,点击红色的框框,即可创建成功。

数据源展示:

创建BI报表

画出图像,进行保存。

5. 统计一天每10分钟累计用户数

这一部分的统计是可视化一天中,某个时刻的累计独立用户数,也就是每一个时刻的UV数都是从0到该时刻的累加。

创建一个es表用于汇总和导出数据。

  1. CREATE TABLE cumulative_uv (
  2. date_str STRING,
  3. time_str STRING,
  4. uv BIGINT,
  5. PRIMARY KEY (date_str, time_str) NOT ENFORCED
  6. ) WITH (
  7. 'connector' = 'elasticsearch-7',
  8. 'hosts' = 'http://elasticsearch:9200',
  9. 'index' = 'cumulative_uv'
  10. );

创建成功:

数据开发,使用data_format抽出基本的日期和时间,再用substr和字符串连接函数’||’ 将时间修正到10分钟级别。

  1. INSERT INTO cumulative_uv
  2. SELECT date_str, MAX(time_str), COUNT(DISTINCT user_id) as uv
  3. FROM (
  4. SELECT
  5. DATE_FORMAT(ts, 'yyyy-MM-dd') as date_str,
  6. SUBSTR(DATE_FORMAT(ts, 'HH:mm'),1,4) || '0' as time_str,
  7. user_id
  8. FROM user_behavior)
  9. GROUP BY date_str;

连接成功:

依旧是在kibana中可视化:

BI数据报表展示如下:

6. 顶级项目排行

这一部分要制作的是类目排行榜,了解哪些类目是支柱类目,由于类目分类太细,于是采用维度表进行映射,规约到顶级类目,而mysql容器中准备了子类目和顶级类目的映射关系,作为dim表。

连接mysql表,用于后续连接

  1. CREATE TABLE category_dim (
  2. sub_category_id BIGINT,
  3. parent_category_name STRING
  4. ) WITH (
  5. 'connector' = 'jdbc',
  6. 'url' = 'jdbc:mysql://mysql:3306/flink',
  7. 'table-name' = 'category',
  8. 'username' = 'root',
  9. 'password' = '123456',
  10. 'lookup.cache.max-rows' = '5000',
  11. 'lookup.cache.ttl' = '10min'
  12. );

创建成功

创建es表,用于存储类目统计结果

  1. CREATE TABLE top_category (
  2. category_name STRING PRIMARY KEY NOT ENFORCED,
  3. buy_cnt BIGINT
  4. ) WITH (
  5. 'connector' = 'elasticsearch-7',
  6. 'hosts' = 'http://elasticsearch:9200',
  7. 'index' = 'top_category'
  8. );

创建成功:

通过对mysql的dim表进行关联,补全类目名称,这里使用一个视图,简化逻辑,就不用建表了。

  1. CREATE VIEW rich_user_behavior AS
  2. SELECT U.user_id, U.item_id, U.behavior, C.parent_category_name as category_name
  3. FROM user_behavior AS U LEFT JOIN category_dim FOR SYSTEM_TIME AS OF U.proctime AS C
  4. ON U.category_id = C.sub_category_id;

创建成功:

统计完后,根据类目分组,统计出buy的次数,并写入到es中

  1. INSERT INTO top_category
  2. SELECT category_name, COUNT(*) buy_cnt
  3. FROM rich_user_behavior
  4. WHERE behavior = 'buy'
  5. GROUP BY category_name;

写入成功:

可视化类目排行,过程同上几张报表,最终结果如下:

但是感觉差一张图显示的效果不好看,我们最后用张目标图来补充上,用来统计我们用户数到100000的进度。

最后展示结果:

7. 总结

  • 本项目参考的是阿里云apache flink的官方教程,从数据源到flink处理,到输出到es,制作报表这一整套流程都涵盖在内,很好的对我们前面学习的流处理框架、流处理的API进行了实践。
  • 本实践的运行环境是ubuntu20.04,内存4G,CPU分配1core。

8. 参考资料

官方文档: 官方Flink SQL demo.
《Flink入门与实战》
《PyDocs》(pyflink官方文档)
《Kafka权威指南》
《Apache Flink 必知必会》
《Apache Flink 零基础入门》
《Flink 基础教程》

相关文章