searchusermenu
  • 发布文章
  • 消息中心
点赞
收藏
评论
分享
原创

一文讲清go的http请求处理过程

2023-10-07 09:19:47
11
0

1、服务启动

首先启动一个http服务器:
```go
func ListenAndServe(addr string, handler Handler) error {
server := &Server{Addr: addr, Handler: handler}
return server.ListenAndServe()
}
```
这里有两个比较重要入参:地址和处理http请求的handler。这个handler是一个包含方法:ServeHTTP的interface。
这为我们自定义处理http请求的方法创造了可能。任何实现了ServeHTTP这个方法的结构体均可作为当前http请求的处理方。
```go
type Handler interface {
ServeHTTP(ResponseWriter, *Request)
}
```
在这个函数中,创建了一个http的server,并为这个server指定了两个非常重要的参数:监听地址和http请求处理的方法ServeHTTP。随后server开始监听http请求并服务http请求:
```go
func (srv *Server) ListenAndServe() error {
if srv.shuttingDown() {
return ErrServerClosed
}
addr := srv.Addr
if addr == "" {
addr = ":http"
}
ln, err := net.Listen("tcp", addr)
if err != nil {
return err
}
return srv.Serve(ln)
}
```
http协议还是依托的tcp协议,所以走的那一套和linux下socket编程那一套本质上是一样的。下面从listen开始分析整个连接建立和服务请求的全过程。

2、TCP连接建立

首先调用了net包下的Listen方法进行服务的监听。
```go
func Listen(network, address string) (Listener, error) {
var lc ListenConfig
return lc.Listen(context.Background(), network, address)
}
```
调用ListenConfig结构体的Listen方法,监听tcp对应的address端口。
```go
// Listen announces on the local network address.
//
// See func Listen for a description of the network and address
// parameters.
func (lc *ListenConfig) Listen(ctx context.Context, network, address string) (Listener, error) {
addrs, err := DefaultResolver.resolveAddrList(ctx, "listen", network, address, nil)
if err != nil {
return nil, &OpError{Op: "listen", Net: network, Source: nil, Addr: nil, Err: err}
}
sl := &sysListener{
ListenConfig: *lc,
network:      network,
address:      address,
}
var l Listener
la := addrs.first(isIPv4)
switch la := la.(type) {
case *TCPAddr:
l, err = sl.listenTCP(ctx, la)
case *UnixAddr:
l, err = sl.listenUnix(ctx, la)
default:
return nil, &OpError{Op: "listen", Net: sl.network, Source: nil, Addr: la, Err: &AddrError{Err: "unexpected address type", Addr: address}}
}
if err != nil {
return nil, &OpError{Op: "listen", Net: sl.network, Source: nil, Addr: la, Err: err} // l is non-nil interface containing nil pointer
}
return l, nil
}
```
这个函数返回的是一个Listener这样的interface,可以看到这里面的一些诸如Accept 的方法和之前unix中的socket编程很一致。
```go
// A Listener is a generic network listener for stream-oriented protocols.
//
// Multiple goroutines may invoke methods on a Listener simultaneously.
type Listener interface {
// Accept waits for and returns the next connection to the listener.
Accept() (Conn, error)
 
// Close closes the listener.
// Any blocked Accept operations will be unblocked and return errors.
Close() error
 
// Addr returns the listener's network address.
Addr() Addr
}
```
根据传入的tcp这个参数,可以确定Listen返回的Listener实例是这里返回的listenTCP实例。
```go
case *TCPAddr:
l, err = sl.listenTCP(ctx, la)
```
TCPListener类型:
```go
// TCPListener is a TCP network listener. Clients should typically
// use variables of type Listener instead of assuming TCP.
type TCPListener struct {
fd *netFD
lc ListenConfig
}
```
这里先记住这个Listener实例,后面会有很多地方用到。
在ListenAndServe函数的最后一行,新建出来的server开始为这个Listener服务:
```go
srv.Serve(ln)
```

3、HTTP连接建立

现在进入之前创建server的Serve方法,为了分析方便,只截取重要部分的代码分析:
```go
func (srv *Server) Serve(l net.Listener) error {
if fn := testHookServerServe; fn != nil {
fn(srv, l) // call hook with unwrapped listener
}
 
origListener := l
l = &onceCloseListener{Listener: l}
defer l.Close()
 
if err := srv.setupHTTP2_Serve(); err != nil {
return err
}
 
if !srv.trackListener(&l, true) {
return ErrServerClosed
}
defer srv.trackListener(&l, false)
 
baseCtx := context.Background()
if srv.BaseContext != nil {
baseCtx = srv.BaseContext(origListener)
if baseCtx == nil {
panic("BaseContext returned a nil context")
}
}
 
var tempDelay time.Duration // how long to sleep on accept failure
 
ctx := context.WithValue(baseCtx, ServerContextKey, srv)
for {
rw, err := l.Accept()
if err != nil {
if srv.shuttingDown() {
return ErrServerClosed
}
if ne, ok := err.(net.Error); ok && ne.Temporary() {
if tempDelay == 0 {
tempDelay = 5 * time.Millisecond
} else {
tempDelay *= 2
}
if max := 1 * time.Second; tempDelay > max {
tempDelay = max
}
srv.logf("http: Accept error: %v; retrying in %v", err, tempDelay)
time.Sleep(tempDelay)
continue
}
return err
}
```
这里会议一下入参l的实例,是一个TCPListener。进入for循环以后,调用了它的Accept方法。这里划重点提示一下:和linux中的socket编程,先listen某个端口,在这个连接上监听连接请求。然后再accept返回一个新建的tcp连接来处理tcp请求有类似之处。去看一下TCPListener的Accept方法:
```go
// Accept implements the Accept method in the Listener interface; it
// waits for the next call and returns a generic Conn.
func (l *TCPListener) Accept() (Conn, error) {
if !l.ok() {
return nil, syscall.EINVAL
}
c, err := l.accept()
if err != nil {
return nil, &OpError{Op: "accept", Net: l.fd.net, Source: nil, Addr: l.fd.laddr, Err: err}
}
return c, nil
}
```
该方法最终调用accept方法返回了一个TCPConn:
```go
func (ln *TCPListener) accept() (*TCPConn, error) {
fd, err := ln.fd.accept()
if err != nil {
return nil, err
}
return newTCPConn(fd, ln.lc.KeepAlive, nil), nil
}
```
```go
// TCPConn is an implementation of the Conn interface for TCP network
// connections.
type TCPConn struct {
conn
}
```
划重点:这个conn结构体是net包下的:
```go
type conn struct {
fd *netFD
}
```
TCPConn结构体有一个匿名成员conn,所以TCPConn会继承conn的方法,通过分析源码,我们可以发现,Conn这个interface:
```go
// Conn is a generic stream-oriented network connection.
//
// Multiple goroutines may invoke methods on a Conn simultaneously.
type Conn interface {
Read(b []byte) (n int, err error)
Write(b []byte) (n int, err error)
Close() error
LocalAddr() Addr
RemoteAddr() Addr
SetDeadline(t time.Time) error
SetReadDeadline(t time.Time) error
SetWriteDeadline(t time.Time) error
}
```
很多的实现,是在conn这个结构体中。conn中有大家熟悉的套接字fd。
这里再总结一下:TCPListener调用自己的方法Accept以后,返回的是一个TCPConn结构体,该结构体继承了conn结构体的方法,实现了interface Conn中的方法。简单点说:Accept方法后返回了一个tcp连接。它的实例是TCPConn,在这个连接上可以调用方法Read和Write进行tcp连接上的双向读与写。这与socket网络编程里面Accept类似,新建一个fd,来专门处理连接上的数据传输。
在Accept结束以后:
```go
tempDelay = 0
c := srv.newConn(rw)
c.setState(c.rwc, StateNew, runHooks) // before Serve can return
go c.serve(connCtx)
```
server用刚才Accept返回的TCPConn,初始化了一个conn
```go
// Create new connection from rwc.
func (srv *Server) newConn(rwc net.Conn) *conn {
c := &conn{
server: srv,
rwc:    rwc,
}
if debugServerConnections {
c.rwc = newLoggingConn("server", c.rwc)
}
return c
}
```
注意这里的conn与前文的conn不是一类型。这里conn是http包下的conn,代表了http意义上的连接。只截取比较重要的几个字段:
```go
// A conn represents the server side of an HTTP connection.
type conn struct {
server *Server //当前http连接所对应的server
rwc net.Conn //当前http连接所用的网络连接,也就是tcp连接。
}
```
最后go了一个协程来处理这个tcp连接的请求:
```go
tempDelay = 0
c := srv.newConn(rw)
c.setState(c.rwc, StateNew, runHooks) // before Serve can return
go c.serve(connCtx)
```
其实这个http包下的连接两个重要的参数:
1、server,标识了这个conn属于哪个server。
2、net.Conn,标识当前http连接用的哪个tcp连接。

4、连接小结

1、首先是Listen返回一个Listener的interface,该interface的实例是TCPListener,实际上是一个tcp连接的监听实例。
2、server调用Serve方法,以这个Listener为入参,启动服务于这个Listener。
3、调用这个Listener的Accept方法,也就是实例TCPListener的的Accept方法,返回一个Conn的interface,它的实例是TCPConn。也就是在监听这个端口地址上,建立了一个tcp连接。
4、用这个tcp连接作为入参,新建了一个http包下的连接。包含两个主要参数分别标识了当前http的conn所属的server以及所用到的网络连接,也就是tcp连接。
5、大致流程:先listen,然后再accept。再有连接请求的过来的情况下,建立tcp连接。用tcp连接,构建http意义上的连接。
6、从本质上看,在accept建立一条新的tcp连接后,才会继续往下执行,初始化新的http意义下的conn,所以go c.serve实际上处理的是一个tcp连接。也就是一个协程处理一个对应的tcp连接,以及这个tcp连接上的所有http请求。对于客户端来说,尽量做到tcp连接复用,因为客户端的每个tcp连接请求都会消耗服务端的一个fd和一个协程来处理建立的tcp连接。

5、HTTP请求处理

至此,进入http请求处理阶段。
```go
// Serve a new connection.
func (c *conn) serve(ctx context.Context) {
c.remoteAddr = c.rwc.RemoteAddr().String()
ctx = context.WithValue(ctx, LocalAddrContextKey, c.rwc.LocalAddr())
var inFlightResponse *response
```
在进入for循环之前有几个关键的初始化:
```go
c.r = &connReader{conn: c}
c.bufr = newBufioReader(c.r)
c.bufw = newBufioWriterSize(checkConnErrorWriter{c}, 4<<10)
```
首先是初始化了,一个connReader,它的conn成员赋值c。而这个c,就是之前创建的http意义上的连接conn。new了一个bufioReader,也就是读取缓冲区的reader
```go
func newBufioReader(r io.Reader) *bufio.Reader {
if v := bufioReaderPool.Get(); v != nil {
br := v.(*bufio.Reader)
br.Reset(r)
return br
}
// Note: if this reader size is ever changed, update
// TestHandlerBodyClose's assumptions.
return bufio.NewReader(r)
}
```
这个形参r是你interface,他的实例是connReader:
```go
// connReader is the io.Reader wrapper used by *conn. It combines a
// selectively-activated io.LimitedReader (to bound request header
// read sizes) with support for selectively keeping an io.Reader.Read
// call blocked in a background goroutine to wait for activity and
// trigger a CloseNotifier channel.
type connReader struct {
conn *conn
 
mu      sync.Mutex // guards following
hasByte bool
byteBuf [1]byte
cond    *sync.Cond
inRead  bool
aborted bool  // set true before conn.rwc deadline is set to past
remain  int64 // bytes remaining
}
```
 
```go
// NewReader returns a new Reader whose buffer has the default size.
func NewReader(rd io.Reader) *Reader {
return NewReaderSize(rd, defaultBufSize)
}
 
// NewReaderSize returns a new Reader whose buffer has at least the specified
// size. If the argument io.Reader is already a Reader with large enough
// size, it returns the underlying Reader.
func NewReaderSize(rd io.Reader, size int) *Reader {
// Is it already a Reader?
b, ok := rd.(*Reader)
if ok && len(b.buf) >= size {
return b
}
if size < minReadBufferSize {
size = minReadBufferSize
}
r := new(Reader)
r.reset(make([]byte, size), rd)
return r
}
func (b *Reader) reset(buf []byte, r io.Reader) {
*b = Reader{
buf:          buf,
rd:           r,
lastByte:     -1,
lastRuneSize: -1,
}
}
```
最后经过一系列的初始化,conn成员bufr的成员rd最终的值就是c.r,类型connReader。这里要记一下,因为后面readRequest会用到这里。
 
挑重点看:
```go
for {
w, err := c.readRequest(ctx)
if c.r.remain != c.server.initialReadLimitSize() {
// If we read any bytes off the wire, we're active.
c.setState(c.rwc, StateActive, runHooks)
}
if err != nil {
const errorHeaders = "\r\nContent-Type: text/plain; charset=utf-8\r\nConnection: close\r\n\r\n"
 
switch {
case err == errTooLarge:
// Their HTTP client may or may not be
// able to read this if we're
// responding to them and hanging up
// while they're still writing their
// request. Undefined behavior.
const publicErr = "431 Request Header Fields Too Large"
fmt.Fprintf(c.rwc, "HTTP/1.1 "+publicErr+errorHeaders+publicErr)
c.closeWriteAndWait()
return
 
case isUnsupportedTEError(err):
// Respond as per RFC 7230 Section 3.3.1 which says,
//      A server that receives a request message with a
//      transfer coding it does not understand SHOULD
//      respond with 501 (Unimplemented).
code := StatusNotImplemented
 
// We purposefully aren't echoing back the transfer-encoding's value,
// so as to mitigate the risk of cross side scripting by an attacker.
fmt.Fprintf(c.rwc, "HTTP/1.1 %d %s%sUnsupported transfer encoding", code, StatusText(code), errorHeaders)
return
 
case isCommonNetReadError(err):
return // don't reply
 
default:
if v, ok := err.(statusError); ok {
fmt.Fprintf(c.rwc, "HTTP/1.1 %d %s: %s%s%d %s: %s", v.code, StatusText(v.code), v.text, errorHeaders, v.code, StatusText(v.code), v.text)
return
}
publicErr := "400 Bad Request"
fmt.Fprintf(c.rwc, "HTTP/1.1 "+publicErr+errorHeaders+publicErr)
return
}
}
 
// Expect 100 Continue support
req := w.req
if req.expectsContinue() {
if req.ProtoAtLeast(1, 1) && req.ContentLength != 0 {
// Wrap the Body reader with one that replies on the connection
req.Body = &expectContinueReader{readCloser: req.Body, resp: w}
w.canWriteContinue.Store(true)
}
} else if req.Header.get("Expect") != "" {
w.sendExpectationFailed()
return
}
 
c.curReq.Store(w)
 
if requestBodyRemains(req.Body) {
registerOnHitEOF(req.Body, w.conn.r.startBackgroundRead)
} else {
w.conn.r.startBackgroundRead()
}
 
// HTTP cannot have multiple simultaneous active requests.[*]
// Until the server replies to this request, it can't read another,
// so we might as well run the handler in this goroutine.
// [*] Not strictly true: HTTP pipelining. We could let them all process
// in parallel even if their responses need to be serialized.
// But we're not going to implement HTTP pipelining because it
// was never deployed in the wild and the answer is HTTP/2.
inFlightResponse = w
serverHandler{c.server}.ServeHTTP(w, w.req)
inFlightResponse = nil
w.cancelCtx()
if c.hijacked() {
return
}
w.finishRequest()
c.rwc.SetWriteDeadline(time.Time{})
if !w.shouldReuseConnection() {
if w.requestBodyLimitHit || w.closedRequestBodyEarly() {
c.closeWriteAndWait()
}
return
}
c.setState(c.rwc, StateIdle, runHooks)
c.curReq.Store(nil)
 
if !w.conn.server.doKeepAlives() {
// We're in shutdown mode. We might've replied
// to the user without "Connection: close" and
// they might think they can send another
// request, but such is life with HTTP/1.1.
return
}
 
if d := c.server.idleTimeout(); d != 0 {
c.rwc.SetReadDeadline(time.Now().Add(d))
} else {
c.rwc.SetReadDeadline(time.Time{})
}
 
// Wait for the connection to become readable again before trying to
// read the next request. This prevents a ReadHeaderTimeout or
// ReadTimeout from starting until the first bytes of the next request
// have been received.
if _, err := c.bufr.Peek(4); err != nil {
return
}
 
c.rwc.SetReadDeadline(time.Time{})
}
}
```
首先是读取连接上的http请求:
```go
w, err := c.readRequest(ctx)
```
这里进入细看:
```go
// Read next request from connection.
func (c *conn) readRequest(ctx context.Context) (w *response, err error) {
if c.hijacked() {
return nil, ErrHijacked
}
 
var (
wholeReqDeadline time.Time // or zero if none
hdrDeadline      time.Time // or zero if none
)
t0 := time.Now()
if d := c.server.readHeaderTimeout(); d > 0 {
hdrDeadline = t0.Add(d)
}
if d := c.server.ReadTimeout; d > 0 {
wholeReqDeadline = t0.Add(d)
}
c.rwc.SetReadDeadline(hdrDeadline)
if d := c.server.WriteTimeout; d > 0 {
defer func() {
c.rwc.SetWriteDeadline(time.Now().Add(d))
}()
}
 
c.r.setReadLimit(c.server.initialReadLimitSize())
if c.lastMethod == "POST" {
// RFC 7230 section 3 tolerance for old buggy clients.
peek, _ := c.bufr.Peek(4) // ReadRequest will get err below
c.bufr.Discard(numLeadingCRorLF(peek))
}
req, err := readRequest(c.bufr)
if err != nil {
if c.r.hitReadLimit() {
return nil, errTooLarge
}
return nil, err
}
 
if !http1ServerSupportsRequest(req) {
return nil, statusError{StatusHTTPVersionNotSupported, "unsupported protocol version"}
}
 
c.lastMethod = req.Method
c.r.setInfiniteReadLimit()
 
hosts, haveHost := req.Header["Host"]
isH2Upgrade := req.isH2Upgrade()
if req.ProtoAtLeast(1, 1) && (!haveHost || len(hosts) == 0) && !isH2Upgrade && req.Method != "CONNECT" {
return nil, badRequestError("missing required Host header")
}
if len(hosts) == 1 && !httpguts.ValidHostHeader(hosts[0]) {
return nil, badRequestError("malformed Host header")
}
for k, vv := range req.Header {
if !httpguts.ValidHeaderFieldName(k) {
return nil, badRequestError("invalid header name")
}
for _, v := range vv {
if !httpguts.ValidHeaderFieldValue(v) {
return nil, badRequestError("invalid header value")
}
}
}
delete(req.Header, "Host")
 
ctx, cancelCtx := context.WithCancel(ctx)
req.ctx = ctx
req.RemoteAddr = c.remoteAddr
req.TLS = c.tlsState
if body, ok := req.Body.(*body); ok {
body.doEarlyClose = true
}
 
// Adjust the read deadline if necessary.
if !hdrDeadline.Equal(wholeReqDeadline) {
c.rwc.SetReadDeadline(wholeReqDeadline)
}
 
w = &response{
conn:          c,
cancelCtx:     cancelCtx,
req:           req,
reqBody:       req.Body,
handlerHeader: make(Header),
contentLength: -1,
closeNotifyCh: make(chan bool, 1),
 
// We populate these ahead of time so we're not
// reading from req.Header after their Handler starts
// and maybe mutates it (Issue 14940)
wants10KeepAlive: req.wantsHttp10KeepAlive(),
wantsClose:       req.wantsClose(),
}
if isH2Upgrade {
w.closeAfterReply = true
}
w.cw.res = w
w.w = newBufioWriterSize(&w.cw, bufferBeforeChunkingSize)
return w, nil
}
```
继续挑重点:
```go
req, err := readRequest(c.bufr)
```
从连接上读取数据,并解析http协议放在Request这样的一个结构体中。核心流程见注释:
```go
func readRequest(b *bufio.Reader) (req *Request, err error) {
tp := newTextprotoReader(b)//生成一个protoReader
defer putTextprotoReader(tp)
 
req = new(Request)
 
// First line: GET /index.html HTTP/1.0
var s string
if s, err = tp.ReadLine(); err != nil {//读第一行
return nil, err
}
defer func() {
if err == io.EOF {
err = io.ErrUnexpectedEOF
}
}()
 
var ok bool
req.Method, req.RequestURI, req.Proto, ok = parseRequestLine(s)//解析获取method URL proto
if !ok {
return nil, badStringError("malformed HTTP request", s)
}
if !validMethod(req.Method) {
return nil, badStringError("invalid method", req.Method)
}
rawurl := req.RequestURI
if req.ProtoMajor, req.ProtoMinor, ok = ParseHTTPVersion(req.Proto); !ok {
return nil, badStringError("malformed HTTP version", req.Proto)
}
justAuthority := req.Method == "CONNECT" && !strings.HasPrefix(rawurl, "/")
if justAuthority {
}
 
if req.URL, err = url.ParseRequestURI(rawurl); err != nil {
return nil, err
}
 
if justAuthority {
req.URL.Scheme = ""
}
 
// Subsequent lines: Key: value.
mimeHeader, err := tp.ReadMIMEHeader()//获取header
if err != nil {
return nil, err
}
req.Header = Header(mimeHeader)
if len(req.Header["Host"]) > 1 {
return nil, fmt.Errorf("too many Host headers")
}
req.Host = req.URL.Host
if req.Host == "" {
req.Host = req.Header.get("Host")
}
 
fixPragmaCacheControl(req.Header)
 
req.Close = shouldClose(req.ProtoMajor, req.ProtoMinor, req.Header, false)
 
err = readTransfer(req, b)//body赋值等。
if err != nil {
return nil, err
}
 
if req.isH2Upgrade() {
// Because it's neither chunked, nor declared:
req.ContentLength = -1
req.Close = true
}
return req, nil
}
```
此时通过解析获得了req,并存在http包下Request这样的结构体中。
此后用这个req初始化了一个response结构体:
```go
w = &response{
conn:          c,
cancelCtx:     cancelCtx,
req:           req,
reqBody:       req.Body,
handlerHeader: make(Header),
contentLength: -1,
closeNotifyCh: make(chan bool, 1),
 
// We populate these ahead of time so we're not
// reading from req.Header after their Handler starts
// and maybe mutates it (Issue 14940)
wants10KeepAlive: req.wantsHttp10KeepAlive(),
wantsClose:       req.wantsClose(),
}
```
这个response有几个重要的信息需要提一下:
1、conn,标识当前response所属的http连接。
2、req,解析出来的对应的req请求。
3、reqBody,对应请求过来的数据包提。
继续挑重点:
```go
inFlightResponse = w
serverHandler{c.server}.ServeHTTP(w, w.req)
inFlightResponse = nil
w.cancelCtx()
if c.hijacked() {
return
}
```
这个ServeHTTP记得之前说过,是一个interface。任何实现了此方法的实例均可传递给server,来实现自定义的http请求处理方法。这里仅分析go http库自带的handler。
```go
// serverHandler delegates to either the server's Handler or
// DefaultServeMux and also handles "OPTIONS *" requests.
type serverHandler struct {
srv *Server
}
 
func (sh serverHandler) ServeHTTP(rw ResponseWriter, req *Request) {
handler := sh.srv.Handler
if handler == nil {
handler = DefaultServeMux
}
if !sh.srv.DisableGeneralOptionsHandler && req.RequestURI == "*" && req.Method == "OPTIONS" {
handler = globalOptionsHandler{}
}
 
if req.URL != nil && strings.Contains(req.URL.RawQuery, ";") {
var allowQuerySemicolonsInUse atomic.Bool
req = req.WithContext(context.WithValue(req.Context(), silenceSemWarnContextKey, func() {
allowQuerySemicolonsInUse.Store(true)
}))
defer func() {
if !allowQuerySemicolonsInUse.Load() {
sh.srv.logf("http: URL query contains semicolon, which is no longer a supported separator; parts of the query may be stripped when parsed; see golang.org/issue/25192")
}
}()
}
 
handler.ServeHTTP(rw, req)
}
```
这里调用了serverHandler的ServeHTTP。在函数里面做了一个判断,如果当前server的handler是nil,则使用默认的DefaultServeMux,否则使用自定义的handler。回头看sever启动,ListenAndServe的入参即可知道:
```go
func ListenAndServe(addr string, handler Handler) error {
server := &Server{Addr: addr, Handler: handler}
return server.ListenAndServe()
}
```

6、响应写回

方便起见,以DefaultServeMux为例分析。
```go
type Handler interface {
ServeHTTP(ResponseWriter, *Request)
}
```
这是一个接口。根据前面的分析,它的实例是DefaultServeMux。入参是
```go
type ResponseWriter interface {
Header() Header
Write([]byte) (int, error)
WriteHeader(statusCode int)
}
```
这样的interface。根据代码,它的实例是前面readRequest返回的response。这个response的核心内容参看前文。去看defaltServeMux的ServeHTTP实现:
```go
// DefaultServeMux is the default ServeMux used by Serve.
var DefaultServeMux = &defaultServeMux
 
var defaultServeMux ServeMux
```
它的ServeHTTP 方法:
```go
// ServeHTTP dispatches the request to the handler whose
// pattern most closely matches the request URL.
func (mux *ServeMux) ServeHTTP(w ResponseWriter, r *Request) {
if r.RequestURI == "*" {
if r.ProtoAtLeast(1, 1) {
w.Header().Set("Connection", "close")
}
w.WriteHeader(StatusBadRequest)
return
}
h, _ := mux.Handler(r)
h.ServeHTTP(w, r)
}
```
http库自带的handler是根据request中的url构建的map去寻找处理路由的。
```go
// If there is no registered handler that applies to the request,
// Handler returns a “page not found” handler and an empty pattern.
func (mux *ServeMux) Handler(r *Request) (h Handler, pattern string) {
 
// CONNECT requests are not canonicalized.
if r.Method == "CONNECT" {
// If r.URL.Path is /tree and its handler is not registered,
// the /tree -> /tree/ redirect applies to CONNECT requests
// but the path canonicalization does not.
if u, ok := mux.redirectToPathSlash(r.URL.Host, r.URL.Path, r.URL); ok {
return RedirectHandler(u.String(), StatusMovedPermanently), u.Path
}
 
return mux.handler(r.Host, r.URL.Path)
}
 
// All other requests have any port stripped and path cleaned
// before passing to mux.handler.
host := stripHostPort(r.Host)
path := cleanPath(r.URL.Path)
 
// If the given path is /tree and its handler is not registered,
// redirect for /tree/.
if u, ok := mux.redirectToPathSlash(host, path, r.URL); ok {
return RedirectHandler(u.String(), StatusMovedPermanently), u.Path
}
 
if path != r.URL.Path {
_, pattern = mux.handler(host, path)
u := &url.URL{Path: path, RawQuery: r.URL.RawQuery}
return RedirectHandler(u.String(), StatusMovedPermanently), pattern
}
 
return mux.handler(host, r.URL.Path)
}
```
 
```go
// handler is the main implementation of Handler.
// The path is known to be in canonical form, except for CONNECT methods.
func (mux *ServeMux) handler(host, path string) (h Handler, pattern string) {
mux.mu.RLock()
defer mux.mu.RUnlock()
 
// Host-specific pattern takes precedence over generic ones
if mux.hosts {
h, pattern = mux.match(host + path)
}
if h == nil {
h, pattern = mux.match(path)
}
if h == nil {
h, pattern = NotFoundHandler(), ""
}
return
}
```
以NotFoundHanlder为例:
```go
// NotFoundHandler returns a simple request handler
// that replies to each request with a “404 page not found” reply.
func NotFoundHandler() Handler { return HandlerFunc(NotFound) }
```
看下HandlerFunc
```go
type HandlerFunc func(ResponseWriter, *Request)
 
// ServeHTTP calls f(w, r).
func (f HandlerFunc) ServeHTTP(w ResponseWriter, r *Request) {
f(w, r)
}
```
实现了ServeHTTP方法,
 
```go
// NotFound replies to the request with an HTTP 404 not found error.
func NotFound(w ResponseWriter, r *Request) { Error(w, "404 page not found", StatusNotFound) }
 
// The error message should be plain text.
func Error(w ResponseWriter, error string, code int) {
w.Header().Set("Content-Type", "text/plain; charset=utf-8")
w.Header().Set("X-Content-Type-Options", "nosniff")
w.WriteHeader(code)
fmt.Fprintln(w, error)
}
```
最终调用response实例的Header Set方法和WriteHeader方法写入header。以及调用fmt.Fprintln写入数据数据。fmt.Frpintln的第一个参数入参是interface,
```go
// Implementations must not retain p.
type Writer interface {
Write(p []byte) (n int, err error)
}
```
从里可以看到它调用的Write方法最终是实例response的Write方法:
```go
// It returns the number of bytes written and any write error encountered.
func Fprintln(w io.Writer, a ...any) (n int, err error) {
p := newPrinter()
p.doPrintln(a)
n, err = w.Write(p.buf)
p.free()
return
}
```
至此,业务端的header与数据写入完成。
完成当前request请求的http响应:
```go
func (w *response) finishRequest() {
w.handlerDone.Store(true)
 
if !w.wroteHeader {
w.WriteHeader(StatusOK)
}
 
w.w.Flush()
putBufioWriter(w.w)
w.cw.close()//写入EOF结束标识符
w.conn.bufw.Flush()
 
w.conn.r.abortPendingRead()
 
// Close the body (regardless of w.closeAfterReply) so we can
// re-use its bufio.Reader later safely.
w.reqBody.Close()
 
if w.req.MultipartForm != nil {
w.req.MultipartForm.RemoveAll()
}
}
```
在close中写入\r\n标识当前数据包的结束:
```go
func (cw *chunkWriter) close() {
if !cw.wroteHeader {
cw.writeHeader(nil)
}
if cw.chunking {
bw := cw.res.conn.bufw // conn's bufio writer
// zero chunk to mark EOF
bw.WriteString("0\r\n")
if trailers := cw.res.finalTrailers(); trailers != nil {
trailers.Write(bw) // the writer handles noting errors
}
// final blank line after the trailers (whether
// present or not)
bw.WriteString("\r\n")
}
}
```

 7、小结

本文包括了go处理http请求的全过程:
1、tcp连接建立
2、request请求解析
3、业务端处理request的handler
4、业务端处理的回包信息如何写回给客户端。
本文只是介绍了整个流程。中间具体细节比如tcp连接管理、各种读写buffer的管理等需要进一步深挖。
业务handler简单介绍了go自带的路由注册管理。常用web框架,如gin的路由注册等后续介绍。
0条评论
0 / 1000
罗****艺
3文章数
0粉丝数
罗****艺
3 文章 | 0 粉丝
罗****艺
3文章数
0粉丝数
罗****艺
3 文章 | 0 粉丝
原创

一文讲清go的http请求处理过程

2023-10-07 09:19:47
11
0

1、服务启动

首先启动一个http服务器:
```go
func ListenAndServe(addr string, handler Handler) error {
server := &Server{Addr: addr, Handler: handler}
return server.ListenAndServe()
}
```
这里有两个比较重要入参:地址和处理http请求的handler。这个handler是一个包含方法:ServeHTTP的interface。
这为我们自定义处理http请求的方法创造了可能。任何实现了ServeHTTP这个方法的结构体均可作为当前http请求的处理方。
```go
type Handler interface {
ServeHTTP(ResponseWriter, *Request)
}
```
在这个函数中,创建了一个http的server,并为这个server指定了两个非常重要的参数:监听地址和http请求处理的方法ServeHTTP。随后server开始监听http请求并服务http请求:
```go
func (srv *Server) ListenAndServe() error {
if srv.shuttingDown() {
return ErrServerClosed
}
addr := srv.Addr
if addr == "" {
addr = ":http"
}
ln, err := net.Listen("tcp", addr)
if err != nil {
return err
}
return srv.Serve(ln)
}
```
http协议还是依托的tcp协议,所以走的那一套和linux下socket编程那一套本质上是一样的。下面从listen开始分析整个连接建立和服务请求的全过程。

2、TCP连接建立

首先调用了net包下的Listen方法进行服务的监听。
```go
func Listen(network, address string) (Listener, error) {
var lc ListenConfig
return lc.Listen(context.Background(), network, address)
}
```
调用ListenConfig结构体的Listen方法,监听tcp对应的address端口。
```go
// Listen announces on the local network address.
//
// See func Listen for a description of the network and address
// parameters.
func (lc *ListenConfig) Listen(ctx context.Context, network, address string) (Listener, error) {
addrs, err := DefaultResolver.resolveAddrList(ctx, "listen", network, address, nil)
if err != nil {
return nil, &OpError{Op: "listen", Net: network, Source: nil, Addr: nil, Err: err}
}
sl := &sysListener{
ListenConfig: *lc,
network:      network,
address:      address,
}
var l Listener
la := addrs.first(isIPv4)
switch la := la.(type) {
case *TCPAddr:
l, err = sl.listenTCP(ctx, la)
case *UnixAddr:
l, err = sl.listenUnix(ctx, la)
default:
return nil, &OpError{Op: "listen", Net: sl.network, Source: nil, Addr: la, Err: &AddrError{Err: "unexpected address type", Addr: address}}
}
if err != nil {
return nil, &OpError{Op: "listen", Net: sl.network, Source: nil, Addr: la, Err: err} // l is non-nil interface containing nil pointer
}
return l, nil
}
```
这个函数返回的是一个Listener这样的interface,可以看到这里面的一些诸如Accept 的方法和之前unix中的socket编程很一致。
```go
// A Listener is a generic network listener for stream-oriented protocols.
//
// Multiple goroutines may invoke methods on a Listener simultaneously.
type Listener interface {
// Accept waits for and returns the next connection to the listener.
Accept() (Conn, error)
 
// Close closes the listener.
// Any blocked Accept operations will be unblocked and return errors.
Close() error
 
// Addr returns the listener's network address.
Addr() Addr
}
```
根据传入的tcp这个参数,可以确定Listen返回的Listener实例是这里返回的listenTCP实例。
```go
case *TCPAddr:
l, err = sl.listenTCP(ctx, la)
```
TCPListener类型:
```go
// TCPListener is a TCP network listener. Clients should typically
// use variables of type Listener instead of assuming TCP.
type TCPListener struct {
fd *netFD
lc ListenConfig
}
```
这里先记住这个Listener实例,后面会有很多地方用到。
在ListenAndServe函数的最后一行,新建出来的server开始为这个Listener服务:
```go
srv.Serve(ln)
```

3、HTTP连接建立

现在进入之前创建server的Serve方法,为了分析方便,只截取重要部分的代码分析:
```go
func (srv *Server) Serve(l net.Listener) error {
if fn := testHookServerServe; fn != nil {
fn(srv, l) // call hook with unwrapped listener
}
 
origListener := l
l = &onceCloseListener{Listener: l}
defer l.Close()
 
if err := srv.setupHTTP2_Serve(); err != nil {
return err
}
 
if !srv.trackListener(&l, true) {
return ErrServerClosed
}
defer srv.trackListener(&l, false)
 
baseCtx := context.Background()
if srv.BaseContext != nil {
baseCtx = srv.BaseContext(origListener)
if baseCtx == nil {
panic("BaseContext returned a nil context")
}
}
 
var tempDelay time.Duration // how long to sleep on accept failure
 
ctx := context.WithValue(baseCtx, ServerContextKey, srv)
for {
rw, err := l.Accept()
if err != nil {
if srv.shuttingDown() {
return ErrServerClosed
}
if ne, ok := err.(net.Error); ok && ne.Temporary() {
if tempDelay == 0 {
tempDelay = 5 * time.Millisecond
} else {
tempDelay *= 2
}
if max := 1 * time.Second; tempDelay > max {
tempDelay = max
}
srv.logf("http: Accept error: %v; retrying in %v", err, tempDelay)
time.Sleep(tempDelay)
continue
}
return err
}
```
这里会议一下入参l的实例,是一个TCPListener。进入for循环以后,调用了它的Accept方法。这里划重点提示一下:和linux中的socket编程,先listen某个端口,在这个连接上监听连接请求。然后再accept返回一个新建的tcp连接来处理tcp请求有类似之处。去看一下TCPListener的Accept方法:
```go
// Accept implements the Accept method in the Listener interface; it
// waits for the next call and returns a generic Conn.
func (l *TCPListener) Accept() (Conn, error) {
if !l.ok() {
return nil, syscall.EINVAL
}
c, err := l.accept()
if err != nil {
return nil, &OpError{Op: "accept", Net: l.fd.net, Source: nil, Addr: l.fd.laddr, Err: err}
}
return c, nil
}
```
该方法最终调用accept方法返回了一个TCPConn:
```go
func (ln *TCPListener) accept() (*TCPConn, error) {
fd, err := ln.fd.accept()
if err != nil {
return nil, err
}
return newTCPConn(fd, ln.lc.KeepAlive, nil), nil
}
```
```go
// TCPConn is an implementation of the Conn interface for TCP network
// connections.
type TCPConn struct {
conn
}
```
划重点:这个conn结构体是net包下的:
```go
type conn struct {
fd *netFD
}
```
TCPConn结构体有一个匿名成员conn,所以TCPConn会继承conn的方法,通过分析源码,我们可以发现,Conn这个interface:
```go
// Conn is a generic stream-oriented network connection.
//
// Multiple goroutines may invoke methods on a Conn simultaneously.
type Conn interface {
Read(b []byte) (n int, err error)
Write(b []byte) (n int, err error)
Close() error
LocalAddr() Addr
RemoteAddr() Addr
SetDeadline(t time.Time) error
SetReadDeadline(t time.Time) error
SetWriteDeadline(t time.Time) error
}
```
很多的实现,是在conn这个结构体中。conn中有大家熟悉的套接字fd。
这里再总结一下:TCPListener调用自己的方法Accept以后,返回的是一个TCPConn结构体,该结构体继承了conn结构体的方法,实现了interface Conn中的方法。简单点说:Accept方法后返回了一个tcp连接。它的实例是TCPConn,在这个连接上可以调用方法Read和Write进行tcp连接上的双向读与写。这与socket网络编程里面Accept类似,新建一个fd,来专门处理连接上的数据传输。
在Accept结束以后:
```go
tempDelay = 0
c := srv.newConn(rw)
c.setState(c.rwc, StateNew, runHooks) // before Serve can return
go c.serve(connCtx)
```
server用刚才Accept返回的TCPConn,初始化了一个conn
```go
// Create new connection from rwc.
func (srv *Server) newConn(rwc net.Conn) *conn {
c := &conn{
server: srv,
rwc:    rwc,
}
if debugServerConnections {
c.rwc = newLoggingConn("server", c.rwc)
}
return c
}
```
注意这里的conn与前文的conn不是一类型。这里conn是http包下的conn,代表了http意义上的连接。只截取比较重要的几个字段:
```go
// A conn represents the server side of an HTTP connection.
type conn struct {
server *Server //当前http连接所对应的server
rwc net.Conn //当前http连接所用的网络连接,也就是tcp连接。
}
```
最后go了一个协程来处理这个tcp连接的请求:
```go
tempDelay = 0
c := srv.newConn(rw)
c.setState(c.rwc, StateNew, runHooks) // before Serve can return
go c.serve(connCtx)
```
其实这个http包下的连接两个重要的参数:
1、server,标识了这个conn属于哪个server。
2、net.Conn,标识当前http连接用的哪个tcp连接。

4、连接小结

1、首先是Listen返回一个Listener的interface,该interface的实例是TCPListener,实际上是一个tcp连接的监听实例。
2、server调用Serve方法,以这个Listener为入参,启动服务于这个Listener。
3、调用这个Listener的Accept方法,也就是实例TCPListener的的Accept方法,返回一个Conn的interface,它的实例是TCPConn。也就是在监听这个端口地址上,建立了一个tcp连接。
4、用这个tcp连接作为入参,新建了一个http包下的连接。包含两个主要参数分别标识了当前http的conn所属的server以及所用到的网络连接,也就是tcp连接。
5、大致流程:先listen,然后再accept。再有连接请求的过来的情况下,建立tcp连接。用tcp连接,构建http意义上的连接。
6、从本质上看,在accept建立一条新的tcp连接后,才会继续往下执行,初始化新的http意义下的conn,所以go c.serve实际上处理的是一个tcp连接。也就是一个协程处理一个对应的tcp连接,以及这个tcp连接上的所有http请求。对于客户端来说,尽量做到tcp连接复用,因为客户端的每个tcp连接请求都会消耗服务端的一个fd和一个协程来处理建立的tcp连接。

5、HTTP请求处理

至此,进入http请求处理阶段。
```go
// Serve a new connection.
func (c *conn) serve(ctx context.Context) {
c.remoteAddr = c.rwc.RemoteAddr().String()
ctx = context.WithValue(ctx, LocalAddrContextKey, c.rwc.LocalAddr())
var inFlightResponse *response
```
在进入for循环之前有几个关键的初始化:
```go
c.r = &connReader{conn: c}
c.bufr = newBufioReader(c.r)
c.bufw = newBufioWriterSize(checkConnErrorWriter{c}, 4<<10)
```
首先是初始化了,一个connReader,它的conn成员赋值c。而这个c,就是之前创建的http意义上的连接conn。new了一个bufioReader,也就是读取缓冲区的reader
```go
func newBufioReader(r io.Reader) *bufio.Reader {
if v := bufioReaderPool.Get(); v != nil {
br := v.(*bufio.Reader)
br.Reset(r)
return br
}
// Note: if this reader size is ever changed, update
// TestHandlerBodyClose's assumptions.
return bufio.NewReader(r)
}
```
这个形参r是你interface,他的实例是connReader:
```go
// connReader is the io.Reader wrapper used by *conn. It combines a
// selectively-activated io.LimitedReader (to bound request header
// read sizes) with support for selectively keeping an io.Reader.Read
// call blocked in a background goroutine to wait for activity and
// trigger a CloseNotifier channel.
type connReader struct {
conn *conn
 
mu      sync.Mutex // guards following
hasByte bool
byteBuf [1]byte
cond    *sync.Cond
inRead  bool
aborted bool  // set true before conn.rwc deadline is set to past
remain  int64 // bytes remaining
}
```
 
```go
// NewReader returns a new Reader whose buffer has the default size.
func NewReader(rd io.Reader) *Reader {
return NewReaderSize(rd, defaultBufSize)
}
 
// NewReaderSize returns a new Reader whose buffer has at least the specified
// size. If the argument io.Reader is already a Reader with large enough
// size, it returns the underlying Reader.
func NewReaderSize(rd io.Reader, size int) *Reader {
// Is it already a Reader?
b, ok := rd.(*Reader)
if ok && len(b.buf) >= size {
return b
}
if size < minReadBufferSize {
size = minReadBufferSize
}
r := new(Reader)
r.reset(make([]byte, size), rd)
return r
}
func (b *Reader) reset(buf []byte, r io.Reader) {
*b = Reader{
buf:          buf,
rd:           r,
lastByte:     -1,
lastRuneSize: -1,
}
}
```
最后经过一系列的初始化,conn成员bufr的成员rd最终的值就是c.r,类型connReader。这里要记一下,因为后面readRequest会用到这里。
 
挑重点看:
```go
for {
w, err := c.readRequest(ctx)
if c.r.remain != c.server.initialReadLimitSize() {
// If we read any bytes off the wire, we're active.
c.setState(c.rwc, StateActive, runHooks)
}
if err != nil {
const errorHeaders = "\r\nContent-Type: text/plain; charset=utf-8\r\nConnection: close\r\n\r\n"
 
switch {
case err == errTooLarge:
// Their HTTP client may or may not be
// able to read this if we're
// responding to them and hanging up
// while they're still writing their
// request. Undefined behavior.
const publicErr = "431 Request Header Fields Too Large"
fmt.Fprintf(c.rwc, "HTTP/1.1 "+publicErr+errorHeaders+publicErr)
c.closeWriteAndWait()
return
 
case isUnsupportedTEError(err):
// Respond as per RFC 7230 Section 3.3.1 which says,
//      A server that receives a request message with a
//      transfer coding it does not understand SHOULD
//      respond with 501 (Unimplemented).
code := StatusNotImplemented
 
// We purposefully aren't echoing back the transfer-encoding's value,
// so as to mitigate the risk of cross side scripting by an attacker.
fmt.Fprintf(c.rwc, "HTTP/1.1 %d %s%sUnsupported transfer encoding", code, StatusText(code), errorHeaders)
return
 
case isCommonNetReadError(err):
return // don't reply
 
default:
if v, ok := err.(statusError); ok {
fmt.Fprintf(c.rwc, "HTTP/1.1 %d %s: %s%s%d %s: %s", v.code, StatusText(v.code), v.text, errorHeaders, v.code, StatusText(v.code), v.text)
return
}
publicErr := "400 Bad Request"
fmt.Fprintf(c.rwc, "HTTP/1.1 "+publicErr+errorHeaders+publicErr)
return
}
}
 
// Expect 100 Continue support
req := w.req
if req.expectsContinue() {
if req.ProtoAtLeast(1, 1) && req.ContentLength != 0 {
// Wrap the Body reader with one that replies on the connection
req.Body = &expectContinueReader{readCloser: req.Body, resp: w}
w.canWriteContinue.Store(true)
}
} else if req.Header.get("Expect") != "" {
w.sendExpectationFailed()
return
}
 
c.curReq.Store(w)
 
if requestBodyRemains(req.Body) {
registerOnHitEOF(req.Body, w.conn.r.startBackgroundRead)
} else {
w.conn.r.startBackgroundRead()
}
 
// HTTP cannot have multiple simultaneous active requests.[*]
// Until the server replies to this request, it can't read another,
// so we might as well run the handler in this goroutine.
// [*] Not strictly true: HTTP pipelining. We could let them all process
// in parallel even if their responses need to be serialized.
// But we're not going to implement HTTP pipelining because it
// was never deployed in the wild and the answer is HTTP/2.
inFlightResponse = w
serverHandler{c.server}.ServeHTTP(w, w.req)
inFlightResponse = nil
w.cancelCtx()
if c.hijacked() {
return
}
w.finishRequest()
c.rwc.SetWriteDeadline(time.Time{})
if !w.shouldReuseConnection() {
if w.requestBodyLimitHit || w.closedRequestBodyEarly() {
c.closeWriteAndWait()
}
return
}
c.setState(c.rwc, StateIdle, runHooks)
c.curReq.Store(nil)
 
if !w.conn.server.doKeepAlives() {
// We're in shutdown mode. We might've replied
// to the user without "Connection: close" and
// they might think they can send another
// request, but such is life with HTTP/1.1.
return
}
 
if d := c.server.idleTimeout(); d != 0 {
c.rwc.SetReadDeadline(time.Now().Add(d))
} else {
c.rwc.SetReadDeadline(time.Time{})
}
 
// Wait for the connection to become readable again before trying to
// read the next request. This prevents a ReadHeaderTimeout or
// ReadTimeout from starting until the first bytes of the next request
// have been received.
if _, err := c.bufr.Peek(4); err != nil {
return
}
 
c.rwc.SetReadDeadline(time.Time{})
}
}
```
首先是读取连接上的http请求:
```go
w, err := c.readRequest(ctx)
```
这里进入细看:
```go
// Read next request from connection.
func (c *conn) readRequest(ctx context.Context) (w *response, err error) {
if c.hijacked() {
return nil, ErrHijacked
}
 
var (
wholeReqDeadline time.Time // or zero if none
hdrDeadline      time.Time // or zero if none
)
t0 := time.Now()
if d := c.server.readHeaderTimeout(); d > 0 {
hdrDeadline = t0.Add(d)
}
if d := c.server.ReadTimeout; d > 0 {
wholeReqDeadline = t0.Add(d)
}
c.rwc.SetReadDeadline(hdrDeadline)
if d := c.server.WriteTimeout; d > 0 {
defer func() {
c.rwc.SetWriteDeadline(time.Now().Add(d))
}()
}
 
c.r.setReadLimit(c.server.initialReadLimitSize())
if c.lastMethod == "POST" {
// RFC 7230 section 3 tolerance for old buggy clients.
peek, _ := c.bufr.Peek(4) // ReadRequest will get err below
c.bufr.Discard(numLeadingCRorLF(peek))
}
req, err := readRequest(c.bufr)
if err != nil {
if c.r.hitReadLimit() {
return nil, errTooLarge
}
return nil, err
}
 
if !http1ServerSupportsRequest(req) {
return nil, statusError{StatusHTTPVersionNotSupported, "unsupported protocol version"}
}
 
c.lastMethod = req.Method
c.r.setInfiniteReadLimit()
 
hosts, haveHost := req.Header["Host"]
isH2Upgrade := req.isH2Upgrade()
if req.ProtoAtLeast(1, 1) && (!haveHost || len(hosts) == 0) && !isH2Upgrade && req.Method != "CONNECT" {
return nil, badRequestError("missing required Host header")
}
if len(hosts) == 1 && !httpguts.ValidHostHeader(hosts[0]) {
return nil, badRequestError("malformed Host header")
}
for k, vv := range req.Header {
if !httpguts.ValidHeaderFieldName(k) {
return nil, badRequestError("invalid header name")
}
for _, v := range vv {
if !httpguts.ValidHeaderFieldValue(v) {
return nil, badRequestError("invalid header value")
}
}
}
delete(req.Header, "Host")
 
ctx, cancelCtx := context.WithCancel(ctx)
req.ctx = ctx
req.RemoteAddr = c.remoteAddr
req.TLS = c.tlsState
if body, ok := req.Body.(*body); ok {
body.doEarlyClose = true
}
 
// Adjust the read deadline if necessary.
if !hdrDeadline.Equal(wholeReqDeadline) {
c.rwc.SetReadDeadline(wholeReqDeadline)
}
 
w = &response{
conn:          c,
cancelCtx:     cancelCtx,
req:           req,
reqBody:       req.Body,
handlerHeader: make(Header),
contentLength: -1,
closeNotifyCh: make(chan bool, 1),
 
// We populate these ahead of time so we're not
// reading from req.Header after their Handler starts
// and maybe mutates it (Issue 14940)
wants10KeepAlive: req.wantsHttp10KeepAlive(),
wantsClose:       req.wantsClose(),
}
if isH2Upgrade {
w.closeAfterReply = true
}
w.cw.res = w
w.w = newBufioWriterSize(&w.cw, bufferBeforeChunkingSize)
return w, nil
}
```
继续挑重点:
```go
req, err := readRequest(c.bufr)
```
从连接上读取数据,并解析http协议放在Request这样的一个结构体中。核心流程见注释:
```go
func readRequest(b *bufio.Reader) (req *Request, err error) {
tp := newTextprotoReader(b)//生成一个protoReader
defer putTextprotoReader(tp)
 
req = new(Request)
 
// First line: GET /index.html HTTP/1.0
var s string
if s, err = tp.ReadLine(); err != nil {//读第一行
return nil, err
}
defer func() {
if err == io.EOF {
err = io.ErrUnexpectedEOF
}
}()
 
var ok bool
req.Method, req.RequestURI, req.Proto, ok = parseRequestLine(s)//解析获取method URL proto
if !ok {
return nil, badStringError("malformed HTTP request", s)
}
if !validMethod(req.Method) {
return nil, badStringError("invalid method", req.Method)
}
rawurl := req.RequestURI
if req.ProtoMajor, req.ProtoMinor, ok = ParseHTTPVersion(req.Proto); !ok {
return nil, badStringError("malformed HTTP version", req.Proto)
}
justAuthority := req.Method == "CONNECT" && !strings.HasPrefix(rawurl, "/")
if justAuthority {
}
 
if req.URL, err = url.ParseRequestURI(rawurl); err != nil {
return nil, err
}
 
if justAuthority {
req.URL.Scheme = ""
}
 
// Subsequent lines: Key: value.
mimeHeader, err := tp.ReadMIMEHeader()//获取header
if err != nil {
return nil, err
}
req.Header = Header(mimeHeader)
if len(req.Header["Host"]) > 1 {
return nil, fmt.Errorf("too many Host headers")
}
req.Host = req.URL.Host
if req.Host == "" {
req.Host = req.Header.get("Host")
}
 
fixPragmaCacheControl(req.Header)
 
req.Close = shouldClose(req.ProtoMajor, req.ProtoMinor, req.Header, false)
 
err = readTransfer(req, b)//body赋值等。
if err != nil {
return nil, err
}
 
if req.isH2Upgrade() {
// Because it's neither chunked, nor declared:
req.ContentLength = -1
req.Close = true
}
return req, nil
}
```
此时通过解析获得了req,并存在http包下Request这样的结构体中。
此后用这个req初始化了一个response结构体:
```go
w = &response{
conn:          c,
cancelCtx:     cancelCtx,
req:           req,
reqBody:       req.Body,
handlerHeader: make(Header),
contentLength: -1,
closeNotifyCh: make(chan bool, 1),
 
// We populate these ahead of time so we're not
// reading from req.Header after their Handler starts
// and maybe mutates it (Issue 14940)
wants10KeepAlive: req.wantsHttp10KeepAlive(),
wantsClose:       req.wantsClose(),
}
```
这个response有几个重要的信息需要提一下:
1、conn,标识当前response所属的http连接。
2、req,解析出来的对应的req请求。
3、reqBody,对应请求过来的数据包提。
继续挑重点:
```go
inFlightResponse = w
serverHandler{c.server}.ServeHTTP(w, w.req)
inFlightResponse = nil
w.cancelCtx()
if c.hijacked() {
return
}
```
这个ServeHTTP记得之前说过,是一个interface。任何实现了此方法的实例均可传递给server,来实现自定义的http请求处理方法。这里仅分析go http库自带的handler。
```go
// serverHandler delegates to either the server's Handler or
// DefaultServeMux and also handles "OPTIONS *" requests.
type serverHandler struct {
srv *Server
}
 
func (sh serverHandler) ServeHTTP(rw ResponseWriter, req *Request) {
handler := sh.srv.Handler
if handler == nil {
handler = DefaultServeMux
}
if !sh.srv.DisableGeneralOptionsHandler && req.RequestURI == "*" && req.Method == "OPTIONS" {
handler = globalOptionsHandler{}
}
 
if req.URL != nil && strings.Contains(req.URL.RawQuery, ";") {
var allowQuerySemicolonsInUse atomic.Bool
req = req.WithContext(context.WithValue(req.Context(), silenceSemWarnContextKey, func() {
allowQuerySemicolonsInUse.Store(true)
}))
defer func() {
if !allowQuerySemicolonsInUse.Load() {
sh.srv.logf("http: URL query contains semicolon, which is no longer a supported separator; parts of the query may be stripped when parsed; see golang.org/issue/25192")
}
}()
}
 
handler.ServeHTTP(rw, req)
}
```
这里调用了serverHandler的ServeHTTP。在函数里面做了一个判断,如果当前server的handler是nil,则使用默认的DefaultServeMux,否则使用自定义的handler。回头看sever启动,ListenAndServe的入参即可知道:
```go
func ListenAndServe(addr string, handler Handler) error {
server := &Server{Addr: addr, Handler: handler}
return server.ListenAndServe()
}
```

6、响应写回

方便起见,以DefaultServeMux为例分析。
```go
type Handler interface {
ServeHTTP(ResponseWriter, *Request)
}
```
这是一个接口。根据前面的分析,它的实例是DefaultServeMux。入参是
```go
type ResponseWriter interface {
Header() Header
Write([]byte) (int, error)
WriteHeader(statusCode int)
}
```
这样的interface。根据代码,它的实例是前面readRequest返回的response。这个response的核心内容参看前文。去看defaltServeMux的ServeHTTP实现:
```go
// DefaultServeMux is the default ServeMux used by Serve.
var DefaultServeMux = &defaultServeMux
 
var defaultServeMux ServeMux
```
它的ServeHTTP 方法:
```go
// ServeHTTP dispatches the request to the handler whose
// pattern most closely matches the request URL.
func (mux *ServeMux) ServeHTTP(w ResponseWriter, r *Request) {
if r.RequestURI == "*" {
if r.ProtoAtLeast(1, 1) {
w.Header().Set("Connection", "close")
}
w.WriteHeader(StatusBadRequest)
return
}
h, _ := mux.Handler(r)
h.ServeHTTP(w, r)
}
```
http库自带的handler是根据request中的url构建的map去寻找处理路由的。
```go
// If there is no registered handler that applies to the request,
// Handler returns a “page not found” handler and an empty pattern.
func (mux *ServeMux) Handler(r *Request) (h Handler, pattern string) {
 
// CONNECT requests are not canonicalized.
if r.Method == "CONNECT" {
// If r.URL.Path is /tree and its handler is not registered,
// the /tree -> /tree/ redirect applies to CONNECT requests
// but the path canonicalization does not.
if u, ok := mux.redirectToPathSlash(r.URL.Host, r.URL.Path, r.URL); ok {
return RedirectHandler(u.String(), StatusMovedPermanently), u.Path
}
 
return mux.handler(r.Host, r.URL.Path)
}
 
// All other requests have any port stripped and path cleaned
// before passing to mux.handler.
host := stripHostPort(r.Host)
path := cleanPath(r.URL.Path)
 
// If the given path is /tree and its handler is not registered,
// redirect for /tree/.
if u, ok := mux.redirectToPathSlash(host, path, r.URL); ok {
return RedirectHandler(u.String(), StatusMovedPermanently), u.Path
}
 
if path != r.URL.Path {
_, pattern = mux.handler(host, path)
u := &url.URL{Path: path, RawQuery: r.URL.RawQuery}
return RedirectHandler(u.String(), StatusMovedPermanently), pattern
}
 
return mux.handler(host, r.URL.Path)
}
```
 
```go
// handler is the main implementation of Handler.
// The path is known to be in canonical form, except for CONNECT methods.
func (mux *ServeMux) handler(host, path string) (h Handler, pattern string) {
mux.mu.RLock()
defer mux.mu.RUnlock()
 
// Host-specific pattern takes precedence over generic ones
if mux.hosts {
h, pattern = mux.match(host + path)
}
if h == nil {
h, pattern = mux.match(path)
}
if h == nil {
h, pattern = NotFoundHandler(), ""
}
return
}
```
以NotFoundHanlder为例:
```go
// NotFoundHandler returns a simple request handler
// that replies to each request with a “404 page not found” reply.
func NotFoundHandler() Handler { return HandlerFunc(NotFound) }
```
看下HandlerFunc
```go
type HandlerFunc func(ResponseWriter, *Request)
 
// ServeHTTP calls f(w, r).
func (f HandlerFunc) ServeHTTP(w ResponseWriter, r *Request) {
f(w, r)
}
```
实现了ServeHTTP方法,
 
```go
// NotFound replies to the request with an HTTP 404 not found error.
func NotFound(w ResponseWriter, r *Request) { Error(w, "404 page not found", StatusNotFound) }
 
// The error message should be plain text.
func Error(w ResponseWriter, error string, code int) {
w.Header().Set("Content-Type", "text/plain; charset=utf-8")
w.Header().Set("X-Content-Type-Options", "nosniff")
w.WriteHeader(code)
fmt.Fprintln(w, error)
}
```
最终调用response实例的Header Set方法和WriteHeader方法写入header。以及调用fmt.Fprintln写入数据数据。fmt.Frpintln的第一个参数入参是interface,
```go
// Implementations must not retain p.
type Writer interface {
Write(p []byte) (n int, err error)
}
```
从里可以看到它调用的Write方法最终是实例response的Write方法:
```go
// It returns the number of bytes written and any write error encountered.
func Fprintln(w io.Writer, a ...any) (n int, err error) {
p := newPrinter()
p.doPrintln(a)
n, err = w.Write(p.buf)
p.free()
return
}
```
至此,业务端的header与数据写入完成。
完成当前request请求的http响应:
```go
func (w *response) finishRequest() {
w.handlerDone.Store(true)
 
if !w.wroteHeader {
w.WriteHeader(StatusOK)
}
 
w.w.Flush()
putBufioWriter(w.w)
w.cw.close()//写入EOF结束标识符
w.conn.bufw.Flush()
 
w.conn.r.abortPendingRead()
 
// Close the body (regardless of w.closeAfterReply) so we can
// re-use its bufio.Reader later safely.
w.reqBody.Close()
 
if w.req.MultipartForm != nil {
w.req.MultipartForm.RemoveAll()
}
}
```
在close中写入\r\n标识当前数据包的结束:
```go
func (cw *chunkWriter) close() {
if !cw.wroteHeader {
cw.writeHeader(nil)
}
if cw.chunking {
bw := cw.res.conn.bufw // conn's bufio writer
// zero chunk to mark EOF
bw.WriteString("0\r\n")
if trailers := cw.res.finalTrailers(); trailers != nil {
trailers.Write(bw) // the writer handles noting errors
}
// final blank line after the trailers (whether
// present or not)
bw.WriteString("\r\n")
}
}
```

 7、小结

本文包括了go处理http请求的全过程:
1、tcp连接建立
2、request请求解析
3、业务端处理request的handler
4、业务端处理的回包信息如何写回给客户端。
本文只是介绍了整个流程。中间具体细节比如tcp连接管理、各种读写buffer的管理等需要进一步深挖。
业务handler简单介绍了go自带的路由注册管理。常用web框架,如gin的路由注册等后续介绍。
文章来自个人专栏
golang开发相关
3 文章 | 1 订阅
0条评论
0 / 1000
请输入你的评论
0
0