介绍
Flink是一个大数据流处理引擎,可以为不同行业提供实时大数据处理解决方案。随着Flink的快速发展和改进,世界各地的许多公司现在都能看到它的存在。目前,北美、欧洲和金砖国家都是全球Flink应用的热门地区。当然,Flink在中国的知名度特别高,部分原因是一些互联网大厂的贡献和引领效应,也符合中国的反应与场景密切相关。想象一下,在中国,一个网站可能需要面对数以亿计的日活跃用户和每秒数亿的计算峰值,这对许多外国公司来说是难以想象的。Flink为我们提供了高速准确处理海量流媒体数据的可能性。
在目前的云原生时代,容器化、K8S等技术已经在各个互联网大厂中独占鳌头,大部分的应用已经实现了上云。对于大数据引擎家族中的一员,flink实现与K8S结合、实现云原生下的severless模式的需求日渐增加,。因此,在本文中,主要为实现面对云原生+flink进行讲解,希望能够给读者带来获得新知识的喜悦。
在这里,将会提供flink的使用方法,和一个flink可视化平台StreamPark中的使用方式。本文将实时更新,将依次介绍其中各个方式的使用方法。在这里将会涉及以下知识点:
-
DataStreamApi的使用
-
UDF的开发
-
FlinkSql的使用
-
Flink cdc功能
-
原生flink k8s application的使用
-
翼flink-StreamPark的使用要点
本文的目录暂定如此,后续将会对其中的内容加以补充,请广大读者提出宝贵意见,如需添加或删减某些知识点可留言或私信本文作者。
FLINK 与Flink可视化平台StreamPark教程介绍基础环境数据源搭建构键k8s集群下载flink客户端提供flink运行任务的环境DataStreamApiMAP-REDUCE流程水位线功能水位线设置窗口设置窗口API窗口函数MapReduce窗口函数Aggregate窗口处理函数JOIN功能时间窗Inner JoinFlink的状态算子状态checkpoint和savepointFlinkSql功能FlinkSql与连接器(Connector)相结合sql与DataStream混合编码SQL模式与原生Flink的关系与差异与适配FlinkSql的动态表FlinkSql的持续查询FlinkSql时间与窗口FlinkSql UDF编写方式标量函数表函数聚合函数FlinkSql JOIN功能Regular JoinFlinkCDC功能基本概念使用api进行操作使用flinksql进行操作断点续传K8s Application运行方式任务jar生成 k8s Application运行flink任务Application模式架构启动命令PodTemplate翼flink-StreamPark使用要点概述常规使用依赖导入
基础环境
在本文中,将面向开发程序员、面向一线码农,带来最详细的flink教程。从基础环境搭建到最后的平台应用均会涉及。
对于flink而言,少不了对流式数据的处理,一般而言面对kafka、rabbitmq、cdc等消息为数据源主流,在这里,为简化基础环境搭建流程,将提供mysql数据源并开启binlog模式作为我们的数据源,实现流(CDC功能接入binlog)批(常规查询)一体的输入。
数据源搭建
在本文中,我们使用mysql作为数据源,并开启binlog作为流数据作为本实例中的数据源。在这里首先需要安装一个docker运行mysql容器,已实现统一基础环境。
# 移除掉旧的版本
sudo yum remove docker \
docker-client \
docker-client-latest \
docker-common \
docker-latest \
docker-latest-logrotate \
docker-logrotate \
docker-selinux \
docker-engine-selinux \
docker-engine
# 删除所有旧的数据
sudo rm -rf /var/lib/docker
# 安装依赖包
sudo yum install -y yum-utils \
device-mapper-persistent-data \
lvm2
# 添加源,使用了阿里云镜像
sudo yum-config-manager \
--add-repo \
http://mirrors.aliyun.com/docker-ce/linux/centos/docker-ce.repo
# 配置缓存
sudo yum makecache fast
# 安装最新稳定版本的docker
sudo yum install -y docker-ce
# 配置镜像加速器
sudo mkdir -p /etc/docker
sudo tee /etc/docker/daemon.json <<-'EOF'
{
"registry-mirrors": ["http://hub-mirror.c.163.com"]
}
EOF
# 启动docker引擎并设置开机启动
sudo systemctl start docker
sudo systemctl enable docker
# 配置当前用户对docker的执行权限
sudo groupadd docker
sudo gpasswd -a ${USER} docker
sudo systemctl restart docker
完成docker的安装后,可以执行如下命令,实现mysql的安装
docker run -p 3307:3306 --name myMysql -v /mydata/mysql/log:/var/log/mysql -v /mydata/mysql/data:/var/lib/mysql -v /mydata/mysql/conf:/etc/mysql -e MYSQL_ROOT_PASSWORD=***** -d mysql:5.7.25
注意这里我们建议开启mysql的binlog功能,供我们后续的CDC功能的使用,因此在启动后需修改mysql的配置文件,以使其支持binlog功能。开启此功能后,关于mysql中数据的修改将会被记录,在后续连接mysql后,将会以流
修改my.cnf文件
[mysqld]
log-bin=/var/lib/mysql/mysql-bin
server-id=123654
expire_logs_days = 30
之后重启容器
docker restart myMysql
构键k8s集群
-
在这里,我们需要搭建一个K8S环境用于提供flink任务的运行时环境。在这里推荐使用kubeadm或者一些脚本工具【链接】github中的脚本工具搭建。具体过程在这里省略,可以参考上述链接中的文档进行操作。
-
需要注意的是,我们需要在相应用户的目录下提供一个kubeconfig文件,一般而言,该文件在安装好k8s后将会在~/.kube/目录下出现,如下图所示,通过该文件,才能顺利地调用K8S客户端提交任务,该config的内容为与K8S的ApiServer进行连接时需要使用的信息。
下载flink客户端
flink客户端是控制flink的核心,需要下载并部署
wget https://archive.apache.org/dist/flink/flink-1.14.3/flink-1.14.3-bin-scala_2.12.tgz
tar -xf flink-1.14.3-bin-scala_2.12.tgz
提供flink运行任务的环境
-
将kubeconfig提供出来,供flink客户端调用,在这里要保证我们使用的客户端时,我们的用户下拥有kubeconfig文件
-
在这里主要提供一个供flink使用的命名空间、和SA。在K8S Application模式下,service acount(SA)是flink的jobmanager使用的服务账号,jobmanager以此来获得启动相应的taskamanager的权限。这一点在后续的K8S application模式下比较重要。
# 创建namespace
kubectl create ns flink-dev
# 创建serviceaccount
kubectl create serviceaccount flink-service-account -n flink-dev
# 用户授权
kubectl create clusterrolebinding flink-role-binding-flink --clusterrole=edit --serviceaccount=flink-dev:flink-service-account