在vmalert的代码里学到了一种控制并发数量的方法,指定并发数为concurrency,开一个concurrency长度的channel,每个routine执行的时候先往channel里塞空结构体,执行完了就把channel里的东西取出来。当channel满的时候就会塞不进去,就会阻塞住,从而控制了并发数。
package concurrency
import (
"fmt"
"sync"
"time"
)
type Rule struct {
ID int
}
type Group struct {
concurrency int
currentConcurrency int
mutex sync.Mutex
rules []Rule
}
func NewGroup(concurrency int, ruleNum int) Group {
g := Group{
concurrency: concurrency,
}
for i := 0; i < ruleNum; i++ {
g.rules = append(g.rules, Rule{ID: i})
}
return g
}
func (r *Rule) Run() {
// do your job
time.Sleep(time.Second)
}
func (g *Group) PrintStart(rule Rule) {
g.mutex.Lock()
g.currentConcurrency++
fmt.Printf("%v, start to run %v, concurrency num: %v\n", time.Now(), rule.ID, g.currentConcurrency)
g.mutex.Unlock()
}
func (g *Group) PrintEnd(rule Rule) {
g.mutex.Lock()
g.currentConcurrency--
fmt.Printf("%v, finish to run %v, concurrency num: %v\n", time.Now(), rule.ID, g.currentConcurrency)
g.mutex.Unlock()
}
func (g Group) Start() {
concurrencyChan := make(chan struct{}, g.concurrency)
wg := &sync.WaitGroup{}
for _, rule := range g.rules {
wg.Add(1)
go func(wg *sync.WaitGroup, rule2 Rule) {
defer wg.Done()
defer func() {
<- concurrencyChan
}()
concurrencyChan <- struct{}{}
g.PrintStart(rule2)
rule2.Run()
g.PrintEnd(rule2)
}(wg, rule)
}
wg.Wait()
}