type ( // Event defines an indication of a point-in-time occurrence. Event struct { // Data in this case is a simple int, but the actual // implementation would depend on the application. Data int64 }
// Observer defines a standard interface for instances that wish to list for // the occurrence of a specific event. Observer interface { // OnNotify allows an event to be "published" to interface implementations. // In the "real world", error handling would likely be implemented. OnNotify(Event) }
// Notifier is the instance being observed. Publisher is perhaps another decent // name, but naming things is hard. Notifier interface { // Register allows an instance to register itself to listen/observe // events. Register(Observer) // Deregister allows an instance to remove itself from the collection // of observers/listeners. Deregister(Observer) // Notify publishes new events to listeners. The method is not // absolutely necessary, as each implementation could define this itself // without losing functionality. Notify(Event) } )
eventNotifier struct{ // Using a map with an empty struct allows us to keep the observers // unique while still keeping memory usage relatively low. observers map[Observer]struct{} } )
funcmain() { // Initialize a new Notifier. n := eventNotifier{ observers: map[Observer]struct{}{}, }
// Register a couple of observers. n.Register(&eventObserver{id: 1}) n.Register(&eventObserver{id: 2})
// A simple loop publishing the current Unix timestamp to observers. stop := time.NewTimer(10 * time.Second).C tick := time.NewTicker(time.Second).C for { select { case <- stop: return case t := <-tick: n.Notify(Event{Data: t.UnixNano()}) } } }
funcCount(start int, end int)chanint { ch := make(chanint)
gofunc(ch chanint) { for i := start; i <= end ; i++ { // Blocks on the operation ch <- i }
close(ch) }(ch)
return ch }
通过 一个goroutine异步创建好需要的对象,减少创建过程中带来的性能损耗
使用
1 2 3 4 5 6 7 8 9 10 11
fmt.Println("No bottles of beer on the wall")
for i := range Count(1, 99) { fmt.Println("Pass it around, put one up,", i, "bottles of beer on the wall") // Pass it around, put one up, 1 bottles of beer on the wall // Pass it around, put one up, 2 bottles of beer on the wall // ... // Pass it around, put one up, 99 bottles of beer on the wall }
// sumFiles starts goroutines to walk the directory tree at root and digest each // regular file. These goroutines send the results of the digests on the md5Result // channel and send the md5Result of the walk on the error channel. If done is // closed, sumFiles abandons its work. funcsumFiles(done <-chanstruct{}, root string)(<-chan md5Result, <-chan error) { // For each regular file, start a goroutine that sums the file and sends // the md5Result on c. Send the md5Result of the walk on errc. c := make(chan md5Result) errc := make(chan error, 1) gofunc() { // HL var wg sync.WaitGroup err := filepath.Walk(root, func(path string, info os.FileInfo, err error)error { if err != nil { return err } if !info.Mode().IsRegular() { returnnil } wg.Add(1) gofunc() { // HL data, err := ioutil.ReadFile(path) select { case c <- md5Result{path, md5.Sum(data), err}: // HL case <-done: // HL } wg.Done() }() // Abort the walk if done is closed. select { case <-done: // HL return errors.New("walk canceled") default: returnnil } }) // Walk has returned, so all calls to wg.Add are done. Start a // goroutine to close c once all the sends are done. gofunc() { // HL wg.Wait() close(c) // HL }() // No select needed here, since errc is buffered. errc <- err // HL }() return c, errc }
funcWalk(jobs Job,walkFn WalkFunc)error { for job range jobs { err = walkFn(job) if err != nil{ break } } return err }
funcwork(done <-chanstruct{},jobs Job)(<-chan result, <-chan error) { // For each regular file, start a goroutine that sums the file and sends // the md5Result on c. Send the md5Result of the walk on errc. c := make(chan result) errc := make(chan error, 1) gofunc() { // HL var wg sync.WaitGroup for job range jobs err := Walk(jobs,func()error { if err != nil { return err } wg.Add(1) gofunc() { // HL data, err := doJob() select { case c <- data // HL //case <-done: // HL } wg.Done() }() // Abort the walk if done is closed. select { case <-done: // HL return errors.New("walk canceled") default: returnnil } }) // Walk has returned, so all calls to wg.Add are done. Start a // goroutine to close c once all the sends are done. gofunc() { // HL wg.Wait() close(c) // HL }() // No select needed here, since errc is buffered. errc <- err // HL }() return c, errc }
// walkFiles starts a goroutine to walk the directory tree at root and send the // path of each regular file on the string channel. It sends the result of the // walk on the error channel. If done is closed, walkFiles abandons its work. funcwalkFiles(done <-chanstruct{}, root string)(<-chanstring, <-chan error) { paths := make(chanstring) errc := make(chan error, 1) gofunc() { // HL // Close the paths channel after Walk returns. deferclose(paths) // HL // No select needed for this send, since errc is buffered. errc <- filepath.Walk(root, func(path string, info os.FileInfo, err error)error { // HL if err != nil { return err } if !info.Mode().IsRegular() { returnnil } select { case paths <- path: // HL case <-done: // HL return errors.New("walk canceled") } returnnil }) }() return paths, errc }
// A result is the product of reading and summing a file using MD5. type result struct { path string sum [md5.Size]byte err error }
// digester reads path names from paths and sends digests of the corresponding // files on c until either paths or done is closed. funcdigester(done <-chanstruct{}, paths <-chanstring, c chan<- result) { for path := range paths { // HLpaths data, err := ioutil.ReadFile(path) select { case c <- result{path, md5.Sum(data), err}: case <-done: return } } }
// MD5All reads all the files in the file tree rooted at root and returns a map // from file path to the MD5 sum of the file's contents. If the directory walk // fails or any read operation fails, MD5All returns an error. In that case, // MD5All does not wait for inflight read operations to complete. funcMD5All(root string)(map[string][md5.Size]byte, error) { // MD5All closes the done channel when it returns; it may do so before // receiving all the values from c and errc. done := make(chanstruct{}) deferclose(done)
paths, errc := walkFiles(done, root)
// Start a fixed number of goroutines to read and digest files. c := make(chan result) // HLc var wg sync.WaitGroup const numDigesters = 20 wg.Add(numDigesters) for i := 0; i < numDigesters; i++ { gofunc() { digester(done, paths, c) // HLc wg.Done() }() } gofunc() { wg.Wait() close(c) // HLc }() // End of pipeline. OMIT
m := make(map[string][md5.Size]byte) for r := range c { if r.err != nil { returnnil, r.err } m[r.path] = r.sum } // Check whether the Walk failed. if err := <-errc; err != nil { // HLerrc returnnil, err } return m, nil }
和上面并发模式有点像,区别是固定量并发数量
1 2 3 4 5 6 7 8 9 10 11 12 13 14
var paths = make(chanstring) paths = getJobs() const numDigesters = 20 wg.Add(numDigesters) for i := 0; i < numDigesters; i++ { gofunc() { digester(done, paths, c) // HLc wg.Done() }() } gofunc() { wg.Wait() close(c) // HLc }()
抽出主要逻辑,清晰的可以看出来其用途为固定量并发 goroutine 数为 20 个。
应用场景: cpu 操作,io操作,网络操作 等等耗时事件长的任务,以及为了防止任务吞掉系统全部资源,限制并发量。
// ProxyObject represents proxy object with intercepts actions type ProxyObject struct { object *Object }
// ObjDo are implemented IObject and intercept action before send in real Object func(p *ProxyObject)ObjDo(action string) { if p.object == nil { p.object = new(Object) } if action == "Run" { p.object.ObjDo(action) // Prints: I can, Run } }
var ( ErrNoTickets = errors.New("semaphore: could not aquire semaphore") ErrIllegalRelease = errors.New("semaphore: can't release the semaphore without acquiring it first") )
// Interface contains the behavior of a semaphore that can be acquired and/or released. type Interface interface { Acquire() error Release() error }
type implementation struct { sem chanstruct{} timeout time.Duration }
func(s *implementation)Acquire()error { select { case s.sem <- struct{}{}: returnnil case <-time.After(s.timeout): return ErrNoTickets } }
func(s *implementation)Release()error { select { case _ = <-s.sem: returnnil case <-time.After(s.timeout): return ErrIllegalRelease }
funcDuration(invocation time.Time, name string) { elapsed := time.Since(invocation)
log.Printf("%s lasted %s", name, elapsed) }
使用
1 2 3 4 5 6 7 8 9 10 11 12
funcBigIntFactorial(x big.Int) *big.Int { // Arguments to a defer statement is immediately evaluated and stored. // The deferred function receives the pre-evaluated values when its invoked. defer profile.Duration(time.Now(), "IntFactorial")
y := big.NewInt(1) for one := big.NewInt(1); x.Sign() > 0; x.Sub(x, one) { y.Mul(y, x) }
// Merge different channels in one channel funcMerge(cs ...<-chanint) <-chanint { var wg sync.WaitGroup
out := make(chanint)
// Start an send goroutine for each input channel in cs. send // copies values from c to out until c is closed, then calls wg.Done. send := func(c <-chanint) { for n := range c { out <- n } wg.Done() }
wg.Add(len(cs)) for _, c := range cs { go send(c) }
// Start a goroutine to close out once all the send goroutines are // done. This must start after the wg.Add call. gofunc() { wg.Wait() close(out) }() return out }
// Split a channel into n channels that receive messages in a round-robin fashion. funcSplit(ch <-chanint, n int) []<-chanint { cs := make([]chanint) for i := 0; i < n; i++ { cs = append(cs, make(chanint)) }
// Distributes the work in a round robin fashion among the stated number // of channels until the main channel has been closed. In that case, close // all channels and return. distributeToChannels := func(ch <-chanint, cs []chan<- int) { // Close every channel when the execution ends. deferfunc(cs []chan<- int) { for _, c := range cs { close(c) } }(cs)
for { for _, c := range cs { select { case val, ok := <-ch: if !ok { return }
// Calculates when should the circuit breaker resume propagating requests // to the service shouldRetryAt := cnt.LastActivity().Add(time.Seconds * 2 << backoffLevel)
return time.Now().After(shouldRetryAt) } // 不返回错误,继续执行。 if !canRetry(cnt) { // Fails fast instead of propagating requests to the circuit since // not enough time has passed since the last failure to retry return ErrServiceUnavailable } }
// Unless the failure threshold is exceeded the wrapped service mimics the // old behavior and the difference in behavior is seen after consecutive failures if err := c(ctx); err != nil { cnt.Count(FailureState) return err }