golang中有2种方式同步程序,一种使用channel,另一种使用sync.WaitGroup。最近在使用golang写一个比较简单的功能 —- host1主机需要先在本机起一个TCP监听,起来后给host2主机发送指令,让其主动给host1主机监听的端口进行连接。最终使用了sync.WaitGroup实现了该功能。本篇就结合一些示例来看下两者的使用。
一、channel并行同步
比如有三个需要取数据的程序同时进行,但是终需要同步并返回数据。我们可以按如下代码操作:
package main import ( "fmt" "time" ) func main() { messages := make(chan int) go func() { time.Sleep(time.Second * 3) messages <- 1 }() go func() { time.Sleep(time.Second * 2) messages <- 2 }() go func() { time.Sleep(time.Second * 1) messages <- 3 }() go func() { for i := range messages { fmt.Println(i) } }() time.Sleep(time.Second * 5) }
最终取回的结果是3 2 1 ,但是如果该代码中如果不加time.sleep 5秒的动作,程序执行时会出现主进程还未等各个进程执行完成就结束了。因为go函数可以简单理论为shell里的&操作。当然遇到这样的问题,使用sync.WaitGroup是可以解决的。但如果不用sync.WaitGroup,还是使用channel去处理能不能解决呢?
当然是可以的,我们可以再创建一个无缓存的channel,由于该channel是阻塞的,在所有的数据未取出前,主程序就不退出。具体做法如下:
package main import ( "fmt" "time" ) func main() { messages := make(chan int) // Use this channel to follow the execution status // of our goroutines :D done := make(chan bool) go func() { time.Sleep(time.Second * 3) messages <- 1 done <- true }() go func() { time.Sleep(time.Second * 2) messages <- 2 done <- true }() go func() { time.Sleep(time.Second * 1) messages <- 3 done <- true }() go func() { for i := range messages { fmt.Println(i) } }() for i := 0; i < 3; i++ { <-done } }
这里上面每个channel执行完成后,会向done这样一个channel里向true,在true的结果没有取出之前,程序就会一直阻塞,直接所有的程序都完成。可能聪明的同学会觉得不需要这么麻烦,只需要把示例1中的代码最后一个go func()去掉,而且把sleep 5秒也去掉,直接改为如下循环取出就可以:
for i := range messages { fmt.Println(i) }
实际执行的时候呢?看下图:
二、sync.WaitGroup并行同步处理
sync包提供了基本同步和互持锁。其可以操作的类型有Cond、Locker、Map、Mutex、Once、Pool、RWMutex、WailtGroup。这里只说WaitGroup,WaitGroup提供了三个方法:Add()用来添加计数。Done()用来在操作结束时调用,使计数减一。Wait()用来等待所有的操作结束,即计数变为0,该函数会在计数不为0时等待,在计数为0时立即返回。同样是上面的示例,使用sync.WailtGroup解决比较容易,如下:
package main import ( "fmt" "sync" "time" ) func main() { messages := make(chan int) var wg sync.WaitGroup // you can also add these one at // a time if you need to wg.Add(3) go func() { defer wg.Done() time.Sleep(time.Second * 3) messages <- 1 }() go func() { defer wg.Done() time.Sleep(time.Second * 2) messages <- 2 }() go func() { defer wg.Done() time.Sleep(time.Second * 1) messages <- 3 }() go func() { for i := range messages { fmt.Println(i) } }() wg.Wait() }
在一个wait组里我们增加了三个计数器,每完成一个减1,直到为0时,wait组结束。其同样适用于多线程采集:
package main import ( "fmt" "io/ioutil" "log" "net/http" "sync" ) func main() { urls := []string{ "http://api.douban.com/v2/book/isbn/9787218087351", "http://ip.taobao.com/service/getIpInfo.php?ip=202.101.172.35", "https://jsonplaceholder.typicode.com/todos/1", } jsonResponses := make(chan string) var wg sync.WaitGroup wg.Add(len(urls)) for _, url := range urls { go func(url string) { defer wg.Done() res, err := http.Get(url) if err != nil { log.Fatal(err) } else { defer res.Body.Close() body, err := ioutil.ReadAll(res.Body) if err != nil { log.Fatal(err) } else { jsonResponses <- string(body) } } }(url) } go func() { for response := range jsonResponses { fmt.Println(response) } }() wg.Wait() }
上面是采集3个json数据的返回结果。当然也可以参看下官方的示例,官方的示例和这里略有差别,这个是一次通过len增加了n个wait任务,官方的每处理前就先增加一个。
package main import ( "sync" ) type httpPkg struct{} func (httpPkg) Get(url string) {} var http httpPkg func main() { var wg sync.WaitGroup var urls = []string{ "http://www.golang.org/", "http://www.google.com/", "http://www.somestupidname.com/", } for _, url := range urls { // Increment the WaitGroup counter. wg.Add(1) // Launch a goroutine to fetch the URL. go func(url string) { // Decrement the counter when the goroutine completes. defer wg.Done() // Fetch the URL. http.Get(url) }(url) } // Wait for all HTTP fetches to complete. wg.Wait() }
参考页面:how-to-wait-for-all-goroutines-to-finish-without-using-time-sleep
《golang channel与sync.WaitGroup同步》有1条评论