基于RocketMQ与Flink构建网页访问用户实时统计分析系统

肖钟城
  • 大数据技术栈
  • Flink
大约 9 分钟

基于RocketMQ与Flink构建网页访问用户实时统计分析系统

总览

访客实时统计业务在很多网站应用中是非常常见的,它不仅仅能够实时展示网站的访问情况,还可以给决策者提供一定的决策依据,本文描述了如何基于RocketMQ消息中间件以及Flink实时引擎构建网站访问用户实时分析系统。并以网站不同省份访问用户模拟数据为例展示如何将数据发送至RocketMQ、如何使用Flink查询RocketMQ数据并写入Clickhouse以及如何使用Superset对接Clickhouse并实时刷新不同省份用户的访问量,以下为本文的架构图。

imagec54dba3c16027ccc.png

接下来,将从以下几个方面对此进行阐述:

  1. 技术选型
  2. 环境搭建
  3. 实例展示
  4. 总结

技术选型

为了适应业务需求,技术选型是需要相当慎重的,基于本业务的实时性,我们需要对消息中间件以及处理框架进行分析判断,并选择合适的组件。

消息中间件选择

由于网站用户访问数据都是实时发生的,需要将其保存在消息中间件中。当前市面上存在许多开源消息中间件,包括:Kafka,Pulsar、RocketMQ、RabbitMQ、ActivateMQ等,可谓是百花齐放。但是基于我们的场景需求,我们最终选择了RocketMQ,下面是我们关注的几个点:

组件安装部署与维护是否简单。RocketMQ的架构模式为主从模式,只需要维护本身的master和slave节点,无需引入其他组件;而相比之下Kafka和Pulsar则需要依赖于Zookeeper,ActivateMQ如果使用levelDB的话也需要依赖于Zookeeper。

组件使用是否简单。RocketMQ是一个开箱即用的消息中间件,用户不需要调整大量的参数即可达到很好的性能;而相比之下,Pulsar与Kafka存在着大量的参数,并且很多参数需要用户去指定,这对于使用者来说并不是那么方便。除此之外,ActivateMQ提供的参数也是底层参数,需要做大量的调优才能达到和好的性能。

消息是否支持过滤。数据在全量写入消息中间件之后,我们希望能够在消费时根据某些条件进行数据过滤。那么RocketMQ就给我们提供了这个完美的特性。在源端将数据写入RocketMQ时,我们直接对某些数据加上tag标签,那么在消费时即可直接进行过滤。

而在消息可靠性、消息的延迟等方面,RocketMQ与Kafka等其他组件都较为相近,所以在结合本场景以及RocketMQ的特性之后,我们选择了RocketMQ作为我们的消息中间件系统。

其他组件选择

基于可靠的消息中间件,我们需要对原始数据进行处理与展示,所以我们选择了Flink作为实时处理引擎,Clickhouse作为OLAP数据库,Superset用于数据查询与展示。

为了便于演示,本文下述所使用的RocketMQ等组件的安装方式均为快速安装方式,生产使用请参阅各组件的高可用部署。

RocketMQ安装部署指南

基本环境

centos7

安装JDK8

yum install java-1.8.0-openjdk-devel -y

RocketMQ下载

mkdir /data && cd /data
wget http://192.168.1.9:11180/downloads/rocketmq-all-4.9.3-bin-release.zip
unzip rocketmq-all-4.9.3-bin-release.zip
ln -s rocketmq-4.9.3 rocketmq

快速启动

nohup sh bin/mqnamesrv &
nohup sh bin/mqbroker -n localhost:9876 &

clickhouse安装部署指南

基本环境

centos7

安装与启动

sudo yum install -y yum-utils
sudo yum-config-manager --add-repo https://packages.clickhouse.com/rpm/clickhouse.repo
sudo yum install -y clickhouse-server clickhouse-client

sudo /etc/init.d/clickhouse-server start
clickhouse-client # or "clickhouse-client --password" if you set up a password.

设置默认用户密码

vim /etc/clickhouse-server/users.xml

在<password><password>标签中输入123456作为密码。即:

<password>123456<password>

Superset安装部署指南

基本环境

centos7

基本安装

# 安装conda
wget https://repo.anaconda.com/archive/Anaconda3-2021.11-Linux-x86_64.sh
sh Anaconda3-2021.11-Linux-x86_64.sh

## 使用conda创建python 3.7环境
conda create -n superset python=3.7

## 激活superset环境
conda activate superset

## 使用清华源进行安装
pip install apache-superset -i https://pypi.tuna.tsinghua.edu.cn/simple
pip install clickhouse-driver==0.2.0 -i https://pypi.tuna.tsinghua.edu.cn/simple
pip install clickhouse-sqlalchemy==0.1.6 -i https://pypi.tuna.tsinghua.edu.cn/simple

初始化数据库

superset db upgrade

执行初始化数据库时,如果出现如下报错,重新指定安装markupsafe版本即可

(superset) [root@hadoop ~]# superset db upgrade
Traceback (most recent call last):
  File "/data/anaconda3/envs/superset/bin/superset", line 5, in <module>
    from superset.cli import superset
  File "/data/anaconda3/envs/superset/lib/python3.7/site-packages/superset/__init__.py", line 18, in <module>
    from flask import current_app, Flask
  File "/data/anaconda3/envs/superset/lib/python3.7/site-packages/flask/__init__.py", line 14, in <module>
    from jinja2 import escape
  File "/data/anaconda3/envs/superset/lib/python3.7/site-packages/jinja2/__init__.py", line 12, in <module>
    from .environment import Environment
  File "/data/anaconda3/envs/superset/lib/python3.7/site-packages/jinja2/environment.py", line 25, in <module>
    from .defaults import BLOCK_END_STRING
  File "/data/anaconda3/envs/superset/lib/python3.7/site-packages/jinja2/defaults.py", line 3, in <module>
    from .filters import FILTERS as DEFAULT_FILTERS  # noqa: F401
  File "/data/anaconda3/envs/superset/lib/python3.7/site-packages/jinja2/filters.py", line 13, in <module>
    from markupsafe import soft_unicode
ImportError: cannot import name 'soft_unicode' from 'markupsafe' (/data/anaconda3/envs/superset/lib/python3.7/site-packages/markupsafe/__init__.py)

解决办法:

pip install markupsafe==2.0.1 -i https://pypi.tuna.tsinghua.edu.cn/simple

初始化与启动

# 设置账号密码,后续在登录web页面时需要用到
export FLASK_APP=superset
superset fab create-admin

# Create default roles and permissions
superset init

# 启动
superset run -h 0.0.0.0 -p 8065 --with-threads --reload --debugger

浏览器打开

浏览器输入ip和port即可看到如下界面

image.png

使用刚设置的密码进行登录,得到

imagec3fcd583501365e2.png

生成模拟数据

RocketMQ针对多种语言提供了丰富的客户端,本文以Java客户端为例,将模拟数据写入RocketMQ。需要在项目中导入下述依赖:

    <dependency>
      <groupId>org.apache.rocketmq</groupId>
      <artifactId>rocketmq-client</artifactId>
      <version>4.9.3</version>
    </dependency>

随机生成网站不同省份访问用户模拟数据并将其写入RocketMQ消息中间件。模拟数据的schema为:

id        String  # 用户ID
name      String  # 用户名称
province  String  # 用户来自的省份

数据生成过程中需要用到省份编码数据,provinceShortCodes.csv内容如下,需将其保存在resources文件夹中。

code,province
JIN1,津
JI1,冀
JIN2,晋
MENG,蒙
LIAO,辽
JI2,吉
HEI,黑
HU,沪
SU,苏
ZHE,浙
WAN,皖
MIN,闽
GAN,赣
LU,鲁
YU1,豫
E,鄂
XIANG,湘
YUE,粤
GUI1,桂
QIONG,琼
CHUAN,川
GUI2,贵
YUN,云
YU2,渝
ZANG,藏
SHAN,陕
GAN,甘
QING,青
NING,宁
XIN,新
GANG,港
AO,澳
TAI,台
JING,京

相关Java类则如下:

/*
 * Copyright © 2022 https://www.lrting.top/ All rights reserved.
 */

package com.zh.ch.bigdata.dategen;

import com.alibaba.fastjson.JSON;
import org.apache.commons.csv.CSVFormat;
import org.apache.commons.csv.CSVParser;
import org.apache.commons.csv.CSVRecord;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.remoting.exception.RemotingException;

import java.io.FileReader;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;

/**
 * @author: bigDataToAi
 * @date: 2022/4/10
 * @description:
 * @modifiedBy:
 * @version: 1.0
 */
public class MQProducer {

    public static List<String> getProvinceShortCodes(String csvPath) throws IOException {
        String[] header = new String[]{"code", "province"};
        CSVFormat csvFormat = CSVFormat.DEFAULT.withHeader(header);
        FileReader fileReader = new FileReader(csvPath);
        List<CSVRecord> records = new CSVParser(fileReader, csvFormat).getRecords();
        List<String> codes = new ArrayList<>(34);
        for (int i = 1; i<records.size(); i++) {
            codes.add(records.get(i).get("code"));
        }
        return codes;
    }

    public static String getRandomName(int length){
        String str = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ";
        Random random = new Random();
        StringBuilder sb = new StringBuilder();
        for(int i=0; i<length; i++){
            int number = random.nextInt(52);
            sb.append(str.charAt(number));
        }
        return sb.toString();
    }

    public static void main(String[] args) {
        try {
            DefaultMQProducer producer = new DefaultMQProducer("group1");
            producer.setNamesrvAddr("192.168.241.138:9876");
            producer.setVipChannelEnabled(false);
            producer.start();
            producer.setSendMsgTimeout(60000);
            List<String> provinceShortCodes = getProvinceShortCodes("src/main/resources/provinceShortCodes.csv");
            long recordNum = 1000000L;
            for (int i=100; i<recordNum; i++) {
                String provinceShortCode = provinceShortCodes.get(new Random().nextInt(provinceShortCodes.size()));
                String randomName = getRandomName(8);
                UserInfo userInfo = new UserInfo(String.valueOf(i+1), randomName, provinceShortCode);
                Message message = new Message("test1", "tag1", JSON.toJSON(userInfo).toString().getBytes(RemotingHelper.DEFAULT_CHARSET));
                SendResult sendRes = producer.send(message);
                System.out.printf("%d条消息发送结果:%s%n", i+1,sendRes);
                Thread.sleep(1000);
            }
            producer.shutdown();
        }
        catch (IOException | MQClientException | MQBrokerException | RemotingException | InterruptedException e) {
            e.printStackTrace();
        }

    }
    static class UserInfo {

        private String id;

        private String name;

        private String province;

        public UserInfo(String id, String name, String province) {
            this.id = id;
            this.name = name;
            this.province = province;
        }

        public String getName() {
            return name;
        }

        public void setName(String name) {
            this.name = name;
        }

        public String getProvince() {
            return province;
        }

        public void setProvince(String province) {
            this.province = province;
        }

        public String getId() {
            return id;
        }

        public void setId(String id) {
            this.id = id;
        }
    }

}

完整代码可见:https://git.lrting.top/rocketmq-quickstart/data-generation.gitopen in new window

启动数据生成工具类,向RocketMQ发送模拟数据:

imagebc02b597cec81579.png

Flink读取RocketMQ数据并写入clickhouse

Flink读取RocketMQ和写入clickhouse需要额外依赖rocketmq-flink以及flink-connector-clickhouse,目前上述两个组件需要适配Flink版本重新编译,本文以flink 1.14.0版本为例,重新编译项目jar包,并将其安装到本地。

git clone https://github.com/apache/rocketmq-flink.git
cd rocketmq-flink
mvn clean package -DskipTests -Dflink.version=1.14.0 -Drocketmq.version=4.9.3

将得到的jar包安装到本地。

git clone https://github.com/itinycheng/flink-connector-clickhouse.git

cd flink-connector-clickhouse

# vim pom.xml
#将版本号改为1.14.0-SNAPSHOT
mvn clean package -DskipTests -Dflink=1.14.0

将得到的jar包安装到本地。

特别说明: 如果你不想编译上述两个jar包,可直接从下述阿里网盘下载: https://www.aliyundrive.com/s/TdHGJRdGksTopen in new window 提取码:6x0q

数据读取与写入

使用flink读取RocketMQ数据并将数据写入clickhouse,注意在将数据写入clickhouse之前需要先在clickhouse中新建userInfo4表。

create table user.userInfo4(`id` String, `name` String, `province` String) ENGINE = TinyLog;

Java代码如下:

/*
 * Copyright © 2020 https://www.lrting.top/ All rights reserved.
 */

package com.zh.ch.bigdata.flink;

import com.alibaba.fastjson.JSONObject;
import com.zh.ch.bigdata.flink.source.deserialize.SimpleStringDeserializationSchema;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.connector.clickhouse.catalog.ClickHouseCatalog;
import org.apache.flink.connector.clickhouse.config.ClickHouseConfig;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;

import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.catalog.Catalog;
import org.apache.rocketmq.flink.legacy.RocketMQConfig;
import org.apache.rocketmq.flink.legacy.RocketMQSourceFunction;
import org.apache.rocketmq.flink.legacy.common.serialization.KeyValueDeserializationSchema;

import java.io.Serializable;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;

/**
 * @author: bigDataToAi
 * @date: 2022/4/10
 * @description:
 * @modifiedBy:
 * @version: 1.0
 */
public class Main implements Serializable {

    public static RocketMQSourceFunction getRocketMQSource() {
        Properties consumerProps = new Properties();
        consumerProps.setProperty(RocketMQConfig.NAME_SERVER_ADDR, "192.168.241.138:9876");
        consumerProps.setProperty(RocketMQConfig.CONSUMER_GROUP, "group1");
        consumerProps.setProperty(RocketMQConfig.CONSUMER_TOPIC, "test1");
        KeyValueDeserializationSchema<String> schema = new SimpleStringDeserializationSchema();
        return new RocketMQSourceFunction(schema, consumerProps);
    }


    public static void main(String[] args) {
        EnvironmentSettings fsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, fsSettings);
        env.setParallelism(4);

        // enable checkpoint
        env.enableCheckpointing(8000, CheckpointingMode.AT_LEAST_ONCE);
        DataStream<String> dataStream = env.addSource(getRocketMQSource());
        SingleOutputStreamOperator<Tuple3<String, String, String>> singleOutputStreamOperator =  dataStream.map(new MapFunction<String, Tuple3<String, String, String>>() {
            @Override
            public Tuple3<String, String, String> map(String s) throws Exception {
                JSONObject jsonObject = JSONObject.parseObject(s);
                return Tuple3.of(jsonObject.getString("id"), jsonObject.getString("name"), jsonObject.getString("province"));
            }
        });
        Table inputTable = tableEnv.fromDataStream(singleOutputStreamOperator).as("id","name", "province");
        tableEnv.createTemporaryView("InputTable", inputTable);

        Map<String, String> props = new HashMap<>();
        props.put(ClickHouseConfig.DATABASE_NAME, "default");
        props.put(ClickHouseConfig.URL, "clickhouse://192.168.241.134:8123");
        props.put(ClickHouseConfig.SINK_FLUSH_INTERVAL, "5s");
        props.put(ClickHouseConfig.USERNAME, "default");
        props.put(ClickHouseConfig.PASSWORD, "123456");
        Catalog cHcatalog = new ClickHouseCatalog("clickhouse", props);
        tableEnv.registerCatalog("clickhouse", cHcatalog);

        tableEnv.executeSql("INSERT INTO `clickhouse`.`user`.`userInfo4` select id, name, province from InputTable");

        try {
            env.execute("rocketmq-flink-example");
        } catch (Exception e) {
            e.printStackTrace();
        }

    }
}

运行上述主函数,实现使用Flink将数据从RocketMQ读取之后,直接写入到clickhouse中。

image2ff78911f6194b71.png

完整代码可见:https://git.lrting.top/rocketmq-quickstart/flink-clickhouse-quickstart.gitopen in new window

通过Superset接入clickhouse进行数据展示

superset链接clickhouse数据源

imagec5a9f58055abf3b4.png

image2555418e1935ed60.png

出现如下提示则表示连接配置成功。

image7477d8e38bb60f0d.png

接下来选择user库中的userInfo4表

image66120af133a1b1b3.png

选择数据查询条件

image0f4326ebc576e8ce.png

imagee4ed649e37d6b0b4.png

在DashBoard面板设置自动刷新

image964e6b6db230d829.png

将面板数据放大后,展现数据实时刷新

2cf3e79c783bc46b4302f399b2b7bcfd.gif

总结

本文展示了基于RocketMQ消息中间件以及Flink实时引擎构建的网页访问用户实时统计分析系统,针对业务需求以及组件的基本特性选择了RocketMQ作为消息中间件的技术选型以及选择Flink作为实时引擎、clickhouse作为OLAP数据库、superset作为数据看板,并以实际用例展示了系统的构建步骤。

评论
  • 按正序
  • 按倒序
  • 按热度
Powered by Waline v2.14.1