package main
import (
"fmt"
"math/rand"
"sync"
"time"
)
var a sync.WaitGroup
var ch = make(chan int)
func test(n int){
time.Sleep(time.Duration(rand.Intn(3)) * time.Second)
//fmt.Println(n)
a.Done()
ch <- n
}
func main() {
for i :=0;i<100;i++ {
a.Add(1)
go test(i)
}
a.Wait()
//close(ch)
//for i:=0;i<100;i++ {
for x := range ch {
//x:= <-ch
//fmt.Println(len(ch))
fmt.Println(x)
}
}
这样会死锁;a.wait()后 关闭会报错 panic: send on closed channel
搜索都说 用 range 取管道要关闭,这个关闭到底在哪关。。。
需求就是想控制并发数量,获取 test 方法的返回值,求解这个代码要咋改
然后想问下,生产实践中 并发并取协程返回值 用的哪个 包 /库,大佬们求指路~ 不胜感激
1
linvon 2021-03-13 15:59:25 +08:00
defer a.Done() 你的代码 test 还没执行完,刚进来就 done 了
range 会阻塞获取一直到 channel 关闭,所以可以改成协程来跑,你的代码只需要将获取结果和执行 test 两部分分开就行了 一般这种简单的扇入扇出操作手写就可以了,可以去看看 GitHub 上搜下关键词 fan in fan out 看有没有好的 |
2
wateryessence 2021-03-13 16:28:18 +08:00
|
3
chotow 2021-03-13 16:34:09 +08:00 1
这样子不叫死锁,具体原因是 channel 未关闭,所以 range 阻塞。
a.Done 在 ch <- n 之前,所以 Wait 后进行 close 会 panic 。 关闭 channel 的话,先按一楼这位朋友所说,给 Done 加 defer,然后在外层用 goroutine 进行 Wait 和 close:go func () {a.Wait(); close(ch)}() 控制并发数量可以用带缓冲的 channel,goroutine 开启前 ch <- struct{}{},任务完成后 <- ch 。 |
5
garyox64 OP @chotow 前两个点明白了,感谢🙏
第三个,为啥需要在外层用 goroutine 进行 wait 和 close,这块不是很清楚,感觉跟 groutine 的调度有关系… |
6
seth19960929 2021-03-13 18:44:02 +08:00
你这控制并发没必要用 WaitGroup, 直接一个带缓冲的 channel,
然后在 for 循环下面一开始写入 channel, 然后 goroutine 写出, 这样就能达到了. 最简单的方式 |
7
garyox64 OP @seth19960929 是的,我傻了。。。我一开始写的 waitgroup,然后往里加 channel 。。。中毒了
|
8
seth19960929 2021-03-13 18:48:06 +08:00
package main
import "fmt" import "time" var ch = make(chan int, 10) func main() { for i := 0; i < 30; i ++ { ch <- i go test(i) } } func test(n int) { fmt.Println(n) time.Sleep(time.Second * 3) <- ch } http://www.dooccn.com/go/#id/7bc611ea7a0d9a73ec159daace8cd6e7 |
9
garyox64 OP @seth19960929 感谢!!
另:原来教程里都有,5555555 https://gobyexample.com/closing-channels 我的需求总结起来就是 能控制并发量、并且拿到返回的,并发池 原来教程里也有: https://gobyexample.com/worker-pools --- 以上感谢诸位,祝周末愉快! |
10
seth19960929 2021-03-13 20:10:07 +08:00
上面的代码有点误人子弟, 进程结束了, 还有 goroutine 没跑完. 还是改一下.
``` package main import ( "fmt" "sync" "time" ) var ( count = 10 ch = make(chan int, count) ) func main() { wg := sync.WaitGroup{} for i := 0; i < count * 3; i ++ { ch <- i wg.Add(1) go test(i, &wg) } wg.Wait() } func test(i int, wg *sync.WaitGroup) { // TODO fmt.Printf("run task %d\n", i) time.Sleep(time.Second * 3) <- ch wg.Done() } ``` http://www.dooccn.com/go/#id/6e2ac7e2e4c73ce14fef393c93d4c80f |