nsq

nsqd的网络通信和对网络逻辑处理框架的思考

Posted by Liangjf on September 1, 2020

nsqd的网络通信和对网络逻辑处理框架的思考

nsqd网络通信

nsqd中关于通信的一些小点:

  • tcp服务是用于监听客户端连接, 使用了多路复用模型(go标准库的tcp服务是基于epoll/kqueue/select等的)

  • 暴露http服务, 提供接口直接发布消息到nsqd, 获取topic channel信息

  • 元数据持久化元数据到文件是json字符串对象

  • nsqd请求nsqlookup获取topic channel等数据是通过http请求获取的

  • 客户端连接nsqd后,先发送4个字节的head, 主要是验证版本号

  • 按行读取客户端发送的数据, \n或者\r\n切分, 命令之间用 “ “ 隔开

  • nsqd推送消息格式

      // message format:
      // [x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x]...
      // |       (int64)        ||    ||      (hex string encoded in ASCII)           || (binary)
      // |       8-byte         ||    ||                 16-byte                      || N-byte
      // ------------------------------------------------------------------------------------------...
      //   nanosecond timestamp    ^^                   message ID                       message body
      //                        (uint16)
      //                         2-byte
      //                        attempts
      head+body
      head: 4字节整个包大小
      body: 数据包
    

客户端连接nsqd的流程:

  • 发送4字节的魔数版本号,验证版本
  • 发送 IDENTIFY 事件, 上报元数据
  • 发送 SUB 事件, 订阅topic
  • 此时客户端的RDY为0, 表示未准备好接收消息, nsqd不推送
  • 当客户端准备好接收消息, 更新RDY, nsqd会定时通过心跳来维持连接和获取RDY的值, 当RDY大于0, nsqd推送消息给客户端
  • 若超过2个心跳客户端未回应, nsqd关闭连接

对于大多数网络逻辑处理框架的思考

关键词: “读写分离”, 生产者消费者模型, 异步

读(面向每个client):

  • 每个client有单独的goroutine来负责client的请求
  • 构造消息(消息id+clientID), 投入内存队列chan
  • 接收客户端请求结束, 剩下的交给核心的消息处理goroutine

写(面向nsqd):

nsqd有一个核心的消息处理goroutine(messagePump)

  • 负责监听内存队列chan的所有消息
  • 根据消息(msg.clientID)找到client对象
  • 推送消息给client

二者的能够联动起来是由以下组成:

  • 消息绑定clientID(有一个消息map key-msgId, value-msg(id+clientID+data…))
  • 内存队列(积压队列)和延时队列, 客户端goroutine负责读数据并构造消息, 投入队列, nsqd的核心的消息处理goroutine监听并取出根据clientID找到client, 最后发送给client就行

同时有以下goroutine辅助:

  • 扫描消息队列goroutine负责把队列中的过期消息取出更新时间+重试+放回队列
  • 统计信息goroutine负责统计topic, channel, 消息等信息

这样的架构很常见, 可以把读和写分离, 其实就是生产者-消费者模型. 通过队列来解耦, 并且起到缓冲的作用, 并且异步推送消息和重试确认消息, 达到提高性能的目的.