系统相关类
PluginLoader:插件的加载类,封装了插件信息、配置加载、安装过程。包含如下组件:
- PluginInfo:含有插件的基本信息
- Plugin接口:插件初始化接口
- AuditPlugin接口:包含审计类型插件关联的操作
初始化
PluginMgr.init初始化时将构建内置插件AuditLogBuilder。
其实插件安装的最终目的是将插件和PluginLoader加载到内存中
插件安装
安装插件时将进行如下步骤:
- 下载、解压或拷贝插件文件到临时目录中
- 读取plugin.properties文件内容
- 移动临时目录中的内容到插件标准目录
- 动态加载插件类
- 调用插件init方法(以下对于AuditLoaderPlugin类而言)
- 加载plugin.conf
- 拉起LoadWorker线程
- EditLog记录PluginInfo
展示插件列表
核心在展示PluginMgr的plugins: Map<String, PluginLoader>[]属性
消息连结
审计消息注入 - AuditEventProcessor
- StreamLoadRecordMgr
- BulkLoadJob
- ConnectProcessor
此三处调用结束后将调用AuditEventProcessor.handleAuditEvent,将审计内容插入eventQueue队列
public void handleAuditEvent(AuditEvent auditEvent) {
try {
eventQueue.add(auditEvent);
} catch (Exception e) {
LOG.warn("encounter exception when handle audit event, ignore", e);
}
}
同时存在另一Worker内部线程,从队列中获取audit信息,并从PluginMgr中获取插件列表,遍历调用AuditPlugin.exec
public class Worker implements Runnable {
@Override
public void run() {
AuditEvent auditEvent;
while (!isStopped) {
auditEvent = eventQueue.poll(5, TimeUnit.SECONDS);
for (Plugin plugin : auditPlugins) {
if (((AuditPlugin) plugin).eventFilter(auditEvent.type)) {
((AuditPlugin) plugin).exec(auditEvent);
}
}
}
}
}
默认插件AuditLogBuilder
将AuditEvent转成字符串使用log4j写入日志文件
public void exec(AuditEvent event) {
try {
switch (event.type) {
case AFTER_QUERY:
auditQueryLog(event);
break;
case LOAD_SUCCEED:
auditLoadLog(event);
break;
case STREAM_LOAD_FINISH:
auditStreamLoadLog(event);
break;
default:
break;
}
} catch (Exception e) {
LOG.debug("failed to process audit event", e);
}
}
private void auditQueryLog(AuditEvent event) throws IllegalAccessException {
StringBuilder sb = new StringBuilder();
Field[] fields = event.getClass().getFields();
for (Field f : fields) {
AuditField af = f.getAnnotation(AuditField.class);
sb.append("|").append(af.value()).append("=").append(String.valueOf(f.get(event)));
}
String auditLog = sb.();
AuditLog.getQueryAudit().log(auditLog);
}