Etcd 中 Raft 协议源码的不完全分析(1)

Etcd 中 Raft 协议源码的不完全分析(1)

etcd 是 CoreOS 团队于2013年6月发起的开源项目,它的目标是构建一个高可用的分布式键值(key-value)数据库。etcd 内部采用 raft 协议作为一致性, 现在就来看看 raft 在 etcd 中是如何实现的。必须先阅读 raft 协议的论文,再对协议论文有深刻理解的情况下,理解实现会更加轻松。

一、分析流程

阅读readMe.md文件

运行单节点raftexample

首先启动一个raftexample的单成员集群:

1
raftexample --id 1 --cluster http://127.0.0.1:12379 --port 12380

每个raftexample进程都维护一个raft实例和一个键值服务器。
进程的逗号分隔对等体(–cluster)列表,其对等列表(–id)的raft ID索引和http键值服务器端口(–port)通过命令行传递。

接下来,将值(“hello”)存储到键(“my-key”):

1
curl -L http://127.0.0.1:12380/my-key -XPUT -d hello

最后,检索存储的密钥:

1
curl -L http://127.0.0.1:12380/my-key

运行本地群集

首先安装[goreman](https://github.com/mattn/goreman),它管理基于Procfile的应用程序。

[Procfile脚本](./ Procfile)将设置本地示例集群。从以下开始:

1
goreman start

这将带来三个raftexample实例。

现在可以将键值对写入集群的任何成员,并同样从任何成员中检索它。

容错

要测试群集恢复,首先启动群集并写入值“foo”:

1
2
goreman start
curl -L http://127.0.0.1:12380/my-key -XPUT -d foo

接下来,删除节点并将值替换为“bar”以检查群集可用性:

1
2
3
goreman run stop raftexample2
curl -L http://127.0.0.1:12380/my-key -XPUT -d bar
curl -L http://127.0.0.1:32380/my-key

最后,重新启动节点并使用更新后的值“bar”验证它是否恢复:

1
2
goreman run start raftexample2
curl -L http://127.0.0.1:22380/my-key

动态集群重新配置

可以使用对REST API的请求将节点添加到正在运行的集群中或从中删除节点。

例如,假设我们有一个使用命令启动的3节点集群:

1
2
3
raftexample --id 1 --cluster http://127.0.0.1:12379,http://127.0.0.1:22379,http://127.0.0.1:32379 --port 12380
raftexample --id 2 --cluster http://127.0.0.1:12379,http://127.0.0.1:22379,http://127.0.0.1:32379 --port 22380
raftexample --id 3 --cluster http://127.0.0.1:12379,http://127.0.0.1:22379,http://127.0.0.1:32379 --port 32380

可以通过发出POST来添加ID为4的第四个节点:

1
curl -L http://127.0.0.1:12380/4 -XPOST -d http://127.0.0.1:42379

然后使用–join选项可以像其他节点一样启动新节点:

1
raftexample --id 4 --cluster http://127.0.0.1:12379,http://127.0.0.1:22379,http://127.0.0.1:32379,http://127.0.0.1:42379 --port 42380 --join

新节点应加入群集,并能够为密钥/值请求提供服务。

我们可以使用DELETE请求删除节点:

1
curl -L http://127.0.0.1:12380/3 -XDELETE

一旦集群处理了此请求,节点3就应该自行关闭。

设计

raftexample由三个组件组成:一个由raft支持的键值存储,一个REST API服务器和一个基于etcd的raft实现的raft共识服务器。

支持raft的键值存储是一个键值映射,它包含所有已提交的键值。该存储桥接了raft服务器和REST服务器之间的通信。键值更新通过存储发送到raft服务器。一旦raft报告提交更新,存储就会更新其地图。

REST服务器通过访问raft支持的键值存储来公开当前的raft共识。GET命令在存储中查找键并返回值(如果有)。键值PUT命令向存储发出更新提议。

raft服务器与其集群对等方达成共识。当REST服务器提交提议时,raft服务器将提议发送给其对等方。当raft达成共识时,服务器通过提交通道发布所有已提交的更新。对于raftexample,此提交通道由键值存储使用。

二、介绍raft库代码结构及核心数据结构

为什么要首先介绍核心数据结构,如果不介绍核心数据结构就可能看不懂后面我的分析,每一个Msg具体代表什么含义,所以先看核心数据结构,这里只需要大概浏览一遍就可以了,忘记了可以在这里看。

MsgHup消息

成员 类型 作用
type MsgHup 不用于节点间通信,仅用于发送给本节点让本节点进行选举
to uint64 消息接收者的节点ID
from uint64 本节点ID

MsgBeat消息

成员 类型 作用
type MsgBeat 不用于节点间通信 ,仅用于leader节点在heartbeat定时器到期时向集群中其他节点发送心跳消息
to uint64 消息接收者的节点ID
from uint64 本节点ID

MsgProp消息

成员 类型 作用
type MsgProp raft库使用者提议(propose)数据
to uint64 消息接收者的节点ID
from uint64 本节点ID
entries Entry 日志条目数组

MsgApp/MsgSnap消息

MsgApp消息

成员 类型 作用
type MsgApp 用于leader向集群中其他节点同步数据的消息
to uint64 消息接收者的节点ID
from uint64 本节点ID
entries Entry 日志条目数组
logTerm uint64 日志所处的任期ID
index uint64 索引ID

MsgSnap消息

成员 类型 作用
type MsgSnap 用于leader向follower同步数据用的快照消息
to uint64 消息接收者的节点ID
from uint64 本节点ID
snapshot Snapshot 快照数据

MsgAppResp消息

成员 类型 作用
type MsgAppResp 集群中其他节点针对leader的MsgApp/MsgSnap消息的应答消息
to uint64 消息接收者的节点ID
from uint64 本节点ID
index uint64 日志索引ID,用于节点向leader汇报自己已经commit的日志数据ID
reject bool 是否拒绝同步日志的请求
rejectHint uint64 拒绝同步日志请求时返回的当前节点日志ID,用于被拒绝方快速定位到下一次合适的同步日志位置

MsgVote/MsgPreVote消息

成员 类型 作用
type MsgVote/MsgPreVote 节点投票给自己以进行新一轮的选举
to uint64 消息接收者的节点ID
from uint64 本节点ID
term uint64 任期ID
index uint64 日志索引ID,用于节点向leader汇报自己已经commit的日志数据ID
logTerm uint64 日志所处的任期ID
context bytes 上下文数据

MsgVoteResp/MsgPreVoteResp消息

成员 类型 作用
type MsgVoteResp/MsgPreVoteResp 投票应答消息
to uint64 消息接收者的节点ID
from uint64 本节点ID
reject bool 是否拒绝

三、分析源码

按照上面README.md文档开始分析,首先把单个实例跑起来。然后根据源码的顺序一个一个的看。

main

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
func main() {
// 用于节点之间通信
cluster := flag.String("cluster", "http://127.0.0.1:9021", "comma separated cluster peers")
id := flag.Int("id", 1, "node ID")
// 用于 key value
kvport := flag.Int("port", 9121, "key-value server port")
join := flag.Bool("join", false, "join an existing cluster")
flag.Parse()

proposeC := make(chan string)
defer close(proposeC)
confChangeC := make(chan raftpb.ConfChange)
defer close(confChangeC)
var kvs *kvstore
getSnapshot := func() ([]byte, error) { return kvs.getSnapshot() }
commitC, errorC, snapshotterReady := newRaftNode(*id, strings.Split(*cluster, ","), *join, getSnapshot, proposeC, confChangeC)
kvs = newKVStore(<-snapshotterReady, proposeC, commitC, errorC)
serveHttpKVAPI(kvs, *kvport, confChangeC, errorC)
}
  1. proposeC创建一个提议channel,用于提议请求的通信
  2. confChangeC创建一个配置更改的channel,用于配置更改的通信
  3. newRaftNode创建raft协议的node节点,然后返回一个提交的channel,和一个错误通知的channel
  4. snapshotterReady用于等待快照创建完毕,然后执行newKVStore方法创建一个key存储容器
  5. serveHttpKVAPI创建一个http服务

main.newKVStore

1
2
3
4
5
6
7
8
func newKVStore(snapshotter *snap.Snapshotter, proposeC chan<- string, commitC <-chan *string, errorC <-chan error) *kvstore {
s := &kvstore{proposeC: proposeC, kvStore: make(map[string]string), snapshotter: snapshotter}
// 回应日志进入到key value map
s.readCommits(commitC, errorC)
//从raft读取提交到kvStore映射直到错误
go s.readCommits(commitC, errorC)
return s
}

创建一个KV存储对象,读取提交信息到kv存储容器中

1
2
3
4
5
6
type kvstore struct {
proposeC chan<- string // channel for proposing updates
mu sync.RWMutex
kvStore map[string]string // current committed key-value pairs
snapshotter *snap.Snapshotter
}

proposeC是一个提议channel,sync.RWMutex因为是在多个 go 协程中运行的,所以需要加锁,存储实际上就是一个map类型

1
2
3
4
type Snapshotter struct {
lg *zap.Logger
dir string
}

Snapshotter类型则是一个zap类型的日志,会持久化到磁盘中

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
func (s *kvstore) readCommits(commitC <-chan *string, errorC <-chan error) {
for data := range commitC {
if data == nil {
snapshot, err := s.snapshotter.Load()
if err == snap.ErrNoSnapshot {
return
}
if err != nil {
log.Panic(err)
}
log.Printf("loading snapshot at term %d and index %d", snapshot.Metadata.Term, snapshot.Metadata.Index)
if err := s.recoverFromSnapshot(snapshot.Data); err != nil {
log.Panic(err)
}
continue
}

var dataKv kv
dec := gob.NewDecoder(bytes.NewBufferString(*data))
if err := dec.Decode(&dataKv); err != nil {
log.Fatalf("raftexample: could not decode message (%v)", err)
}
s.mu.Lock()
s.kvStore[dataKv.Key] = dataKv.Val
s.mu.Unlock()
}
if err, ok := <-errorC; ok {
log.Fatal(err)
}
}

根据提交然后加载快照中的信息,从快照中的信息恢复到kv存储容器中,然后再从commit日志中恢复信息

main.serveHttpKVAPI

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
// serveHttpKVAPI starts a key-value server with a GET/PUT API and listens.
func serveHttpKVAPI(kv *kvstore, port int, confChangeC chan<- raftpb.ConfChange, errorC <-chan error) {
// httpKAPi 因为实现了serverhttp方法所以可以传入作为handler
srv := http.Server{
Addr: ":" + strconv.Itoa(port),
// 只需要关心httpKVAPI的实现就可以了
Handler: &httpKVAPI{
store: kv,
confChangeC: confChangeC,
},
}
go func() {
// 打开监听端口
if err := srv.ListenAndServe(); err != nil {
log.Fatal(err)
}
}()
// exit when raft goes down
if err, ok := <-errorC; ok {
log.Fatal(err)
}
}

开启http服务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
func (h *httpKVAPI) ServeHTTP(w http.ResponseWriter, r *http.Request) {
key := r.RequestURI
defer r.Body.Close()
//HttpServer主循环:
//接收用户提交的数据:
//如果是PUT请求:
//将数据写入到proposeC中
//如果是POST请求:
//将配置变更数据写入到confChangeC中

switch {
// 如果方式put方法 更新某个key
case r.Method == "PUT":
v, err := ioutil.ReadAll(r.Body)
if err != nil {
log.Printf("Failed to read on PUT (%v)\n", err)
http.Error(w, "Failed on PUT", http.StatusBadRequest)
return
}
// 建议 != 事实
// 在这里调用了key value 的更新建议
h.store.Propose(key, string(v))

//乐观的无需等Raft的确认。值尚未提交,因此键上的后续GET可能返回旧值
w.WriteHeader(http.StatusNoContent)
// 如果方式get方法 查找某个key
case r.Method == "GET":
if v, ok := h.store.Lookup(key); ok {
// 查看某一个key的配置
w.Write([]byte(v))
} else {
http.Error(w, "Failed to GET", http.StatusNotFound)
}
case r.Method == "POST":
url, err := ioutil.ReadAll(r.Body)
if err != nil {
log.Printf("Failed to read on POST (%v)\n", err)
http.Error(w, "Failed on POST", http.StatusBadRequest)
return
}

nodeId, err := strconv.ParseUint(key[1:], 0, 64)
if err != nil {
log.Printf("Failed to convert ID for conf change (%v)\n", err)
http.Error(w, "Failed on POST", http.StatusBadRequest)
return
}

cc := raftpb.ConfChange{
// 删除传入的添加
Type: raftpb.ConfChangeAddNode,
NodeID: nodeId,
Context: url,
}
// 更新配置
h.confChangeC <- cc

//如上所述,乐观地认为raft会应用变化
w.WriteHeader(http.StatusNoContent)
case r.Method == "DELETE":
nodeId, err := strconv.ParseUint(key[1:], 0, 64)
if err != nil {
log.Printf("Failed to convert ID for conf change (%v)\n", err)
http.Error(w, "Failed on DELETE", http.StatusBadRequest)
return
}

cc := raftpb.ConfChange{
// 删除传入的nodeid
Type: raftpb.ConfChangeRemoveNode,
NodeID: nodeId,
}

// 删除配置
h.confChangeC <- cc

//如上,乐观地认为筏将应用conf更改
w.WriteHeader(http.StatusNoContent)
default:
w.Header().Set("Allow", "PUT")
w.Header().Add("Allow", "GET")
w.Header().Add("Allow", "POST")
w.Header().Add("Allow", "DELETE")
http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
}
}

可以分为两类,一类是属性信息,第二类是节点配置的信息

  • 1.属性信息
    • PUT通过h.store.Propose(key, string(v))该方法往s.proposeC中提交建议
    • GET通过h.store.Lookup(key)v, ok := s.kvStore[key]集合中查找数据,然后返回
  • 2.节点配置的信息
    • POST 因为设置的Type: raftpb.ConfChangeAddNode信息为添加类型,再由h.confChangeC <- cc传输到配置更改的channel,实现添加节点
    • DELTE因为设置的Type: raftpb.ConfChangeRemoveNode信息为添加类型,再由h.confChangeC <- cc传输到配置更改的channel,实现删除节点

main.newRaftNode

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
func newRaftNode(id int, peers []string, join bool, getSnapshot func() ([]byte, error), proposeC <-chan string,
confChangeC <-chan raftpb.ConfChange) (<-chan *string, <-chan error, <-chan *snap.Snapshotter) {

commitC := make(chan *string)
errorC := make(chan error)

rc := &raftNode{
proposeC: proposeC,
confChangeC: confChangeC,
commitC: commitC,
errorC: errorC,
id: id,
peers: peers,
join: join,
waldir: fmt.Sprintf("raftexample-%d", id),
snapdir: fmt.Sprintf("raftexample-%d-snap", id),
getSnapshot: getSnapshot,
snapCount: defaultSnapshotCount,
stopc: make(chan struct{}),
httpstopc: make(chan struct{}),
httpdonec: make(chan struct{}),

snapshotterReady: make(chan *snap.Snapshotter, 1),
// WAL回应后填充的其余结构
}
go rc.startRaft()
return commitC, errorC, rc.snapshotterReady
}

创建一个raft的node节点,然后启用协程开启调用startRaft(),启动raft协议,后面的方法会等待snapshotterReady快照信息准备完毕

main.newRaftNode.startRaft

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
func (rc *raftNode) startRaft() {
// 创建快照目录
if !fileutil.Exist(rc.snapdir) {
if err := os.Mkdir(rc.snapdir, 0750); err != nil {
log.Fatalf("raftexample: cannot create dir for snapshot (%v)", err)
}
}
rc.snapshotter = snap.New(zap.NewExample(), rc.snapdir)
rc.snapshotterReady <- rc.snapshotter

// 是否存在wal日志
oldwal := wal.Exist(rc.waldir)
rc.wal = rc.replayWAL()

rpeers := make([]raft.Peer, len(rc.peers))
// 为遍历每一个peer节点都设置id 从1开始设置
for i := range rpeers {
rpeers[i] = raft.Peer{ID: uint64(i + 1)}
}
// 创建配置文件 弹性时间戳为10 心跳为1 用内存都方式存储 节点之间最大消息大小为1024*1024字节 乐观复制中最大追加消息数为256条 未提交的最大条目数量
c := &raft.Config{
ID: uint64(rc.id),
ElectionTick: 10,
HeartbeatTick: 1,
Storage: rc.raftStorage,
MaxSizePerMsg: 1024 * 1024,
MaxInflightMsgs: 256,
MaxUncommittedEntriesSize: 1 << 30,
}

// 如果存在wal日志
if oldwal {
// 就根据之前的配置来恢复这个节点
rc.node = raft.RestartNode(c)
} else {
// 否则就启动node节点
startPeers := rpeers
// 如果是参加节点,那么当前节点就不启动
if rc.join {
startPeers = nil
}
// 启动node节点
rc.node = raft.StartNode(c, startPeers)
}

// raft传输
rc.transport = &rafthttp.Transport{
Logger: zap.NewExample(),
ID: types.ID(rc.id),
ClusterID: 0x1000,
Raft: rc,
ServerStats: stats.NewServerStats("", ""),
LeaderStats: stats.NewLeaderStats(strconv.Itoa(rc.id)),
ErrorC: make(chan error),
}
// 传输服务启动
rc.transport.Start()
// 添加传输的对等节点,如果不是本身节点,就添加
for i := range rc.peers {
if i+1 != rc.id {
rc.transport.AddPeer(types.ID(i+1), []string{rc.peers[i]})
}
}
// 启动HTTP服务
go rc.serveRaft()
// 开始监听各个channel然后消费
go rc.serveChannels()
}
  1. 是否本地时候存在快照信息
    • 如果有快照就读取之前快照信息
    • 否则就在本地初始化创建一个快照
  2. 是否本地存在wal日志,
    • 如果存在wal日志,就根据之前的配置来恢复这个节点
    • 否则就创建node节点,启动node节点
  3. 启动传输服务启动,并将对等节点放入到要传入的服务中
  4. 启动节点之间需要的tcp连接
  5. 监听各个节点给他发送的消息
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
func (t *Transport) Start() error {
var err error
// stream 流 一般是维护节点状态,以及心跳
t.streamRt, err = newStreamRoundTripper(t.TLSInfo, t.DialTimeout)
if err != nil {
return err
}
// pipeline流 通常传输较大到数据,例如快照
t.pipelineRt, err = NewRoundTripper(t.TLSInfo, t.DialTimeout)
if err != nil {
return err
}
t.remotes = make(map[types.ID]*remote)
t.peers = make(map[types.ID]Peer)
t.pipelineProber = probing.NewProber(t.pipelineRt)
t.streamProber = probing.NewProber(t.streamRt)

if t.DialRetryFrequency == 0 {
t.DialRetryFrequency = rate.Every(100 * time.Millisecond)
}
return nil
}

创建一个传输服务,分为stream类型,和pipeline类型, 其中stream类型是一个长链接,而pipeline类型是一个短链接类型。

1
2
streamRt   http.RoundTripper // roundTripper used by streams
pipelineRt http.RoundTripper // roundTripper used by pipelines
  1. stream类型
    • 用来发送心跳和日志的信息
  2. pipeline类型
    • 用来传输快照
    • 用来发送心跳和日志的信息(仅当stream类型不可用是,才会使用)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
func (rc *raftNode) serveRaft() {
// 解析节点地址
url, err := url.Parse(rc.peers[rc.id-1])
if err != nil {
log.Fatalf("raftexample: Failed parsing URL (%v)", err)
}

// 打开可以停止的tcp连接
ln, err := newStoppableListener(url.Host, rc.httpstopc)
if err != nil {
log.Fatalf("raftexample: Failed to listen rafthttp (%v)", err)
}
//transport.Handler方法放回一个实现来net.http包下的handler接口,调用其服务Serve接口
err = (&http.Server{Handler: rc.transport.Handler()}).Serve(ln)
select {
case <-rc.httpstopc:
default:
log.Fatalf("raftexample: Failed to serve rafthttp (%v)", err)
}
close(rc.httpdonec)
}

时间这个方法只是打开了一个tcp连接,然后将rc.transport.Handler()传入

1
2
3
4
5
6
7
8
9
10
11
func (t *Transport) Handler() http.Handler {
pipelineHandler := newPipelineHandler(t, t.Raft, t.ClusterID)
streamHandler := newStreamHandler(t, t, t.Raft, t.ID, t.ClusterID)
snapHandler := newSnapshotHandler(t, t.Raft, t.Snapshotter, t.ClusterID)
mux := http.NewServeMux()
mux.Handle(RaftPrefix, pipelineHandler)
mux.Handle(RaftStreamPrefix+"/", streamHandler)
mux.Handle(RaftSnapshotPrefix, snapHandler)
mux.Handle(ProbingPrefix, probing.NewHandler())
return mux
}

传入了四种handler,分别对应四种类型

1
2
3
4
RaftPrefix         = "/raft"
ProbingPrefix = path.Join(RaftPrefix, "probing")
RaftStreamPrefix = path.Join(RaftPrefix, "stream")
RaftSnapshotPrefix = path.Join(RaftPrefix, "snapshot")

管道类型,探测类型,流类型,快照类型,至于每一个类型handler的httpServer实现,我这里就不讲了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
func (rc *raftNode) serveChannels() {
// 拿到快照
snap, err := rc.raftStorage.Snapshot()
if err != nil {
panic(err)
}
rc.confState = snap.Metadata.ConfState
rc.snapshotIndex = snap.Metadata.Index
rc.appliedIndex = snap.Metadata.Index

defer rc.wal.Close()

ticker := time.NewTicker(100 * time.Millisecond)
defer ticker.Stop()

//通过raft发送提案
go func() {
confChangeCount := uint64(0)

for rc.proposeC != nil && rc.confChangeC != nil {
select {
// 消费数据
case prop, ok := <-rc.proposeC:
if !ok {
rc.proposeC = nil
} else {
//阻止直到被raft状态机接受
// 调用node的建议操作
rc.node.Propose(context.TODO(), []byte(prop))
}
// 配置变更
case cc, ok := <-rc.confChangeC:
if !ok {
rc.confChangeC = nil
} else {
confChangeCount++
cc.ID = confChangeCount
rc.node.ProposeConfChange(context.TODO(), cc)
}
}
}
//客户关闭渠道;如果还没有关掉raft
close(rc.stopc)
}()

//在raft状态机更新时的事件循环
for {
select {
case <-ticker.C:
rc.node.Tick()

//将raft条目存储到wal,然后通过提交通道发布
case rd := <-rc.node.Ready():
rc.wal.Save(rd.HardState, rd.Entries)
// 如果快照不为空
if !raft.IsEmptySnap(rd.Snapshot) {
// 保存快照状态
rc.saveSnap(rd.Snapshot)
// 应用快照状态
rc.raftStorage.ApplySnapshot(rd.Snapshot)
// 发布快照
rc.publishSnapshot(rd.Snapshot)
}
// 添加条目到本地存储
rc.raftStorage.Append(rd.Entries)
// 发送消息
rc.transport.Send(rd.Messages)
// 发布条目
if ok := rc.publishEntries(rc.entriesToApply(rd.CommittedEntries)); !ok {
// 如果没有成功就停止节点
rc.stop()
return
}
//可能触发快照
rc.maybeTriggerSnapshot()
// 通知进行下一步
rc.node.Advance()

case err := <-rc.transport.ErrorC:
rc.writeError(err)
return

case <-rc.stopc:
rc.stop()
return
}
}
}

其中有有两个 for 循环,一个是来接收提议和配置更改的信息 ,一个是类处理node节点之间的信息

  1. 接收提议和配置更改的信息的for循环
    • case prop, ok := <-rc.proposeC: 然后调用rc.node.Propose(context.TODO(), []byte(prop))发送提议
    • case cc, ok := <-rc.confChangeC: 然后调用rc.node.ProposeConfChange(context.TODO(), cc)发送配置更改
  2. 处理node节点之间的信息的 for 循环
    • case <-ticker.C:使用rc.node.Tick()用来处理超时和心跳的逻辑
    • rd := <-rc.node.Ready():返回当前时间点状态的通道,将条目存储到wal,然后通过提交通道发布,然后调用Advance它准备节点返回下一个可用的Ready。
    • case err := <-rc.transport.ErrorC:使用rc.writeError(err)产生错误就把错误写到通道里
    • case <-rc.stopc: 停止该节点
1
2
3
4
5
6
7
8
func RestartNode(c *Config) Node {
r := newRaft(c)

n := newNode()
n.logger = c.Logger
go n.run(r)
return &n
}

如果重启节点,就从配置中恢复节点的状态

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
// StartNode 返回一个新的Node给定配置和一个raft对等列表。
//它将每个给定对等体的ConfChangeAddNode条目附加到初始日志。
func StartNode(c *Config, peers []Peer) Node {
// 创建一个raft协议
r := newRaft(c)
// 成为第1任期的追随者并应用第1任期的初始配置条目
r.becomeFollower(1, None)
for _, peer := range peers {
cc := pb.ConfChange{Type: pb.ConfChangeAddNode, NodeID: peer.ID, Context: peer.Context}
d, err := cc.Marshal()
if err != nil {
panic("unexpected marshal error")
}
e := pb.Entry{Type: pb.EntryConfChange, Term: 1, Index: r.raftLog.lastIndex() + 1, Data: d}
r.raftLog.append(e)
}
//将这些初始条目标记为已提交。
// TODO(bdarnell):这些条目仍然不稳定;我们需要保留吗?
//提交<unstable的不变量? 持久化存储和非持久化存储,它们之间的分界线就是lastIndex
r.raftLog.committed = r.raftLog.lastIndex()
//现在应用它们,主要是为了让应用程序可以调用Campaign
//在测试中的StartNode之后立即执行。请注意,这些节点将
//被添加到raft两次:此处和应用程序准备就绪
//循环调用ApplyConfChange。必须追求对addNode的调用
//所有对raftLog.append的调用都是在这些之后设置progress.next
// bootstrapping条目(如果我们尝试附加这些条目,则会出错
//条目,因为它们已经提交了)。
//我们没有设置raftLog.applied,所以应用程序就可以了
//通过Ready.CommittedEntries观察所有conf更改。
for _, peer := range peers {
// 添加节点
r.addNode(peer.ID)
}
n := newNode()
n.logger = c.Logger
// 运行raft协议
go n.run(r)
return &n
}
  1. 首先创建一个raft协议,成为第1任期的追随者并应用第1任期的初始配置条目,将领导者设置为空
  2. 并将条目添到wal中
  3. 添加对等节点
  4. 运行run方法,调用处理过程

main.newRaftNode.startRaft.run

该方法是一个相当重要的方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
func (n *node) run(r *raft) {
var propc chan msgWithResult
var readyc chan Ready
var advancec chan struct{}
var prevLastUnstablei, prevLastUnstablet uint64
var havePrevLastUnstablei bool
var prevSnapi uint64
var applyingToI uint64
var rd Ready

// None 是没有领导者时使用的占位符节点ID。
lead := None
prevSoftSt := r.softState()
prevHardSt := emptyState

// raft Node
//raftNode结构体主循环:
//如果proposeC中有数据写入:
//调用node.Propose向raft库提交数据
//如果confChangeC中有数据写入:
//调用node.Node.ProposeConfChange向raft库提交配置变更数据
//如果tick定时器到期:
//调用node.Tick函数进行raft库的定时操作
//如果node.Ready()函数返回的Ready结构体channel有数据变更:
//依次处理Ready结构体中各成员数据
//处理完毕之后调用node.Advance函数进行收尾处理
for {
if advancec != nil {
readyc = nil
} else {
// 这里做一个准备,msg 是从这里开始创建的
rd = newReady(r, prevSoftSt, prevHardSt)
if rd.containsUpdates() {
readyc = n.readyc
} else {
readyc = nil
}
}

if lead != r.lead {
// 如果有leader节点
if r.hasLeader() {
if lead == None {
r.logger.Infof("raft.node: %x elected leader %x at term %d", r.id, r.lead, r.Term)
} else {
r.logger.Infof("raft.node: %x changed leader from %x to %x at term %d", r.id, lead, r.lead, r.Term)
}
// 处理消息结果集
propc = n.propc
// 如果没有leader节点
} else {
r.logger.Infof("raft.node: %x lost leader %x at term %d", r.id, lead, r.Term)
propc = nil
}
// 设置当前的leader鸡诶单
lead = r.lead
}


select {
// TODO:如果存在配置,可能缓冲配置建议(方式
// 在raft文中描述)
// 目前它在静默中被丢弃。
// 从处理结果集中拿到消息
case pm := <-propc:
m := pm.m
m.From = r.id
// 修改状态机的状态
err := r.Step(m)
if pm.result != nil {
pm.result <- err
close(pm.result)
}
// 从接受的消息
case m := <-n.recvc:
// 从未知发件人中筛选出响应消息。
if pr := r.getProgress(m.From); pr != nil || !IsResponseMsg(m.Type) {
// 修改状态机状态
r.Step(m)
}
// 从配置消息中
case cc := <-n.confc:
if cc.NodeID == None {
select {
case n.confstatec <- pb.ConfState{
Nodes: r.nodes(),
Learners: r.learnerNodes()}:
case <-n.done:
}
break
}
switch cc.Type {
// 添加节点
case pb.ConfChangeAddNode:
r.addNode(cc.NodeID)
// 添加学习者节点
case pb.ConfChangeAddLearnerNode:
r.addLearner(cc.NodeID)
// 移除节点
case pb.ConfChangeRemoveNode:
// 删除本地节点时阻止传入的建议
if cc.NodeID == r.id {
propc = nil
}
r.removeNode(cc.NodeID)
// 更新节点
case pb.ConfChangeUpdateNode:
default:
panic("unexpected conf type")
}
select {
case n.confstatec <- pb.ConfState{
Nodes: r.nodes(),
Learners: r.learnerNodes()}:
case <-n.done:
}
// 心跳和选举的timeout
case <-n.tickc:
r.tick()
// Ready是各种准备好的变更
case readyc <- rd:
if rd.SoftState != nil {
prevSoftSt = rd.SoftState
}
if len(rd.Entries) > 0 {
prevLastUnstablei = rd.Entries[len(rd.Entries)-1].Index
prevLastUnstablet = rd.Entries[len(rd.Entries)-1].Term
havePrevLastUnstablei = true
}
if !IsEmptyHardState(rd.HardState) {
prevHardSt = rd.HardState
}
if !IsEmptySnap(rd.Snapshot) {
prevSnapi = rd.Snapshot.Metadata.Index
}
if index := rd.appliedCursor(); index != 0 {
applyingToI = index
}

r.msgs = nil
r.readStates = nil
r.reduceUncommittedSize(rd.CommittedEntries)
advancec = n.advancec
// 确认Ready已经处理完的
case <-advancec:
if applyingToI != 0 {
r.raftLog.appliedTo(applyingToI)
applyingToI = 0
}
if havePrevLastUnstablei {
r.raftLog.stableTo(prevLastUnstablei, prevLastUnstablet)
havePrevLastUnstablei = false
}
r.raftLog.stableSnapTo(prevSnapi)
advancec = nil
// 状态变更的消息
case c := <-n.status:
c <- getStatus(r)
// 是否有停止节点的消息
case <-n.stop:
close(n.done)
return
}
}
}

raftNode结构体主循环:

  1. 如果proposeC中有数据写入(外部通信):调用状态机进行处理
  2. 如果recvc中有数据写入(内部通信),调用状态机进行处理
  3. 如果confChangeC中有数据写入:调用node.Node.ProposeConfChange向raft库提交配置变更数据
  4. 如果tick定时器到期:,调用node.Tick函数进行raft库的定时操作
  5. 如果node.Ready()函数返回的Ready结构体channel有数据变更:依次处理Ready结构体中各成员数据
  6. 处理完毕之后调用node.Advance函数,进行持久化或者快照操作
  7. 如果状态有变更就变更状态
  8. 监听节点是否停止的消息

main.newRaftNode.startRaft.Step

状态机器处理过程,这是raft的核心逻辑

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
// 状态机器理过程
func (r *raft) Step(m pb.Message) error {
//处理消息任期,这可能导致我们踩到追随者。
switch {
// 如果任期为0
case m.Term == 0:
// local message
// 如果传入消息的任期大于当前节点的任期
//1.首先该函数会判断msg.Term是否大于本节点的Term,如果消息的任期号更大则说明是一次新的选举。这种情况下将根据msg.Context是否等于“CampaignTransfer”字符串来确定是不是一次由于leader迁移导致的强制选举过
// 程。同时也会根据当前的electionElapsed是否小于electionTimeout来确定是否还在租约期以内。如果既不是强制leader选举又在租约期以内,那么节点将忽略该消息的处理,在论文4.2.3部分论述这样做的原因,是为了避免
// 已经离开集群的节点在不知道自己已经不在集群内的情况下,仍然频繁的向集群内节点发起选举导致耗时在这种无效的选举流程中。如果以上检查流程通过了,说明可以进行选举了,如果消息类型还不是MsgPreVote类型,那么此时节
// 点会切换到follower状态且认为发送消息过来的节点msg.From是新的leader。
case m.Term > r.Term:
// 如果是 投票或者 与投票类型
if m.Type == pb.MsgVote || m.Type == pb.MsgPreVote {
// 当context为campaignTransfer时表示强制要求进行竞选
force := bytes.Equal(m.Context, []byte(campaignTransfer))
// 判断当前是否在租约期以内,判断的条件包括:checkQuorum为true,当前节点保存的leader不为空,没有到选举超时,前面这三个条件同时满足。
inLease := r.checkQuorum && r.lead != None && r.electionElapsed < r.electionTimeout
if !force && inLease {
// 如果非强制,而且又在租约期以内,就不做任何处理
// 非强制又在租约期内可以忽略选举消息,见论文的4.2.3,这是为了阻止已经离开集群的节点再次发起投票请求
// If a server receives a RequestVote request within the minimum election timeout
// of hearing from a current leader, it does not update its term or grant its vote
r.logger.Infof("%x [logterm: %d, index: %d, vote: %x] ignored %s from %x [logterm: %d, index: %d] at term %d: lease is not expired (remaining ticks: %d)",
r.id, r.raftLog.lastTerm(), r.raftLog.lastIndex(), r.Vote, m.Type, m.From, m.LogTerm, m.Index, r.Term, r.electionTimeout-r.electionElapsed)
return nil
}
}
// 如果是与投票类型
switch {
case m.Type == pb.MsgPreVote:
// 在应答一个prevote消息时不对任期term做修改
// Never change our term in response to a PreVote
case m.Type == pb.MsgPreVoteResp && !m.Reject:
//我们将在未来发送带有期限的投票前请求。如果
//投票前获得批准,当我们获得投票时,我们将增加我们的期限
//法定人数如果不是,则该任期来自节点
//拒绝了我们的投票,所以我们应该成为新的追随者期限
default:
r.logger.Infof("%x [term: %d] received a %s message with higher term from %x [term: %d]",
r.id, r.Term, m.Type, m.From, m.Term)
// 如果是日志复制复制,或者心跳,或者快照信息 认为发送过来的节点是新的leader
if m.Type == pb.MsgApp || m.Type == pb.MsgHeartbeat || m.Type == pb.MsgSnap {
// 成为发送消息方等的跟随者
r.becomeFollower(m.Term, m.From)
} else {
// 当前节点领导者设置为空,并更新任期为传入消息的任期,因为传入消息任期是大的
r.becomeFollower(m.Term, None)
}
}
// 如果传入消息的任期小于当前节点的任期
case m.Term < r.Term:
// check quorum eader 向集群的所有节点发起广播,如果还能收到大多数节点的响应,处理读请求。
if (r.checkQuorum || r.preVote) && (m.Type == pb.MsgHeartbeat || m.Type == pb.MsgApp) {
//我们收到了来自低任期领导的消息。有可能的
//这些消息只是在网络中被延迟了,但是这可以
//也表示此节点在网络中提升了其任期编号
//分区,它现在无法赢得选举或重新加入
//旧词的多数。如果checkQuorum为false,则为
//通过递增任期编号来响应MsgVote来处理
//更高的任期,但如果checkQuorum为真,我们可能无法推进该任期
// MsgVote并且必须生成其他消息以推进该任期。互联网
//这两个功能的结果是最小化由中断引起的
//已从群集配置中删除的节点:a
//删除的节点将发送将被忽略的MsgVotes(或MsgPreVotes),
//但它不会收到MsgApp或MsgHeartbeat,所以它不会创建
//通过通知领导者此节点的活动性来增加破坏性任期。
//以上评论也适用于预投票
//
//当追随者被隔离时,很快就会开始选举结束
//比起领导者更高的学期,虽然不会得到足够的
//投票赢得大选。当它重新获得连接时,这种反应
//使用更高级别的“pb.MsgAppResp”会迫使领导者下台。
//但是,这种中断是不可避免的
//新选举这可以通过预投票阶段来预防。
r.send(pb.Message{To: m.From, Type: pb.MsgAppResp})
} else if m.Type == pb.MsgPreVote {
//在预投票启用之前,可能有更高期限的候选人,
//但更少的日志。更新到Pre-Vote后,群集可能会死锁
//我们删除较低期限的邮件。
r.logger.Infof("%x [logterm: %d, index: %d, vote: %x] rejected %s from %x [logterm: %d, index: %d] at term %d",
r.id, r.raftLog.lastTerm(), r.raftLog.lastIndex(), r.Vote, m.Type, m.From, m.LogTerm, m.Index, r.Term)
r.send(pb.Message{To: m.From, Term: r.Term, Type: pb.MsgPreVoteResp, Reject: true})
} else {
//忽略其他情况
r.logger.Infof("%x [term: %d] ignored a %s message with lower term from %x [term: %d]",
r.id, r.Term, m.Type, m.From, m.Term)
}
return nil
}

switch m.Type {
// 选举信息
case pb.MsgHup:
if r.state != StateLeader {
ents, err := r.raftLog.slice(r.raftLog.applied+1, r.raftLog.committed+1, noLimit)
if err != nil {
r.logger.Panicf("unexpected error getting unapplied entries (%v)", err)
}
if n := numOfPendingConf(ents); n != 0 && r.raftLog.committed > r.raftLog.applied {
r.logger.Warningf("%x cannot campaign at term %d since there are still %d pending configuration changes to apply", r.id, r.Term, n)
return nil
}

r.logger.Infof("%x is starting a new election at term %d", r.id, r.Term)
if r.preVote {
r.campaign(campaignPreElection)
} else {
r.campaign(campaignElection)
}
} else {
r.logger.Debugf("%x ignoring MsgHup because already leader", r.id)
}
//2.在raft.Step函数的后面,会判断消息类型是MsgVote或者MsgPreVote来进一步进行处理。其判断条件是以下两个条件同时成立:

//2.1.当前没有给任何节点进行过投票(r.Vote == None ),或者消息的任期号更大(m.Term > r.Term ),或者是之前已经投过票的节点(r.Vote == m.From))。这个条件是检查是否可以还能给该节点投票。
//2.2.同时该节点的日志数据是最新的(r.raftLog.isUpToDate(m.Index, m.LogTerm) )。这个条件是检查这个节点上的日志数据是否足够的新。 只有在满足以上两个条件的情况下,节点才投票给这个消息节点,将修改raft.Vote为消息发送者ID。如果不满足条件,将应答msg.Reject=true,拒绝该节点的投票消息。
case pb.MsgVote, pb.MsgPreVote:
//如果是在续租之内那么就忽视
if r.isLearner {
// TODO:学习者可能需要投票,如果节点在交换时失败。
r.logger.Infof("%x [logterm: %d, index: %d, vote: %x] ignored %s from %x [logterm: %d, index: %d] at term %d: learner can not vote",
r.id, r.raftLog.lastTerm(), r.raftLog.lastIndex(), r.Vote, m.Type, m.From, m.LogTerm, m.Index, r.Term)
return nil
}
//之前已经投过票的节点
canVote := r.Vote == m.From ||
// 当前没有给任何节点进行过投票,并且没有领导者......
(r.Vote == None && r.lead == None) ||
// 消息i和一个预投票,并且消息的任期号更大
(m.Type == pb.MsgPreVote && m.Term > r.Term)
// r.raftLog.isUpToDate该节点的日志数据是最新的
if canVote && r.raftLog.isUpToDate(m.Index, m.LogTerm) {
r.logger.Infof("%x [logterm: %d, index: %d, vote: %x] cast %s for %x [logterm: %d, index: %d] at term %d",
r.id, r.raftLog.lastTerm(), r.raftLog.lastIndex(), r.Vote, m.Type, m.From, m.LogTerm, m.Index, r.Term)
r.send(pb.Message{To: m.From, Term: m.Term, Type: voteRespMsgType(m.Type)})
if m.Type == pb.MsgVote {
r.electionElapsed = 0

r.Vote = m.From
}
} else {
r.logger.Infof("%x [logterm: %d, index: %d, vote: %x] rejected %s from %x [logterm: %d, index: %d] at term %d",
r.id, r.raftLog.lastTerm(), r.raftLog.lastIndex(), r.Vote, m.Type, m.From, m.LogTerm, m.Index, r.Term)
// 否则拒绝投票
r.send(pb.Message{To: m.From, Term: r.Term, Type: voteRespMsgType(m.Type), Reject: true})
}

default:
err := r.step(r, m)
if err != nil {
return err
}
}
return nil
}

当条件是:

  1. 如果任期为0,不做处理
  2. 如果消息任期大于当前节点的任期
    • 如果是预投票或者和投票类型
      • 如果非强制,而且又在租约期以内,就不做任何处理(见论文的4.2.3,这是为了阻止已经离开集群的节点再次发起投票请求) ⚠️结束
    • 如果是MsgPreVote类型,在应答一个prevote消息时不对任期term做修改 (防止分区导致的,领导人重新选举)
    • 如果是MsgPreVoteResp类型并且没有拒绝
    • 如果上面两者都不是的话
      • 如果是领导者给跟随者发的消息或者收到了心跳或者收到了快照信息,就将当前节点,设置为跟随者。(根据论文 5.2 领导人选举 这一节)
      • 否则就将领导者设置为空,因为不满足上面的条件不应当处理
  3. 如果传入消息的任期小于当前节点的任期
    • 在等待投票的时候,候选人可能会从其他的服务器接收到声明它是领导人的附加日志项 RPC。如果这个领导人的任期号(包含在此次的 RPC中)不小于候选人当前的任期号,那么候选人会承认领导人合法并回到跟随者状态 (根据论文 5.2 领导人选举 这一节)
    • 如果是MsgPreVote类型,会拒绝,因为候选人的任期没有当前节点的任期大,日志不是最新的。
    • 否则忽略其他类型 ⚠️结束
  4. 如果是选举类型 (该类型只会由本节点发送给自己)
    • 如果状态不是领导者
      • 如果待处理的配置更改要应用,因此无法进行选举 ⚠️结束
      • 如果状态是预投票,就开始预选举
      • 如果状态是正式投票,就开始正式选举
    • 如果是领导者就是忽略这条消息
  5. 如果是预投票或者和投票类型
    • 如果该节点是学习者,学习者不能投票 ⚠️结束
    • 如果 (1.)之前已经投过票的节点 或者 2.)当前没有给任何节点进行过投票,并且没有领导者 或者 3.)消息是预投票,并且消息的任期号更大)并且(该节点的日志数据是最新的) 就投票给这个消息节点,将修改raft.Vote为消息发送者ID
    • 否则 将应答msg.Reject=true,拒绝该节点的投票消息
    1. 如果上面两种状态都不是,就进入特有身份处理步骤中
    • stepLeader
    • stepCandidate
    • stepFollower
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
func stepLeader(r *raft, m pb.Message) error {
//这些消息类型不需要m.From的任何进展。
switch m.Type {
case pb.MsgBeat:
r.bcastHeartbeat()
return nil
// leader的定时器函数,在超过选举时间时,如果当前打开了raft.checkQuorum开关,那么leader将给自己发送一条MsgCheckQuorum消息,对该消息的处理是:
// 检查集群中所有节点的状态,如果超过半数的节点都不活跃了,那么leader也切换到follower状态。
case pb.MsgCheckQuorum:
if !r.checkQuorumActive() {
r.logger.Warningf("%x stepped down to follower since quorum is not active", r.id)
r.becomeFollower(r.Term, None)
}
return nil

// raft库的使用者向raft库propose数据时,最后会封装成这个类型的消息来进行提交,不同类型的节点处理还不尽相同。
case pb.MsgProp:
if len(m.Entries) == 0 {
r.logger.Panicf("%x stepped empty MsgProp", r.id)
}
if _, ok := r.prs[r.id]; !ok {
//如果我们当前不是范围的成员(即此节点)
//作为领导者从配置中删除了),
//删除任何新提案。
return ErrProposalDropped
}
if r.leadTransferee != None {
r.logger.Debugf("%x [term %d] transfer leadership to %x is in progress; dropping proposal", r.id, r.Term, r.leadTransferee)
return ErrProposalDropped
}

for i, e := range m.Entries {
if e.Type == pb.EntryConfChange {
if r.pendingConfIndex > r.raftLog.applied {
r.logger.Infof("propose conf %s ignored since pending unapplied configuration [index %d, applied %d]",
e.String(), r.pendingConfIndex, r.raftLog.applied)
m.Entries[i] = pb.Entry{Type: pb.EntryNormal}
} else {
r.pendingConfIndex = r.raftLog.lastIndex() + uint64(i) + 1
}
}
}

if !r.appendEntry(m.Entries...) {
return ErrProposalDropped
}
r.bcastAppend()
return nil
// 其中,entries数组只会有一条数据,带上的是应用层此次请求的标识数据,在follower收到MsgReadIndex消息进行应答时,同样需要把这个数据原样带回返回给leader,详细的线性读一致性的实现在后面展开分析。

case pb.MsgReadIndex:
if r.quorum() > 1 {
// 首先如果该leader在成为新的leader之后没有提交过任何值,那么会直接返回不做处理。
if r.raftLog.zeroTermOnErrCompacted(r.raftLog.term(r.raftLog.committed)) != r.Term {
//当此领导者未在其任期内提交任何日志条目时,拒绝只读请求。
return nil
}

//思考:使用一个内部定义的上下文而不是用户给定的上下文。
//我们可以用任期和索引来表示,而不是用户提供的值。
//这将允许多次读取捎带在同一条消息上。
switch r.readOnly.option {
case ReadOnlySafe:
// 保存该MsgreadIndex请求到来时的commit索引。
r.readOnly.addRequest(r.raftLog.committed, m)
// 向集群中所有其他节点广播一个心跳消息MsgHeartbeat,并且在其中带上该读请求的唯一标识。
r.bcastHeartbeatWithCtx(m.Entries[0].Data)
case ReadOnlyLeaseBased:
ri := r.raftLog.committed
if m.From == None || m.From == r.id { // from local member
r.readStates = append(r.readStates, ReadState{Index: r.raftLog.committed, RequestCtx: m.Entries[0].Data})
} else {
r.send(pb.Message{To: m.From, Type: pb.MsgReadIndexResp, Index: ri, Entries: m.Entries})
}
}
} else { // there is only one voting member (the leader) in the cluster
if m.From == None || m.From == r.id { // from leader itself
r.readStates = append(r.readStates, ReadState{Index: r.raftLog.committed, RequestCtx: m.Entries[0].Data})
} else { //来自学习者成员
r.send(pb.Message{To: m.From, Type: pb.MsgReadIndexResp, Index: r.raftLog.committed, Entries: m.Entries})
}
}

return nil
}

// All other message types require a progress for m.From (pr).
pr := r.getProgress(m.From)
if pr == nil {
r.logger.Debugf("%x no progress available for %x", r.id, m.From)
return nil
}
switch m.Type {
//在节点收到leader的MsgApp/MsgSnap消息时,可能出现leader上的数据与自身节点数据不一致的情况,这种情况下会返回reject为true的MsgAppResp消息,同时rejectHint字段是本节点raft最后一条日志的索引ID。
//而index字段则返回的是当前节点的日志索引ID,用于向leader汇报自己已经commit的日志数据ID,这样leader就知道下一次同步数据给这个节点时,从哪条日志数据继续同步了。
//leader节点在收到MsgAppResp消息的处理流程大体如下(stepLeader函数中MsgAppResp case的处理流程)。
//首先,收到节点的MsgAppResp消息,说明该节点是活跃的,因此保存节点状态的RecentActive成员置为true。
//接下来,再根据msg.Reject的返回值,即节点是否拒绝了这次数据同步,来区分两种情况进行处理。

//msg.Reject为true的情况

//如果msg.Reject为true,说明节点拒绝了前面的MsgApp/MsgSnap消息,根据msg.RejectHint成员回退leader上保存的关于该节点的日志记录状态。比如leader前面认为从日志索引为10的位置开始向节点A同步数据,但是节点A拒绝了这次数据同步,同时返回RejectHint为2,说明节点A告知leader在它上面保存的最大日志索引ID为2,这样下一次leader就可以直接从索引为2的日志数据开始同步数据到节点A。而如果没有这个RejectHint成员,leader只能在每次被拒绝数据同步后都递减1进行下一次数据同步,显然这样是低效的。
//
//1.因为上面节点拒绝了这次数据同步,所以节点的状态可能存在一些异常,此时如果leader上保存的节点状态为ProgressStateReplicate,那么将切换到ProgressStateProbe状态(关于这几种状态,下面会谈到)。
//
//2.前面已经按照msg.RejectHint修改了leader上关于该节点日志状态的索引数据,接着再次尝试按照这个新的索引数据向该节点再次同步数据。

//msg.Reject为false的情况

//这种情况说明这个节点通过了leader的这一次数据同步请求,这种情况下根据msg.Index来判断在leader中保存的该节点日志数据索引是否发生了更新,如果发生了更新那么就说明这个节点通过了新的数据,这种情况下会做以下的几个操作。
//
//1.修改节点状态
//
//1.1如果该节点之前在ProgressStateProbe状态,说明之前处于探测状态,此时可以切换到ProgressStateReplicate,开始正常的接收leader的同步数据了。
//1.2如果之前处于ProgressStateSnapshot状态,即还在同步副本,说明节点之前可能落后leader数据比较多才采用了接收副本的状态。这里还需要多做一点解释,因为在节点落后leader数据很多的情况下,可能leader会多次通过snapshot同步数据给节点,而当 pr.Match >= pr.PendingSnapshot的时候,说明通过快照来同步数据的流程完成了,这时可以进入正常的接收同步数据状态了,这就是函数Progress.needSnapshotAbort要做的判断。
//1.3.如果之前处于ProgressStateReplicate状态,此时可以修改leader关于这个节点的滑动窗口索引,释放掉这部分数据索引,好让节点可以接收新的数据了。关于这个滑动窗口设计,见下面详细解释。
//2.判断是否有新的数据可以提交(commit)了。因为raft的提交数据的流程是这样的:首先节点将数据提议(propose)给leader,leader在将数据写入到自己的日志成功之后,再通过MsgApp把这些提议的数据广播给集群中的其他节点,在某一条日志数据收到超过半数(qurom)的节点同意之后,才认为是可以提交(commit)的。因此每次leader节点在收到一条MsgAppResp类型消息,同时msg.Reject又是false的情况下,都需要去检查当前有哪些日志是超过半数的节点同意的,再将这些可以提交(commit)的数据广播出去。而在没有数据可以提交的情况下,如果之前节点处于暂停状态,那么将继续向该节点同步数据。
//
//3.最后还要做一个跟leader迁移相关的操作。如果该消息节点是准备迁移过去的新leader节点(raft.leadTransferee == msg.From),而且此时该节点上的Match索引已经跟旧的leader的日志最大索引一致,说明新旧节点的日志数据已经同步,可以正式进行集群leader迁移操作了。
case pb.MsgAppResp:
pr.RecentActive = true

if m.Reject {
r.logger.Debugf("%x received msgApp rejection(lastindex: %d) from %x for index %d",
r.id, m.RejectHint, m.From, m.Index)
if pr.maybeDecrTo(m.Index, m.RejectHint) {
r.logger.Debugf("%x decreased progress of %x to [%s]", r.id, m.From, pr)
if pr.State == ProgressStateReplicate {
pr.becomeProbe()
}
r.sendAppend(m.From)
}
} else {
oldPaused := pr.IsPaused()
if pr.maybeUpdate(m.Index) {
switch {
case pr.State == ProgressStateProbe:
pr.becomeReplicate()
case pr.State == ProgressStateSnapshot && pr.needSnapshotAbort():
r.logger.Debugf("%x snapshot aborted, resumed sending replication messages to %x [%s]", r.id, m.From, pr)
// Transition back to replicating state via probing state
// (which takes the snapshot into account). If we didn't
// move to replicating state, that would only happen with
// the next round of appends (but there may not be a next
// round for a while, exposing an inconsistent RaftStatus).
pr.becomeProbe()
pr.becomeReplicate()
case pr.State == ProgressStateReplicate:
pr.ins.freeTo(m.Index)
}

if r.maybeCommit() {
r.bcastAppend()
} else if oldPaused {
// If we were paused before, this node may be missing the
// latest commit index, so send it.
r.sendAppend(m.From)
}
// We've updated flow control information above, which may
// allow us to send multiple (size-limited) in-flight messages
// at once (such as when transitioning from probe to
// replicate, or when freeTo() covers multiple messages). If
// we have more entries to send, send as many messages as we
// can (without sending empty messages for the commit index)
for r.maybeSendAppend(m.From, false) {
}
// Transfer leadership is in progress.
if m.From == r.leadTransferee && pr.Match == r.raftLog.lastIndex() {
r.logger.Infof("%x sent MsgTimeoutNow to %x after received MsgAppResp", r.id, m.From)
r.sendTimeoutNow(m.From)
}
}
}
//leader中会定时向集群中其他节点发送心跳消息,该消息的作用除了探测节点的存活情况之外,还包括:
//
//commit成员:leader选择min[节点上的Match,leader日志最大提交索引],用于告知节点哪些日志可以进行提交(commit)。
//context:与线性一致性读相关,后面会进行解释。
case pb.MsgHeartbeatResp:
pr.RecentActive = true
pr.resume()

// free one slot for the full inflights window to allow progress.
if pr.State == ProgressStateReplicate && pr.ins.full() {
pr.ins.freeFirstOne()
}
if pr.Match < r.raftLog.lastIndex() {
r.sendAppend(m.From)
}
// leader在接收到MsgHeartbeatResp消息后,如果其中有ctx字段,说明该MsgHeartbeatResp消息对应的MsgHeartbeat消息,是收到ReadIndex时leader消息为了确认自己还是集群leader发送的心跳消息
if r.readOnly.option != ReadOnlySafe || len(m.Context) == 0 {
return nil
}
// 首先会调用r.readOnly.recvAck(m)函数,根据消息中的ctx字段,到全局的pendingReadIndex中查找是否有保存该ctx的带处理的readIndex请求,如果有就在acks map中记录下该follower已经进行了应答。
ackCount := r.readOnly.recvAck(m)
// 当ack数量超过了集群半数时,意味着该leader仍然还是集群的leader,此时调用r.readOnly.advance(m)函数
if ackCount < r.quorum() {
return nil
}

// 将该readIndex之前的所有readIndex请求都认为是已经成功进行确认的了,所有成功确认的readIndex请求,将会加入到readStates数组中,同时leader也会向follower发送MsgReadIndexResp。
rss := r.readOnly.advance(m)
for _, rs := range rss {
req := rs.req
if req.From == None || req.From == r.id { // from local member
r.readStates = append(r.readStates, ReadState{Index: rs.index, RequestCtx: req.Entries[0].Data})
} else {
r.send(pb.Message{To: req.From, Type: pb.MsgReadIndexResp, Index: rs.index, Entries: req.Entries})
}
}

//仅leader处理这类消息:
//1.如果reject为false:表示接收快照成功,将切换该节点状态到探测状态。
//2.否则接收失败。
case pb.MsgSnapStatus:
if pr.State != ProgressStateSnapshot {
return nil
}
if !m.Reject {
pr.becomeProbe()
r.logger.Debugf("%x snapshot succeeded, resumed sending replication messages to %x [%s]", r.id, m.From, pr)
} else {
pr.snapshotFailure()
pr.becomeProbe()
r.logger.Debugf("%x snapshot failed, resumed sending replication messages to %x [%s]", r.id, m.From, pr)
}
//如果快照完成,请在发送之前等待来自远程节点的msgAppResp
//淘汰下一个msgApp。
//如果快照失败,请在下次尝试之前等待心跳间隔
pr.pause()
// 仅leader才处理这类消息,leader如果判断该节点此时处于正常接收数据的状态(ProgressStateReplicate),那么就切换到探测状态。
case pb.MsgUnreachable:
//在乐观复制期间,如果远程无法访问,
// MsgApp很有可能丢失。
if pr.State == ProgressStateReplicate {
pr.becomeProbe()
}
r.logger.Debugf("%x failed to send message to %x because it is unreachable [%s]", r.id, m.From, pr)
//3.这类消息follower将转发给leader处理,因为follower并没有修改集群配置状态的权限。
//leader在收到这类消息时,是以下的处理流程。
//3.1.如果当前的raft.leadTransferee成员不为空,说明有正在进行的leader迁移流程。此时会判断是否与这次迁移是同样的新leader ID,如果是则忽略该消息直接返回;否则将终止前面还没有完毕的迁移流程。
//3.2.如果这次迁移过去的新节点,就是当前的leader ID,也直接返回不进行处理。
//到了这一步就是正式开始这一次的迁移leader流程了,一个节点能成为一个集群的leader,其必要条件是上面的日志与当前leader的一样多,所以这里会判断是否满足这个条件,如果满足那么发送MsgTimeoutNow消息给新的leader通知该节点进行leader迁移,否则就先进行日志同步操作让新的leader追上旧leader的日志数据。
case pb.MsgTransferLeader:
// 如果是学习者 就不能进行转发给leader
if pr.IsLearner {
r.logger.Debugf("%x is learner. Ignored transferring leadership", r.id)
return nil
}
leadTransferee := m.From
lastLeadTransferee := r.leadTransferee
if lastLeadTransferee != None {
// 判断是否已经有相同节点的leader转让流程在进行中
if lastLeadTransferee == leadTransferee {
r.logger.Infof("%x [term %d] transfer leadership to %x is in progress, ignores request to same node %x",
r.id, r.Term, leadTransferee, leadTransferee)
// 如果是,直接返回
return nil
}
// 否则中断之前的转让流程
r.abortLeaderTransfer()
r.logger.Infof("%x [term %d] abort previous transferring leadership to %x", r.id, r.Term, lastLeadTransferee)
}

// 判断是否转让过来的leader是否本节点,如果是也直接返回,因为本节点已经是leader了
if leadTransferee == r.id {
r.logger.Debugf("%x is already leader. Ignored transferring leadership to self", r.id)
return nil
}
//将领导权转移给第三方。
r.logger.Infof("%x [term %d] starts to transfer leadership to %x", r.id, r.Term, leadTransferee)
//转移领导应该在一个electionTimeout中完成,所以重置r.electionElapsed。
r.electionElapsed = 0
r.leadTransferee = leadTransferee
if pr.Match == r.raftLog.lastIndex() {
// 如果日志已经匹配了,那么就发送timeoutnow协议过去
r.sendTimeoutNow(leadTransferee)
r.logger.Infof("%x sends MsgTimeoutNow to %x immediately as %x already has up-to-date log", r.id, leadTransferee, leadTransferee)
} else {
// 否则继续追加日志
r.sendAppend(leadTransferee)
}
}
return nil
}
  1. case pb.MsgBeat:向所有跟随者,广播心跳,⚠️结束
  2. case pb.MsgCheckQuorum: 检查是否有一半以上的跟随者在自己的状态机中处于活跃,⚠️结束
  3. case pb.MsgProp:raft库的使用者向raft库propose数据时,最后会封装成这个类型的消息来进行提交,不同类型的节点处理还不尽相同。⚠️结束
  4. case pb.MsgReadIndex:
    1. 如果总节点人数大于一个,也就是除了自己还有其他节点
      1. 首先如果该leader在成为新的leader之后没有提交过任何值,那么会直接返回不做处理。
      2. 然后检查只读类型是ReadOnlySafe还是ReadOnlyLeaseBased,
        • 如果是ReadOnlySafe, 保存该MsgreadIndex请求到来时的commit索引,向集群中所有其他节点广播一个心跳消息MsgHeartbeat,并且在其中带上该读请求的唯一标识。
        • 如果是ReadOnlyLeaseBased
          • 如果消息是当前成员,如果没有提交过任何数据,那么在它所在的这个任期(term)内的commit索引当时是并不知道的,因此在成为leader之后,需要马上提交一个no-op的空日志,这样拿到该任期的第一个commit索引。
          • 否则就发送消息回应给跟随者
    2. 否则
      • 如果消息是当前成员,如果没有提交过任何数据,那么在它所在的这个任期(term)内的commit索引当时是并不知道的,因此在成为leader之后,需要马上提交一个no-op的空日志,这样拿到该任期的第一个commit索引。
      • 否则就回应该消息,因为是因为是来自学习者
        1. ⚠️结束
  5. 获取跟随者的进度
  6. case pb.MsgAppResp:
    1. msg.Reject为true的情况,说明节点拒绝了前面的MsgApp/MsgSnap消息,根据msg.RejectHint成员回退leader上保存的关于该节点的日志记录状态。比如leader前面认为从日志索引为10的位置开始向节点A同步数据,但是节点A拒绝了这次数据同步,同时返回RejectHint为2,说明节点A告知leader在它上面保存的最大日志索引ID为2,这样下一次leader就可以直接从索引为2的日志数据开始同步数据到节点A。而如果没有这个RejectHint成员,leader只能在每次被拒绝数据同步后都递减1进行下一次数据同步,显然这样是低效的。
      1. 因为上面节点拒绝了这次数据同步,所以节点的状态可能存在一些异常,此时如果leader上保存的节点状态为ProgressStateReplicate,那么将切换到ProgressStateProbe状态(关于这几种状态,下面会谈到)。
      2. 前面已经按照msg.RejectHint修改了leader上关于该节点日志状态的索引数据,接着再次尝试按照这个新的索引数据向该节点再次同步数据。
    2. msg.Reject为false的情况
      1. 更新进度,如果不是过时的
        1. 如果该节点之前在ProgressStateProbe状态,说明之前处于探测状态,此时可以切换到ProgressStateReplicate,开始正常的接收leader的同步数据了。
        2. 如果之前处于ProgressStateSnapshot状态,即还在同步副本,说明节点之前可能落后leader数据比较多才采用了接收副本的状态。这里还需要多做一点解释,因为在节点落后leader数据很多的情况下,可能leader会多次通过snapshot同步数据给节点,而当 pr.Match >= pr.PendingSnapshot的时候,说明通过快照来同步数据的流程完成了,这时可以进入正常的接收同步数据状态了,这就是函数Progress.needSnapshotAbort要做的判断。
        3. 如果之前处于ProgressStateReplicate状态,此时可以修改leader关于这个节点的滑动窗口索引,释放掉这部分数据索引,好让节点可以接收新的数据了。关于这个滑动窗口设计,见下面详细解释。
    3. 判断是否有新的数据可以提交(commit)了。因为raft的提交数据的流程是这样的:首先节点将数据提议(propose)给leader,leader在将数据写入到自己的日志成功之后,再通过MsgApp把这些提议的数据广播给集群中的其他节点,在某一条日志数据收到超过半数(qurom)的节点同意之后,才认为是可以提交(commit)的。因此每次leader节点在收到一条MsgAppResp类型消息,同时msg.Reject又是false的情况下,都需要去检查当前有哪些日志是超过半数的节点同意的,再将这些可以提交(commit)的数据广播出去。而在没有数据可以提交的情况下,如果之前节点处于暂停状态,那么将继续向该节点同步数据。
    4. 最后还要做一个跟leader迁移相关的操作。如果该消息节点是准备迁移过去的新leader节点(raft.leadTransferee == msg.From),而且此时该节点上的Match索引已经跟旧的leader的日志最大索引一致,说明新旧节点的日志数据已经同步,可以正式进行集群leader迁移操作了。
  7. case pb.MsgHeartbeatResp:
    1. 将进度设置为活跃
    2. 为完整滑动窗口释放一个插槽以允许进度
    3. 如果消息节点是日志索引是落后的就发送追加
    4. leader在接收到MsgHeartbeatResp消息后,如果其中有ctx字段,说明该MsgHeartbeatResp消息对应的MsgHeartbeat消息,是收到ReadIndex时leader消息为了确认自己还是集群leader发送的心跳消息
    5. 通知raft状态机收到的只读结构对只读请求附加的心跳的确认上下文,根据消息中的ctx字段,到全局的pendingReadIndex中查找是否有保存该ctx的带处理的readIndex请求,如果有就在acks map中记录下该follower已经进行了应答
    6. 当ack数量超过了集群半数时,意味着该leader仍然还是集群的leader,此时调用r.readOnly.advance(m)函数
    7. 将该readIndex之前的所有readIndex请求都认为是已经成功进行确认的了,所有成功确认的readIndex请求,将会加入到readStates数组中,同时leader也会向follower发送MsgReadIndexResp。
  8. case pb.MsgSnapStatus:仅leader处理这类消息
    1. 如果reject为false:表示接收快照成功,将切换该节点状态到探测状态。
    2. 否则接收失败,将切换该节点状态到探测状态。
    3. 当Paused为true时,raft应暂停向此对等方发送复制消息。
  9. case pb.MsgUnreachable:在乐观复制期间,如果远程无法访问,MsgApp很有可能丢失。
    1. 如果远程节点状态变为复制状态,就变为探测状态
  10. case pb.MsgTransferLeader:这类消息follower将转发给leader处理,因为follower并没有修改集群配置状态的权限。

    1. 如果是学习者 就不能进行转发给leader
    2. 如果当前的raft.leadTransferee成员不为空,说明有正在进行的leader迁移流程。此时会判断是否与这次迁移是同样的新leader ID,如果是则忽略该消息直接返回;否则将终止前面还没有完毕的迁移流程。
    3. 判断是否转让过来的leader是否本节点,如果是也直接返回,因为本节点已经是leader了
    4. 如果日志已经匹配了,那么就发送timeoutnow协议过去
    5. 否则继续追加日志到新的领导者
      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      29
      30
      31
      32
      33
      34
      35
      36
      37
      38
      39
      40
      41
      42
      43
      44
      45
      46
      47
      48
      49
      50
      51
      52
      53
      // stepCandidate 由StateCandidate和StatePreCandidate共享;不同的是
      //它们是否响应MsgVoteResp或MsgPreVoteResp。
      func stepCandidate(r *raft, m pb.Message) error {
      // 只处理与我们的候选资格相对应的投票回复 (当在StateCandidate, 在这个任期中,我们可能会收到陈旧的MsgPreVoteResp消息从我们的 pre-candidate 状态).
      var myVoteRespType pb.MessageType
      if r.state == StatePreCandidate {
      myVoteRespType = pb.MsgPreVoteResp
      } else {
      myVoteRespType = pb.MsgVoteResp
      }
      switch m.Type {
      case pb.MsgProp:
      r.logger.Infof("%x no leader at term %d; dropping proposal", r.id, r.Term)
      return ErrProposalDropped
      case pb.MsgApp:
      r.becomeFollower(m.Term, m.From) // always m.Term == r.Term
      r.handleAppendEntries(m)
      case pb.MsgHeartbeat:
      r.becomeFollower(m.Term, m.From) // always m.Term == r.Term
      r.handleHeartbeat(m)
      case pb.MsgSnap:
      r.becomeFollower(m.Term, m.From) // always m.Term == r.Term
      r.handleSnapshot(m)
      case myVoteRespType:
      //1.节点调用raft.poll函数,其中传入msg.Reject参数表示发送者是否同意这次选举,根据这些来计算当前集群中有多少节点给这次选举投了同意票。
      //2.如果有半数的节点同意了,如果选举类型是PreVote,那么进行Vote状态正式进行一轮选举;否则该节点就成为了新的leader,调用raft.becomeLeader函数切换状态,然后开始同步日志数据给集群中其他节点了。
      //3.而如果半数以上的节点没有同意,那么重新切换到follower状态。

      // 计算当前集群中有多少节点给自己投了票
      gr := r.poll(m.From, m.Type, !m.Reject)
      r.logger.Infof("%x [quorum:%d] has received %d %s votes and %d vote rejections", r.id, r.quorum(), gr, m.Type, len(r.votes)-gr)
      switch r.quorum() {
      case gr: // 如果进行投票的节点数量正好是半数以上节点数量
      //如果选举类型是PreVote,那么进行Vote状态正式进行一轮选举;
      if r.state == StatePreCandidate {
      r.campaign(campaignElection)
      //vote状态正式的一轮选举
      } else {
      // 变成leader
      r.becomeLeader()
      r.bcastAppend()
      }
      case len(r.votes) - gr: // 如果是半数以上节点拒绝了投票
      // 变成follower
      // pb.MsgPreVoteResp包含未来候选人的期限
      // m.Term > r.Term; reuse r.Term
      r.becomeFollower(r.Term, None)
      }
      case pb.MsgTimeoutNow:
      r.logger.Debugf("%x [term %d state %v] ignored MsgTimeoutNow from %x", r.id, r.Term, r.state, m.From)
      }
      return nil
      }
  11. pb.MsgProp:如果是提议属性消息,那么就直接放弃,因为在候选人阶段是不能够添加日志。

  12. pb.MsgApp:如果收到领导人消息,直接将当前节点转为跟随者,并且向领导人发送当前的commitid
  13. pb.MsgHeartbeat:如果收到心跳,也变为跟随者,然后处理心跳
  14. pb.MsgSnap:如果收到快照,也变为跟随者,然后处理快照
  15. case myVoteRespType:
    1. 节点调用raft.poll函数,其中传入msg.Reject参数表示发送者是否同意这次选举,根据这些来计算当前集群中有多少节点给这次选举投了同意票。
    2. 如果有半数的节点同意了,如果选举类型是PreVote,那么进行Vote状态正式进行一轮选举;否则该节点就成为了新的leader,调用raft.becomeLeader函数切换状态,然后开始同步日志数据给集群中其他节点了。
    3. 而如果半数以上的节点没有同意,那么重新切换到follower状态。
  16. case pb.MsgTimeoutNow:忽略这条信息,因为状态不对。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
func stepFollower(r *raft, m pb.Message) error {
switch m.Type {
case pb.MsgProp:
if r.lead == None {
r.logger.Infof("%x no leader at term %d; dropping proposal", r.id, r.Term)
return ErrProposalDropped
} else if r.disableProposalForwarding {
r.logger.Infof("%x not forwarding to leader %x at term %d; dropping proposal", r.id, r.lead, r.Term)
return ErrProposalDropped
}
m.To = r.lead
r.send(m)
case pb.MsgApp:
r.electionElapsed = 0
r.lead = m.From
r.handleAppendEntries(m)
case pb.MsgHeartbeat:
r.electionElapsed = 0
r.lead = m.From
r.handleHeartbeat(m)
case pb.MsgSnap:
r.electionElapsed = 0
r.lead = m.From
r.handleSnapshot(m)
case pb.MsgTransferLeader:
if r.lead == None {
r.logger.Infof("%x no leader at term %d; dropping leader transfer msg", r.id, r.Term)
return nil
}
m.To = r.lead
r.send(m)
// 新的leader节点,在还未迁移之前仍然是follower,在收到这条消息后,就可以进行迁移了,此时会调用前面分析MsgVote时说过的campaign函数,传入的参数是campaignTransfer,表示这是一次由于迁移leader导致的选举流程。
case pb.MsgTimeoutNow:
if r.promotable() {
r.logger.Infof("%x [term %d] received MsgTimeoutNow from %x and starts an election to get leadership.", r.id, r.Term, m.From)
// Leadership transfers never use pre-vote even if r.preVote is true; we
// know we are not recovering from a partition so there is no need for the
// extra round trip.
r.campaign(campaignTransfer)
} else {
r.logger.Infof("%x received MsgTimeoutNow from %x but is not promotable", r.id, m.From)
}
case pb.MsgReadIndex:
if r.lead == None {
r.logger.Infof("%x no leader at term %d; dropping index reading msg", r.id, r.Term)
return nil
}
m.To = r.lead
r.send(m)
case pb.MsgReadIndexResp:
if len(m.Entries) != 1 {
r.logger.Errorf("%x invalid format of MsgReadIndexResp from %x, entries count: %d", r.id, m.From, len(m.Entries))
return nil
}
r.readStates = append(r.readStates, ReadState{Index: m.Index, RequestCtx: m.Entries[0].Data})
}
return nil
}
  1. case pb.MsgProp:将消息转发给领导人
  2. case pb.MsgApp:收到了领导人了消息,重置弹性超时时间,并且添加日志
  3. case pb.MsgHeartbeat:收到心跳,重置弹性超时时间,处理心跳
  4. case pb.MsgSnap:收到快照,重置弹性超时时间,处理快照
  5. case pb.MsgTransferLeader:转移领导人
  6. case pb.MsgTimeoutNow:新的leader节点,在还未迁移之前仍然是follower,在收到这条消息后,就可以进行迁移了,此时会调用前面分析MsgVote时说过的campaign函数,传入的参数是campaignTransfer,表示这是一次由于迁移leader导致的选举流程。
  7. case pb.MsgReadIndex:像领导人发送读请求
  8. case pb.MsgReadIndexResp:从远程条目里面,添加到本地条目到读状态数组

四、总结

源码还是比较难的,还有一些地方我还需要仔细分析,在后面会慢慢加上去,目前就先分析主要流程。最核心的代码依然是在状态机中,Step()函数,以及三个身份的步骤的状态函数。

评论

`
Your browser is out-of-date!

Update your browser to view this website correctly. Update my browser now

×