在过去,传统的网络编程模型是多线程模型,在主线程中开启一个网络监听,然后每次有一个客户端进行连接,就会单独开启一个线程来处理这个客户端请求。

然而,如果并发量比较大,服务端就会创建大量的线程,而且会有大量的线程阻塞在网络IO上,频繁的线程上下文切换会占用大量的cpu时间片,严重影响服务性能,而且大量的线程也需要占用大量的系统资源

这样就引出著名的C10K问题,如何在单台服务器上支持并发10K量级的连接

我们知道,虽然同一时间有大量的并发连接,但是同一时刻,只有少数的连接是可读/写的,我们完全可以只使用一个线程来服务提供服务,这也是目前解决C10K问题的主要思路,对应的解决方案叫做IO多路复用,现在主流的高性能网络服务器/框架都是基于该网络模型,比如nginxredis或者netty网络库等。

说到这,就不得不提epoll,这是linux内核提供的用于实现IO多路复用的系统调用,其他操作系统上也有类似的接口,关于epoll具体内容网上有一大堆的资料,这里就不重复介绍了

IO多路复用模型,也可以称作是事件驱动模型,虽然能够有效解决C10K问题,但是相对传统的多线程模型也带来了一点复杂性。比如说,在多线程模型下,每个连接独占一个线程,而线程本身有自己的上下文;而如果是IO多路复用模型,需要在一个线程中处理多个连接,而每个需要有自己的上下文,需要开发者手动管理。比如服务端还没有接收到一个完整的协议报文时,我们需要把先前接收的部分内容保存到当前连接上下文中,等到下次其余内容到底时再一起处理。

今天,我们主要来看一下go中的网络模型。

go中我们可以像传统的多线程模型那样为每个网络连接单独使用一个goroutine来提供服务,但是goroutine的资源占用相比系统级线程来说非常小,而且其切换在运行在用户态的,并且只需要交换很少的寄存器,因此goroutine的上下文切换代价也是极小的,更重要的是,其底层也是基于epoll(linux系统下)来实现事件通知的,因此只需要占用很少的系统级线程。

很明显可以看出,go中的网络IO模型是传统多线程模型和IO多路复用模型的结合,既有前者的易用性,又有后者的效率,因此使用go可以很容易地开发高性能服务器。

今天我们就来看一下,go中的网络IO模型是如何实现的。

一切从创建Listener开始

我们从创建Listener开始说起。

先看下面代码:

1
ln,_ :=net.Listen("tcp",":80")

我们使用Listen来创建一个Listener,那么底层具体会发生什么呢?让我们一步一步来揭开

首先查看net.Listen方法

1
2
3
4
func Listen(network, address string) (Listener, error) {
var lc ListenConfig
return lc.Listen(context.Background(), network, address)
}

可以看到实际上工作的是ListenConfig.Listen,我们继续往下看:

1
2
3
4
5
6
7
8
9
10
func (lc *ListenConfig) Listen(ctx context.Context, network, address string) (Listener, error) {
...
var l Listener
la := addrs.first(isIPv4)
switch la := la.(type) {
case *TCPAddr:
l, err = sl.listenTCP(ctx, la)
...
return l, nil
}

因为我们创建的是tcp连接,这里我们只关注sl.listenTCP方法,继续往下

1
2
3
4
5
6
7
func (sl *sysListener) listenTCP(ctx context.Context, laddr *TCPAddr) (*TCPListener, error) {
fd, err := internetSocket(ctx, sl.network, laddr, nil, syscall.SOCK_STREAM, 0, "listen", sl.ListenConfig.Control)
if err != nil {
return nil, err
}
return &TCPListener{fd}, nil
}

我们看函数第一行,调用了internetSocket,很明显里面就是创建实际socket的逻辑了,继续往下走

1
2
3
4
5
6
7
func internetSocket(ctx context.Context, net string, laddr, raddr sockaddr, sotype, proto int, mode string, ctrlFn func(string, string, syscall.RawConn) error) (fd *netFD, err error) {
if (runtime.GOOS == "windows" || runtime.GOOS == "openbsd" || runtime.GOOS == "nacl") && mode == "dial" && raddr.isWildcard() {
raddr = raddr.toLocal(net)
}
family, ipv6only := favoriteAddrFamily(net, laddr, raddr, mode)
return socket(ctx, net, family, sotype, proto, ipv6only, laddr, raddr, ctrlFn)
}

这里我们只看linux的情况,因此继续看socket方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
func socket(ctx context.Context, net string, family, sotype, proto int, ipv6only bool, laddr, raddr sockaddr, ctrlFn func(string, string, syscall.RawConn) error) (fd *netFD, err error) {
// 这里是实际创建socket的代码
s, err := sysSocket(family, sotype, proto)
if err != nil {
return nil, err
}
// 设置socket选项
if err = setDefaultSockopts(s, family, sotype, ipv6only); err != nil {
poll.CloseFunc(s)
return nil, err
}
// 根据socket创建netFD,netFD是net包对底层socket的封装
if fd, err = newFD(s, family, sotype, net); err != nil {
poll.CloseFunc(s)
return nil, err
}

if laddr != nil && raddr == nil {
switch sotype {
// 看上面的参数,我们传入的sotype是SOCK_STREAM,因此会走这个分支
case syscall.SOCK_STREAM, syscall.SOCK_SEQPACKET:
if err := fd.listenStream(laddr, listenerBacklog, ctrlFn); err != nil {
fd.Close()
return nil, err
}
return fd, nil
case syscall.SOCK_DGRAM:
if err := fd.listenDatagram(laddr, ctrlFn); err != nil {
fd.Close()
return nil, err
}
return fd, nil
}
}
if err := fd.dial(ctx, laddr, raddr, ctrlFn); err != nil {
fd.Close()
return nil, err
}
return fd, nil
}

我们先来看sysSocket方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
func sysSocket(family, sotype, proto int) (int, error) {
// 这里的socketFunc实际上是创建socket的系统调用
// socketFunc func(int, int, int) (int, error) = syscall.Socket
// 注意这里传入的SOCK_NONBLOCK,表明我们创建的是非阻塞的socket
// 这里的SOCK_CLOEXEC表明在执行fork系统调用时,当执行exec时需要关闭从父进程继承的文件设备
s, err := socketFunc(family, sotype|syscall.SOCK_NONBLOCK|syscall.SOCK_CLOEXEC, proto)
switch err {
case nil:
return s, nil
default:
return -1, os.NewSyscallError("socket", err)
// 低版本内核不支持创建时指定SOCK_NONBLOCK或者SOCK_CLOEXEC
// 这时候需要分两步,先创建socket,然后再设置flag
case syscall.EPROTONOSUPPORT, syscall.EINVAL:
}

// 这里需要加锁,与fork操作互斥,防止在创建socket而没有设置`SOCK_CLOEXEC`时执行了fork和exec
syscall.ForkLock.RLock()
// 创建socket
s, err = socketFunc(family, sotype, proto)
if err == nil {
// 设置SOCK_COLEXEC
syscall.CloseOnExec(s)
}
syscall.ForkLock.RUnlock()
if err != nil {
return -1, os.NewSyscallError("socket", err)
}
// 设置非阻塞IO
if err = syscall.SetNonblock(s, true); err != nil {
poll.CloseFunc(s)
return -1, os.NewSyscallError("setnonblock", err)
}
return s, nil
}

sysSocket主要通过系统调用创建了socket同时设置了SOCK_NONBLOCK标志位,这点非常重要,这里要明确,我们在go中使用的网络连接一般都是非阻塞的。关于阻塞IO和非阻塞IO的区别网上有一大堆的资料,这里就不重复说明了。使用非阻塞IO的主要的原因是,在go中,当使用阻塞系统调用时,当前goroutine对应的底层系统级线程就会被占用,无法与当前g解绑为其他g提供服务,这样当需要执行其他g时就需要创建新的线程来执行

接着来看netFd.listenStream

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
func (fd *netFD) listenStream(laddr sockaddr, backlog int, ctrlFn func(string, string, syscall.RawConn) error) error {
...
// 为socket绑定监听的ip和端口
if err = syscall.Bind(fd.pfd.Sysfd, lsa); err != nil {
return os.NewSyscallError("bind", err)
}
// listenFunc func(int, int) error = syscall.Listen
// 这里的listenFunc实际上是系统调用Listen
// 开始监听
if err = listenFunc(fd.pfd.Sysfd, backlog); err != nil {
return os.NewSyscallError("listen", err)
}
// 执行初始化操作
if err = fd.init(); err != nil {
return err
}
lsa, _ = syscall.Getsockname(fd.pfd.Sysfd)
fd.setAddr(fd.addrFunc()(lsa), nil)
return nil
}

这里就是常规的绑定监听地址和端口,然后开始监听,这里重要的是netFD.init函数,先来看netFD的结构:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
// Network file descriptor.
type netFD struct {
pfd poll.FD

// immutable until Close
family int
sotype int
isConnected bool // handshake completed or use of association with peer
net string
laddr Addr
raddr Addr
}

// FD is a file descriptor. The net and os packages use this type as a
// field of a larger type representing a network connection or OS file.
// 对应上面的poll.FD
type FD struct {
// Lock sysfd and serialize access to Read and Write methods.
fdmu fdMutex // 执行read/write时的互斥锁

// System file descriptor. Immutable until Close.
Sysfd int // open系统调用返回的文件描述符fd

// I/O poller.
pd pollDesc

// Writev cache.
iovecs *[]syscall.Iovec

// Semaphore signaled when file is closed.
csema uint32

// Non-zero if this file has been set to blocking mode.
isBlocking uint32

// Whether this is a streaming descriptor, as opposed to a
// packet-based descriptor like a UDP socket. Immutable.
IsStream bool

// Whether a zero byte read indicates EOF. This is false for a
// message based socket connection.
ZeroReadIsEOF bool

// Whether this is a file rather than a network socket.
isFile bool
}

接着看上面的netFD.init函数:

1
2
3
4
func (fd *netFD) init() error {
// 这里的pfd实际上就是poll.FD,用来表示一个网络连接或者打开的系统文件
return fd.pfd.Init(fd.net, true)
}

我们来看一下pollFD.Init

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
func (fd *FD) Init(net string, pollable bool) error {
// We don't actually care about the various network types.
if net == "file" {
fd.isFile = true
}

if !pollable {
fd.isBlocking = 1
return nil
}
// 这里又有个init,这里的pd是pollDesc类型
// 只有pollable才会调用该方法
err := fd.pd.init(fd)
if err != nil {
// If we could not initialize the runtime poller,
// assume we are using blocking mode.
fd.isBlocking = 1
}
return err
}

可以看到上面又有个init函数,我们先来看一下fd.pd对应的pollDesc类型:

1
2
3
type pollDesc struct {
runtimeCtx uintptr // 这个运行时上下文很重要
}

我们来看一下init函数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
var serverInit sync.Once

func (pd *pollDesc) init(fd *FD) error {
// 保证runtime_pollServerInit只会执行一次
// 从命名很容易看出来该方法在runtime包中实现
serverInit.Do(runtime_pollServerInit)
// 执行runtime_pollOpen
// 只有文件是pollable的时候,才会走到这里
// 该方法实际上是将fd加入到epoll中,该方法在runtime包中实现
ctx, errno := runtime_pollOpen(uintptr(fd.Sysfd))
if errno != 0 {
if ctx != 0 {
runtime_pollUnblock(ctx)
runtime_pollClose(ctx)
}
return syscall.Errno(errno)
}
// 保存pollOpen返回的上下文
pd.runtimeCtx = ctx
return nil
}

上面这个函数才是关键所在,这里涉及到了runtime_pollServerInitruntime_pollOpen两个函数,从命名可以很容易看出这两个函数是在runtime包中实现的,然后在链接器链接过来的

先来看一下runtime_pollServerInit实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
func poll_runtime_pollServerInit() {
netpollinit()
atomic.Store(&netpollInited, 1)
}

func netpollinit() {
// 执行系统调用创建epoll
// 先尝试使用create1系统调用
epfd = epollcreate1(_EPOLL_CLOEXEC)
if epfd >= 0 {
return
}
// 这边的1024是历史原因,只要大于0就好了
// 原先epoll底层使用hash表实现,需要传入一个size指定hash表的大小,后面基于rb-tree实现,因此这个参数没有实际意义了,大于0即可
epfd = epollcreate(1024)
if epfd >= 0 {
closeonexec(epfd)
return
}
println("runtime: epollcreate failed with", -epfd)
throw("runtime: netpollinit failed")
}

很简单,就是创建了一个epoll

再来看一下runtime_pollOpen的实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
func poll_runtime_pollOpen(fd uintptr) (*pollDesc, int) {
// 分配一个pollDesc,这个pollDesc是runtime的pollDesc,和上面的pollDesc不是同一个东西,但是他们之间又有关联
pd := pollcache.alloc()
lock(&pd.lock)
if pd.wg != 0 && pd.wg != pdReady {
throw("runtime: blocked write on free polldesc")
}
if pd.rg != 0 && pd.rg != pdReady {
throw("runtime: blocked read on free polldesc")
}
pd.fd = fd
pd.closing = false
pd.seq++
pd.rg = 0
pd.rd = 0
pd.wg = 0
pd.wd = 0
unlock(&pd.lock)

var errno int32
errno = netpollopen(fd, pd)
// 这里返回了pd的地址,也就是poll.pollDesc中的runtimeCtx实际上保存的就是runtime.pollDesc的地址
return pd, int(errno)
}

func netpollopen(fd uintptr, pd *pollDesc) int32 {
var ev epollevent
// 设置需要通知的实际类型,这里设置了边缘触发模式,关于epoll的边缘触发和水平触发模式可以网上有一堆的资料
// 边缘触发和水平触发的本质区别,就是水平触发的话,当事件从epoll的readyList拷贝到用户空间时,会重新加入到readyList,这样下次执行epoll_wait的话,readyList还会有该事件存在(epoll_wait会重新执行file->operations中的poll方法确定是否有事件可以消费)
ev.events = _EPOLLIN | _EPOLLOUT | _EPOLLRDHUP | _EPOLLET
// 可以看到,这里把pollDesc的地址存到了ev.Data中
*(**pollDesc)(unsafe.Pointer(&ev.data)) = pd
// 执行epollctl系统调用,添加socket到epoll中
return -epollctl(epfd, _EPOLL_CTL_ADD, int32(fd), &ev)
}

至此一个net.Listener就创建完成了,总结一下主要的逻辑:

  1. 创建一个非阻塞socket,并执行bindlisten
  2. 如果没有初始化过runtime包的epoll,则执行初始化,创建一个epoll
  3. 以边缘触发模式将socket添加到epoll
  4. 返回封装后的net.Listener

runtime包中的一些注释

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
// Integrated network poller (platform-independent part).
// A particular implementation (epoll/kqueue) must define the following functions:
// func netpollinit() // to initialize the poller
// func netpollopen(fd uintptr, pd *pollDesc) int32 // to arm edge-triggered notifications
// and associate fd with pd.
// An implementation must call the following function to denote that the pd is ready.
// func netpollready(gpp **g, pd *pollDesc, mode int32)

// pollDesc contains 2 binary semaphores, rg and wg, to park reader and writer
// goroutines respectively. The semaphore can be in the following states:
// pdReady - io readiness notification is pending;
// a goroutine consumes the notification by changing the state to nil.
// pdWait - a goroutine prepares to park on the semaphore, but not yet parked;
// the goroutine commits to park by changing the state to G pointer,
// or, alternatively, concurrent io notification changes the state to READY,
// or, alternatively, concurrent timeout/close changes the state to nil.
// G pointer - the goroutine is blocked on the semaphore;
// io notification or timeout/close changes the state to READY or nil respectively
// and unparks the goroutine.
// nil - nothing of the above.

pollDescrgwg字段,可能的取值情况:

  • pdReadyrg表示当前有可读事件,wg表示可写
  • pdWait:表示即将进入等待
  • G的指针:需要先进入pdWait,然后调用gopark,设置等待事件类型,如果是等待读,则设置rg,等待写则设置wg为当前G的指针,然后挂起;当事件到达,runtime会将对应的G唤醒
  • 0:其他情况

Accept又是如何执行的呢

接下来我们来看一下执行Accept时会发生什么

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
func (l *TCPListener) Accept() (Conn, error) {
if !l.ok() {
return nil, syscall.EINVAL
}
c, err := l.accept()
if err != nil {
return nil, &OpError{Op: "accept", Net: l.fd.net, Source: nil, Addr: l.fd.laddr, Err: err}
}
return c, nil
}

func (ln *TCPListener) accept() (*TCPConn, error) {
fd, err := ln.fd.accept()
if err != nil {
return nil, err
}
return newTCPConn(fd), nil
}

我们上面创建的是一个TcpListener,因此自然是执行对应的Accept,可以看到是调用netFD.Accept

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
func (fd *netFD) accept() (netfd *netFD, err error) {
// 执行poll.FD的Accept方法,获取新的客户端连接
d, rsa, errcall, err := fd.pfd.Accept()
if err != nil {
if errcall != "" {
err = wrapSyscallError(errcall, err)
}
return nil, err
}
// 封装netFD
if netfd, err = newFD(d, fd.family, fd.sotype, fd.net); err != nil {
poll.CloseFunc(d)
return nil, err
}
// 这里的netFD.init上面分析过了,就是将新的socket加入到epoll中
if err = netfd.init(); err != nil {
fd.Close()
return nil, err
}
lsa, _ := syscall.Getsockname(netfd.pfd.Sysfd)
netfd.setAddr(netfd.addrFunc()(lsa), netfd.addrFunc()(rsa))
return netfd, nil
}

接下来看一下poll.FDAccept方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
func (fd *FD) Accept() (int, syscall.Sockaddr, string, error) {
// 尝试加锁
if err := fd.readLock(); err != nil {
return -1, nil, "", err
}
defer fd.readUnlock()

if err := fd.pd.prepareRead(fd.isFile); err != nil {
return -1, nil, "", err
}
for {
/// 首先尝试直接获取客户端连接
s, rsa, errcall, err := accept(fd.Sysfd)
if err == nil { // 获取成功,直接返回
return s, rsa, "", err
}
switch err {
// 因为我们创建的socket是非阻塞的,当没有新的连接可以accept时会直接返回EAGAIN而不是阻塞
case syscall.EAGAIN:
// 如果是可轮询的,表明可以等到epoll事件通知
if fd.pd.pollable() {
//
if err = fd.pd.waitRead(fd.isFile); err == nil {
continue
}
}
case syscall.ECONNABORTED:
// This means that a socket on the listen
// queue was closed before we Accept()ed it;
// it's a silly error, so try again.
continue
}
return -1, nil, errcall, err
}
}

func accept(s int) (int, syscall.Sockaddr, string, error) {
// var Accept4Func func(int, int) (int, syscall.Sockaddr, error) = syscall.Accept4
// 首先使用系统调用accept4获取一个非阻塞的socket
ns, sa, err := Accept4Func(s, syscall.SOCK_NONBLOCK|syscall.SOCK_CLOEXEC)
switch err {
case nil:
return ns, sa, "", nil
default: // errors other than the ones listed
return -1, sa, "accept4", err
case syscall.ENOSYS: // syscall missing
case syscall.EINVAL: // some Linux use this instead of ENOSYS
case syscall.EACCES: // some Linux use this instead of ENOSYS
case syscall.EFAULT: // some Linux use this instead of ENOSYS
}
// 有些内核不支持accept4
ns, sa, err = AcceptFunc(s)
if err == nil {
syscall.CloseOnExec(ns)
}
if err != nil {
return -1, nil, "accept", err
}
// 设置非阻塞模式
if err = syscall.SetNonblock(ns, true); err != nil {
CloseFunc(ns)
return -1, nil, "setnonblock", err
}
return ns, sa, "", nil
}

接着来看pollDesc.waitRead实现:

1
2
3
4
5
6
7
8
9
10
11
12
func (pd *pollDesc) waitRead(isFile bool) error {
return pd.wait('r', isFile)
}

func (pd *pollDesc) wait(mode int, isFile bool) error {
if pd.runtimeCtx == 0 {
return errors.New("waiting for unsupported file type")
}
// 又是一个runtime包的方法
res := runtime_pollWait(pd.runtimeCtx, mode)
return convertErr(res, isFile)
}

接着看一下runtime_pollWait实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
func poll_runtime_pollWait(pd *pollDesc, mode int) int {
err := netpollcheckerr(pd, int32(mode))
if err != 0 {
return err
}
// As for now only Solaris uses level-triggered IO.
if GOOS == "solaris" {
netpollarm(pd, mode)
}
// 实际干活的是netpollblock
for !netpollblock(pd, int32(mode), false) {
err = netpollcheckerr(pd, int32(mode))
if err != 0 {
return err
}
// Can happen if timeout has fired and unblocked us,
// but before we had a chance to run, timeout has been reset.
// Pretend it has not happened and retry.
}
return 0
}

func netpollblock(pd *pollDesc, mode int32, waitio bool) bool {
// 这里如果是'r'模式,则gpp是&pd.rg
// 'w'模式则是'&pd.wg'
gpp := &pd.rg
if mode == 'w' {
gpp = &pd.wg
}

// cas操作,设置gpp为pdwait
for {
old := *gpp
if old == pdReady {
*gpp = 0
return true
}
if old != 0 {
throw("runtime: double wait")
}
if atomic.Casuintptr(gpp, 0, pdWait) {
break
}
}

// 这里直接执行gopark,将当前协程挂起 ^-^
if waitio || netpollcheckerr(pd, mode) == 0 {
// 这里netpollblockcommit会被调用,把当前g的引用保存到gpp中,也就是pollDesc的rg或者wg中
gopark(netpollblockcommit, unsafe.Pointer(gpp), waitReasonIOWait, traceEvGoBlockNet, 5)
}
// be careful to not lose concurrent READY notification
old := atomic.Xchguintptr(gpp, 0)
if old > pdWait {
throw("runtime: corrupted polldesc")
}
return old == pdReady
}

至此,Accept的流程也很清晰了:

  1. 首先直接尝试通过socket执行accept来获取可能的客户端连接
  2. 如果此时客户端没有连接,因为socket是非阻塞模式,会直接返回EAGAIN
  3. 调用runtime.poll_runtime_pollWait将当前协程挂起,并且根据是等待读还是等待写将当前g的引用保存到pollDesc中的rg或者wg
  4. 当有新的客户端连接到来时,epoll会通知将当前阻塞的协程恢复,然后重新执行第一步

那么epoll的wait又是什么时候调用的呢

我们可以在协程的调度逻辑中看到这样一段代码段:

1
2
3
4
5
6
7
8
9
10
11
12
13
if netpollinited() && atomic.Load(&netpollWaiters) > 0 && atomic.Load64(&sched.lastpoll) != 0 {
// 这里的netpoll的参数false表示不阻塞
if gp := netpoll(false); gp != nil {
// 这里获取的可能是一个列表,将后面多余的g加入调度队列,这里调度一次只能调度一个
injectglist(gp.schedlink.ptr())
// 设置g为runnable
casgstatus(gp, _Gwaiting, _Grunnable)
if trace.enabled {
traceGoUnpark(gp, 0)
}
return gp, false
}
}

我们来看一下netpoll的执行:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
func netpoll(block bool) *g {
if epfd == -1 {
return nil
}
waitms := int32(-1)
// 调度逻辑中传入的是0
if !block {
waitms = 0
}
var events [128]epollevent
retry:
// 执行epoll_wait系统调用
n := epollwait(epfd, &events[0], int32(len(events)), waitms)
if n < 0 {
if n != -_EINTR {
println("runtime: epollwait on fd", epfd, "failed with", -n)
throw("runtime: netpoll failed")
}
goto retry
}
// 这里gp是一个链表
var gp guintptr
for i := int32(0); i < n; i++ {
ev := &events[i]
if ev.events == 0 {
continue
}
var mode int32
if ev.events&(_EPOLLIN|_EPOLLRDHUP|_EPOLLHUP|_EPOLLERR) != 0 {
mode += 'r'
}
if ev.events&(_EPOLLOUT|_EPOLLHUP|_EPOLLERR) != 0 {
mode += 'w'
}
if mode != 0 {
// 从ev.data取出pollDesc,还记得上面分析过,在加入epoll时会把对应的pollDesc保存到ev.Data中,而协程阻塞时会把g指针保存在pollDesc中的rg或者wg中
pd := *(**pollDesc)(unsafe.Pointer(&ev.data))
// 这里执行netpollready,把对应阻塞的g加到gp链表头部
netpollready(&gp, pd, mode)
}
}
if block && gp == 0 {
goto retry
}
return gp.ptr()
}

func netpollready(gpp *guintptr, pd *pollDesc, mode int32) {
var rg, wg guintptr
if mode == 'r' || mode == 'r'+'w' {
// 这里调用了netpollunblock,获取对应的g
rg.set(netpollunblock(pd, 'r', true))
}
if mode == 'w' || mode == 'r'+'w' {
wg.set(netpollunblock(pd, 'w', true))
}
// 链表设置,将新的g添加到链表头部
if rg != 0 {
rg.ptr().schedlink = *gpp
*gpp = rg
}
if wg != 0 {
wg.ptr().schedlink = *gpp
*gpp = wg
}
}

func netpollunblock(pd *pollDesc, mode int32, ioready bool) *g {
// 如果是等待读则rg是阻塞的g的引用
// 如果是等待写则wg是阻塞的g的引用
gpp := &pd.rg
if mode == 'w' {
gpp = &pd.wg
}

for {
old := *gpp
if old == pdReady {
return nil
}
if old == 0 && !ioready {
// Only set READY for ioready. runtime_pollWait
// will check for timeout/cancel before waiting.
return nil
}
var new uintptr
if ioready {
new = pdReady
}
// 状态为ready
if atomic.Casuintptr(gpp, old, new) {
if old == pdReady || old == pdWait {
old = 0
}
return (*g)(unsafe.Pointer(old))
}
}
}

可以看到,在执行协程的调度时,会执行epoll_wait系统调用,获取已经准备好的socket,并唤醒对应的goroutine

除了在调度时会执行epoll_wait,在后台线程sysmon中也会定时执行epoll_wait

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
func sysmon() {
...
for {
...
if netpollinited() && lastpoll != 0 && lastpoll+10*1000*1000 < now {
atomic.Cas64(&sched.lastpoll, uint64(lastpoll), uint64(now))
gp := netpoll(false) // non-blocking - returns list of goroutines
if gp != nil {
incidlelocked(-1)
injectglist(gp)
incidlelocked(1)
}
}
...
}
}

大同小异的读写操作

那么接下来,我们来看一下Read操作,实际上Read最后会执行

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
func (c *conn) Read(b []byte) (int, error) {
if !c.ok() {
return 0, syscall.EINVAL
}
n, err := c.fd.Read(b)
if err != nil && err != io.EOF {
err = &OpError{Op: "read", Net: c.fd.net, Source: c.fd.laddr, Addr: c.fd.raddr, Err: err}
}
return n, err
}

func (fd *netFD) Read(p []byte) (n int, err error) {
n, err = fd.pfd.Read(p)
runtime.KeepAlive(fd)
return n, wrapSyscallError("read", err)
}

最后到了poll.FDRead方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
func (fd *FD) Read(p []byte) (int, error) {
// 这里执行对应的加锁操作
...
for {
// 首先尝试直接读,如果无可读内容,因为是非阻塞模式,会返回EAGAIN
n, err := syscall.Read(fd.Sysfd, p)
if err != nil {
n = 0
if err == syscall.EAGAIN && fd.pd.pollable() {
// 这里的waitRead有没有似曾相识?这个方法在accept流程的时候已经分析过了,最后会将当前协程挂起
if err = fd.pd.waitRead(fd.isFile); err == nil {
continue
}
}

// On MacOS we can see EINTR here if the user
// pressed ^Z. See issue #22838.
if runtime.GOOS == "darwin" && err == syscall.EINTR {
continue
}
}
err = fd.eofError(n, err)
return n, err
}
}

再来看一下写过程,最后会执行:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
func (fd *FD) Write(p []byte) (int, error) {
// 这里执行对应的加锁操作
...
// 记录已经写入字节数
var nn int
for {
max := len(p)
if fd.IsStream && max-nn > maxRW {
max = nn + maxRW
}
n, err := syscall.Write(fd.Sysfd, p[nn:max])
if n > 0 {
nn += n
}
// 写入方法与读方法的区别在于,读方法只要读取到内容就会返回
// 而写入需要将传入的字节切片全部写入才返回
if nn == len(p) {
return nn, err
}
// 这里的waitWrite和上面的waitRead类似
if err == syscall.EAGAIN && fd.pd.pollable() {
if err = fd.pd.waitWrite(fd.isFile); err == nil {
continue
}
}
if err != nil {
return nn, err
}
if n == 0 {
return nn, io.ErrUnexpectedEOF
}
}
}

// 其实最后都是调用的pd.wait
func (pd *pollDesc) waitWrite(isFile bool) error {
return pd.wait('w', isFile)
}

// 最终调用runtime_pollWait将当前协程挂起
func (pd *pollDesc) wait(mode int, isFile bool) error {
if pd.runtimeCtx == 0 {
return errors.New("waiting for unsupported file type")
}
res := runtime_pollWait(pd.runtimeCtx, mode)
return convertErr(res, isFile)
}

差点被遗忘的close

接着来看一下Close方法,实际执行的是:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
func (c *conn) Close() error {
if !c.ok() {
return syscall.EINVAL
}
// 这里执行netFD.Close
err := c.fd.Close()
if err != nil {
err = &OpError{Op: "close", Net: c.fd.net, Source: c.fd.laddr, Addr: c.fd.raddr, Err: err}
}
return err
}

func (fd *netFD) Close() error {
// 清除finalizer
runtime.SetFinalizer(fd, nil)
// 调用poll.FD的Close方法
return fd.pfd.Close()
}


func (fd *FD) Close() error {
if !fd.fdmu.increfAndClose() {
return errClosing(fd.isFile)
}

// 这里evict方法唤醒所有阻塞读写的g
fd.pd.evict()
// 减少引用,如果引用为0则关闭
err := fd.decref()

if fd.isBlocking == 0 {
runtime_Semacquire(&fd.csema)
}

return err
}

func (pd *pollDesc) evict() {
if pd.runtimeCtx == 0 {
return
}
runtime_pollUnblock(pd.runtimeCtx)
}

func poll_runtime_pollUnblock(pd *pollDesc) {
lock(&pd.lock)
if pd.closing {
throw("runtime: unblock on closing polldesc")
}
pd.closing = true
pd.seq++
var rg, wg *g
atomicstorep(unsafe.Pointer(&rg), nil)
// 获取阻塞的g
rg = netpollunblock(pd, 'r', false)
wg = netpollunblock(pd, 'w', false)
if pd.rt.f != nil {
deltimer(&pd.rt)
pd.rt.f = nil
}
if pd.wt.f != nil {
deltimer(&pd.wt)
pd.wt.f = nil
}
unlock(&pd.lock)
if rg != nil {
// 调用goready唤醒g
netpollgoready(rg, 3)
}
if wg != nil {
// 唤醒g
netpollgoready(wg, 3)
}
}


func (fd *FD) decref() error {
// 减少引用,如果引用为0,则返回true
if fd.fdmu.decref() {
// 关闭连接
return fd.destroy()
}
return nil
}

func (fd *FD) destroy() error {
// 调用runtime_pollClose方法
fd.pd.close()
// var CloseFunc func(int) error = syscall.Close
// 这里的CloseFunc就是系统调用close
err := CloseFunc(fd.Sysfd)
fd.Sysfd = -1
runtime_Semrelease(&fd.csema)
return err
}

func (pd *pollDesc) close() {
if pd.runtimeCtx == 0 {
return
}
runtime_pollClose(pd.runtimeCtx)
pd.runtimeCtx = 0
}

func poll_runtime_pollClose(pd *pollDesc) {
if !pd.closing {
throw("runtime: close polldesc w/o unblock")
}
if pd.wg != 0 && pd.wg != pdReady {
throw("runtime: blocked write on closing polldesc")
}
if pd.rg != 0 && pd.rg != pdReady {
throw("runtime: blocked read on closing polldesc")
}
// 从epoll中删除fd
netpollclose(pd.fd)
// 释放pollDesc
pollcache.free(pd)
}

func netpollclose(fd uintptr) int32 {
var ev epollevent
// 系统调用epoll_ctl删除对应的fd
return -epollctl(epfd, _EPOLL_CTL_DEL, int32(fd), &ev)
}

综上,关闭一个连接时:

  1. 设置pollDesc相关flag为已关闭,唤醒该连接上阻塞的协程
  2. 减少对应poll.FD的引用,如果引用为0,则只需真正的关闭
  3. 执行关闭操作,先从epoll删除对应的fd,然后执行close系统调用关闭

最后

可以看到,go使用非阻塞IO来防止大量系统线程阻塞带来的上下文切换,取而代之的是让轻量级的协程阻塞在IO事件上,然后通过epoll来实现IO事件通知,唤醒阻塞的协程。