Go源码分析(1) - net.http

Go源码分析(1) - net.http

Golang 中 net/http 包下是非常关键的源码,gin 中的框架也是实现了 Golang 中的handler接口的SeverHttp方法才能够适配,可以见的该包是网络编程的核心,我针对于其中一部分源码进行了分析,希望能够帮助大家更好的理解 Golang 网络编程。

一、分析过程

通过自顶向下的分析方式来分析整个源码,不关心其他实现,只关心整个调用的核心过程。

二、编写一个正常运行的例子

随便写一个例子,查看其调用链 ,http.NewServeMux()返回一个多路复用的服务,其结构如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
package main

import "net/http"

func main() {
mux := http.NewServeMux()

mux.HandleFunc("/a", func(writer http.ResponseWriter, request *http.Request) {
writer.Write([]byte("aaaaaa"))
})
mux.HandleFunc("/b", func(writer http.ResponseWriter, request *http.Request) {
writer.Write([]byte("bbbbbb"))
})
http.ListenAndServe(":3000",mux)
}

m map[string]muxEntry包含了请求路径对应的映射方法, 内部过程是并发的,需要加锁。

1
2
3
4
5
6
type ServeMux struct {
mu sync.RWMutex
m map[string]muxEntry
es []muxEntry // slice of entries sorted from longest to shortest.
hosts bool // whether any patterns contain hostnames
}

看一下mux.HandleFunc这个方法,将路径与处理该路径的方法对应其内部调用了mux.Handle(pattern, HandlerFunc(handler)),其作用也不言而喻,将路由器表以及对应的处理方法注册到多路服务中。

1
2
3
4
5
6
7
// HandleFunc registers the handler function for the given pattern.
func (mux *ServeMux) HandleFunc(pattern string, handler func(ResponseWriter, *Request)) {
if handler == nil {
panic("http: nil handler")
}
mux.Handle(pattern, HandlerFunc(handler))
}

http.ListenAndServe(":3000",mux)监听了3000端口,传入创建好的多路复用服务,其内部又包装了一个Server,然后通过server.ListenAndServe()开启内部调用过程。

1
2
3
4
5
6
7
8
9
10
11
// ListenAndServe listens on the TCP network address addr and then calls
// Serve with handler to handle requests on incoming connections.
// Accepted connections are configured to enable TCP keep-alives.
//
// The handler is typically nil, in which case the DefaultServeMux is used.
//
// ListenAndServe always returns a non-nil error.
func ListenAndServe(addr string, handler Handler) error {
server := &Server{Addr: addr, Handler: handler}
return server.ListenAndServe()
}

三、内部调用过程

ListenAndServe()方法中使用ln, err := net.Listen("tcp", addr)打开了tcp连接(这个过程在后序的文章会进行分析),最后调用了srv.Serve(tcpKeepAliveListener{ln.(*net.TCPListener)})处理。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
func (srv *Server) ListenAndServe() error {
if srv.shuttingDown() {
return ErrServerClosed
}
addr := srv.Addr
if addr == "" {
addr = ":http"
}
ln, err := net.Listen("tcp", addr)
if err != nil {
return err
}
return srv.Serve(tcpKeepAliveListener{ln.(*net.TCPListener)})
}

Serve(l net.Listener) error方法是一个监听端口消息的处理器。其过程可以简化描述为首先监听请求,如果当前有错误信息,直接关闭服务,如果没有错误信息,创建新的连接去处理这个请求。

接受Listener l上的传入连接,创建一个每个新服务goroutine。服务goroutines读取请求和调用srv.Handler来回复他们的请求。整个过程最重要的就是整个for循环,他做了什么呢?

  1. rw, e := l.Accept() Accept等待并返回与侦听器的下一个连接
  2. <-srv.getDoneChan()如果收到结束的通知,就停止监听
  3. if ne, ok := e.(net.Error); ok && ne.Temporary()如果发生错误就进行重试
    1. tempDelay = 5 * time.Millisecond重试时间最初设置为5毫秒
    2. tempDelay *= 2每次重试时间为2的指数级
    3. max := 1 * time.Second ;tempDelay = max重试时间最大为1秒钟
    4. time.Sleep(tempDelay)执行休眠的过程(非阻塞)
  4. c := srv.newConn(rw)建立连接
  5. c.setState(c.rwc, StateNew) // before Serve can return设置连接状态为StateNew
  6. go c.serve(ctx)创建一个协程去处理
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
// Serve accepts incoming connections on the Listener l, creating a
// new service goroutine for each. The service goroutines read requests and
// then call srv.Handler to reply to them.
//
// HTTP/2 support is only enabled if the Listener returns *tls.Conn
// connections and they were configured with "h2" in the TLS
// Config.NextProtos.
//
// Serve always returns a non-nil error and closes l.
// After Shutdown or Close, the returned error is ErrServerClosed.
func (srv *Server) Serve(l net.Listener) error {
if fn := testHookServerServe; fn != nil {
fn(srv, l) // call hook with unwrapped listener
}

l = &onceCloseListener{Listener: l}
defer l.Close()

if err := srv.setupHTTP2_Serve(); err != nil {
return err
}

if !srv.trackListener(&l, true) {
return ErrServerClosed
}
defer srv.trackListener(&l, false)

var tempDelay time.Duration // how long to sleep on accept failure
baseCtx := context.Background() // base is always background, per Issue 16220
ctx := context.WithValue(baseCtx, ServerContextKey, srv)
for {
rw, e := l.Accept()
if e != nil {
select {
case <-srv.getDoneChan():
return ErrServerClosed
default:
}
if ne, ok := e.(net.Error); ok && ne.Temporary() {
if tempDelay == 0 {
tempDelay = 5 * time.Millisecond
} else {
tempDelay *= 2
}
if max := 1 * time.Second; tempDelay > max {
tempDelay = max
}
srv.logf("http: Accept error: %v; retrying in %v", e, tempDelay)
time.Sleep(tempDelay)
continue
}
return e
}
tempDelay = 0
c := srv.newConn(rw)
c.setState(c.rwc, StateNew) // before Serve can return
go c.serve(ctx)
}
}

传入是一个tcpKeepAliveListener,所以会设置keepAlive,其默认设置为设置为3分钟保持活动之间的时间间隔,保持tcp连接不会马上中断。

1
2
3
4
5
6
7
8
9
func (ln tcpKeepAliveListener) Accept() (net.Conn, error) {
tc, err := ln.AcceptTCP()
if err != nil {
return nil, err
}
tc.SetKeepAlive(true)
tc.SetKeepAlivePeriod(3 * time.Minute)
return tc, nil
}

处理整个过程的关键又到了c.serve(ctx)这个方法,核心就是将c.server的参数传给一个实现了Handler接口的类去处理这件事情。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
// Serve a new connection.
func (c *conn) serve(ctx context.Context) {
c.remoteAddr = c.rwc.RemoteAddr().String()
ctx = context.WithValue(ctx, LocalAddrContextKey, c.rwc.LocalAddr())
defer func() {
if err := recover(); err != nil && err != ErrAbortHandler {
const size = 64 << 10
buf := make([]byte, size)
buf = buf[:runtime.Stack(buf, false)]
c.server.logf("http: panic serving %v: %v\n%s", c.remoteAddr, err, buf)
}
if !c.hijacked() {
c.close()
c.setState(c.rwc, StateClosed)
}
}()

if tlsConn, ok := c.rwc.(*tls.Conn); ok {
if d := c.server.ReadTimeout; d != 0 {
c.rwc.SetReadDeadline(time.Now().Add(d))
}
if d := c.server.WriteTimeout; d != 0 {
c.rwc.SetWriteDeadline(time.Now().Add(d))
}
if err := tlsConn.Handshake(); err != nil {
// If the handshake failed due to the client not speaking
// TLS, assume they're speaking plaintext HTTP and write a
// 400 response on the TLS conn's underlying net.Conn.
if re, ok := err.(tls.RecordHeaderError); ok && re.Conn != nil && tlsRecordHeaderLooksLikeHTTP(re.RecordHeader) {
io.WriteString(re.Conn, "HTTP/1.0 400 Bad Request\r\n\r\nClient sent an HTTP request to an HTTPS server.\n")
re.Conn.Close()
return
}
c.server.logf("http: TLS handshake error from %s: %v", c.rwc.RemoteAddr(), err)
return
}
c.tlsState = new(tls.ConnectionState)
*c.tlsState = tlsConn.ConnectionState()
if proto := c.tlsState.NegotiatedProtocol; validNPN(proto) {
if fn := c.server.TLSNextProto[proto]; fn != nil {
h := initNPNRequest{tlsConn, serverHandler{c.server}}
fn(c.server, tlsConn, h)
}
return
}
}

// HTTP/1.x from here on.

ctx, cancelCtx := context.WithCancel(ctx)
c.cancelCtx = cancelCtx
defer cancelCtx()

c.r = &connReader{conn: c}
c.bufr = newBufioReader(c.r)
c.bufw = newBufioWriterSize(checkConnErrorWriter{c}, 4<<10)

for {
w, err := c.readRequest(ctx)
if c.r.remain != c.server.initialReadLimitSize() {
// If we read any bytes off the wire, we're active.
c.setState(c.rwc, StateActive)
}
if err != nil {
const errorHeaders = "\r\nContent-Type: text/plain; charset=utf-8\r\nConnection: close\r\n\r\n"

if err == errTooLarge {
// Their HTTP client may or may not be
// able to read this if we're
// responding to them and hanging up
// while they're still writing their
// request. Undefined behavior.
const publicErr = "431 Request Header Fields Too Large"
fmt.Fprintf(c.rwc, "HTTP/1.1 "+publicErr+errorHeaders+publicErr)
c.closeWriteAndWait()
return
}
if isCommonNetReadError(err) {
return // don't reply
}

publicErr := "400 Bad Request"
if v, ok := err.(badRequestError); ok {
publicErr = publicErr + ": " + string(v)
}

fmt.Fprintf(c.rwc, "HTTP/1.1 "+publicErr+errorHeaders+publicErr)
return
}

// Expect 100 Continue support
req := w.req
if req.expectsContinue() {
if req.ProtoAtLeast(1, 1) && req.ContentLength != 0 {
// Wrap the Body reader with one that replies on the connection
req.Body = &expectContinueReader{readCloser: req.Body, resp: w}
}
} else if req.Header.get("Expect") != "" {
w.sendExpectationFailed()
return
}

c.curReq.Store(w)

if requestBodyRemains(req.Body) {
registerOnHitEOF(req.Body, w.conn.r.startBackgroundRead)
} else {
w.conn.r.startBackgroundRead()
}

// HTTP cannot have multiple simultaneous active requests.[*]
// Until the server replies to this request, it can't read another,
// so we might as well run the handler in this goroutine.
// [*] Not strictly true: HTTP pipelining. We could let them all process
// in parallel even if their responses need to be serialized.
// But we're not going to implement HTTP pipelining because it
// was never deployed in the wild and the answer is HTTP/2.
serverHandler{c.server}.ServeHTTP(w, w.req)
w.cancelCtx()
if c.hijacked() {
return
}
w.finishRequest()
if !w.shouldReuseConnection() {
if w.requestBodyLimitHit || w.closedRequestBodyEarly() {
c.closeWriteAndWait()
}
return
}
c.setState(c.rwc, StateIdle)
c.curReq.Store((*response)(nil))

if !w.conn.server.doKeepAlives() {
// We're in shutdown mode. We might've replied
// to the user without "Connection: close" and
// they might think they can send another
// request, but such is life with HTTP/1.1.
return
}

if d := c.server.idleTimeout(); d != 0 {
c.rwc.SetReadDeadline(time.Now().Add(d))
if _, err := c.bufr.Peek(4); err != nil {
return
}
}
c.rwc.SetReadDeadline(time.Time{})
}
}

serverHandler{c.server}.ServeHTTP(w, w.req) 调用ServeHTTP方法.

1
2
3
type serverHandler struct {
srv *Server
}

这个结构体,实现了Handler接口的ServeHTTP(ResponseWriter, *Request)方法

1
2
3
4
5
6
7
8
9
10
func (sh serverHandler) ServeHTTP(rw ResponseWriter, req *Request) {
handler := sh.srv.Handler
if handler == nil {
handler = DefaultServeMux
}
if req.RequestURI == "*" && req.Method == "OPTIONS" {
handler = globalOptionsHandler{}
}
handler.ServeHTTP(rw, req)
}

然后就将c.server传入的参数sh.srv.Handler直接调用handler.ServeHTTP(rw, req)

1
2
3
4
5
6
7
8
9
10
11
12
13
// ServeHTTP dispatches the request to the handler whose
// pattern most closely matches the request URL.
func (mux *ServeMux) ServeHTTP(w ResponseWriter, r *Request) {
if r.RequestURI == "*" {
if r.ProtoAtLeast(1, 1) {
w.Header().Set("Connection", "close")
}
w.WriteHeader(StatusBadRequest)
return
}
h, _ := mux.Handler(r)
h.ServeHTTP(w, r)
}

首先会调用h, _ := mux.Handler(r)方法,对请求对数据进行查找路径,重定向,加工之类对操作,调用mux.handler(host, r.URL.Path)在路由表里查找路径对应对方法,返回的一个对应该请求的处理handlerhandler再调用ServeHTTP方法进行处理。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
func (mux *ServeMux) Handler(r *Request) (h Handler, pattern string) {

// CONNECT requests are not canonicalized.
if r.Method == "CONNECT" {
// If r.URL.Path is /tree and its handler is not registered,
// the /tree -> /tree/ redirect applies to CONNECT requests
// but the path canonicalization does not.
if u, ok := mux.redirectToPathSlash(r.URL.Host, r.URL.Path, r.URL); ok {
return RedirectHandler(u.String(), StatusMovedPermanently), u.Path
}

return mux.handler(r.Host, r.URL.Path)
}

// All other requests have any port stripped and path cleaned
// before passing to mux.handler.
host := stripHostPort(r.Host)
path := cleanPath(r.URL.Path)

// If the given path is /tree and its handler is not registered,
// redirect for /tree/.
if u, ok := mux.redirectToPathSlash(host, path, r.URL); ok {
return RedirectHandler(u.String(), StatusMovedPermanently), u.Path
}

if path != r.URL.Path {
_, pattern = mux.handler(host, path)
url := *r.URL
url.Path = path
return RedirectHandler(url.String(), StatusMovedPermanently), pattern
}

return mux.handler(host, r.URL.Path)
}

最终处理流程还是交给了HandlerFunc去处理了这些请求。

1
2
3
4
// ServeHTTP calls f(w, r).
func (f HandlerFunc) ServeHTTP(w ResponseWriter, r *Request) {
f(w, r)
}

四、总结

整个过程我们在总结一下,我用一个流程符号来表示一下整个调用过程吧

http.ListenAndServe(":3000",mux) -> 
server.ListenAndServe()  -> 
srv.Serve(tcpKeepAliveListener{ln.(*net.TCPListener)}) ->  
c.serve(ctx) ->
serverHandler{c.server}.ServeHTTP(w, w.req) ->         
h, _ := mux.Handler(r) -> 
h.ServeHTTP(w, r)

对于调用过程,我的理解是一共分成了三层,第一层为server这一层对请求进行处理,处理错误请求,是否需要创建连接。第二层 为connect创建http请求,处理安全认证.第三层为serverHandler,进行路由,查找对应的方法,调用最终的处理。每一层的职责进行处理,分工明确,采用委托的方式来实现自定义的处理。如果文中有错误或者可以补充的,不吝指教。

评论

`
Your browser is out-of-date!

Update your browser to view this website correctly. Update my browser now

×