一、数据同步
1.1、什么是数据同步
我们知道 elasticsearch 的数据是来源于 数据库(比如 mysql). 当我们在写了代码将 mysql 中的数据导入 es 中,那么这次导入之后 mysql 的数据并不会一成不变,将来我们的业务中会有 crud,数据库中的数据就和有新增、修改、删除,那么 mysql 数据一定那发生变化, es 如果不跟着变化,就会出现问题.
比如在一个商城系统中,到了 双11 ,数据库中的商品的价格下降,而 es 还是老价格,那么用户搜索时看到的商品的价格还是没有变化,用户可能就得考虑换软件了~
因此,我们要保证 mysql 数据变化的时候 es 也能跟着同步变化,这就是数据同步.
Ps:实际上不光是 es 存在数据同步问题,凡是涉及到数据库双写的情况,比如 redis 和 mysql,都会存在数据同步问题.
1.2、解决数据同步面临的问题
如果你现在是一个单体式的项目,所有业务都写在一个项目中,那就比较好办,无非就是在及逆行新增、修改、删除业务的时候,同时把 es 也一起更新就 ok.
但是如果我们是一个 微服务 架构的项目上,不同的业务往往会在不同的微服务上,比如 “商品数据管理业务” 和 “商品数据搜索业务” 肯定会在两个不同的微服务上,那么跨微服务的项目就没办法直接操作了.
1.3、解决办法
1.3.1、同步调用
假设我们现在有两个微服务,一个是 酒店数据管理服务,另一个是酒店数据搜索服务. 假设这两个服务之间互相不能访问对方的数据库,也就是说,酒店管理服务只能访问 mysql,而 酒店搜索服务 只能访问 es,这也符合 微服务 里的标准和规范.
这里有一种办法是同步调用,步骤如下:
1.比如用户做新增操作时,首先把数据写到数据库里.
2. 数据库写完了以后紧接着调用 酒店搜索服务中的 “更新索引库” 的接口.
3. 更新 es.
最后更新完了 es 就把响应反馈给搜索服务,然后搜索服务才会把把响应反馈给 管理服务,然后再反馈给用户. 这整个过程依次执行,因此也叫同步调用.
缺陷:
1. 数据耦合,业务耦合:原本只是写数据库,写完就结束了,然后现在还需要再写完数据库的代码后面再加上 调用 “更新索引库” 接口的代码,而且这调用 这个接口的业务跟我新增业务显然没有关系啊,现在业务耦合在一起,将来必然也会影响性能.
2. 影响性能(耦合带来的问题):原本数据库写完了,比如耗时 50ms,但是现在写完数据库,你还得等待后台调用这个接口返回的响应,而这个接口又要等待 es 这里的响应,假如这里也耗时 50ms,那么总的耗时不就是这个三个步骤相加的,达到 100 ms.
3. 牵一发而动全身(耦合带来的问题):如果 步骤 2 和 步骤 3 任意一个位置出现了异常,就会导致整个业务也出现崩溃.
同步调用这么多问题,那么就需要考虑别的方案了.
1.3.2、异步通知(推荐)
这里就需要使用到 mq 来实现了,步骤如下:
1. 当有人做新增操作时,先去写数据库.
2. 写完之后,不调用任何服务的接口,而是向 mq 发送一个消息,通知一下 其他服务:“我这里数据新增了啊~”,整个步骤到这里结束.
那么至于谁来监听这个消息,监听了以后做什么,跟我有关系吗?没关系,这样一来,业务的耦合就解除了; 至于其他服务耗时多少秒,跟我也没关系,我写完数据库,发完消息就结束了,因此性能也提升了;再者,就算其他服务出现异常了,跟我这里也没关系.
缺陷:
1. 这样一来,比较依赖 mq 消息的可靠性.
2. 引入新的中间件,实现的复杂度也会有一定的上升.
不过这些缺陷,跟同步调用比起来,算不了什么,因此这也是比较推荐的方案.
1.3.3、监听 binlog
mysql 默认情况下 binlog 时关闭的,一旦开启,那么每次 mysql 在做增删改的时候,都会记录相应的操作到 binlog 中.
那么可以使用类似于 canal 这样的中间件来监听 binlog,一旦发现变化,立马通知对应的微服务,这个时候就知道数据发生变更,就可以进行更新了.
优势:
这种方案他既不给任何中间件发消息,也不去调用任何接口,因此耦合度是最低的.
劣势:
1.开启 binlog,对 mysql 的压力就增加了.
2.引入新的中间件.
1.3、基于 RabbitMQ 实现数据同步
1.3.1、需求
现在有两个微服务:“酒店管理服务” 和 "酒店搜索服务"
用户操作 “酒店管理服务” 进行增删改数据、要求对 "酒店搜索服务" 中的 es 的数据也要完成相同的数据更改操作.
这里使用 MQ 的异步方式实现数据同步.
1.3.2、在“酒店搜索服务”中 声明 exchange、queue、routingKey,同时开启监听
在 “酒店搜索服务” 中去声明一个 exchange,用来接收 增删改 的消息,接着声明两个队列即可,一个队列用来增改(这两用一个队列是因为在 es 中,增改可以使用同一个 DSL 语句实现),另一个用来删除(这里我是以 Bean 的方式注入到容器中了).
public class MqConstants {
//主题交换机
public static final String EXCHANGE_TOPIC = "hotel.topic";
//增加 or 修改酒店 队列
public static final String INSERT_QUEUE = "hotel.insert.queue";
//删除酒店 队列
public static final String DELETE_QUEUE = "hotel.delete.queue";
//增加 or 修改酒店 routingKey
public static final String INSERT_KEY = "hotel.insert.key";
//删除酒店 routingKey
public static final String DELETE_KEY = "hotel.delete.key";
}
@Configuration
public class MqConfig {
@Bean
public TopicExchange hotelTopicExchange() {
return new TopicExchange(MqConstants.EXCHANGE_TOPIC, true, false);
}
@Bean
public Queue hotelInsertQueue() {
return new Queue(MqConstants.INSERT_QUEUE, true);
}
@Bean
public Queue hotelDeleteQueue() {
return new Queue(MqConstants.DELETE_QUEUE, true);
}
@Bean
public Binding hotelInsertBinding() {
return BindingBuilder.bind(hotelInsertQueue()).to(hotelTopicExchange()).with(MqConstants.INSERT_KEY);
}
@Bean
public Binding hotelDeleteBinding() {
return BindingBuilder.bind(hotelDeleteQueue()).to(hotelTopicExchange()).with(MqConstants.DELETE_KEY);
}
}
Ps:此类(MqConfig)的包必须与启动类同级,否则声明交换机和队列失败.
最后就可以使用 @RabbitListener 监听 队列 了.
@Component
public class MqListener {
@Autowired
private IHotelService hotelService;
@RabbitListener(queues = MqConstants.INSERT_QUEUE)
public void HotelInsertOrUpdateListener(Long id) {
hotelService.insertHotelById(id);
}
@RabbitListener(queues = MqConstants.DELETE_QUEUE)
public void HotelDeleteListener(Long id) {
hotelService.deleteHotelById(id);
}
}
Ps:此类(MyListener,监听者类)上必须要有 @Component 注解(交由给 Spring 来管理),否则声明的交换机和队列无效.
1.3.3、在“酒店管理服务”中发布消息
酒店管理服务中,一旦用户进行酒店的 增删改,就会对数据库信息进行修改,然后将增删改的消息发布到 MQ 中.
Ps:这里不要发送 hotel 整体数据,太大可能会导致占满队列(Mq 是基于内存存储的,因此会设定队列上限),因此这里发送 id 即可. es 这边拿到 id,就可进行相应的增删改.
@PostMapping
public void saveHotel(@RequestBody Hotel hotel){
// 新增酒店
hotelService.save(hotel);
rabbitTemplate.convertAndSend(MqConstants.EXCHANGE_TOPIC, MqConstants.INSERT_KEY, hotel.getId());
}
@PutMapping()
public void updateById(@RequestBody Hotel hotel){
//修改酒店信息
if (hotel.getId() == null) {
throw new InvalidParameterException("id不能为空");
}
hotelService.updateById(hotel);
rabbitTemplate.convertAndSend(MqConstants.EXCHANGE_TOPIC, MqConstants.INSERT_KEY, hotel.getId());
}
@DeleteMapping("/{id}")
public void deleteById(@PathVariable("id") Long id) {
//删除酒店信息
hotelService.removeById(id);
rabbitTemplate.convertAndSend(MqConstants.EXCHANGE_TOPIC, MqConstants.DELETE_KEY, id);
}
1.3.4、启动微服务并测试
a)在酒店管理页面中,修改 “7天连锁酒店(上海莘庄地铁站店)” 为 “7天连锁酒店(此处正在施工,请谨慎前往)”,如下.
b)在酒店搜索页面中,搜索 “施工” 关键词,就可以看到在 “酒店管理服务” 中更新的信息,已经同步到了 “酒店搜索服务” 中.