首页 > golang > golang中redis对redigo的发布订阅机制的使用
2022
01-11

golang中redis对redigo的发布订阅机制的使用

redigo 对redis的订阅机制放在pubsub.go里面

PubSubConn封装Conn以实现订阅者提供简便方法。Subscribe,PSubscribe,Unsubscribe和PUnsubscribe方法发送和刷新订阅。receive方法将推送的消息转换对应的类型

// Receive returns a pushed message as a Subscription, Message, Pong or error.
// The return value is intended to be used directly in a type switch as
// illustrated in the PubSubConn example

func (c PubSubConn) Receive() interface{
    } {
    return c.receiveInternal(c.Conn.Receive())
}

返回的是一个空接口类型 interface{},由于空接口没有方法,因此所有类型都实现了空接口,也就是说可以返回任意类型。


返回类型

具体会返回哪些类型在receiveInternal()里面可以看到,
目前返回的三种Message、Subscription、Pong都定义在了pubsub.go 里面。

func (c PubSubConn) receiveInternal(replyArg interface{
    }, errArg error) interface{
    } {

    reply, err := Values(replyArg, errArg)
    if err != nil {

        return err
    }

    var kind string
    reply, err = Scan(reply, &kind)
    if err != nil {

        return err
    }

    switch kind {

    case "message":
        var m Message
        if _, err := Scan(reply, &m.Channel, &m.Data); err != nil {

            return err
        }
        return m
    case "pmessage":
        var m Message
        if _, err := Scan(reply, &m.Pattern, &m.Channel, &m.Data); err != nil {

            return err
        }
        return m
    case "subscribe", "psubscribe", "unsubscribe", "punsubscribe":
        s := Subscription{
    Kind: kind}
        if _, err := Scan(reply, &s.Channel, &s.Count); err != nil {

            return err
        }
        return s
    case "pong":
        var p Pong
        if _, err := Scan(reply, &p.Data); err != nil {

            return err
        }
        return p
    }
    r

订阅示例

package main

import(
    //"github.com/go-redis/redis"
  "fmt"
  "time"
  //"reflect"
  "unsafe"
    "github.com/gomodule/redigo/redis"
    log "github.com/astaxie/beego/logs"
)

type SubscribeCallback func (channel, message string)

type Subscriber struct {

  client redis.PubSubConn
  cbMap map[string]SubscribeCallback
}

func (c *Subscriber) Connect(ip string, port uint16) {

  conn, err := redis.Dial("tcp", "127.0.0.1:6379")
  if err != nil {

      log.Critical("redis dial failed.")
  }

  c.client = redis.PubSubConn{
    conn}
  c.cbMap = make(map[string]SubscribeCallback)

  go func() {

    for {

        log.Debug("wait...")
        //读取channel消息 并检测收到的消息类型
        switch res := c.client.Receive().(type) {
          case redis.Message:
              channel := (*string)(unsafe.Pointer(&res.Channel))
              message := (*string)(unsafe.Pointer(&res.Data))
              c.cbMap[*channel](*channel, *message)
          case redis.Subscription:
              //订阅成功,同时订阅多个时,这地方会收到多次
              fmt.Printf("%s: %s %d\n", res.Channel, res.Kind, res.Count)
          case error:
            log.Error("error handle...")
            //如果是长连接项目,这里最好return,否则客户端断开时,这里可能会造成死循环
            return
        }
    }
  }()

}

func (c *Subscriber) Close() {

  err := c.client.Close()
  if err != nil{

    log.Error("redis close error.")
  }
}

func (c *Subscriber) Subscribe(channel interface{
    }, cb SubscribeCallback) {

  err := c.client.Subscribe(channel)
  if err != nil{

    log.Critical("redis Subscribe error.")
  }

  c.cbMap[channel.(string)] = cb
}

func TestCallback1(chann, msg string){

  log.Debug("TestCallback1 channel : ", chann, " message : ", msg)
}

func TestCallback2(chann, msg string){

  log.Debug("TestCallback2 channel : ", chann, " message : ", msg)
}

func TestCallback3(chann, msg string){

  log.Debug("TestCallback3 channel : ", chann, " message : ", msg)
}

func main() {


  log.Info("===========main start============")

  var sub Subscriber
  sub.Connect("127.0.0.1", 6397)
  sub.Subscribe("test_chan1", TestCallback1)
  sub.Subscribe("test_chan2", TestCallback2)
  sub.Subscribe("test_chan3", TestCallback3)

  for{

   time.Sleep(1 * time.Second)
  }
}


Alt text

常见问题

这地方如果用redis的连接池的话,要注意不要设置读取数据的超时时间,否则到了超时时间,就会断开连接了,报错如下:

2022/07/12 15:43:14 wait...
test1: subscribe 1
2022/07/12 15:43:14 wait...
[GIN] 2022/07/12 - 15:43:26 | 200 |    1.129741ms |  222.128.58.255 | GET      "
/gotest/pub"
2022/07/12 15:43:26 {test1  [104 101 108 108 111]}
2022/07/12 15:43:26 0xc00009e540 0xc00009e560
2022/07/12 15:43:26 wait...
2022/07/12 15:43:46 error handle... read tcp 10.10.2.8:57724->123.207.190.86:637
9: i/o timeout
2022/07/12 15:43:46 wait...
2022/07/12 15:43:46 error handle... read tcp 10.10.2.8:57724->123.207.190.86:637
9: use of closed network connection
2022/07/12 15:43:46 wait...
2022/07/12 15:43:46 error handle... read tcp 10.10.2.8:57724->123.207.190.86:637
9: use of closed network connection
2022/07/12 15:43:46 wait...
2022/07/12 15:43:46 error handle... read tcp 10.10.2.8:57724->123.207.190.86:637
9: use of closed network connection
2022/07/12 15:43:46 wait...

如果是长连接项目,case error 的情况下这里最好return,否则客户端断开时,这里可能会造成死循环(break只会退出switch,不会退出外边的for)


发布示例

发布直接使用默认的Conn来Send “Publish“ 命令即可.
redigo的管道的使用方法设计到三个函数,Do函数也是下面这三个函数的合并:

c.Send("SUBSCRIBE", "example")
c.Flush()
for {
    reply, err := c.Receive()
    if err != nil {
        return err
    }
    // process pushed message
}

send()方法把命令写到输出缓冲区,Flush()把缓冲区的命令刷新到redis服务器,Receive()函数接收redis给予的响应,三个操作共同完成一套命令流程。

package main

import(
  //"github.com/go-redis/redis"
  "github.com/gomodule/redigo/redis"
  log "github.com/astaxie/beego/logs"
)

func main() {


  client, err := redis.Dial("tcp", "127.0.0.1:6379")
  if err != nil {

      log.Critical("redis dial failed.")
  }
  defer client.Close()

  _, err = client.Do("Publish", "test_chan1", "hello")
  if err != nil {

    log.Critical("redis Publish failed.")
  }

  _, err = client.Do("Publish", "test_chan2", "hello")
  if err != nil {

    log.Critical("redis Publish failed.")
  }

  _, err = client.Do("Publish", "test_chan3", "hello")
  if err != nil {

    log.Critical("redis Publish failed.")
  }

}

Alt text


PubSubConn

定义

type PubSubConn struct {    
    Conn Conn
}


提供的方法:

1.Close 关闭连接

func (c PubSubConn) Close() error


2.PSubscribe 订阅channel支持通配符匹配

func (c PubSubConn) PSubscribe(channel ...interface{}) error


3.PUnsubscribe 取消发布, 如果没有给定, 则取消所有

func (c PubSubConn) PUnsubscribe(channel ...interface{}) error



4.Ping 指定的数据向服务器发送PING 调用此方法时,连接必须至少订阅一个通道或模式

func (c PubSubConn) Ping(data string) error


5.Receive 获取消息

func (c PubSubConn) Receive() interface{}


6.ReceiveWithTimeout 带有超时时间的获取消息函数

func (c PubSubConn) ReceiveWithTimeout(timeout time.Duration) interface{}



7.Subscribe 订阅

func (c PubSubConn) Subscribe(channel ...interface{}) error


8.Unsubscribe 取消订阅

func (c PubSubConn) Unsubscribe(channel ...interface{}) error


go redis发布订阅常用函数

Subscribe - 订阅channel

PSubscribe - 订阅channel支持通配符匹配

Publish - 将信息发送到指定的channel。

PubSub Channels - 查询活跃的channel

127.0.0.1:6379> pubsub channels
 1) "forum_38"
 2) "forum_99"
 3) "forum_79"

PubSub NumSub - 查询指定的channel有多少个订阅者

127.0.0.1:6379> pubsub numsub forum_38 forum_10
1) "forum_38"
2) (integer) 1
3) "forum_10"
4) (integer) 1



本文》有 0 条评论

留下一个回复