PUBLISHER 代码:
func main() {
nc,err := stan.Connect("test-cluster","idc",stan.NatsURL("nats://127.0.0.1:4222"))
if err != nil{
panic(err)
}
fmt.Println("connect succ")
for i:=0;i<10;i++{
fmt.Println("publishing:",i)
err := nc.Publish("tp1",[]byte(strconv.Itoa(i)))
if err != nil{
panic(err)
}
}
nc.Close()
}
QueueSubscriber 代码:
func main() {
nc,err := stan.Connect("test-cluster","subscriber",stan.NatsURL("nats://localhost:4222"))
if err != nil{
panic(err)
}
defer nc.Close()
subs := make([]stan.Subscription,3)
for i:=0;i<3;i++{
workername := "worker"+strconv.Itoa(i)
fmt.Println(fmt.Sprintf("QueueSubscribe %s start",workername))
sub,err := nc.QueueSubscribe("tp1","ch1", func(msg *stan.Msg) {
fmt.Println(workername,"get msg:",string(msg.Data),"start doing something")
time.Sleep(1*time.Second)
},stan.DurableName("subscriber"),stan.AckWait(time.Hour*24))
if err != nil{
panic(err)
}
subs[i] = sub
}
c := make(chan os.Signal, 1)
signal.Notify(c, os.Interrupt, os.Kill)
select{
case <- c:
fmt.Println("Subscriber CLOSE")
for i,_ := range subs{
subs[i].Close()
}
nc.Close()
fmt.Println("quit")
}
}
Publisher 输出:
connect succ
publishing: 0
publishing: 1
publishing: 2
publishing: 3
publishing: 4
publishing: 5
publishing: 6
publishing: 7
publishing: 8
publishing: 9
QueueSubscriber 输出:
QueueSubscribe worker0 start
QueueSubscribe worker1 start
worker0 get msg: 0 start doing something
QueueSubscribe worker2 start
worker0 get msg: 1 start doing something
worker0 get msg: 2 start doing something
worker0 get msg: 3 start doing something
worker0 get msg: 4 start doing something
worker0 get msg: 5 start doing something
worker0 get msg: 6 start doing something
worker0 get msg: 7 start doing something
worker0 get msg: 8 start doing something
worker0 get msg: 9 start doing something
请问朋友们是否有遇到过一样的问题呢?谢谢大家
1
freestyle 2019-08-30 12:44:52 +08:00 via iPhone
这是特性,queue 模式. 如果想每个订阅者都收到,设置不同的 queue 名字或普通方式订阅就行. 可以看下我的博客 https://imhanjm.com/2018/02/17/%E6%B7%B1%E5%85%A5%E7%90%86%E8%A7%A3nats%20&%20nats%20streaming/
|
2
heavyrainn OP @freestyle 额…不是,我的意思是,为啥其他的 worker 不工作,只有 worker0 在工作
|
3
freestyle 2019-08-31 07:16:06 +08:00 via iPhone
@heavyrainn 你这是同一个连接啊 一般 queueSub 是不同的进程即不同的连接 你试试给每个 worker 创建一个连接.
|
4
heavyrainn OP @freestyle 我搞清楚了…是因为没有设置 MaxInflight 值的问题,派出的任务都最先启动的 worker 给接收了。设置 MaxInflight 值为 1 实现了正常分 worker 执行。谢谢啦
|