nsq

nsq集群和踩坑

Posted by Liangjf on June 5, 2019

nsq集群和踩坑

nsq集群和可靠性:

  • 多播。1个nsqd多个topic,多组channel,可以多播
  • 降级。消息处理失败,消费者会降级,不会损坏系统核心功能
  • 解耦。上游数据接收数据,写入nsq中,下游异步的取数据做处理。保证上下游的健壮,稳定
  • 重试。当发送消息失败,会发送REQ到nsqd,或者发送消息超过有效时间窗口时间,会重新入队再次等待发送,保证消息发送被消费。
  • 指数回退。当处理消息失败,会根据次数而指数的程度来延长消息接收处理时间。当处于回退状态并且被正确处理后,延长时间置0,即消息即刻响应。
  • 持久性。当消息堆积大于设置的最大消息,多余的消息会持久到本地硬盘。
  • 归档。nsq_to_file归档nsq的topic,用于备份回溯。
  • 归档通道。一个topic可以有多个channel的,每个channel都可以有相同的数据副本,所以即使有一个channel坏了,也不会影响 nsq_to_file 这个归档通道。
  • 分布式系统。容错性,必定前面有负载均衡模块。
  • 冗余。单独一台机器来做归档处理,或者加多一个。
  • 集群。nsqlookupd用于运行时集群中管理协助各个nsqd的状态
  • nsqlookupd。守护进程,在运行时记录和传播NSQ集群的状态。nsqd实例维护到nsqlookupd的持久TCP连接,并跨线路推送状态更改。具体地说,nsqd将自己注册为给定topic以及它所知道的所有channel的生产者。这允许使用者查询nsqlookupd来确定感兴趣的主题的生产者,而不是写死该配置。随着时间的推移,他们将了解到新生产商的存在,并能够跳过失败。
  • 生产者和消费者解耦。因为nsqlookupd现在充当两者之间的目录服务。
  • 一致性。nsqlookupd不相互协调或保持一致,只是保持最终一致性。消费者通常只需要一个nsqlookupd来提供他们需要的信息

使用踩坑

  • 当多个nsqd服务都有相同的topic的时候,consumer要修改默认设置config.MaxInFlight才能连接。
  • consumer与topic没有直接联系,而是通过具体的channel接受数据。如果consumer退出,channel不会自动删除。 如果不再需要,需要通过http端口删除channel,否则很可能会导致磁盘空间不足。

      config:=nsq.NewConfig() 
      //最大允许向两台NSQD服务器接受消息,默认是1,要特别注意 
      config.MaxInFlight=2
      c1, err1 := nsq.NewConsumer("test", "channel", config) // 新建一个消费者
    

nsq 消费消息有延迟,通过配置让 nsq 收到消息后,订阅的消费者能够立即收到消息?

  • 经过实际测试发现是在我们的使用过程中Consumer的默认MaxInFlight长度为1导致的。如果nsq是集群使用,当值为1,consumer仅处理一个nsqd单点消息(RDY 1),则会出现在另外的nsqd单点的消息需要很长时间被消费。这个值应该大于等于实际的nsqd节点。

测试demo放在了 github

通过gif看前后对比