系统相关类
PluginLoader:插件的加载类,封装了插件信息、配置加载、安装过程。包含如下组件:
- PluginInfo:含有插件的基本信息
- Plugin接口:插件初始化接口
- AuditPlugin接口:包含审计类型插件关联的操作
初始化
PluginMgr.init初始化时将构建内置插件AuditLogBuilder。
其实插件安装的最终目的是将插件和PluginLoader加载到内存中
插件安装
安装插件时将进行如下步骤:
- 下载、解压或拷贝插件文件到临时目录中
- 读取plugin.properties文件内容
- 移动临时目录中的内容到插件标准目录
- 动态加载插件类
- 调用插件init方法(以下对于AuditLoaderPlugin类而言)
- 加载plugin.conf
- 拉起LoadWorker线程
 
- EditLog记录PluginInfo
展示插件列表
核心在展示PluginMgr的plugins: Map<String, PluginLoader>[]属性
消息连结
审计消息注入 - AuditEventProcessor
- StreamLoadRecordMgr
- BulkLoadJob
- ConnectProcessor
此三处调用结束后将调用AuditEventProcessor.handleAuditEvent,将审计内容插入eventQueue队列
   public void handleAuditEvent(AuditEvent auditEvent) {
        try {
            eventQueue.add(auditEvent);
        } catch (Exception e) {
            LOG.warn("encounter exception when handle audit event, ignore", e);
        }
        }
同时存在另一Worker内部线程,从队列中获取audit信息,并从PluginMgr中获取插件列表,遍历调用AuditPlugin.exec
    public class Worker implements Runnable {
        @Override
        public void run() {
            AuditEvent auditEvent;
            while (!isStopped) {
                auditEvent = eventQueue.poll(5, TimeUnit.SECONDS);
                for (Plugin plugin : auditPlugins) {
                    if (((AuditPlugin) plugin).eventFilter(auditEvent.type)) {
                        ((AuditPlugin) plugin).exec(auditEvent);
                    }
                }
            }
        }
    }
默认插件AuditLogBuilder
将AuditEvent转成字符串使用log4j写入日志文件
    public void exec(AuditEvent event) {
        try {
            switch (event.type) {
                case AFTER_QUERY:
                    auditQueryLog(event);
                    break;
                case LOAD_SUCCEED:
                    auditLoadLog(event);
                    break;
                case STREAM_LOAD_FINISH:
                    auditStreamLoadLog(event);
                    break;
                default:
                    break;
            }
        } catch (Exception e) {
            LOG.debug("failed to process audit event", e);
        }
    }
    
    private void auditQueryLog(AuditEvent event) throws IllegalAccessException {
        StringBuilder sb = new StringBuilder();
        Field[] fields = event.getClass().getFields();
        for (Field f : fields) {
            AuditField af = f.getAnnotation(AuditField.class);
            sb.append("|").append(af.value()).append("=").append(String.valueOf(f.get(event)));
        }
        String auditLog = sb.();
        AuditLog.getQueryAudit().log(auditLog);
    }    
