V2EX = way to explore
V2EX 是一个关于分享和探索的地方
现在注册
已注册用户请  登录
The Go Programming Language
http://golang.org/
Go Playground
Go Projects
Revel Web Framework
volvo007
V2EX  ›  Go 编程语言

小白学完 channel 马上就不会了 —— 现在会了(大概)

  •  
  •   volvo007 · 2022-11-07 13:45:50 +08:00 · 1804 次点击
    这是一个创建于 803 天前的主题,其中的信息可能已经有所发展或是发生改变。

    之前的一篇主题发表于 180 天前 都说 go 简单 小白学完 channel 马上就不会了

    原来主题的主要目标是遍历一个文件夹内的所有文件,并根据不同的文件后缀进行归类。我也去爆栈问过,但是回答都和我这样操作文件不安全相关,并没有正面解决这个问题。

    言之种种,在我做了一些针对 channel 的练习之后,算是大概搞清楚了这个例子要怎么写。可能不是 best practice ,但希望能帮到大家,特别是之前一直关注那个帖子的朋友。(有 18 个收藏,所以我一定要把它搞懂不然对不起这 18 个收藏)

    ======== 在开始之前,我们稍微回顾一下之前的逻辑:

    『函数 1 』 是一个简单、耗时很短的功能,它会不断生成一些中间量等待下游处理;『函数 2 』 则是一个复杂、耗时长的功能。如果单线程运行,那么『函数 1 』会长时间等待浪费很多时间。 我们期待通过 goroutine 来并发处理『函数 2 』以达到提升处理性能的目的。

    我用的文件处理案例,根据大佬们所说会有线程安全问题 (多个线程可能会同时创建文件夹,应当加写锁),我们先忽略这个问题,主要还是把握后面的方法论哈

    1 函数 getInfo (f []fs.FileInfo, c chan<- string) 通过遍历 []fs.FileInfo 结构,进行了一些简单判断后(例如忽略文件夹、.DS_store 这种),不断将文件名写入 c 这个 string channel

    func getInfo(f []fs.FileInfo, c chan string) {
    	for _, fs := range f {
    		// 这个 if 只是用来简单忽略掉 文件夹或者 .DS_store 这种
    		if fs.IsDir() || strings.HasPrefix(fs.Name(), ".") {
    			continue
    		} else {
    			c <- fs.Name()
    		}
    	}
    }
    

    2 函数 dealInfo (path string, typeDict map[string]int, c <-chan string) 通过 range 方法,不断获取 c 之前保存的文件名,截取后缀之后,要么转入对应文件夹,要么创建新文件夹再转入

    func dealInfo(path string, typeDict map[string]int, c chan string) {
    	for name := range c {
    		sp := strings.Split(name, ".")
    		suffix := sp[len(sp)-1]
    
    		if _, ok := typeDict[suffix]; ok {
    			MoveFile(name, path, suffix)
    		} else {
    			CreateFolder(path, suffix)
    			MoveFile(name, path, suffix)
    			typeDict[suffix] = 1
    
    			fmt.Println(name)
    		}
    	}
    }
    

    ======== 到这里其实思路上是没有什么问题的,这里最关键的是没有注意到简单练习里不会提到的一个知识点:用 range 遍历 channel 的时候,需要主动 close channel. 否则 range 会阻塞 channel 直到 deadlock panic. 尽管所有 channel 会在 main channel 结束的时候被强制结束. (大概因为 range 遍历 channel 的时候没有错误处理?)

    如果不用 range 的方式来遍历的话,我们需要写一个 if name, ok := <- c; ok { ... } 这样的东西放到一个死循环里面,也就是每次循环都要来手动判断一次 c 里面还有没有东西,没东西了我就跳出循环呗。显然 range 遍历的方式更优雅,但要考虑 close(c) 的时机。

    第二个点则是如何 “并发” 处理 函数 2 。如果只用 go func(),最多只能实现两个 goroutine 之间的通信,所以我们引入了线程池 sync 库来解决这个问题——我们需要给每个 goroutine 加入到线程池里面,但在某个线程工作结束的时候又要把它从池子里面拿掉。最后,还需要一个 wait 函数来通知主线程等待这些线程工作结束。

    具体来说,我们需要改写一下前面的『函数 1 』、『函数 2 』 了:

    对于『函数 1 』,原始伪代码:

    func getInfo(f []fs.FileInfo, c chan<- string){
    	遍历 f { 处理后的 fineName 写入 c }
    }
    

    现在应当改写为:

    func getInfo(f []fs.FileInfo, c chan<- string){
    	// 后面要用 sync.Add 加入池子,所以这里要减去。加入和减去要匹配, 重要!
    	defer wg.Done()
        
    	遍历 f { 处理后的 fineName 写入 c }
    
    	// 后面其他函数会用 range 来遍历,所以一定要 close ,重要!
    	close(c)
    }
    

    对于『函数 2 』,由于会用多个 goroutine 并发,那么每一次都需要一个 wg.Add(1) 来加入线程池,所以每一次我们还要从『函数 2 』里减去这个线程

    原函数 2 伪代码:

    func dealInfo(path string, typeDict map[string]int, c <-chan string){
    	for _, filename := range c {
    		判断文件;
    		处理文件;
    	}
    }
    

    现在改写为:

    func dealInfo(path string, typeDict map[string]int, c <-chan string){
    	defer wg.Done()
    
    	for filename := range c {
    		判断文件;
    		处理文件;
    	}
    }
    

    非常简单,就是在循环前加一个 defer wg.Done() 就可以了。

    最后,我们来写主函数的伪代码:

    import ("sync", "fmt", "time", ... )
    
    var wg sync.WaitGroup // 为了创建多线程并发,准备线程池
    
    func getInfo( ... ) // 实现 func1
    
    func dealInfo( ... ) // 实现 func2
    
    func main(){
    	c := make(chan string, 1000)
    	start := time.Now()
    
    	wg.Add(1)
    	go getInfo(...)
    
    	for i:=0; i<16; i++ {
    		wg.Add(1)
    		go dealInfo(...)
    	}
    
    	wg.Wait()
        
    	fmt.Println("time: ", time.Since(start))
    }
    

    这里应该就能充分暴露前面改写过程中加入的奇怪东西的目的了 😄

    可以发现,wg.Add(1) 之后,一定会紧跟一个带有 defer wg.Done() 的函数,来实现线程加减的匹配

    而对于比较复杂的『函数 2 』 ,我们通过一个循环来加入 Ngoroutine 线程。wg.Add(1) 放在循环里面,同时每个 wg.Add() 都必然对应一个 defer wg.Done() 来匹配

    最后,别忘了放一个 wg.Wait() 来通知主线程等待所有 wg 的线程执行完毕——它靠的就是不断 Add ,之后又不断 Done ,直到池子里线程归零的那一瞬来判断任务全部结束的。所以 AddDone 必须匹配

    另外一个之前没有提到的小改动是,我们建立 c (chan string) 的时候,还给了它一些缓存。这样,由于 getInfo 处理得很快,就可以预存一些结果到 c 里面,在面对 16 个 go dealInfo 的时候,就能保证每个 dealInfo 总是能拿到东西来处理,就不会空闲等待了。这个 N ,我在哪看到资料说是最大 10000 个,好像可以通过配置修改。不过对于大部分的场景,如果要修改这个参数,不如优化代码才是正道

    还有一个地方是,我们在循环加入『函数 2 』 goroutine 的时候,wg.Add(1) 放在了循环里面。由于我知道这里的循环会创建 16 个 goroutine ,所以我们也可以一开始就在循环外面 wg.Add(16) 把它一口气全加进去。由于每个循环有一个 defer wg.Done() ,所以最后线程池还是可以归零的。只是这样写如果后期要扩充数量的话会有点不好维护,还是每个循环 +1 ,N 则通过配置文件来提供更妥当。

    通过这个例子,感觉自己算是摸到了一点 channel 使用的门路。也体会到了一些 『不要通过共享内存来通信,而应该通过通信来共享内存』的设计思路。

    这里还有一个不错的例子,是关于并行获取 < N 的所有素数的。它用到了 3 个 channel 来处理 写入、计算、读取打印。通过这里例子,应该能对 close(channel) 的时机有更好的理解。例子,实现不是很严谨

    对于 go 小白如我,这里也是班门弄斧。只是希望能够帮助到之前收藏我文章的朋友,或者其他入门 go 的小伙伴。

    3 条回复    2022-11-07 23:57:34 +08:00
    zjj19950716
        1
    zjj19950716  
       2022-11-07 14:01:56 +08:00
    这东西不叫线程池吧
    volvo007
        2
    volvo007  
    OP
       2022-11-07 14:16:34 +08:00
    @zjj19950716 嗯嗯,好的,是叫 『等待组』吗?
    volvo007
        3
    volvo007  
    OP
       2022-11-07 23:57:34 +08:00
    补充一下,WaitGroup 是一个计数信号量,可以用来记录并维护运行的 goroutine ,不是线程池。高手们看看笑笑就好,新手们记得自己脑子里做下替换哈

    此外文章里还有些概念、描述上的问题(毕竟不是专业的)。我将测试过的完整代码发到这里大家可以自取

    https://pastebin.com/aHCGYfEr

    已知问题:由于一开始会有若干线程同时尝试创建文件夹,所以会有几个 "file existed" 错误。捕获打印之后可以继续执行代码
    关于   ·   帮助文档   ·   博客   ·   API   ·   FAQ   ·   实用小工具   ·   2907 人在线   最高记录 6679   ·     Select Language
    创意工作者们的社区
    World is powered by solitude
    VERSION: 3.9.8.5 · 22ms · UTC 07:39 · PVG 15:39 · LAX 23:39 · JFK 02:39
    Developed with CodeLauncher
    ♥ Do have faith in what you're doing.