searchusermenu
  • 发布文章
  • 消息中心
点赞
收藏
评论
分享
原创

Golang中优秀的消息队列NSQ基础安装及使用

2024-08-29 09:42:14
41
0

前言

NSQ是Go语言编写的,开源的分布式消息队列中间件,其设计的目的是用来大规模地处理每天数以十亿计级别的消息。NSQ 具有分布式和去中心化拓扑结构,该结构具有无单点故障、故障容错、高可用性以及能够保证消息的可靠传递的特征,是一个成熟的、已在大规模生成环境下应用的产品。

背景介绍

在服务器最开始的时候,基本上在一台主机上就能解决大部分问题,所以一般架构设计如下:

但是,突然某一天,来了一个新需求,我们服务器上不只是简单的储存一些文本信息,我们需要储存图片甚至视频,显然直接在一台主机上再部署一个文件服务是不科学的,因为它会占用大量的流量,这时候我们就要考虑单独使用一个文件服务器了,所以我们调整一下架构:

通过nginx这个高性能代理服务器,有效的处理不同用户请求的诉求:

然后接下来某一天,随着用户越来越多,你发现服务器的负载越来越高,这个时候,就要考虑去分离业务服务和数据库服务了,设计再次修改:

 

接下来又过了一些日子,新的问题又出现了,你们公司的业务越做越大,用户越来越多,有用户投诉,收到的响应延迟很大甚至出现了无响应的情况,你检查了一下服务器,发现每天峰值的时候,有一小部分数据是经常要使用的,频繁的从数据库读出这部分数据,占用了大量的数据库资源,然后我们的设计又要改了:

把一些常用的数据储存到缓存中,遵循国际惯例的二八原则(80%的请求读取20%的数据),这样一来,数据库的压力负担自然可以减少很多。

当你觉得高枕无忧的时候,突然老板一时兴起,我们来搞一个秒杀活动吧,这时候你一定会绞尽脑汁解决不能多买多卖的问题(一瞬间高并发的常见问题),好吧,终于轮到我们消息队列出场了(当然消息队列不是唯一的解决方案),我们先把设计贴出来:

好的,所有的请求都先注入消息队列,然后立刻给予请求响应,因为注入消息队列实际上向内存写入数据的过程,所以响应的速度会非常的快,然后后面的业务服务器再根据消息的队列的内容逐个读出(相当于异步读取),所以可以大大削减数据库的压力,避免多买多卖的问题。当然后面随着业务的扩大,还有很多问题要解决,比如使用负载均衡搭建多个业务服务器,使用分布式部署数据库,搭载高可用架构等,但是今天只是仅仅为了引出消息队列它的应用场景,所以我们就不往下讨论了。

正文

从官网下载对应的nsq版本,我下载的是linux最新稳定版

下载解压之后,在/usr/下建立一个目录,接着把解压文件夹/bin/下面的文件全部拷贝进去,最后在/etc/profile添加引用路径,这样就可以直接使用命令启动nsq服务了,我的配置如下

我们先介绍一下几个必要服务的作用

nsqlookupd:

主要负责服务发现 负责nsqd的心跳、状态监测,给客户端、nsqadmin提供nsqd地址与状态,启动命令如下:

nsqd:

负责接收消息,存储队列和将消息发送给客户端,nsqd 可以多机器部署,当你使用客户端向一个topic发送消息时,可以配置多个nsqd地址,消息会随机的分配到各个nsqd上,nsqd优先把消息存储到内存channel中,当内存channel满了之后,则把消息写到磁盘文件中。他监听了两个tcp端口,一个用来服务客户端,一个用来提供http的接口

 nsqadmin:

nsqadmin是一个web管理界面 

启动之后,通过管理地址可以访问这个管理页面, 默认使用4171端口

我们先来说明一下这个后台里面的一些内容,因为我们的NSQ所使用的是经典的pub/sub模式(发布/订阅,典型的生产者/消费者模式),我们可以先发布一个主题到NSQ,然后所有订阅的服务器就会异步的从这里读取主题的内容:

Topic(左上角):发布的主题名字

NSQd Host:Nsq主机服务地址

Channel:消息通道

NSQd Host:Nsq主机服务地址

Depth:消息积压量

In-flight:已经投递但是还未消费掉的消息

Deferred:没有消费掉的延时消息

Messages:服务器启动之后,总共接收到的消息量

Connections:通道里面客户端的订阅数

TimeOut:超时时间内没有被响应的消息数

Memory + Disk:储存在内存和硬盘中总共的消息数

------------------------------------------------------------------------------华丽的分割线-------------------------------------------------------------------------

接着,我们讲解如何在代码中发布主题内容,然后通过订阅某主题去异步读取消息

使用官方提供的下载地址:

go get github.com/nsqio/go-nsq

先创建一个主题,并且发布100条消息:

package main
 
import (
    "github.com/nsqio/go-nsq"
    "fmt"
)
 
var (
    //nsqd的地址,使用了tcp监听的端口
    tcpNsqdAddrr = "10.10.6.147:4150"
)
 
func main() {
    //初始化配置
    config := nsq.NewConfig()
    for i := 0; i < 100; i++ {
        //创建100个生产者
        tPro, err := nsq.NewProducer(tcpNsqdAddrr, config)
        if err != nil {
            fmt.Println(err)
        }
        //主题
        topic := "Insert"
        //主题内容
        tCommand := "new data!"
        //发布消息
        err = tPro.Publish(topic, []byte(tCommand))
        if err != nil {
            fmt.Println(err)
        }
    }
 
}

接下来我们看看admin的显示内容:

我们可以看到Nsqd接收到了100条信息,100条信息都储存在内存中,没有被消化。

现在没有任何服务订阅了我们的主题,所以主题的消息都没有被消化,那我们创建一个消费者去订阅我们的主题:

package main
 
import (
    "github.com/nsqio/go-nsq"
    "fmt"
    "sync"
    "time"
)
 
var (
    //nsqd的地址,使用了tcp监听的端口
    tcpNsqdAddrr = "10.10.6.147:4150"
)
 
//声明一个结构体,实现HandleMessage接口方法(根据文档的要求)
type NsqHandler struct {
    //消息数
    msqCount int64
    //标识ID
    nsqHandlerID string
}
 
//实现HandleMessage方法
//message是接收到的消息
func (s *NsqHandler) HandleMessage(message *nsq.Message) error {
    //没收到一条消息+1
    s.msqCount++
    //打印输出信息和ID
    fmt.Println(s.msqCount,s.nsqHandlerID)
    //打印消息的一些基本信息
    fmt.Printf("msg.Timestamp=%v, msg.nsqaddress=%s,msg.body=%s \n", time.Unix(0  , message.Timestamp).Format("2006-01-02 03:04:05") , message.NSQDAddress, string(message.Body))
    return nil
}
 
func main() {
 
    //初始化配置
    config := nsq.NewConfig()
    //创造消费者,参数一时订阅的主题,参数二是使用的通道
    com, err := nsq.NewConsumer("Insert", "channel1", config)
    if err != nil {
        fmt.Println(err)
    }
    //添加处理回调
    com.AddHandler(&NsqHandler{nsqHandlerID:"One"})
    //连接对应的nsqd
    err = com.ConnectToNSQD(tcpNsqdAddrr)
    if err != nil {
        fmt.Println(err)
    }
 
    //只是为了不结束此进程,这里没有意义
    var wg  = &sync.WaitGroup{}
    wg.Add(1)
    wg.Wait()
    
    /*
    result:
    
    msg.Timestamp=2018-11-02 04:37:18, msg.nsqaddress=10.10.6.147:4150,msg.body=new data! 
    98 One
    msg.Timestamp=2018-11-02 04:37:18, msg.nsqaddress=10.10.6.147:4150,msg.body=new data! 
    99 One
    msg.Timestamp=2018-11-02 04:37:18, msg.nsqaddress=10.10.6.147:4150,msg.body=new data! 
    100 One
    msg.Timestamp=2018-11-02 04:37:18, msg.nsqaddress=10.10.6.147:4150,msg.body=new data!
    
    */
    
}
 
 

这里可以看到,之前挤压的100条信息,都被我们的订阅者消化掉了,也就是读取了

所以我们的订阅者(可以有多个)如果提前订阅主题的话,只要对应的主题有发布新内容,就可以马上异步读取。

完结

消息队列的应用场景还有很多,Nsq这里也只是先介绍了一下入门知识,有兴趣的朋友可以继续深入了解,最后还是欢迎大家多提意见。

0条评论
0 / 1000
l****n
3文章数
0粉丝数
l****n
3 文章 | 0 粉丝
原创

Golang中优秀的消息队列NSQ基础安装及使用

2024-08-29 09:42:14
41
0

前言

NSQ是Go语言编写的,开源的分布式消息队列中间件,其设计的目的是用来大规模地处理每天数以十亿计级别的消息。NSQ 具有分布式和去中心化拓扑结构,该结构具有无单点故障、故障容错、高可用性以及能够保证消息的可靠传递的特征,是一个成熟的、已在大规模生成环境下应用的产品。

背景介绍

在服务器最开始的时候,基本上在一台主机上就能解决大部分问题,所以一般架构设计如下:

但是,突然某一天,来了一个新需求,我们服务器上不只是简单的储存一些文本信息,我们需要储存图片甚至视频,显然直接在一台主机上再部署一个文件服务是不科学的,因为它会占用大量的流量,这时候我们就要考虑单独使用一个文件服务器了,所以我们调整一下架构:

通过nginx这个高性能代理服务器,有效的处理不同用户请求的诉求:

然后接下来某一天,随着用户越来越多,你发现服务器的负载越来越高,这个时候,就要考虑去分离业务服务和数据库服务了,设计再次修改:

 

接下来又过了一些日子,新的问题又出现了,你们公司的业务越做越大,用户越来越多,有用户投诉,收到的响应延迟很大甚至出现了无响应的情况,你检查了一下服务器,发现每天峰值的时候,有一小部分数据是经常要使用的,频繁的从数据库读出这部分数据,占用了大量的数据库资源,然后我们的设计又要改了:

把一些常用的数据储存到缓存中,遵循国际惯例的二八原则(80%的请求读取20%的数据),这样一来,数据库的压力负担自然可以减少很多。

当你觉得高枕无忧的时候,突然老板一时兴起,我们来搞一个秒杀活动吧,这时候你一定会绞尽脑汁解决不能多买多卖的问题(一瞬间高并发的常见问题),好吧,终于轮到我们消息队列出场了(当然消息队列不是唯一的解决方案),我们先把设计贴出来:

好的,所有的请求都先注入消息队列,然后立刻给予请求响应,因为注入消息队列实际上向内存写入数据的过程,所以响应的速度会非常的快,然后后面的业务服务器再根据消息的队列的内容逐个读出(相当于异步读取),所以可以大大削减数据库的压力,避免多买多卖的问题。当然后面随着业务的扩大,还有很多问题要解决,比如使用负载均衡搭建多个业务服务器,使用分布式部署数据库,搭载高可用架构等,但是今天只是仅仅为了引出消息队列它的应用场景,所以我们就不往下讨论了。

正文

从官网下载对应的nsq版本,我下载的是linux最新稳定版

下载解压之后,在/usr/下建立一个目录,接着把解压文件夹/bin/下面的文件全部拷贝进去,最后在/etc/profile添加引用路径,这样就可以直接使用命令启动nsq服务了,我的配置如下

我们先介绍一下几个必要服务的作用

nsqlookupd:

主要负责服务发现 负责nsqd的心跳、状态监测,给客户端、nsqadmin提供nsqd地址与状态,启动命令如下:

nsqd:

负责接收消息,存储队列和将消息发送给客户端,nsqd 可以多机器部署,当你使用客户端向一个topic发送消息时,可以配置多个nsqd地址,消息会随机的分配到各个nsqd上,nsqd优先把消息存储到内存channel中,当内存channel满了之后,则把消息写到磁盘文件中。他监听了两个tcp端口,一个用来服务客户端,一个用来提供http的接口

 nsqadmin:

nsqadmin是一个web管理界面 

启动之后,通过管理地址可以访问这个管理页面, 默认使用4171端口

我们先来说明一下这个后台里面的一些内容,因为我们的NSQ所使用的是经典的pub/sub模式(发布/订阅,典型的生产者/消费者模式),我们可以先发布一个主题到NSQ,然后所有订阅的服务器就会异步的从这里读取主题的内容:

Topic(左上角):发布的主题名字

NSQd Host:Nsq主机服务地址

Channel:消息通道

NSQd Host:Nsq主机服务地址

Depth:消息积压量

In-flight:已经投递但是还未消费掉的消息

Deferred:没有消费掉的延时消息

Messages:服务器启动之后,总共接收到的消息量

Connections:通道里面客户端的订阅数

TimeOut:超时时间内没有被响应的消息数

Memory + Disk:储存在内存和硬盘中总共的消息数

------------------------------------------------------------------------------华丽的分割线-------------------------------------------------------------------------

接着,我们讲解如何在代码中发布主题内容,然后通过订阅某主题去异步读取消息

使用官方提供的下载地址:

go get github.com/nsqio/go-nsq

先创建一个主题,并且发布100条消息:

package main
 
import (
    "github.com/nsqio/go-nsq"
    "fmt"
)
 
var (
    //nsqd的地址,使用了tcp监听的端口
    tcpNsqdAddrr = "10.10.6.147:4150"
)
 
func main() {
    //初始化配置
    config := nsq.NewConfig()
    for i := 0; i < 100; i++ {
        //创建100个生产者
        tPro, err := nsq.NewProducer(tcpNsqdAddrr, config)
        if err != nil {
            fmt.Println(err)
        }
        //主题
        topic := "Insert"
        //主题内容
        tCommand := "new data!"
        //发布消息
        err = tPro.Publish(topic, []byte(tCommand))
        if err != nil {
            fmt.Println(err)
        }
    }
 
}

接下来我们看看admin的显示内容:

我们可以看到Nsqd接收到了100条信息,100条信息都储存在内存中,没有被消化。

现在没有任何服务订阅了我们的主题,所以主题的消息都没有被消化,那我们创建一个消费者去订阅我们的主题:

package main
 
import (
    "github.com/nsqio/go-nsq"
    "fmt"
    "sync"
    "time"
)
 
var (
    //nsqd的地址,使用了tcp监听的端口
    tcpNsqdAddrr = "10.10.6.147:4150"
)
 
//声明一个结构体,实现HandleMessage接口方法(根据文档的要求)
type NsqHandler struct {
    //消息数
    msqCount int64
    //标识ID
    nsqHandlerID string
}
 
//实现HandleMessage方法
//message是接收到的消息
func (s *NsqHandler) HandleMessage(message *nsq.Message) error {
    //没收到一条消息+1
    s.msqCount++
    //打印输出信息和ID
    fmt.Println(s.msqCount,s.nsqHandlerID)
    //打印消息的一些基本信息
    fmt.Printf("msg.Timestamp=%v, msg.nsqaddress=%s,msg.body=%s \n", time.Unix(0  , message.Timestamp).Format("2006-01-02 03:04:05") , message.NSQDAddress, string(message.Body))
    return nil
}
 
func main() {
 
    //初始化配置
    config := nsq.NewConfig()
    //创造消费者,参数一时订阅的主题,参数二是使用的通道
    com, err := nsq.NewConsumer("Insert", "channel1", config)
    if err != nil {
        fmt.Println(err)
    }
    //添加处理回调
    com.AddHandler(&NsqHandler{nsqHandlerID:"One"})
    //连接对应的nsqd
    err = com.ConnectToNSQD(tcpNsqdAddrr)
    if err != nil {
        fmt.Println(err)
    }
 
    //只是为了不结束此进程,这里没有意义
    var wg  = &sync.WaitGroup{}
    wg.Add(1)
    wg.Wait()
    
    /*
    result:
    
    msg.Timestamp=2018-11-02 04:37:18, msg.nsqaddress=10.10.6.147:4150,msg.body=new data! 
    98 One
    msg.Timestamp=2018-11-02 04:37:18, msg.nsqaddress=10.10.6.147:4150,msg.body=new data! 
    99 One
    msg.Timestamp=2018-11-02 04:37:18, msg.nsqaddress=10.10.6.147:4150,msg.body=new data! 
    100 One
    msg.Timestamp=2018-11-02 04:37:18, msg.nsqaddress=10.10.6.147:4150,msg.body=new data!
    
    */
    
}
 
 

这里可以看到,之前挤压的100条信息,都被我们的订阅者消化掉了,也就是读取了

所以我们的订阅者(可以有多个)如果提前订阅主题的话,只要对应的主题有发布新内容,就可以马上异步读取。

完结

消息队列的应用场景还有很多,Nsq这里也只是先介绍了一下入门知识,有兴趣的朋友可以继续深入了解,最后还是欢迎大家多提意见。

文章来自个人专栏
NSQ
1 文章 | 1 订阅
0条评论
0 / 1000
请输入你的评论
1
1