增量数据入hudi数据湖测试方案
增量数据入hudi数据湖测试方案
基本要求:
- 有一台机器部署docker用于安装生成数据datafaker
- MySQL数据库开启binlog
- Flink 1.13.6
- Flink CDC 2.2.0
- Hudi 0.11.0
基本流程:
- 使用datafaker生成测试数据写入MySQL表中
- 使用Flink CDC工具将MySQL中的数据写到Hudi表
- 查询Hudi表中的数据
- 手动修改MySQL中的数据
使用datafaker生成测试数据
假定MySQL连接信息为:
- hostname:10.45.46.120
- port:3306
- username:root
- password:Pass-123-root
MySQL中创建hudi数据库和student表。
create database hudi;
CREATE TABLE `hudi`.`student` (
`id` int(10) unsigned NOT NULL AUTO_INCREMENT COMMENT '自增id',
`name` varchar(20) NOT NULL COMMENT '学生名字',
`school` varchar(20) NOT NULL COMMENT '学校名字',
`nickname` varchar(20) NOT NULL COMMENT '学生小名',
`age` int(11) NOT NULL COMMENT '学生年龄',
`class_num` int(11) NOT NULL COMMENT '班级人数',
`score` decimal(4,2) NOT NULL COMMENT '成绩',
`phone` bigint(20) NOT NULL COMMENT '电话号码',
`email` varchar(64) DEFAULT NULL COMMENT '家庭网络邮箱',
`ip` varchar(32) DEFAULT NULL COMMENT 'IP地址',
`address` text COMMENT '家庭地址',
`update_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
`create_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=11 DEFAULT CHARSET=utf8;
新建测试数据生成工具元数据文件,保存如下数据为meta.txt
id||int||自增id[:inc(id,1)]
name||varchar(20)||学生名字
school||varchar(20)||学校名字[:enum(zhongda, beida, xidian, huagong)]
nickname||varchar(20)||学生小名[:enum(鬼泣, 高小王子, 歌神, 逗比)]
age||int||学生年龄[:age]
class_num||int||班级人数[:int(10, 100)]
score||decimal(4,2)||成绩[:decimal(4,2,1)]
phone||bigint||电话号码[:phone_number]
email||varchar(64)||家庭网络邮箱[:email]
ip||varchar(32)||IP地址[:ipv4]
address||text||家庭地址[:address]
生成1000条测试数据并写入MySQL表
(在116服务器执行,由于自增ID从1开始,第二次执行需要修改自增ID起始值)
docker run -v /root/xzc/datafaker/meta.txt:/opt/meta.txt xiaozhch5/datafaker:v1.0 rdb mysql+mysqldb://root:Pass-123-root@10.45.46.120:3306/hudi?charset=utf8 student 1000 --meta /opt/meta.txt
查询student表中的数据
select count(*) from hudi.student;
select * from hudi.student limit 10;
将测试数据同步到hudi表
在117服务器的 /data/flink目录执行
yarn上启动flink session
export HADOOP_CLASSPATH
bin/yarn-session.sh -s 8 -jm 2048 -tm 2048 -nm flink-hudi-test -d
启动flink sql
bin/sql-client.sh
以hdfs上的/user/hudi/warehouse为目录创建catalog
create catalog hudi with('type'='hudi','catalog.path'='hdfs://10.45.46.117:8020/user/hudi/warehouse');
创建test数据库和student表:
create database hudi.test;
drop table if exists hudi.test.student;
create table hudi.test.student (
`id` INT COMMENT '自增id',
`name` STRING NOT NULL COMMENT '学生名字',
`school` STRING NOT NULL COMMENT '学校名字',
`nickname` STRING NOT NULL COMMENT '学生小名',
`age` INT NOT NULL COMMENT '学生年龄',
`class_num` INT NOT NULL COMMENT '班级人数',
`score` decimal(4,2) NOT NULL COMMENT '成绩',
`phone` INT NOT NULL COMMENT '电话号码',
`email` STRING NULL COMMENT '家庭网络邮箱',
`ip` STRING NULL COMMENT 'IP地址',
`address` STRING COMMENT '家庭地址',
`update_time` timestamp NOT NULL COMMENT '更新时间',
`create_time` timestamp NOT NULL COMMENT '创建时间',
PRIMARY KEY (`id`) NOT ENFORCED
)
PARTITIONED BY (`school`)
WITH (
'connector' = 'hudi',
'table.type' = 'MERGE_ON_READ'
);
Flink SQL中创建student表读取数据
create table student_mysql (
`id` INT COMMENT '自增id',
`name` STRING NOT NULL COMMENT '学生名字',
`school` STRING NOT NULL COMMENT '学校名字',
`nickname` STRING NOT NULL COMMENT '学生小名',
`age` INT NOT NULL COMMENT '学生年龄',
`class_num` INT NOT NULL COMMENT '班级人数',
`score` decimal(4,2) NOT NULL COMMENT '成绩',
`phone` INT NOT NULL COMMENT '电话号码',
`email` STRING NULL COMMENT '家庭网络邮箱',
`ip` STRING NULL COMMENT 'IP地址',
`address` STRING COMMENT '家庭地址',
`update_time` timestamp NOT NULL COMMENT '更新时间',
`create_time` timestamp NOT NULL COMMENT '创建时间',
PRIMARY KEY (`id`) NOT ENFORCED
)
WITH (
'connector'='mysql-cdc',
'hostname'='10.45.46.120',
'port'='3306',
'username'='root',
'password'='Pass-123-root',
'database-name'='hudi',
'table-name'='student'
);
将mysql数据同步到hudi表
insert into hudi.test.student select * from student_mysql;
flink引擎对于hudi表的刷写策略为执行checkpoint或者write buffer达到256MB时刷写一次,本测试使用的checkpoint时间设置为20秒,所以等到20秒后查询hudi表数据
界面上可以看到
查看数据
SET 'sql-client.execution.result-mode' = 'table';
select count(*) from hudi.test.student;
得到:
对比MySQL与hudi中的数据
mysql数据查询:
select * from hudi.student where id = 100;
hudi数据查询
SET 'sql-client.execution.result-mode' = 'tableau';
select * from hudi.test.student where id = 100;
数据修改与删除测试
在mysql中将id为1的数据进行删除,将id为100的数据进行修改,将name由qOQKm改为qOQKn
delete from hudi.student where id = 1;
update hudi.stu set name = "qOQKn" where id = 100;
执行完上述操作后,等待程序执行完checkpoint之后,再次查看hudi表中的数据
SET 'sql-client.execution.result-mode' = 'tableau';
select * from hudi.test.student where id = 1;
select * from hudi.test.student where id = 100;
分别得到
hudi表数据实时查询能力测试
为了实时查询hudi表,我们需要在flink中新建stu表映射原有hudi表,并新增实时查询参数
create table student_streaming_read (
`id` INT COMMENT '自增id',
`name` STRING NOT NULL COMMENT '学生名字',
`school` STRING NOT NULL COMMENT '学校名字',
`nickname` STRING NOT NULL COMMENT '学生小名',
`age` INT NOT NULL COMMENT '学生年龄',
`class_num` INT NOT NULL COMMENT '班级人数',
`score` decimal(4,2) NOT NULL COMMENT '成绩',
`phone` INT NOT NULL COMMENT '电话号码',
`email` STRING NULL COMMENT '家庭网络邮箱',
`ip` STRING NULL COMMENT 'IP地址',
`address` STRING COMMENT '家庭地址',
`update_time` timestamp NOT NULL COMMENT '更新时间',
`create_time` timestamp NOT NULL COMMENT '创建时间',
PRIMARY KEY (`id`) NOT ENFORCED
)
PARTITIONED BY (`school`)
WITH (
'connector' = 'hudi',
'table.type' = 'MERGE_ON_READ',
'read.streaming.enabled'='true',
'path' = 'hdfs://10.45.46.117:8020/user/hudi/warehouse/test/student'
);
在刚刚的操作中,我们使用datafaker测试数据生成工具写了1000条数据,并在后续的操作中删除了id为1的数据,这里为了测试hudi表的实时查询能力,再次生成1000条数据
与此同时,我们开启实时查询hudi表操作
select * from student_streaming_read;
修改meta.txt为
id||int||自增id[:inc(id,1001)]
name||varchar(20)||学生名字
school||varchar(20)||学校名字[:enum(中大, 北大, 西电, 华工)]
nickname||varchar(20)||学生小名[:enum(鬼泣, 高小王子, 歌神, 逗比)]
age||int||学生年龄[:age]
class_num||int||班级人数[:int(10, 100)]
score||decimal(4,2)||成绩[:decimal(4,2,1)]
phone||bigint||电话号码[:phone_number]
email||varchar(64)||家庭网络邮箱[:email]
ip||varchar(32)||IP地址[:ipv4]
address||text||家庭地址[:address]
执行
docker run -v /root/xzc/datafaker/meta.txt:/opt/meta.txt xiaozhch5/datafaker:v1.0 rdb mysql+mysqldb://root:Pass-123-root@10.45.46.120:3306/hudi?charset=utf8 student 3000 --meta /opt/meta.txt
可以发现flink任务在完成checkpoint之后即可实时查询写入的数据