我对于 Golang 设计模式中不同的理解

我对于 Golang 设计模式中不同的理解

设计模式,自打我开始学习编程起,这就是一个津津乐道的话题。怎么编写好味道的代码,如何写出合适的设计代码帮助项目更容易理解,代码更简洁,这是我在项目中常常思考的问题。在golang 中有和其他语言不同的区别。例如函数作为一等公民,goroutine , chan 等特性,对于这些特性我思考如何能够编写适用于 golang 中的设计模式,而不是一味的套用着老思想的设计模式,以下是针对开源项目 go-patterns 的理解。

习惯型

1. 函数可选项

允许使用默认设置和惯用替代创建干净的API

选项

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
type Options struct {
UID int
GID int
Flags int
Contents string
Permissions os.FileMode
}

type Option func(*Options)

func UID(userID int) Option {
return func(args *Options) {
args.UID = userID
}
}

func GID(groupID int) Option {
return func(args *Options) {
args.GID = groupID
}
}

func Contents(c string) Option {
return func(args *Options) {
args.Contents = c
}
}

func Permissions(perms os.FileMode) Option {
return func(args *Options) {
args.Permissions = perms
}
}

构造器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
func New(filepath string, setters ...Option) error {
// Default Options
args := &Options{
UID: os.Getuid(),
GID: os.Getgid(),
Contents: "",
Permissions: 0666,
Flags: os.O_CREATE | os.O_EXCL | os.O_WRONLY,
}

for _, setter := range setters {
setter(args)
}

f, err := os.OpenFile(filepath, args.Flags, args.Permissions)
if err != nil {
return err
} else {
defer f.Close()
}

if _, err := f.WriteString(args.Contents); err != nil {
return err
}

return f.Chown(args.UID, args.GID)
}

调用一下:

1
2
3
4
5
6
7
8
9
emptyFile, err := file.New("/tmp/empty.txt")
if err != nil {
panic(err)
}

fillerFile, err := file.New("/tmp/file.txt", file.UID(1000), file.Contents("Lorem Ipsum Dolor Amet"))
if err != nil {
panic(err)
}

上面给出了一个关于函数选项模式的例子,这样做有什么好处呢?使程序看起来更加简洁,假如我们不按照这样的方式写,会是什么情况?

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
func New(filepath string,uid,filepath,Flags string ........... ) error {
// Default Options
args := &Options{
UID: os.Getuid(),
GID: os.Getgid(),
Contents: "",
Permissions: 0666,
Flags: os.O_CREATE | os.O_EXCL | os.O_WRONLY,
}
f, err := os.OpenFile(filepath, args.Flags, args.Permissions)
if err != nil {
return err
} else {
defer f.Close()
}

if _, err := f.WriteString(args.Contents); err != nil {
return err
}

return f.Chown(args.UID, args.GID)
}

可以看出来,函数可选项模式,使调用函数的参数列表更加简洁,并且参数列表的设置顺序不再是按照函数的形参列表的顺序,传入选项的顺序与函数已经无关了。并且通过选项形式,我们不用再去一一对应我们这个地方需要放什么参数,我门只需要调用可选项里面的函数即可,函数名轻而易举的就可以告诉我们这个参数是什么作用。

使用场景:对配置进行初始化的时候(当我们认为需要配置的参数特别多的时候,我认为这种模式应该会非常有帮助)

行为型

2. 策略模式

通过策略行为设计模式,可以在运行时选择算法的行为。

下面是一个例子对整数进行操作的可互换运算符对象的实现

操作符接口

1
2
3
4
5
6
7
8
9
10
11
type Operator interface {
Apply(int, int) int
}

type Operation struct {
Operator Operator
}

func (o *Operation) Operate(leftValue, rightValue int) int {
return o.Operator.Apply(leftValue, rightValue)
}

加法运算符

1
2
3
4
5
type Addition struct{}

func (Addition) Apply(lval, rval int) int {
return lval + rval
}
1
2
add := Operation{Addition{}}
add.Operate(3, 5) // 8

乘法运算符

1
2
3
4
5
type Multiplication struct{}

func (Multiplication) Apply(lval, rval int) int {
return lval * rval
}
1
2
3
mult := Operation{Multiplication{}}

mult.Operate(3, 5) // 15

调用的过程应该就是向一个Operation类传入一个实现了Operator接口的类,然后调用Operation对象的方法,Operation类 本身也实现Operator接口,相当于采用委托的方式对子类进行动态选择。

策略模式与模板模式相似,但粒度不同,策略模式的变化是在类级别,模版方法是在方法级别。

使用场景:在程序运行过程中可以动态的选择执行的类。

3. 观察者模式

提供特定以通知事件/数据更改

事件本身,观察者,通知者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
type (
// Event defines an indication of a point-in-time occurrence.
Event struct {
// Data in this case is a simple int, but the actual
// implementation would depend on the application.
Data int64
}

// Observer defines a standard interface for instances that wish to list for
// the occurrence of a specific event.
Observer interface {
// OnNotify allows an event to be "published" to interface implementations.
// In the "real world", error handling would likely be implemented.
OnNotify(Event)
}

// Notifier is the instance being observed. Publisher is perhaps another decent
// name, but naming things is hard.
Notifier interface {
// Register allows an instance to register itself to listen/observe
// events.
Register(Observer)
// Deregister allows an instance to remove itself from the collection
// of observers/listeners.
Deregister(Observer)
// Notify publishes new events to listeners. The method is not
// absolutely necessary, as each implementation could define this itself
// without losing functionality.
Notify(Event)
}
)

事件观察者的的实现,事件通知者的实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
type (
eventObserver struct{
id int
}

eventNotifier struct{
// Using a map with an empty struct allows us to keep the observers
// unique while still keeping memory usage relatively low.
observers map[Observer]struct{}
}
)

func (o *eventObserver) OnNotify(e Event) {
fmt.Printf("*** Observer %d received: %d\n", o.id, e.Data)
}

func (o *eventNotifier) Register(l Observer) {
o.observers[l] = struct{}{}
}

func (o *eventNotifier) Deregister(l Observer) {
delete(o.observers, l)
}

func (p *eventNotifier) Notify(e Event) {
for o := range p.observers {
o.OnNotify(e)
}
}

观察者应该具有一定的结构,逻辑,将观察者注册到其所对应到通知者对象上,采用遍历到方式,对观察者者进行遍历,观察者被通知到后调用自身的业务逻辑方法。

使用如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
func main() {
// Initialize a new Notifier.
n := eventNotifier{
observers: map[Observer]struct{}{},
}

// Register a couple of observers.
n.Register(&eventObserver{id: 1})
n.Register(&eventObserver{id: 2})

// A simple loop publishing the current Unix timestamp to observers.
stop := time.NewTimer(10 * time.Second).C
tick := time.NewTicker(time.Second).C
for {
select {
case <- stop:
return
case t := <-tick:
n.Notify(Event{Data: t.UnixNano()})
}
}
}

将观察者全部都注册到通知者上,使用for select语句对所有所有已经被注册到通知者对象上进行通知消息。

应用场景:通知者会在符合某种条件下去通知观察者,观察者根据消息产生一些系列的逻辑。

并发型

4. 生成器模式

一次产生一个值序列

生成器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
func Count(start int, end int) chan int {
ch := make(chan int)

go func(ch chan int) {
for i := start; i <= end ; i++ {
// Blocks on the operation
ch <- i
}

close(ch)
}(ch)

return ch
}

通过 一个goroutine异步创建好需要的对象,减少创建过程中带来的性能损耗

使用

1
2
3
4
5
6
7
8
9
10
11
fmt.Println("No bottles of beer on the wall")

for i := range Count(1, 99) {
fmt.Println("Pass it around, put one up,", i, "bottles of beer on the wall")
// Pass it around, put one up, 1 bottles of beer on the wall
// Pass it around, put one up, 2 bottles of beer on the wall
// ...
// Pass it around, put one up, 99 bottles of beer on the wall
}

fmt.Println(100, "bottles of beer on the wall")

打印 1-99 的数。

应用场景:创建对象需要耗费大量资源,或者解耦创建过程与调用过程,开发者不需要关心创建了什么,只需要做创建过程。

5. 并发模式

并发完成大量独立任务

信息结果

1
2
3
4
5
6
// A md5Result is the product of reading and summing a file using MD5.
type md5Result struct {
path string
sum [md5.Size]byte
err error
}

并发

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
// sumFiles starts goroutines to walk the directory tree at root and digest each
// regular file. These goroutines send the results of the digests on the md5Result
// channel and send the md5Result of the walk on the error channel. If done is
// closed, sumFiles abandons its work.
func sumFiles(done <-chan struct{}, root string) (<-chan md5Result, <-chan error) {
// For each regular file, start a goroutine that sums the file and sends
// the md5Result on c. Send the md5Result of the walk on errc.
c := make(chan md5Result)
errc := make(chan error, 1)
go func() { // HL
var wg sync.WaitGroup
err := filepath.Walk(root, func(path string, info os.FileInfo, err error) error {
if err != nil {
return err
}
if !info.Mode().IsRegular() {
return nil
}
wg.Add(1)
go func() { // HL
data, err := ioutil.ReadFile(path)
select {
case c <- md5Result{path, md5.Sum(data), err}: // HL
case <-done: // HL
}
wg.Done()
}()
// Abort the walk if done is closed.
select {
case <-done: // HL
return errors.New("walk canceled")
default:
return nil
}
})
// Walk has returned, so all calls to wg.Add are done. Start a
// goroutine to close c once all the sends are done.
go func() { // HL
wg.Wait()
close(c) // HL
}()
// No select needed here, since errc is buffered.
errc <- err // HL
}()
return c, errc
}

并发模式的关键要素就是使用changoroutine 配合使用,创建一个goroutine运行需要执行的作业内容,遍历作业,对其每个耗时较长的任务进行异步处理,并判断是否接收到了中断信息,如果有中断信息就直接返回错误,并将错误传入到errc管道中, 这样作业内容就可以停止。对作业内容结果进行处理填入到chan中,并用WaitGroup记录其作业状态,当作业内容全部执行了,就关闭chan,不允许可写。

针对上面代码进行简化操作的逻辑就是:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
func Walk(jobs Job,walkFn WalkFunc) error {
for job range jobs {
err = walkFn(job)
if err != nil{
break
}
}
return err
}

func work(done <-chan struct{},jobs Job) (<-chan result, <-chan error) {
// For each regular file, start a goroutine that sums the file and sends
// the md5Result on c. Send the md5Result of the walk on errc.
c := make(chan result)
errc := make(chan error, 1)
go func() { // HL
var wg sync.WaitGroup
for job range jobs
err := Walk(jobs,func() error {
if err != nil {
return err
}
wg.Add(1)
go func() { // HL
data, err := doJob()
select {
case c <- data // HL
//case <-done: // HL
}
wg.Done()
}()
// Abort the walk if done is closed.
select {
case <-done: // HL
return errors.New("walk canceled")
default:
return nil
}
})
// Walk has returned, so all calls to wg.Add are done. Start a
// goroutine to close c once all the sends are done.
go func() { // HL
wg.Wait()
close(c) // HL
}()
// No select needed here, since errc is buffered.
errc <- err // HL
}()
return c, errc
}

抽出一个通用逻辑来,应该就是这样的,适用于各种场景。

应用场景: io操作,网络操作 等等耗时事件长的任务。

6. 固定并发模式

在资源有限的情况下完成大量独立任务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
// walkFiles starts a goroutine to walk the directory tree at root and send the
// path of each regular file on the string channel. It sends the result of the
// walk on the error channel. If done is closed, walkFiles abandons its work.
func walkFiles(done <-chan struct{}, root string) (<-chan string, <-chan error) {
paths := make(chan string)
errc := make(chan error, 1)
go func() { // HL
// Close the paths channel after Walk returns.
defer close(paths) // HL
// No select needed for this send, since errc is buffered.
errc <- filepath.Walk(root, func(path string, info os.FileInfo, err error) error { // HL
if err != nil {
return err
}
if !info.Mode().IsRegular() {
return nil
}
select {
case paths <- path: // HL
case <-done: // HL
return errors.New("walk canceled")
}
return nil
})
}()
return paths, errc
}

// A result is the product of reading and summing a file using MD5.
type result struct {
path string
sum [md5.Size]byte
err error
}

// digester reads path names from paths and sends digests of the corresponding
// files on c until either paths or done is closed.
func digester(done <-chan struct{}, paths <-chan string, c chan<- result) {
for path := range paths { // HLpaths
data, err := ioutil.ReadFile(path)
select {
case c <- result{path, md5.Sum(data), err}:
case <-done:
return
}
}
}

// MD5All reads all the files in the file tree rooted at root and returns a map
// from file path to the MD5 sum of the file's contents. If the directory walk
// fails or any read operation fails, MD5All returns an error. In that case,
// MD5All does not wait for inflight read operations to complete.
func MD5All(root string) (map[string][md5.Size]byte, error) {
// MD5All closes the done channel when it returns; it may do so before
// receiving all the values from c and errc.
done := make(chan struct{})
defer close(done)

paths, errc := walkFiles(done, root)

// Start a fixed number of goroutines to read and digest files.
c := make(chan result) // HLc
var wg sync.WaitGroup
const numDigesters = 20
wg.Add(numDigesters)
for i := 0; i < numDigesters; i++ {
go func() {
digester(done, paths, c) // HLc
wg.Done()
}()
}
go func() {
wg.Wait()
close(c) // HLc
}()
// End of pipeline. OMIT

m := make(map[string][md5.Size]byte)
for r := range c {
if r.err != nil {
return nil, r.err
}
m[r.path] = r.sum
}
// Check whether the Walk failed.
if err := <-errc; err != nil { // HLerrc
return nil, err
}
return m, nil
}

和上面并发模式有点像,区别是固定量并发数量

1
2
3
4
5
6
7
8
9
10
11
12
13
14
var paths = make(chan string)
paths = getJobs()
const numDigesters = 20
wg.Add(numDigesters)
for i := 0; i < numDigesters; i++ {
go func() {
digester(done, paths, c) // HLc
wg.Done()
}()
}
go func() {
wg.Wait()
close(c) // HLc
}()

抽出主要逻辑,清晰的可以看出来其用途为固定量并发 goroutine 数为 20 个。

应用场景: cpu 操作,io操作,网络操作 等等耗时事件长的任务,以及为了防止任务吞掉系统全部资源,限制并发量。

创造型

7 .建造者模式

使用简单对象构建复杂对象

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
package car

type Speed float64

const (
MPH Speed = 1
KPH = 1.60934
)

type Color string

const (
BlueColor Color = "blue"
GreenColor = "green"
RedColor = "red"
)

type Wheels string

const (
SportsWheels Wheels = "sports"
SteelWheels = "steel"
)

type Builder interface {
Color(Color) Builder
Wheels(Wheels) Builder
TopSpeed(Speed) Builder
Build() Car
}

type Car interface {
Drive() error
Stop() error
}

使用

1
2
3
4
5
6
7
assembly := car.NewBuilder().Paint(car.RedColor)

familyCar := assembly.Wheels(car.SportsWheels).TopSpeed(50 * car.MPH).Build()
familyCar.Drive()

sportsCar := assembly.Wheels(car.SteelWheels).TopSpeed(150 * car.MPH).Build()
sportsCar.Drive()

应用场景: 构建复杂对象。

7. 工厂模式

将对象的实例化推迟到用于创建实例的专用功能

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
package data

type StorageType int

const (
DiskStorage StorageType = 1 << iota
TempStorage
MemoryStorage
)

func NewStore(t StorageType) Store {
switch t {
case MemoryStorage:
return newMemoryStorage( /*...*/ )
case DiskStorage:
return newDiskStorage( /*...*/ )
default:
return newTempStorage( /*...*/ )
}
}

使用

1
2
3
4
5
s, _ := data.NewStore(data.MemoryStorage)
f, _ := s.Open("file")

n, _ := f.Write([]byte("data"))
defer f.Close()

使用工厂方法,用户可以指定所需的存储类型。工厂模式就是对开闭原则的一个最好诠释,只增加工厂类,不修改原类。

使用场景:对一类问题产生变化的可能性,有多种变化,选择。

7. 对象池模式

例化并维护一组相同类型的对象实例

对象池

1
2
3
4
5
6
7
8
9
10
11
12
13
package pool

type Pool chan *Object

func New(total int) *Pool {
p := make(Pool, total)

for i := 0; i < total; i++ {
p <- new(Object)
}

return &p
}

使用

1
2
3
4
5
6
7
8
9
10
11
p := pool.New(2)

select {
case obj := <-p:
obj.Do( /*...*/ )

p <- obj
default:
// No more objects left — retry later or fail
return
}

再 Golang 中已经有专门的sync.Pool封装好了 ,不需要自己单独写。

应用场景:对象初始化很多的情况下,由于对象已预先初始化,因此可以对象不需要再进行初始化,对耗时较长的创建对象是一个好的选择。,如果需要性能,而不是节省资源,维护对象池,不是一个好的选择。

8. 单利模式

将类型的实例化限制为一个对象

单利模式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
package singleton

type singleton map[string]string

var (
once sync.Once

instance singleton
)

func New() singleton {
once.Do(func() {
instance = make(singleton)
})

return instance
}

使用

1
2
3
4
5
6
7
8
s := singleton.New()

s["this"] = "that"

s2 := singleton.New()

fmt.Println("This is ", s2["this"])
// This is that

使用once语义可以保证内容只被一次,从而保证对象是单利的。

应用场景:单利类在初始创建后,就不应该被再操作,所以我认为单利类只是一堆包含数据的结构体,初始后状态不可变,可以被重复使用,而不用产生大量的对象,减少 GC 压力。

建造模式

8. 装饰器模式

静态地或动态地向对象添加行为

装饰

1
2
3
4
5
6
7
8
9
10
11
12
13
type Object func(int) int

func LogDecorate(fn Object) Object {
return func(n int) int {
log.Println("Starting the execution with the integer", n)

result := fn(n)

log.Println("Execution is completed with the result", result)

return result
}
}

使用

1
2
3
4
5
6
7
func Double(n int) int {
return n * 2
}

f := LogDecorate(Double)

f(5)

Double这个函数作为被装饰对象,使用LogDecorate装饰要被装饰的函数,老实说,我看着这个函数有点像 Spring 里面的 AOP 代理模式,感觉和代理模式也很像,但是是函数级别,并且处理结果可以再被处理,所以说是装饰模式也可以。

应用场景:拓展一个方法原有的功能

8. 代理模式

提供对象的代理对象以控制其动作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
// To use proxy and to object they must implement same methods
type IObject interface {
ObjDo(action string)
}

// Object represents real objects which proxy will delegate data
type Object struct {
action string
}

// ObjDo implements IObject interface and handel's all logic
func (obj *Object) ObjDo(action string) {
// Action behavior
fmt.Printf("I can, %s", action)
}

// ProxyObject represents proxy object with intercepts actions
type ProxyObject struct {
object *Object
}

// ObjDo are implemented IObject and intercept action before send in real Object
func (p *ProxyObject) ObjDo(action string) {
if p.object == nil {
p.object = new(Object)
}
if action == "Run" {
p.object.ObjDo(action) // Prints: I can, Run
}
}

简单的用一句话来说就是将被代理对象传入代理对象,然后执行被代理对象的方法,并且执行代理对象自身的一些操作。

同步模式

9. 信号量模式

允许控制对公共资源的访问

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
package semaphore

var (
ErrNoTickets = errors.New("semaphore: could not aquire semaphore")
ErrIllegalRelease = errors.New("semaphore: can't release the semaphore without acquiring it first")
)

// Interface contains the behavior of a semaphore that can be acquired and/or released.
type Interface interface {
Acquire() error
Release() error
}

type implementation struct {
sem chan struct{}
timeout time.Duration
}

func (s *implementation) Acquire() error {
select {
case s.sem <- struct{}{}:
return nil
case <-time.After(s.timeout):
return ErrNoTickets
}
}

func (s *implementation) Release() error {
select {
case _ = <-s.sem:
return nil
case <-time.After(s.timeout):
return ErrIllegalRelease
}

return nil
}

func New(tickets int, timeout time.Duration) Interface {
return &implementation{
sem: make(chan struct{}, tickets),
timeout: timeout,
}
}

信号量模式也好理解,就是同一时刻只有一个任务可以被执行,所以chan是一个阻塞chan, 当任务结束就是释放chan里面的元素,保持活跃,可以接受的状态。

应用场景:在该模式下,接收请求和执行下游依赖在同一个线程内完成,不存在线程上下文切换所带来的性能开销。

性能分析型

10. 记时模式

包装函数并记录执行

1
2
3
4
5
6
7
8
9
10
11
12
package profile

import (
"time"
"log"
)

func Duration(invocation time.Time, name string) {
elapsed := time.Since(invocation)

log.Printf("%s lasted %s", name, elapsed)
}

使用

1
2
3
4
5
6
7
8
9
10
11
12
func BigIntFactorial(x big.Int) *big.Int {
// Arguments to a defer statement is immediately evaluated and stored.
// The deferred function receives the pre-evaluated values when its invoked.
defer profile.Duration(time.Now(), "IntFactorial")

y := big.NewInt(1)
for one := big.NewInt(1); x.Sign() > 0; x.Sub(x, one) {
y.Mul(y, x)
}

return x.Set(y)
}

通过defer的妙用,将当前函数的开始时间可以记住,然后通过defer在函数执行完后才会调用defer函数。这样就达成了记时的作用,可以统计这段函数花费了多长时间,感觉很妙。

应用场景:进行基准测试的时候可以使用。

消息型

11. 扇入消息传递模式

扇入是一种消息传递模式,用于为工作(客户端:源,服务器:目标)之间的工作创建漏斗。

我们可以使用Go channel 对扇入进行建模。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
// Merge different channels in one channel
func Merge(cs ...<-chan int) <-chan int {
var wg sync.WaitGroup

out := make(chan int)

// Start an send goroutine for each input channel in cs. send
// copies values from c to out until c is closed, then calls wg.Done.
send := func(c <-chan int) {
for n := range c {
out <- n
}
wg.Done()
}

wg.Add(len(cs))
for _, c := range cs {
go send(c)
}

// Start a goroutine to close out once all the send goroutines are
// done. This must start after the wg.Add call.
go func() {
wg.Wait()
close(out)
}()
return out
}

简单的说就是用多个goroutine异步写入多个chan里面的消息到另一个chan,所以叫Merge过程。

应用场景:对数据进行整合。

12. 扇入消息传递模式

扇出是一种消息传递模式,用于在工(生产者:源,消费者:目的地)之间分配工作。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
// Split a channel into n channels that receive messages in a round-robin fashion.
func Split(ch <-chan int, n int) []<-chan int {
cs := make([]chan int)
for i := 0; i < n; i++ {
cs = append(cs, make(chan int))
}

// Distributes the work in a round robin fashion among the stated number
// of channels until the main channel has been closed. In that case, close
// all channels and return.
distributeToChannels := func(ch <-chan int, cs []chan<- int) {
// Close every channel when the execution ends.
defer func(cs []chan<- int) {
for _, c := range cs {
close(c)
}
}(cs)

for {
for _, c := range cs {
select {
case val, ok := <-ch:
if !ok {
return
}

c <- val
}
}
}
}

go distributeToChannels(ch, cs)

return cs
}

将一个chan的内容拆分到多个chan里面 (通过 goroutine 并发模式)

应用场景:拆分一堆任务成多个任务。

13. 发布和订阅消息传递模式

发布-订阅是一种消息传递模式,用于在消息之间进行消息传递不同的组件,而这些组件不了解彼此的身份。

消息,发布者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
type Message struct {
// Contents
}


type Subscription struct {
ch chan<- Message

Inbox chan Message
}

func (s *Subscription) Publish(msg Message) error {
if _, ok := <-s.ch; !ok {
return errors.New("Topic has been closed")
}

s.ch <- msg

return nil
}

订阅者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
type Topic struct {
Subscribers []Session
MessageHistory []Message
}

func (t *Topic) Subscribe(uid uint64) (Subscription, error) {
// Get session and create one if it's the first

// Add session to the Topic & MessageHistory

// Create a subscription
}

func (t *Topic) Unsubscribe(Subscription) error {
// Implementation
}

func (t *Topic) Delete() error {
// Implementation
}
1
2
3
4
5
6
7
8
9
type User struct {
ID uint64
Name string
}

type Session struct {
User User
Timestamp time.Time
}

观察者和发布订阅模式的区别是,观察者模式的通知对象只有一个,而发布订阅模式,一个订阅着可以订阅多个发布者对象。发布订阅模式需要进行手动消费,而观察者模式则自动触发操作。

应用场景:发布订阅者模式更适合多对多消息变化的应用场景。

稳定模式

14. 熔断器模式

类似于电熔丝,可防止在连接电路电流过大时引发火灾,这导致电线加热并燃烧,断路器的设计模式是“故障优先”关闭电路,请求/响应关系或机制的机制在软件开发的情况下提供服务,以防止更大的失败。

操作计数器

circuit.Counter是一个简单的计数器,用于记录成功或失败的状态电路和时间戳,并计算连续的失败。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
package circuit

import (
"time"
)

type State int

const (
UnknownState State = iota
FailureState
SuccessState
)

type Counter interface {
Count(State)
ConsecutiveFailures() uint32
LastActivity() time.Time
Reset()
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
package circuit

import (
"time"
)

type State int

const (
UnknownState State = iota
FailureState
SuccessState
)

type Counter interface {
Count(State)
ConsecutiveFailures() uint32
LastActivity() time.Time
Reset()
}

使用`circuit.Breaker’闭包来包装Circuit,该闭包保留内部操作计数器。如果电路连续故障超过指定阈值,它将返回快速错误。一段时间后,它重试该请求并记录下来。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
package circuit

import (
"context"
"time"
)

type Circuit func(context.Context) error

func Breaker(c Circuit, failureThreshold uint32) Circuit {
// 一个计数器
cnt := NewCounter()

return func(ctx context) error {
// 计数器超过阀值
if cnt.ConsecutiveFailures() >= failureThreshold {
// 判断是否可以重试的条件
canRetry := func(cnt Counter) {
backoffLevel := Cnt.ConsecutiveFailures() - failureThreshold

// Calculates when should the circuit breaker resume propagating requests
// to the service
shouldRetryAt := cnt.LastActivity().Add(time.Seconds * 2 << backoffLevel)

return time.Now().After(shouldRetryAt)
}

// 不返回错误,继续执行。
if !canRetry(cnt) {
// Fails fast instead of propagating requests to the circuit since
// not enough time has passed since the last failure to retry
return ErrServiceUnavailable
}
}

// Unless the failure threshold is exceeded the wrapped service mimics the
// old behavior and the difference in behavior is seen after consecutive failures
if err := c(ctx); err != nil {
cnt.Count(FailureState)
return err
}

cnt.Count(SuccessState)
return nil
}
}

上面这个例子太简单了,概括为熔断器超过阀值时(可提供一定的尝试次数),不直接执行方法,直接返回错误,防止更大的破坏,等过一段时间后自动恢复。

应用场景:微服务之间的调用,为了下游服务阻塞上游服务卡住,直接将下游服务断开,快速失败,避免其他服务也不能正常使用。

评论

`
Your browser is out-of-date!

Update your browser to view this website correctly. Update my browser now

×