// Network file descriptor. type netFD struct { pfd poll.FD
// immutable until Close family int sotype int isConnected bool// handshake completed or use of association with peer net string laddr Addr raddr Addr }
// FD is a file descriptor. The net and os packages use this type as a // field of a larger type representing a network connection or OS file. // 对应上面的poll.FD type FD struct { // Lock sysfd and serialize access to Read and Write methods. fdmu fdMutex // 执行read/write时的互斥锁
// System file descriptor. Immutable until Close. Sysfd int// open系统调用返回的文件描述符fd
// I/O poller. pd pollDesc
// Writev cache. iovecs *[]syscall.Iovec
// Semaphore signaled when file is closed. csema uint32
// Non-zero if this file has been set to blocking mode. isBlocking uint32
// Whether this is a streaming descriptor, as opposed to a // packet-based descriptor like a UDP socket. Immutable. IsStream bool
// Whether a zero byte read indicates EOF. This is false for a // message based socket connection. ZeroReadIsEOF bool
// Whether this is a file rather than a network socket. isFile bool }
func(fd *FD)Init(net string, pollable bool)error { // We don't actually care about the various network types. if net == "file" { fd.isFile = true } if !pollable { fd.isBlocking = 1 returnnil } // 这里又有个init,这里的pd是pollDesc类型 // 只有pollable才会调用该方法 err := fd.pd.init(fd) if err != nil { // If we could not initialize the runtime poller, // assume we are using blocking mode. fd.isBlocking = 1 } return err }
可以看到上面又有个init函数,我们先来看一下fd.pd对应的pollDesc类型:
1 2 3
type pollDesc struct { runtimeCtx uintptr// 这个运行时上下文很重要 }
// Integrated network poller (platform-independent part). // A particular implementation (epoll/kqueue) must define the following functions: // func netpollinit() // to initialize the poller // func netpollopen(fd uintptr, pd *pollDesc) int32 // to arm edge-triggered notifications // and associate fd with pd. // An implementation must call the following function to denote that the pd is ready. // func netpollready(gpp **g, pd *pollDesc, mode int32)
// pollDesc contains 2 binary semaphores, rg and wg, to park reader and writer // goroutines respectively. The semaphore can be in the following states: // pdReady - io readiness notification is pending; // a goroutine consumes the notification by changing the state to nil. // pdWait - a goroutine prepares to park on the semaphore, but not yet parked; // the goroutine commits to park by changing the state to G pointer, // or, alternatively, concurrent io notification changes the state to READY, // or, alternatively, concurrent timeout/close changes the state to nil. // G pointer - the goroutine is blocked on the semaphore; // io notification or timeout/close changes the state to READY or nil respectively // and unparks the goroutine. // nil - nothing of the above.
func(fd *FD)Accept()(int, syscall.Sockaddr, string, error) { // 尝试加锁 if err := fd.readLock(); err != nil { return-1, nil, "", err } defer fd.readUnlock() if err := fd.pd.prepareRead(fd.isFile); err != nil { return-1, nil, "", err } for { /// 首先尝试直接获取客户端连接 s, rsa, errcall, err := accept(fd.Sysfd) if err == nil { // 获取成功,直接返回 return s, rsa, "", err } switch err { // 因为我们创建的socket是非阻塞的,当没有新的连接可以accept时会直接返回EAGAIN而不是阻塞 case syscall.EAGAIN: // 如果是可轮询的,表明可以等到epoll事件通知 if fd.pd.pollable() { // if err = fd.pd.waitRead(fd.isFile); err == nil { continue } } case syscall.ECONNABORTED: // This means that a socket on the listen // queue was closed before we Accept()ed it; // it's a silly error, so try again. continue } return-1, nil, errcall, err } }
funcaccept(s int)(int, syscall.Sockaddr, string, error) { // var Accept4Func func(int, int) (int, syscall.Sockaddr, error) = syscall.Accept4 // 首先使用系统调用accept4获取一个非阻塞的socket ns, sa, err := Accept4Func(s, syscall.SOCK_NONBLOCK|syscall.SOCK_CLOEXEC) switch err { casenil: return ns, sa, "", nil default: // errors other than the ones listed return-1, sa, "accept4", err case syscall.ENOSYS: // syscall missing case syscall.EINVAL: // some Linux use this instead of ENOSYS case syscall.EACCES: // some Linux use this instead of ENOSYS case syscall.EFAULT: // some Linux use this instead of ENOSYS } // 有些内核不支持accept4 ns, sa, err = AcceptFunc(s) if err == nil { syscall.CloseOnExec(ns) } if err != nil { return-1, nil, "accept", err } // 设置非阻塞模式 if err = syscall.SetNonblock(ns, true); err != nil { CloseFunc(ns) return-1, nil, "setnonblock", err } return ns, sa, "", nil }
funcpoll_runtime_pollWait(pd *pollDesc, mode int)int { err := netpollcheckerr(pd, int32(mode)) if err != 0 { return err } // As for now only Solaris uses level-triggered IO. if GOOS == "solaris" { netpollarm(pd, mode) } // 实际干活的是netpollblock for !netpollblock(pd, int32(mode), false) { err = netpollcheckerr(pd, int32(mode)) if err != 0 { return err } // Can happen if timeout has fired and unblocked us, // but before we had a chance to run, timeout has been reset. // Pretend it has not happened and retry. } return0 }
// cas操作,设置gpp为pdwait for { old := *gpp if old == pdReady { *gpp = 0 returntrue } if old != 0 { throw("runtime: double wait") } if atomic.Casuintptr(gpp, 0, pdWait) { break } }
// 这里直接执行gopark,将当前协程挂起 ^-^ if waitio || netpollcheckerr(pd, mode) == 0 { // 这里netpollblockcommit会被调用,把当前g的引用保存到gpp中,也就是pollDesc的rg或者wg中 gopark(netpollblockcommit, unsafe.Pointer(gpp), waitReasonIOWait, traceEvGoBlockNet, 5) } // be careful to not lose concurrent READY notification old := atomic.Xchguintptr(gpp, 0) if old > pdWait { throw("runtime: corrupted polldesc") } return old == pdReady }
for { old := *gpp if old == pdReady { returnnil } if old == 0 && !ioready { // Only set READY for ioready. runtime_pollWait // will check for timeout/cancel before waiting. returnnil } varnewuintptr if ioready { new = pdReady } // 状态为ready if atomic.Casuintptr(gpp, old, new) { if old == pdReady || old == pdWait { old = 0 } return (*g)(unsafe.Pointer(old)) } } }
func(fd *FD)Read(p []byte)(int, error) { // 这里执行对应的加锁操作 ... for { // 首先尝试直接读,如果无可读内容,因为是非阻塞模式,会返回EAGAIN n, err := syscall.Read(fd.Sysfd, p) if err != nil { n = 0 if err == syscall.EAGAIN && fd.pd.pollable() { // 这里的waitRead有没有似曾相识?这个方法在accept流程的时候已经分析过了,最后会将当前协程挂起 if err = fd.pd.waitRead(fd.isFile); err == nil { continue } }
// On MacOS we can see EINTR here if the user // pressed ^Z. See issue #22838. if runtime.GOOS == "darwin" && err == syscall.EINTR { continue } } err = fd.eofError(n, err) return n, err } }
func(fd *FD)Write(p []byte)(int, error) { // 这里执行对应的加锁操作 ... // 记录已经写入字节数 var nn int for { max := len(p) if fd.IsStream && max-nn > maxRW { max = nn + maxRW } n, err := syscall.Write(fd.Sysfd, p[nn:max]) if n > 0 { nn += n } // 写入方法与读方法的区别在于,读方法只要读取到内容就会返回 // 而写入需要将传入的字节切片全部写入才返回 if nn == len(p) { return nn, err } // 这里的waitWrite和上面的waitRead类似 if err == syscall.EAGAIN && fd.pd.pollable() { if err = fd.pd.waitWrite(fd.isFile); err == nil { continue } } if err != nil { return nn, err } if n == 0 { return nn, io.ErrUnexpectedEOF } } }