上一篇文章讨论了写缓存的架构解决方案,它虽然可以减少数据库写操作的压力,但也存在不足。比如需要长期高频插入数据时,这个解决方案就无法满足,本篇文章我们就围绕这个问题逐步提出解决方案。在架构方案层层展开的过程中,你会发现不断会有新问题需要考虑。
一、业务背景
因业务快速发展,公司系统日活用户高达500万,基于现有业务模式,业务侧要求我们根据用户行为做埋点,旨在记录用户在指定页面的所有行为、开展数据分析与第三方进行费用结算。(至于为啥要做结算,我就不说了,嘿嘿)
当然,在埋点过程中,业务侧还要求能在后台实时查询用户行为和统计报表。(这里虽然说是实时,其实特定时间内的延迟业务方还是能接受的,为确保描述的准确性,我们把它称之为“准实时”吧)
为了能够清晰的理解后续方案的建设思路,我来简单列举点儿数据结构,免得后面出现理解偏差。首先,我们需要收集的原始数据结构如下表所示:
指标 | 备注 |
---|---|
IMEI | 用户设备的IMEI |
定位点 | 经纬度 |
用户ID | |
目标ID | 每个页面、按钮、banner都有唯一识别id |
目标类型 | 页面、按钮、banner等 |
事件动作 | 点击、进入、跳出等 |
From Url | 来源URL |
Current Url | 当前URL |
To URL | 去向URL |
动作时间 | 触发这个动作的时间 |
进入时间 | 进入该页面的时间 |
跳出时间 | 跳出该页面的时间 |
... | ... |
通过以上的数据结构,在后台查询原始数据时,业务侧不仅可以以城市(根据经纬度换算)、性别(从业务表中查询)、年龄、目标类型、目标ID、事件动作等作为查询条件实时查看用户行为数据,还可以以时间、性别、年龄等维度实时查看每个目标ID的总点击数、平均点击数、页面转换率等统计报表数据。
二、技术选型思路
根据以上业务场景,我们提炼出了6点业务需求,并针对业务需求梳理了技术选型相关思路。
1、原始数据海量:对于这点,我们初步考虑HBase进行持久化。
2、对于埋点记录的请求要快:埋点记录服务会把原始埋点记录存放在一个缓冲的地方,以此来保证响应速度。关于这点有好几个缓存方案,下面会展开讨论。
3、可通过后台查询原始数据:如果使用HBASE直接作为查询引擎,查询速度太慢了,所以我们还需要使用ES来保存查询页面上作为查询条件的字段和活动ID。
4、各种统计报表的需求:关于数据可视化工具也有很多选择,比如kibana、grafana等,考虑使用过程的灵活性,我们最终选择自己设计功能。
5、能根据埋点日志生成费用结算数据:我们将费用结算数据保存在MySQL中。
6、需要一个框架将缓存中的数据进行处理,并保存到ES、HBase和MySQL中:因为业务有准实时查询的需求,所以我们需要使用实时处理工具。目前,市面上流行的实时处理工具主要分为Storm、spark Steaming、Apache Flink这三种,一会儿我们也会展开说明。
为了更快理解这部分内容,画了个简单的架构图来说明,如下图所示:
仔细观察这张架构图,你会发现图上还有两个地方打了问好,这是为什么呢?这就涉及到我们接下来需要讨论的4个问题。
1、使用什么技术保存埋点数据的第一现场?
市面上关于快速保存埋点数据的技术主要分为Redis、kafka、本地日志这三种,在上面的业务场景中,我们最终选择了本地日志。
说到这,你可能想问:Redis跟Kafka到底哪里不好,为什么不使用呢?我们先来说说Redis的AOF机制,这点在前面第六篇文章也说过。
Redis的AOF机制会持久化保存Redis所有操作记录,用于服务器宕机后数据还原。那么Redis什么时候将AOF落盘呢?
在Redis中存在一个AOF配置:appendfsync,如果appendfsync配置成everysec,AOF每秒落盘一次,不过这种配置方式有可能会丢失1秒的数据。如果appendfsync配置成always,每次操作请求的记录都落盘后再返回成功信息给客户端,不过这种配置方式系统性能就会很慢。因为对于埋点记录的请求要求响应快,所以我们没有选择Redis。
接下来我们讨论下kafka的技术方案。
kafka的冗余设计是每个分区都有多个副本,其中一个副本是Leader,其他副本都是Follower,Leader主要负责处理所有的读写请求,并同步给其他Follwer。
那么kafka什么时候将数据从Leader同步给Follower呢?kafka的producer configs中也有个acks配置,它的配置方式分为三种。
- acks=0:不等Leader将数据落到日志,kafka直接返回完成信号给客户端。这种方式虽然响应很快,但数据持久化没有保障,数据如果没有落到本地日志,系统就会出现宕机,导致数据丢失。
- acks=1:等Leader将数据落到本地日志,但是不等Follower同步数据,kafka就直接返回完成信号给客户端。
- acks=all:等Leader将数据落到日志,且等min.insync.replicas个Follower都同步数据后,kafka再返回完成信号给客户端。这种配置方式虽然数据有保证,但响应慢。
通过以上的解释,是否发现了使用Redis与kafka都会出现问题呢?
如果我们想保证数据的可靠性,必然需要牺牲系统性能,那么有没有一个方案可以性能+可靠性同时兼得呢?有的,所以我们最终决定把埋点数据保存到本地日志中。
2、使用什么技术(ES、HBase、Mysql)把缓冲数据搬到持久化层呢?
关于这个问题,最简单的方式是通过Logstash直接把日志文件中的数据搬运到ES,但是问题来了,业务侧要求存放ES中的记录包含城市、性别、年龄等原始数据(这些字段需要调用业务系统的数据进行抽取),而这些原始数据日志文件中并没有,所以我们并没有选择Logstash。
如果你坚持通过Logstash把日志文件的数据搬运到ES,我分享3种实现方式。
- 自定义filter:先在Logstash自定义的filter里封装业务数据,再保存到ES。因Logstash自定义的filter是使用Ruby语言编写,也就是说我们需要使用其他语言编写业务逻辑,因此Logstash自定义filter的方案被我们pass了。
- 修改客户端的埋点逻辑:每次记录埋点的数据发送到服务端之前,我们先在客户端将业务的相关字段提取出来再上传到服务端。这个方法也直接被业务端pass了,理由是后期业务侧每更新一次后台查询条件,我们就需要重新发一次版,实在太麻烦了。
- 修改埋点服务端的逻辑:每次服务端在记录埋点的数据发送到日志文件之前,我们先从数据库获取业务字段组合埋点记录。这个方法也被服务端pass了,因为这种操作会直接影响每个请求的效率,间接印象用户体验。
另外,我们没选择logstash还有两点原因。
- 日志文件中的数据需要同时输出ES和Hbase两个输出源,因Logstash的多输出源基于同一个pipeline,如果1个输出源出错了,另1个输出源也会出错,两者之间会互相影响。
- MySQL中需要生成费用结算数据,而费用结算数据需要通过分析埋点的数据动态来计算,显然Logstash并不适合这样的业务场景,因为filter可以改变每条数据某些字段的值。
在上面的业务场景中,我们最终决定引入了一个计算框架了,此时整个解决方案的架构图如下:
这个方案中就是先通过logstash把日志文件搬运到MQ中,再通过实时计算框架处理MQ中的数据,最后保存处理转换出来的数据到持久层中。
实际上,引入实时计算框架是为了再原始的埋点数据中填充业务数据,并统计埋点数据生成费用结算数据,最后分别保存到持久层中。
最后,关于Logstash的注意点,我们需要再强调下。
Logstash系统是通过Ruby语言编写的,资源消耗大,所以官方又推出一个轻量的Filbeat。我们可以使用Filebeat收集数据,再通过Logstash进行数据过滤。如果你不想使用Logstash的强大过滤功能,你可以直接使用Filebeat收集日志数据发送给kafka。
但问题又来了,Filebeat是使用轮询的方式采集文件变动,存在一定(有时候很大)延时,不像Logstash可直接监听文件变动,所以最后最终我们选择继续使用Logstash。(因为我们扛得住资源的消耗)
下面,我们开始讨论kafka、处理框架。
3、为什么使用Kafka?
kafka是LinkedIn推出的开源消息中间件,它天生是为收集日志而设计,且它具备超高的吞吐量和数据量的扩展性,号称无线堆积。
根据LinkedIn官方说法,他们使用3台便宜的机器部署kafka,就能每秒写入2百万条记录,官方博客有说明,感兴趣的可以自行查找一下。。。
看到这里,大家肯定会好奇为什么它的吞吐量这么高?这里我们就有必要了解kafka的存储结构了,我们先看一张架构示意图:
图片来源:kafka官方文档:http:///documentation/#log
Kafka 的存储结构中每个Topic分区相当于1个巨型文件,而每个巨型文件又是由多个segment小文件组成。其中,producer负责对该巨型文件进行“顺序写”,Consumer负责对该文件进行“顺序读”。
这里,我们可以把kafka的存储架构简单理解为kafka写数据通过追加数据到文件尾实现顺序写,读取数据时直接从文件中读,好处是读操作不会阻塞写操作,这也是吞吐量大的原因。
另外,理论上只要磁盘空间足够,kafka可以实现消息无限堆积,因此它特别适合处理日志收集这种场景,可见我们选择使用kafka是有一定理论依据的哦。
4、使用什么技术把kafka的数据搬运到持久层?
为了把kafka的数据搬运到持久层,我们需要使用一个分布式实时计算框架,原因有2点:
- 数据量特别大,为此我们需要使用一个处理框架将上亿的埋点数据每天进行快速分析和处理(且必须使用多个节点并发处理才来得及),再存放到ES、Hbase和MySQL中,即大数据计算,因此它有分布式计算的诉求。
- 业务要求实时查询统计报表数据,因此我们需要一个实时计算框架处理埋点数据。
目前,市面上流行的分布式实时计算框架有3种:Storm、Spark Stream、Apache Flink,到底使用哪个好呢?
我认为都可以,这就看公司的具体情况了,比如公司已经使用实时计算框架了,你就不需要再考虑这个问题了,如果公司还没使用,那就看个人喜好了。
我个人偏好Apache Flink,不仅因为它性能强(听说阿里双11使用它后,1秒内处理了17亿条数据),还因为它的容错机制能保证每条数据仅仅处理1次,且它有时间窗口处理功能。
关于流处理、容错机制、时间窗口这三个概念,我们具体展开说明一下。
在流处理这个过程中,往往会引发一系列的问题,比如一条消息处理过程中,如果系统出现故障该怎么办?你会处理吗?如果重试会不会出现重复处理?如果不重复,消息是否会丢失?你能保证每条消息最多或最少处理几次?
在不同流处理框架中采取不同的容错机制,他们也就保证了不一样的一致性。
- At-Most-Once:至多一次,表示一条消息不管后续处理成功与否只会被消费处理一次,存在数据丢失可能。
- Exactly-Once:精确一次,表示一条消息从其消费到后续的处理成功,只会发生一次。
- At-Least-Once:至少一次,表示一条消息从消费到后续的处理成功,可能会发生多次,存在重复消费的可能。
以上三种方式中,Exactly-Once无疑是最优的选择,因为在正常的业务场景中,一般只要求消息处理一次。而Apache Flink的容错机制就可以保证所有消息只处理一次(Exactly-Once)的一致性,还能保证系统安全性能,所以很多人最终都使用它。
接下来,我们来说说Apache Flink的时间窗口计算功能,以下是Apache Flink的一个代码示例,它把每个小时里发生事件的用户聚合在一个列表中。
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
// alternatively:
// env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
// env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
DataStream<MyEvent> stream = env.addSource(new FlinkKafkaConsumer09<MyEvent>(topic, schema, props));
stream
.keyBy( (event) -> event.getUser() )
.timeWindow(Time.hours(1))
.reduce( (a, b) -> a.add(b) )
.addSink(...);
我们知道,日志中事件发生的时间有可能与计算框架处理消息的时间不一致。
假定实时计算框架收到消息的时间是2秒后,比如有一条消息,这个事件发生的时间是6:30,因你接收到消息后处理的时间延后了2秒,即变成了6:32,因此当你计算6:01-6:30的数据和,这条消息并不会计算在6:01-6:30范围内,这就不符合实际的业务需求了。
在实际业务场景中,如果需要按照时间窗口统计数据,我们往往是根据消息的事件时间来计算。而Apache Flink的特性恰恰是基于消息的事件时间,而不是基于计算框架的处理时间,这也是它的另一个撒手锏。