一、转发规则实现
1.1、需求分析
这里主要实现 routingKey 和 bindingKey 参数的校验,以及 TopicExchange 类型绑定规则的实现.
这里重点来看一下 Topic 交换机的转发规则
bindingkey:创建绑定的时候,给绑定指定特殊字符串,相当于出题;
routingKey:发布消息的时候,给消息上指定的特殊字符串,相当于做答案;
当 routingKey 和 bindingKey 匹配(答案对的上),就可以将消息转发给对应的队列。
什么叫做能匹配的上?
routingKey 的组成:
- 数字、字母、下划线
- 使用 "." 把整个routingKey 分成多个部分
例如:
- aaa.bbb.ccc 合法
- aaa.564.bbb 合法
- aaa 合法
bindingKey 的组成:
- 数字、字母、下划线
- 使用 "." 把整个 bindingKey 分成多个部分
- 支持两种特殊的符号作为通配符(* 和 # 必须是作为被 . 分割出来的独立部分)
- * 可以匹配任何一个独立的部分
- # 可以匹配任何 0 个或者多个独立的部分
例如:
- aaa.*.bbb 合法
- aaa.*.bbb 非法
- aaa.#b.ccc 非法
是否能匹配上,有如下几个栗子:
情况一:bindingKey 中没有 * 和 # ,此时,必须要求 routingKey 和 bindingKey 一模一样,才能匹配成功
假设 bindingKey:aaa.bbb.ccc
此时 routingKey 如下:
aaa.bbb.ccc (匹配成功)
aaa.cbb.ccc (匹配失败)
情况二: bindingKey 中有 *
假设 bindingKey:aaa.*.ccc
此时 routingKey 如下:
aaa.bbb.ccc (匹配成功)
aaa.gafdga.ccc (匹配成功)
aaa.bbb.eee.ccc (匹配失败)
情况三:bindingKey 中有 #
假设 bindingKey:aaa.#.ccc
此时 routingKey 如下:
aaa.bbb.ccc (匹配成功)
aaa.bbb.ddd.ccc (匹配成功)
aaa.ccc(匹配成功)
aaa.b.ccc.d(匹配失败)
特殊情况:如果把 bindingKey 设置成 #,就可以匹配到所有 routingKey,如下
aaa
aaa.bbb
aaa.bbb.ccc
.......
此时,topic 交换机就相当用户 fanout 交换机了
Ps:上述规则是 AMQP 协议约定的
1.2、实现 Router 转发规则
1.2.1、bindingKey 和 routingKey 参数校验
bindingKey:
* 1.数字、字母、下划线
* 2.使用 . 作为分隔符
* 3.允许存在 * 和 # 作为通配符,但是通配符只能作为独立的分段
routingKey:
* 1.字母、数组、下划线
* 2.使用 . 进行分割
/**
* bindingKey 的构造规则
* @param bindingKey
* @return
*/
public boolean checkBindingKey(String bindingKey) {
if(bindingKey.length() == 0) {
//空字符串,也是一种合法情况,比如使用 direct(routingKey 直接当作队列名字去匹配) / fanout 交换机的时候,bindingKey 是用不上的
return true;
}
//检查字符串中不能存在的非法字符
for(int i = 0; i < bindingKey.length(); i++) {
char ch = bindingKey.charAt(i);
if(ch >= 'A' && ch <= 'Z') {
continue;
}
if(ch >= 'a' && ch <= 'z') {
continue;
}
if(ch >= '0' && ch <= '9') {
continue;
}
if(ch == '.' || ch == '_' || ch == '*' || ch == '#') {
continue;
}
return false;
}
//检查 * 或者 # 是否是独立的部分
//aaa.*.bbb 是合法的;aaa.a*.bbb 是非法的
String[] words = bindingKey.split("\\.");//这里是正则表达式
for(String word : words) {
//检查 word 长度 > 1,并且包含了 * 或者 # ,就是非法格式了
if(word.length() > 1 && (word.contains("*") || word.contains("#"))) {
return false;
}
}
//约定一下,通配符之间的相邻关系(个人约定,不这样约定太难实现)
//为什么这么约定。因为前三种相邻的时候,实现匹配的逻辑是非常繁琐,同时功能性提升不大
//1. aaa.#.#.bbb => 非法
//2. aaa.#.*.bbb => 非法
//3. aaa.*.#.bbb => 非法
//4. aaa.*.*.bbb => 合法
for(int i = 0; i < words.length - 1; i++) {
if(words[i].equals("#") && words[i + 1].equals("#")) {
return false;
}
if(words[i].equals("#") && words[i + 1].equals("*")) {
return false;
}
if(words[i].equals("*") && words[i + 1].equals("#")) {
return false;
}
}
return true;
}
/**
* routingKey 的构造规则
* @param routingKey
* @return
*/
public boolean checkRoutingKey(String routingKey) {
if(routingKey.length() == 0) {
//空字符串是合法情况,比如使用 faout 交换机的时候,routingKey 用不上,就可以设置为 “”
return true;
}
for(int i = 0; i < routingKey.length(); i++) {
char ch = routingKey.charAt(i);
// 检查大写字母
if(ch >= 'A' && ch <= 'Z') {
continue;
}
// 检查小写字母
if(ch >= 'a' && ch <= 'z') {
continue;
}
//检查数字
if(ch >= '0' && ch <= '9') {
continue;
}
//检查下划线和.
if(ch == '_' || ch == '.') {
continue;
}
//不是上述规则,就是错误
return false;
}
return true;
}
Ps:split 方法中的参数是一个正则表达式~ 首先 "." 在正则表达式中是一个特殊的符号,"\\." 是把 . 当作原始文本进行匹配,想要使用 . 作为为原始文本,就需要在正则表达式中使用 \. 来表示,又因为,在 Java 的字符串中,"\" 是一个特殊字符,需要使用 "\\" 来转义,此时才能通过 "\\." 的方式真正录入 "." 这个文本
(这个不懂没关系,记住就行了)
1.2.2、消息匹配规则
之前已经具体实现了 直接交换机和扇出交换机的逻辑了,这里主要关注主题交换机.
/**
* 判定消息是否可以转发给这个绑定的队列
* @param exchangeType
* @param binding
* @param message
* @return
*/
public boolean route(ExchangeType exchangeType, Binding binding, Message message) throws MqException {
//1.判断当前交换机类型
if(exchangeType == ExchangeType.FANOUT) {
// 扇出交换机,要给每一个绑定的队列都发送消息
return true;
} else if(exchangeType == ExchangeType.TOPIC) {
// 主题交换机
return routeTopic(binding, message);
} else {
// 其他情况是不存在的
throw new MqException("[Router] 交换机类型非法!exchangeType=" + exchangeType);
}
}
1.2.3、主题交换机匹配规则
假设 bindingKey:aaa.*.bbb.#.ccc
假设 routingKey:aaa.11.bbb.22.33.ccc
这里我们使用双指针算法进行匹配:
- 先将这两个字符串使用 split 通过 "." 进行分割.
- 如果指向的是一个普通的字符串,此时要求和 routingKey 的对应下标执行的内容完全一致.、
- 如果指向的是 *,此时无论 routingKey 指向的是什么,双方都同时下标前进.
- 遇到 # 了,并且如果 # 后面没有其他内容了,直接返回 true,匹配成功.
- 遇到 # 了,并且 # 后面还有内容,拿着 # 后面的一段内容,去 routingKey 中查找,找到后面的部分,在 routingKey 中出现的位置(如果后面部分在 routingKey 中不存在,直接认为匹配失败).
- 两个指针同时到达末尾,则匹配成功,反之匹配失败
public boolean routeTopic(Binding binding, Message message) {
//先进行切分
String[] bindingTokens = binding.getBindingKey().split("\\.");
String[] routingTokens = message.getRoutingKey().split("\\.");
//使用双指针
int bindingIndex = 0;
int routingIndex = 0;
while(bindingIndex < bindingTokens.length && routingIndex < routingTokens.length) {
if(bindingTokens[bindingIndex].equals("*")) {
//遇到 * ,继续向后走
bindingIndex++;
routingIndex++;
} else if(bindingTokens[bindingIndex].equals("#")) {
//如果遇到 #,先看看还有没有下一个位置
bindingIndex++;
if(bindingIndex == bindingTokens.length) {
//后面没有东西,一定匹配成功
return true;
}
//如果后面还有东西,就去 routingKey 后面的位置去找
//findNextMatch 这个方法用来查找该部分再 routingKey 的位置,返回该下标,没找到,就返回 -1
routingIndex = findNextMatch(routingTokens, routingIndex, bindingTokens[bindingIndex]);
if(routingIndex == -1) {
//后面没有匹配结果,失败
return false;
}
//找到了,就继续往后匹配
routingIndex++;
bindingIndex++;
} else {
//如果遇到普通字符串,要求两边的内容是一样的
if(!bindingTokens[bindingIndex].equals(routingTokens[routingIndex])) {
return false;
}
bindingIndex++;
routingIndex++;
}
}
//判定是否双方同时到达末尾
//比如 aaa.bbb.ccc 和 aaa.bbb 是要匹配失败的
return (bindingIndex == bindingTokens.length && routingIndex == routingTokens.length);
}
private int findNextMatch(String[] routingTokens, int routingIndex, String bindingToken) {
for(int i = routingIndex; i < routingTokens.length; i++) {
if(routingTokens[i].equals(bindingToken)) {
return i;
}
}
return -1;
}