关键字:Kafka、消息中间件、MQ、数据审计、生产、消费
Kafka介绍:Kafka是一个开源的分布式事件流平台,被数千家公司用于高性能数据管道、流分析、数据集成和关键任务应用程序。这是Kafka官网首页用一句话对Kakfa的介绍,这句话中首先提到的“高性能数据管道”,在笔者看来这就是对消息队列(Message Queue ,简写为MQ)的另外一种描述。
本文的主要目的不是对Kafka展开来进行详细介绍,引用官网这幅图就能够将Kafka从技术的角度介绍清楚。Kafka 是一种分布式的,基于发布、订阅的消息系统,具有以下特点:
- 高吞吐量:可以满足每秒百万级别消息的生产和消费;
- 持久性:有一套完善的消息存储机制,确保数据高效安全且持久化;
- 分布式:基于分布式的扩展;Kafka的数据都会复制到几台服务器上,当某台故障失效时,生产者和消费者转而使用其它的Broker;
- 实时性:同时支持离线数据处理和实时数据处理。
审计类程序对于MQ的需求:数据审计类程序的最大特点是有海量的数据需要进行记录并持久化,为以后的数据审计工作提供数据基础。与此同时,审计类的数据也业务没有强关联性,不需要同步进行保存。因此,通过消息中间件对审计类的数据通过消息中间件进行异步保存的方案成为了业界的主流。在这样的架构下,MQ就成为了整个审计程序的最关键组件,MQ的性能与可用性就制约了审计程序的整体稳定性。
基于以上阐述,审计类程序对消息中间件的吞吐量、高可用性有很高的要求。在某些场景下,为保障数据的安全性和及时性,审计程序对消息中间件的持久性和实时性也提出了要求。结合各个消息中间件的特性,Kafka就理所当然的成为了审计程序在MQ的最佳选择。
Kafka在云审计产品的实践:随着云计算技术的发展,越来越多的IT资产被转移到云端,在云上部署了大量的IT资产,包括但不限于虚拟机、数据库、业务系统。而IT资产被转移到云端之后,用户通过云平台对IT资产进行查看和管理,云上系统与数据的安全性就变得尤为重要。站在用户自身的角度来看,通过云管平台的操作需要有记录、可溯源,这样在遇到操作事故的时候才能复盘和追责。从国家与监管的层面看,等级保护等相关法律要求对云平台上的操作进行保存。
云审计需要关注海量的用户操作事件,并进行落盘和归档。在这样技术场景下,Kafka作为消息中间件出现在云审计的技术架构的C位。
通过上面的图我们可以看出Kafka在云审计中的重要性,这样对Kafka高度依赖的技术架构就需要我们充分利用Kafka的优势并且规避Kafka存在的问题,这样我们整个云审计的系统稳定性才有保障。Kafka的优势在本文的开始就已经跟大家共同探讨,下文主要总结云审计在使用Kafka的过程中遇到的问题。Kafak在云审计产品的实践过程中遇到的问题主要分为两大类:一类是因为Kafka本身的局限性可预期的问题,另一类是在研发与测试过程遇到的未预料到的问题。
- 预期的问题:
- Topic的数量问题
根据Kafka的技术架构我们不难发现Partition的数量越多Kafka的吞吐量在一定程度上是越高的,但是在已经确定的节点和分区的环境下Topic数量无限制的增加将严重影响Kafka的吞吐量,因此云审计在程序设计上没有把Kafka的Topic的创建能力下放给用户,而是严格控制Topic数量来保障吞吐量的稳定。
- 消息丢失问题
Broker丢失消息是由于Kafka本身的原因造成的,Kafka为了得到更高的性能和吞吐量,将数据异步批量的存储在磁盘中。消息的刷盘过程,为了提高性能,减少刷盘次数,Kafka采用了批量刷盘的做法。即,按照一定的消息量,和时间间隔进行刷盘。也就是说,理论上,要完全让Kafka保证单个broker不丢失消息是做不到的,只能通过调整刷盘机制的参数缓解该情况。比如,减少刷盘间隔,减少刷盘数据量大小。时间越短,性能越差,可靠性越好。但是审计类数据为非业务类数据,少量的丢失而保证了整个系统的高可用性是完全可以接受的。
- 消息顺序问题
由于Kafka的一个Topic可以分为了多个Partition,Producer发送消息的时候,是分散在不同 Partition的。当Producer按顺序发消息给Broker,但进入Kafka之后,这些消息就不一定进到哪个Partition,会导致顺序是乱的。但是云审计场景对消息的顺序没有要求,因此系统并没有为了消息的顺序而控制Topic的Partition。
- 消息重复问题
Kafka的消息重复问题主要分为两个个场景:一是生产者重试造成消息重复;二是消费者重复消费消息。重复的消息并不会对审计业务造成影响,因此云审计并未对消息重复的场景进行特殊处理。
- 实际遇到的问题:
- 数据的压缩与加密
通过Kafka传输给云审计的数据主要为用户的操作事件,在测试的过程中发现数据中可能包含用户的敏感数据,因此决定对数据进行加密处理。但是加解密过程会影响Producer和consumer的性能,在进行性能与安全的权衡之后决定还是需要对传输的数据进行加密处理,综合评估采用对称加密的算法。结合各对称加密算法的特性,本系统采用AES加密算法。与此同时,操作事件种类繁多,数据长度不统一,且跨资源的传输要走公网传输,因此对数据进行压缩变得很重要。好在Kafka原生支持对数据进行压缩操作,经过综合评估本项目采用了lz4算法进行压缩。
- 公网传输协议的安全性问题
前期对项目进行规划的时候期望整个数据流都不走公网链路进行传输,但是基于实际情况,和云审计的部署架构,跨资源池的数据必须要通过公网进行传输,因此数据的传输协议就需要调整为SASL_SSL的安全协议,以保证数据的安全性。
网上的很多证书制作过程不完善,通过阅读官方文档总结出了可用的制作脚本供读者使用。
#!/bin/bash
#######################服务端验证证书#######################
#Step 1 生成服务端密钥库
keytool -keystore server.keystore.jks -alias localhost -validity 3650 -genkey -keyalg RSA -storetype pkcs12
#Step 2 创建CA认证证书
openssl req -new -x509 -keyout ca-key -out ca-cert -days 3650
#Step 3 服务端信任客户端证书
keytool -keystore client.truststore.jks -alias CARoot -import -file ca-cert
#Step 4 从密钥库导出证书
keytool -keystore server.keystore.jks -alias localhost -certreq -file cert-file
#Step 5 用CA签名
openssl x509 -req -CA ca-cert -CAkey ca-key -in cert-file -out cert-signed -days 3650 -CAcreateserial -passin pass:test1234
#Step 6 导入CA的证书和已签名的证书到密钥仓库:
keytool -keystore server.keystore.jks -alias CARoot -import -file ca-cert
keytool -keystore server.keystore.jks -alias localhost -import -file cert-signed
#######################客户端验证证书#######################
#Step 1 生成客户端密钥库
keytool -keystore client.keystore.jks -alias localhost -validity 3650 -genkey -keyalg RSA -storetype pkcs12
#Step 2 将CA添加到服务端端信任库
keytool -keystore server.truststore.jks -alias CARoot -import -file ca-cert
#Step 3 从密钥库导出证书
keytool -keystore client.keystore.jks -alias localhost -certreq -file cert-file
#Step 4 用CA签名
# 执行本步骤之前 执行rm -rf cert-signed
openssl x509 -req -CA ca-cert -CAkey ca-key -in cert-file -out cert-signed -days 3650 -CAcreateserial -passin pass:test1234
#Step 5 导入CA的证书和已签名的证书到密钥仓库:
keytool -keystore client.keystore.jks -alias CARoot -import -file ca-cert
keytool -keystore client.keystore.jks -alias localhost -import -file cert-signed
- 跨语言的Kafka Client对证书格式支持性问题
由于不通产品在上报操作事件的时候使用的是不通的开发语言,因此就存在不通的Kafka Client。本项目在通过Python进行数据上报时,Kafka-Python库不支持jks格式的证书,需要将生产的jks的证书装维pem格式的。