增量数据入hudi数据湖测试方案

肖钟城
  • 大数据技术栈
  • Hudi
大约 6 分钟

增量数据入hudi数据湖测试方案

基本要求:

  • 有一台机器部署docker用于安装生成数据datafaker
  • MySQL数据库开启binlog
  • Flink 1.13.6
  • Flink CDC 2.2.0
  • Hudi 0.11.0

基本流程:

  • 使用datafaker生成测试数据写入MySQL表中
  • 使用Flink CDC工具将MySQL中的数据写到Hudi表
  • 查询Hudi表中的数据
  • 手动修改MySQL中的数据

使用datafaker生成测试数据

假定MySQL连接信息为:

  • hostname:10.45.46.120
  • port:3306
  • username:root
  • password:Pass-123-root

MySQL中创建hudi数据库和student表。

create database hudi;
CREATE TABLE `hudi`.`student` (
  `id` int(10) unsigned NOT NULL AUTO_INCREMENT COMMENT '自增id',
  `name` varchar(20) NOT NULL COMMENT '学生名字',
  `school` varchar(20) NOT NULL COMMENT '学校名字',
  `nickname` varchar(20) NOT NULL COMMENT '学生小名',
  `age` int(11) NOT NULL COMMENT '学生年龄',
  `class_num` int(11) NOT NULL COMMENT '班级人数',
  `score` decimal(4,2) NOT NULL COMMENT '成绩',
  `phone` bigint(20) NOT NULL COMMENT '电话号码',
  `email` varchar(64) DEFAULT NULL COMMENT '家庭网络邮箱',
  `ip` varchar(32) DEFAULT NULL COMMENT 'IP地址',
  `address` text COMMENT '家庭地址',
  `update_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
  `create_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
  PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=11 DEFAULT CHARSET=utf8;

新建测试数据生成工具元数据文件,保存如下数据为meta.txt

id||int||自增id[:inc(id,1)]
name||varchar(20)||学生名字
school||varchar(20)||学校名字[:enum(zhongda, beida, xidian, huagong)]
nickname||varchar(20)||学生小名[:enum(鬼泣, 高小王子, 歌神, 逗比)]
age||int||学生年龄[:age]
class_num||int||班级人数[:int(10, 100)]
score||decimal(4,2)||成绩[:decimal(4,2,1)]
phone||bigint||电话号码[:phone_number]
email||varchar(64)||家庭网络邮箱[:email]
ip||varchar(32)||IP地址[:ipv4]
address||text||家庭地址[:address]

生成1000条测试数据并写入MySQL表

(在116服务器执行,由于自增ID从1开始,第二次执行需要修改自增ID起始值)

docker run -v /root/xzc/datafaker/meta.txt:/opt/meta.txt xiaozhch5/datafaker:v1.0 rdb mysql+mysqldb://root:Pass-123-root@10.45.46.120:3306/hudi?charset=utf8 student 1000 --meta /opt/meta.txt

查询student表中的数据

select count(*) from hudi.student;

select * from hudi.student limit 10;

将测试数据同步到hudi表

在117服务器的 /data/flink目录执行

yarn上启动flink session

export HADOOP_CLASSPATH
bin/yarn-session.sh -s 8 -jm 2048 -tm 2048 -nm flink-hudi-test -d

启动flink sql

bin/sql-client.sh

以hdfs上的/user/hudi/warehouse为目录创建catalog

create catalog hudi with('type'='hudi','catalog.path'='hdfs://10.45.46.117:8020/user/hudi/warehouse');

创建test数据库和student表:

create database hudi.test;

drop table if exists hudi.test.student;

create table hudi.test.student (
  `id` INT COMMENT '自增id',
  `name` STRING NOT NULL COMMENT '学生名字',
  `school` STRING NOT NULL COMMENT '学校名字',
  `nickname` STRING NOT NULL COMMENT '学生小名',
  `age` INT NOT NULL COMMENT '学生年龄',
  `class_num` INT NOT NULL COMMENT '班级人数',
  `score` decimal(4,2) NOT NULL COMMENT '成绩',
  `phone` INT NOT NULL COMMENT '电话号码',
  `email` STRING NULL COMMENT '家庭网络邮箱',
  `ip` STRING NULL COMMENT 'IP地址',
  `address` STRING COMMENT '家庭地址',
  `update_time` timestamp NOT NULL COMMENT '更新时间',
  `create_time` timestamp NOT NULL COMMENT '创建时间',
  PRIMARY KEY (`id`) NOT ENFORCED
)
PARTITIONED BY (`school`)
WITH (
  'connector' = 'hudi',
  'table.type' = 'MERGE_ON_READ'
);

Flink SQL中创建student表读取数据

create table student_mysql (
  `id` INT COMMENT '自增id',
  `name` STRING NOT NULL COMMENT '学生名字',
  `school` STRING NOT NULL COMMENT '学校名字',
  `nickname` STRING NOT NULL COMMENT '学生小名',
  `age` INT NOT NULL COMMENT '学生年龄',
  `class_num` INT NOT NULL COMMENT '班级人数',
  `score` decimal(4,2) NOT NULL COMMENT '成绩',
  `phone` INT NOT NULL COMMENT '电话号码',
  `email` STRING NULL COMMENT '家庭网络邮箱',
  `ip` STRING NULL COMMENT 'IP地址',
  `address` STRING COMMENT '家庭地址',
  `update_time` timestamp NOT NULL COMMENT '更新时间',
  `create_time` timestamp NOT NULL COMMENT '创建时间',
  PRIMARY KEY (`id`) NOT ENFORCED
)
WITH (
  'connector'='mysql-cdc',
  'hostname'='10.45.46.120',
  'port'='3306',
  'username'='root',
  'password'='Pass-123-root',
  'database-name'='hudi',
  'table-name'='student'
);

将mysql数据同步到hudi表

insert into hudi.test.student select * from student_mysql;

flink引擎对于hudi表的刷写策略为执行checkpoint或者write buffer达到256MB时刷写一次,本测试使用的checkpoint时间设置为20秒,所以等到20秒后查询hudi表数据

界面上可以看到

查看数据

SET 'sql-client.execution.result-mode' = 'table';

select count(*) from hudi.test.student;

得到:

对比MySQL与hudi中的数据

mysql数据查询:

select * from hudi.student where id = 100;

hudi数据查询

SET 'sql-client.execution.result-mode' = 'tableau';

select * from hudi.test.student where id = 100;

数据修改与删除测试

在mysql中将id为1的数据进行删除,将id为100的数据进行修改,将name由qOQKm改为qOQKn

delete from hudi.student where id = 1;

update hudi.stu set name = "qOQKn" where id = 100;

执行完上述操作后,等待程序执行完checkpoint之后,再次查看hudi表中的数据

SET 'sql-client.execution.result-mode' = 'tableau';

select * from hudi.test.student where id = 1;
select * from hudi.test.student where id = 100;

分别得到

hudi表数据实时查询能力测试

为了实时查询hudi表,我们需要在flink中新建stu表映射原有hudi表,并新增实时查询参数

create table student_streaming_read (
  `id` INT COMMENT '自增id',
  `name` STRING NOT NULL COMMENT '学生名字',
  `school` STRING NOT NULL COMMENT '学校名字',
  `nickname` STRING NOT NULL COMMENT '学生小名',
  `age` INT NOT NULL COMMENT '学生年龄',
  `class_num` INT NOT NULL COMMENT '班级人数',
  `score` decimal(4,2) NOT NULL COMMENT '成绩',
  `phone` INT NOT NULL COMMENT '电话号码',
  `email` STRING NULL COMMENT '家庭网络邮箱',
  `ip` STRING NULL COMMENT 'IP地址',
  `address` STRING COMMENT '家庭地址',
  `update_time` timestamp NOT NULL COMMENT '更新时间',
  `create_time` timestamp NOT NULL COMMENT '创建时间',
  PRIMARY KEY (`id`) NOT ENFORCED
)
PARTITIONED BY (`school`)
WITH (
  'connector' = 'hudi',
  'table.type' = 'MERGE_ON_READ',
  'read.streaming.enabled'='true',
  'path' = 'hdfs://10.45.46.117:8020/user/hudi/warehouse/test/student'
);

在刚刚的操作中,我们使用datafaker测试数据生成工具写了1000条数据,并在后续的操作中删除了id为1的数据,这里为了测试hudi表的实时查询能力,再次生成1000条数据

与此同时,我们开启实时查询hudi表操作

select * from student_streaming_read;

修改meta.txt为

id||int||自增id[:inc(id,1001)]
name||varchar(20)||学生名字
school||varchar(20)||学校名字[:enum(中大, 北大, 西电, 华工)]
nickname||varchar(20)||学生小名[:enum(鬼泣, 高小王子, 歌神, 逗比)]
age||int||学生年龄[:age]
class_num||int||班级人数[:int(10, 100)]
score||decimal(4,2)||成绩[:decimal(4,2,1)]
phone||bigint||电话号码[:phone_number]
email||varchar(64)||家庭网络邮箱[:email]
ip||varchar(32)||IP地址[:ipv4]
address||text||家庭地址[:address]

执行

docker run -v /root/xzc/datafaker/meta.txt:/opt/meta.txt xiaozhch5/datafaker:v1.0 rdb mysql+mysqldb://root:Pass-123-root@10.45.46.120:3306/hudi?charset=utf8 student 3000 --meta /opt/meta.txt

可以发现flink任务在完成checkpoint之后即可实时查询写入的数据

评论
  • 按正序
  • 按倒序
  • 按热度
Powered by Waline v2.14.1