一、需求
监控 Azkaban-web和Azkaban-exec进程状态和 Azkaban 任务执行的状态同步到SQLServer数据库,任务执行 10 分钟同步一次,发现异常就告警。
1.1.Azkaban 任务状态
30--------正在运行
50--------运行成功
60--------kill任务
70--------运行失败
1.2.存储 Azkaban任务状态的表
这块我只查询运行成功的任务和失败的任务同步到目标数据库
select exec_id,flow_id,status,FROM_UNIXTIME(start_time/1000) as start_time,FROM_UNIXTIME(end_time/1000) as end_time
from execution_flows where status = 70 or status = 50;
二、脚本编写
2.1.编写table.ini 的数据库配置文件
xxx 是不同的客户不同的数据库 可以读取多个数据库的配置写到不同的SQLServer数据库。
[pcsprd@client ~]$ cat /hadoop/datadir/script/hadoop/table.ini
[xxx_CONNECT]
url=xxx
port=1433
username=PCS.Support
password=321@win#
dbname=HDP_TEST
customer=xxx_
2.2.任务状态写入SQLServer的shell
set -x
HOSTNAME="xxx"
USER="root"
PASSWD="@001"
PORT="3306"
DBNAME="azkaban"
function ReadConnect(){
ReadINI=`awk -F '=' '/\['$2'\]/{a=1}a==1&&$1~/^'$3'$/{print $2;exit}' $1`
}
batchCustomer=xxx_
table_ini=/hadoop/datadir/script/hadoop/table.ini
ReadConnect $table_ini "${batchCustomer}CONNECT" url
server=$ReadINI
ReadConnect $table_ini "${batchCustomer}CONNECT" port
port=$ReadINI
ReadConnect $table_ini "${batchCustomer}CONNECT" dbname
database=$ReadINI
ReadConnect $table_ini "${batchCustomer}CONNECT" username
user=$ReadINI
ReadConnect $table_ini "${batchCustomer}CONNECT" password
paw=$ReadINI
azkaban_exec_tmp_file=/hadoop/datadir/temp/monitor/exec_tmp_file.txt
mysql_cmd="mysql -h${HOSTNAME} -P${PORT} -u${USER} -p${PASSWD} ${DBNAME} -e"
sqlserver_cmd="/opt/mssql-tools/bin/sqlcmd -S $server -U $user -P $paw -d ${database} -Q "
if [ 0 == $azwebCount ];then
# 定义了 80 为进程 运行正常,90为进程挂掉
${sqlserver_cmd} "INSERT into task_monitor (flowId,taskId,status,startTime,endTime) VALUES(DATEDIFF(S,'1970-01-01 00:00:00', GETDATE()),'azkban-web-heartbeat',90,GETDATE(),GETDATE())"
else
${sqlserver_cmd} "INSERT into task_monitor (flowId,taskId,status,startTime,endTime) VALUES(DATEDIFF(S,'1970-01-01 00:00:00', GETDATE()),'azkban-web-heartbeat',80,GETDATE(),GETDATE())"
fi
#监控azkaban的exe
azexeCount=`ps -ef |grep azkaban-exe |grep -v "grep" |wc -l`
if [ 0 == $azexeCount ];then
${sqlserver_cmd} "INSERT into task_monitor (flowId,taskId,status,startTime,endTime) VALUES(DATEDIFF(S,'1970-01-01 00:00:00', GETDATE()),'azkban-exec-heartbeat',90,GETDATE(),GETDATE())"
else
${sqlserver_cmd} "INSERT into task_monitor (flowId,taskId,status,startTime,endTime) VALUES(DATEDIFF(S,'1970-01-01 00:00:00', GETDATE()),'azkban-exec-heartbeat',80,GETDATE(),GETDATE())"
fi
#查询Azkaban调度中 运行成功和失败的任务
select_exec_sql="select exec_id,flow_id,status,FROM_UNIXTIME(start_time/1000) as start_time,FROM_UNIXTIME(end_time/1000) as end_time from execution_flows where (FROM_UNIXTIME(start_time/1000)>(select task_lastTime from task_lastTime)) and (status = 70 or status = 50)
into outfile \"${azkaban_exec_tmp_file}\" fields terminated by \",\" ;"
# azkaban从临时表抽入正式
task_move_sql="insert into ${database}.[dbo].[task_monitor] (flowId,taskId,status,startTime,endTime) select flowId,taskId,status,startTime,endTime from ${database}.[dbo].[task_monitor_tmp];"
update_task_lastTime_sql="UPDATE task_lastTime SET task_lastTime = NOW() WHERE id=1;"
#执行SQL 运行成功和失败的任务 写入文件
rm -rf ${azkaban_exec_tmp_file}
${mysql_cmd} "${select_exec_sql}"
if [ -f ${azkaban_exec_tmp_file} ];then
${sqlserver_cmd} "truncate table ${database}.[dbo].[task_monitor_tmp]"
/opt/mssql-tools/bin/bcp ${database}.dbo.task_monitor_tmp in ${azkaban_exec_tmp_file} -S${server} -U${user} -P${paw} -c -t, -r'\n' -b 1000
${sqlserver_cmd} "${task_move_sql}"
else
echo file ${azkaban_exec_tmp_file} not exist!
fi
${mysql_cmd} "${update_task_lastTime_sql}"
2.3.任务状态写入MySQL的shell
2.3.1.MySQL表创建 azkaban_task_monitor
DROP TABLE IF EXISTS `azkaban_task_monitor`;
CREATE TABLE `azkaban_task_monitor` (
`id` bigint(20) NOT NULL AUTO_INCREMENT,
`flowId` int NOT NULL,
`taskId` varchar(500) DEFAULT NULL,
`status` int DEFAULT NULL,
`startTime` timestamp NULL DEFAULT NULL,
`endTime` timestamp NULL DEFAULT NULL,
`modifyTime` timestamp NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
`createTime` timestamp NULL DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
2.3.2.临时表创建 azkaban_task_monitor_tmp
DROP TABLE IF EXISTS `azkaban_task_monitor_tmp`;
CREATE TABLE `azkaban_task_monitor_tmp` (
`flowId` int NOT NULL,
`taskId` varchar(255) DEFAULT NULL,
`status` int DEFAULT NULL,
`startTime` timestamp NULL DEFAULT NULL,
`endTime` timestamp NULL DEFAULT NULL,
PRIMARY KEY (`flowId`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
2.3.3.任务业务说明表 azkaban_task_all_info
DROP TABLE IF EXISTS `azkaban_task_all_info`;
CREATE TABLE `azkaban_task_all_info` (
`taskId` varchar(255) NOT NULL, # flowID
`taskName` varchar(500) DEFAULT NULL, # 任务名
`taskDesc` varchar(500) DEFAULT NULL, #任务描述
`modifyTime` timestamp NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
`createTime` timestamp NULL DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY (`taskId`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
2.3.4.任务状态写入MySQL的shell
写入到MySQL 不同的数据库或者不同的服务器上
set -x
# Azkaban 元数据表
HOSTNAME="xxx"
USER="root"
PASSWD="@001"
PORT="3306"
DBNAME="azkaban"
# 告警目标库
WARN_HOSTNAME="xxx"
WARN_USER="root"
WARN_PASSWD="@001"
WARN_PORT="3306"
WARN_DBNAME="kangll"
azkaban_exec_tmp_file=/hadoop/datadir/temp/monitor/exec_tmp_file.txt
# Azkaban元数据库连接
mysql_cmd="mysql -h${HOSTNAME} -P${PORT} -u${USER} -p${PASSWD} ${DBNAME} -e "
# 告警目标库连接
mysql_cmd_two="mysql -h${WARN_HOSTNAME} -P${WARN_PORT} -u${WARN_USER} -p${WARN_PASSWD} ${WARN_DBNAME} -e "
last_time_sql="select task_lastJobTime from azkaban_lastJobTime where id = 1;"
# 查询最后同步的时间
last_time=$(mysql -h${WARN_HOSTNAME} -P${WARN_PORT} -u${WARN_USER} -p${WARN_PASSWD} ${WARN_DBNAME} --skip-column-names -e "${last_time_sql}")
# 查询Azkaban调度中 运行成功和失败的任务
select_exec_sql="select exec_id,flow_id,status,FROM_UNIXTIME(start_time/1000) as start_time,FROM_UNIXTIME(end_time/1000) as end_time from execution_flows where FROM_UNIXTIME(start_time/1000)>\"${last_time}\" and (status = 70 or status = 50)
into outfile \"${azkaban_exec_tmp_file}\" fields terminated by \",\" ;"
# load 文件到临时表中
load_file_sql="load data local infile \"${azkaban_exec_tmp_file}\" into table azkaban_task_monitor_tmp fields terminated by \",\" (flowId,taskId,status,startTime,endTime)"
# 任务从临时表抽入正式
task_move_sql="insert into azkaban_task_monitor (flowId,taskId,status,startTime,endTime) select flowId,taskId,status,startTime,endTime from azkaban_task_monitor_tmp;"
update_task_lastTime_sql="UPDATE azkaban_lastJobTime SET task_lastJobTime = NOW() WHERE id=1;"
# 任务状态写入临时文件
rm -rf ${azkaban_exec_tmp_file}
${mysql_cmd} "${select_exec_sql}"
if [ -f ${azkaban_exec_tmp_file} ];then
# 清空临时表
${mysql_cmd_two} "truncate table azkaban_task_monitor_tmp"
# 文件load到临时表
${mysql_cmd_two} "${load_file_sql}"
# 临时文件 insert到目标表
${mysql_cmd_two} "${task_move_sql}"
else
echo "${azkaban_exec_tmp_file} not exist!"
fi
${mysql_cmd_two} "${update_task_lastTime_sql}"