golang channel与sync.WaitGroup同步

golang中有2种方式同步程序,一种使用channel,另一种使用sync.WaitGroup。最近在使用golang写一个比较简单的功能 ---- host1主机需要先在本机起一个TCP监听,起来后给host2主机发送指令,让其主动给host1主机监听的端口进行连接。最终使用了sync.WaitGroup实现了该功能。本篇就结合一些示例来看下两者的使用。

一、channel并行同步

比如有三个需要取数据的程序同时进行,但是终需要同步并返回数据。我们可以按如下代码操作:

package main
import (
    "fmt"
    "time"
)
func main() {
    messages := make(chan int)
    go func() {
        time.Sleep(time.Second * 3)
        messages <- 1
    }()
    go func() {
        time.Sleep(time.Second * 2)
        messages <- 2
    }()
    go func() {
        time.Sleep(time.Second * 1)
        messages <- 3
    }()
    go func() {
        for i := range messages {
            fmt.Println(i)
        }
    }()
    time.Sleep(time.Second * 5)
}

最终取回的结果是3 2 1 ,但是如果该代码中如果不加time.sleep 5秒的动作,程序执行时会出现主进程还未等各个进程执行完成就结束了。因为go函数可以简单理论为shell里的&操作。当然遇到这样的问题,使用sync.WaitGroup是可以解决的。但如果不用sync.WaitGroup,还是使用channel去处理能不能解决呢?

当然是可以的,我们可以再创建一个无缓存的channel,由于该channel是阻塞的,在所有的数据未取出前,主程序就不退出。具体做法如下:

package main
import (
    "fmt"
    "time"
)
func main() {
    messages := make(chan int)
    // Use this channel to follow the execution status
    // of our goroutines 😀
    done := make(chan bool)
    go func() {
        time.Sleep(time.Second * 3)
        messages <- 1
        done <- true
    }()
    go func() {
        time.Sleep(time.Second * 2)
        messages <- 2
        done <- true
    }()
    go func() {
        time.Sleep(time.Second * 1)
        messages <- 3
        done <- true
    }()
    go func() {
        for i := range messages {
            fmt.Println(i)
        }
    }()
    for i := 0; i < 3; i++ {
        <-done
    }
}

这里上面每个channel执行完成后,会向done这样一个channel里向true,在true的结果没有取出之前,程序就会一直阻塞,直接所有的程序都完成。可能聪明的同学会觉得不需要这么麻烦,只需要把示例1中的代码最后一个go func()去掉,而且把sleep 5秒也去掉,直接改为如下循环取出就可以:

for i := range messages {
    fmt.Println(i)
}

实际执行的时候呢?看下图:

channel-deadlock

二、sync.WaitGroup并行同步处理

sync包提供了基本同步和互持锁。其可以操作的类型有Cond、Locker、Map、Mutex、Once、Pool、RWMutex、WailtGroup。这里只说WaitGroup,WaitGroup提供了三个方法:Add()用来添加计数。Done()用来在操作结束时调用,使计数减一。Wait()用来等待所有的操作结束,即计数变为0,该函数会在计数不为0时等待,在计数为0时立即返回。同样是上面的示例,使用sync.WailtGroup解决比较容易,如下:

package main
import (
    "fmt"
    "sync"
    "time"
)
func main() {
    messages := make(chan int)
    var wg sync.WaitGroup
    // you can also add these one at
    // a time if you need to
    wg.Add(3)
    go func() {
        defer wg.Done()
        time.Sleep(time.Second * 3)
        messages <- 1
    }()
    go func() {
        defer wg.Done()
        time.Sleep(time.Second * 2)
        messages <- 2
    }()
    go func() {
        defer wg.Done()
        time.Sleep(time.Second * 1)
        messages <- 3
    }()
    go func() {
        for i := range messages {
            fmt.Println(i)
        }
    }()
    wg.Wait()
}

在一个wait组里我们增加了三个计数器,每完成一个减1,直到为0时,wait组结束。其同样适用于多线程采集:

package main
import (
    "fmt"
    "io/ioutil"
    "log"
    "net/http"
    "sync"
)
func main() {
    urls := []string{
        "http://api.douban.com/v2/book/isbn/9787218087351",
        "http://ip.taobao.com/service/getIpInfo.php?ip=202.101.172.35",
        "https://jsonplaceholder.typicode.com/todos/1",
    }
    jsonResponses := make(chan string)
    var wg sync.WaitGroup
    wg.Add(len(urls))
    for _, url := range urls {
        go func(url string) {
            defer wg.Done()
            res, err := http.Get(url)
            if err != nil {
                log.Fatal(err)
            } else {
                defer res.Body.Close()
                body, err := ioutil.ReadAll(res.Body)
                if err != nil {
                    log.Fatal(err)
                } else {
                    jsonResponses <- string(body)
                }
            }
        }(url)
    }
    go func() {
        for response := range jsonResponses {
            fmt.Println(response)
        }
    }()
    wg.Wait()
}

上面是采集3个json数据的返回结果。当然也可以参看下官方的示例,官方的示例和这里略有差别,这个是一次通过len增加了n个wait任务,官方的每处理前就先增加一个。

package main
import (
	"sync"
)
type httpPkg struct{}
func (httpPkg) Get(url string) {}
var http httpPkg
func main() {
	var wg sync.WaitGroup
	var urls = []string{
		"http://www.golang.org/",
		"http://www.google.com/",
		"http://www.somestupidname.com/",
	}
	for _, url := range urls {
		// Increment the WaitGroup counter.
		wg.Add(1)
		// Launch a goroutine to fetch the URL.
		go func(url string) {
			// Decrement the counter when the goroutine completes.
			defer wg.Done()
			// Fetch the URL.
			http.Get(url)
		}(url)
	}
	// Wait for all HTTP fetches to complete.
	wg.Wait()
}

参考页面:how-to-wait-for-all-goroutines-to-finish-without-using-time-sleep




本站的发展离不开您的资助,金额随意,欢迎来赏!

You can donate through PayPal.
My paypal id: itybku@139.com
Paypal page: https://www.paypal.me/361way

  1. 本文目前尚无任何评论.
  1. 本文目前尚无任何 trackbacks 和 pingbacks.