nsq

nsq中的内部优化和技巧

Posted by Liangjf on September 1, 2020

nsq中的内部优化和技巧

1.内存队列和磁盘队列的选择

func (c *Channel) put(m *Message) error {
    select {
    //推入内存队列
    case c.memoryMsgChan <- m:
    default:
            //内存队列满了, 走落盘的方式
            b := bufferPoolGet()
            err := writeMessageToBackend(b, m, c.backend)
            bufferPoolPut(b)
            c.ctx.nsqd.SetHealth(err)
            if err != nil {
                    ...
            }
    }
    return nil
}

这里是通过select阻塞等待case chan不符合条件后, 走default分支的特性. 实现了先检查内存队列是否可写入, 满了直接写入本地磁盘. 通过这个技巧可言用 简单的几行代码实现消息写内存队列或磁盘队列的策略

2.降低GC压力

nsqd以statsd格式暴露GC统计信息, 可言通过nsqadmin查看

nsq通过以下措施来减少”垃圾”的产生:

  • 避免[]byte到string的转换(在很多框架, 这样的转换随处可见, 这种转换会产生中间变量)
  • 重用缓冲区或对象(对象池), 使用sync.Pool(比如第一点的buf就用到了, 之前看grpc的源码, 对于序列化和编码的对象大量使用了对象池得到技术来避免频繁的创建销毁)
  • 切片/map预分配(假如知道大小, 可以预先分配内存, 避免不必要的扩容, 扩容会引起内存的拷贝, 旧内存就会被等待回收)
  • 避免装箱/拆箱, interface{}(这个概念在c++的STL也存在, 就是要避免用interface{}转换类型)
  • 避免使用defer, 特别是在循环中(增加栈分配)

3.自定义TCP通信协议

[x][x][x][x][x][x][x][x][x][x][x][x]...
|  (int32) ||  (int32) || (binary)
|  4-byte  ||  4-byte  || N-byte
------------------------------------...
    size      frame ID     data

这样数据包更加紧凑, 用json的话会有额外的开销

4.hack方式的类型转换

func getMessageID(p []byte) (*MessageID, error) {
        ...
        return (*MessageID)(unsafe.Pointer(&p[0])), nil
}

从套接字读取的数据类型是[]byte, 而不是string, 并且为了避免从切片复制到MessageID数组产生中间变量, 所以通过unsafe直接强制指针类型转换 这样做其实是不是真正安全的, 使用不当会引起指针越界的问题

5.http统计信息

  • 1.支持pprof的方式查看性能问题
    • go tool pprof http://127.0.0.1:4151/debug/pprof/profile
  • 2.支持通过/stats接口查看性能统计信息

6.节点健康监测(心跳和超时检测)

nsq是push型,消费者连上后,不会马上推消息,消费者初始的RDY为0,nsqd根据RDY来推送消息,从而实现客户端流控的目的. nsqd会定期发送心跳来啊更新RDY的值, 从而进行发送速率的控制, 客户端可以设置心跳的时间间隔 连接的维持是依赖心跳的, 心跳检测失败超时, 会直接断开连接, 避免没用的连接占据资源

7.goroutine的管理

type WaitGroupWrapper struct {
        sync.WaitGroup
}

func (w *WaitGroupWrapper) Wrap(cb func()) {
        w.Add(1)
        go func() {
                cb()
                w.Done()
        }()
}

// can be used as follows:
wg := WaitGroupWrapper{}
wg.Wrap(func() { n.idPump() })
...
wg.Wait()

通过包装 sync.WaitGroup , 实现Add()和Done()在在同一个函数, 并且通过defer的特性来保证goroutine的完成回收 这样做的好处是保证了goroutine的创建和销毁, 不会造成goroutine泄漏