TCP I/O samples:

  1. echo-server: https://github.com/libuv/libuv/blob/v1.x/test/echo-server.c
  2. test-tcp-write-queue-order.c: https://github.com/libuv/libuv/blob/v1.x/test/test-tcp-write-queue-order.c

Use Flow:

  • Raw tcp api:

  • libuv tcp api:

Internal implementation:

struct uv_handle_t

Notes:

  • uv_tcp_t represents a TCP stream or TCP server.

struct uv_tcp_t

Notes:

https://github.com/libuv/libuv/blob/v1.x/src/unix/tcp.c

UV_EXTERN int uv_tcp_init(uv_loop_t*, uv_tcp_t* handle);
UV_EXTERN int uv_tcp_init_ex(uv_loop_t*, uv_tcp_t* handle, unsigned int flags);
UV_EXTERN int uv_tcp_open(uv_tcp_t* handle, uv_os_sock_t sock);
UV_EXTERN int uv_tcp_nodelay(uv_tcp_t* handle, int enable);
UV_EXTERN int uv_tcp_keepalive(uv_tcp_t* handle,
                               int enable,
                               unsigned int delay);
UV_EXTERN int uv_tcp_simultaneous_accepts(uv_tcp_t* handle, int enable);

UV_EXTERN int uv_tcp_bind(uv_tcp_t* handle,
                          const struct sockaddr* addr,
                          unsigned int flags);
UV_EXTERN int uv_tcp_getsockname(const uv_tcp_t* handle,
                                 struct sockaddr* name,
                                 int* namelen);
UV_EXTERN int uv_tcp_getpeername(const uv_tcp_t* handle,
                                 struct sockaddr* name,
                                 int* namelen);
UV_EXTERN int uv_tcp_connect(uv_connect_t* req,
                             uv_tcp_t* handle,
                             const struct sockaddr* addr,
                             uv_connect_cb cb);

https://github.com/libuv/libuv/blob/v1.x/src/unix/stream.c

UV_EXTERN int uv_listen(uv_stream_t* stream, int backlog, uv_connection_cb cb);
UV_EXTERN int uv_accept(uv_stream_t* server, uv_stream_t* client);
UV_EXTERN int uv_read_start(uv_stream_t*,
                            uv_alloc_cb alloc_cb,
                            uv_read_cb read_cb);
UV_EXTERN int uv_read_stop(uv_stream_t*);
UV_EXTERN int uv_write(uv_write_t* req,
                       uv_stream_t* handle,
                       const uv_buf_t bufs[],
                       unsigned int nbufs,
                       uv_write_cb cb);
UV_EXTERN int uv_write2(uv_write_t* req,
                        uv_stream_t* handle,
                        const uv_buf_t bufs[],
                        unsigned int nbufs,
                        uv_stream_t* send_handle,
                        uv_write_cb cb);
UV_EXTERN int uv_try_write(uv_stream_t* handle,
                           const uv_buf_t bufs[],
                           unsigned int nbufs);

struct uv_write_t

/* Size of object which can be written atomically.

   This macro has different values in different kernel versions.  The
   latest versions of the kernel use 1024 and this is good choice.  Since
   the C library implementation of readv/writev is able to emulate the
   functionality even if the currently running kernel does not support
   this large value the readv/writev call will not fail because of this.  */
#define UIO_MAXIOV  1024


/* Structure for scatter/gather I/O.  */
struct iovec
  {
    void *iov_base; /* Pointer to data.  */
    size_t iov_len; /* Length of data.  */
  };
  

/* Note: May be cast to struct iovec. See writev(2). */
typedef struct uv_buf_t {
  char* base;
  size_t len;
} uv_buf_t;

uv_write()

struct iovec* iov;
int iovmax;
int iovcnt;

iov = (struct iovec*) &(req->bufs[req->write_index]);
iovcnt = req->nbufs - req->write_index;
iovmax = uv__getiovmax();

/* Limit iov count to avoid EINVALs from writev() */
if (iovcnt > iovmax)
  iovcnt = iovmax;


if (iovcnt == 1) {
  n = write(uv__stream_fd(stream), iov[0].iov_base, iov[0].iov_len);
} else {
  n = writev(uv__stream_fd(stream), iov, iovcnt);
}

Notes:

  1. 如果程序S一直调用uv_write向程序C发送消息,但是程序C没有调用uv_read_start,或者程序C卡在了io_poll里,那么程序S请求发送的消息会先堆积在程序C的socket接收缓冲区(Recv-Q),然后在程序S的socket发送缓冲区(Send-Q)里堆积,再然后缓存在uv_stream_t的write_queue里,这会导致程序S占用的内存越来越大。 解决办法:加入类似心跳的机制(程序C发送给S的正常逻辑消息也可算作心跳包)<= a uv_timer_t for detecting timeouts。

struct uv__io_t

Notes:

  1. struct uv__io_t 起到上下关联的作用,它将各种 Handle(uv_tcp_t、uv_udp_t、uv_pipe_t、uv_timer_t等)与 struct uv_loop_t 关联上了。

struct uv_loop_t

struct uv_loop_t associated with io_poll

struct uv_loop_s {
  ... ...
  unsigned long flags;
  int backend_fd; // Important
  ... ...
  void* watcher_queue[2]; // Be new or modified watcher for this time.
  uv__io_t** watchers;
  unsigned int nwatchers;
  unsigned int nfds;
  ... ...
  uint64_t time;
  ... ...
};

https://github.com/libuv/libuv/blob/v1.x/src/unix/loop.c

UV_EXTERN int uv_loop_init(uv_loop_t* loop);
UV_EXTERN int uv_loop_close(uv_loop_t* loop);

https://github.com/libuv/libuv/blob/v1.x/src/unix/core.c

UV_EXTERN int uv_loop_alive(const uv_loop_t* loop);
UV_EXTERN int uv_run(uv_loop_t*, uv_run_mode mode);
UV_EXTERN void uv_update_time(uv_loop_t*);
UV_EXTERN int uv_is_active(const uv_handle_t* handle);
/* core, src/unix/internal.h */  
int uv__nonblock(int fd, int set);
int uv__close(int fd);
int uv__cloexec(int fd, int set);
int uv__socket(int domain, int type, int protocol);
int uv__dup(int fd);
ssize_t uv__recvmsg(int fd, struct msghdr *msg, int flags);
void uv__make_close_pending(uv_handle_t* handle);
int uv__getiovmax(void);

void uv__io_init(uv__io_t* w, uv__io_cb cb, int fd);
void uv__io_start(uv_loop_t* loop, uv__io_t* w, unsigned int events);
void uv__io_stop(uv_loop_t* loop, uv__io_t* w, unsigned int events);
void uv__io_close(uv_loop_t* loop, uv__io_t* w);
void uv__io_feed(uv_loop_t* loop, uv__io_t* w);
int uv__io_active(const uv__io_t* w, unsigned int events);

https://github.com/libuv/libuv/blob/v1.x/src/unix/linux-core.c

int uv__io_check_fd(uv_loop_t* loop, int fd);
void uv__io_poll(uv_loop_t* loop, int timeout); /* in milliseconds or -1 */

/* platform specific */
uint64_t uv__hrtime(uv_clocktype_t type);
int uv__kqueue_init(uv_loop_t* loop);
int uv__platform_loop_init(uv_loop_t* loop);
void uv__platform_loop_delete(uv_loop_t* loop);
void uv__platform_invalidate_fd(uv_loop_t* loop, int fd);

Notes:

  • UV__EPOLL_CTL_ADD, UV__EPOLL_CTL_MOD and UV__EPOLL_CTL_DEL
     uv__io_start()
---> QUEUE_INSERT_TAIL(&loop->watcher_queue, &w->watcher_queue);
   loop->watchers[w->fd] = w;
---> if (w->events == 0) op = UV__EPOLL_CTL_ADD;
   else op = UV__EPOLL_CTL_MOD;
   uv__epoll_ctl(loop->backend_fd, op, w->fd, &e)

     uv__io_stop()
---> loop->watchers[w->fd] = NULL <br>
---> w = loop->watchers[fd]; if (w == NULL) {
   uv__epoll_ctl(loop->backend_fd, UV__EPOLL_CTL_DEL, fd, pe); continue; }
  • UV__EPOLLERR and UV__POLLHUP
    • 根据 uv__io_t 它关心的事件(pevents),返回当前事件(events)为 UV__EPOLLIN 或 UV__EPOLLOUT 或两者之和。这样,当对应的文件描述符发生错误时,在 I/O 回调函数里,调用accept、recv、write 等将返回错误。
    struct uv__epoll_event* pe;  
    uv__io_t* w;
    pe->events &= w->pevents | UV__POLLERR | UV__POLLHUP;  
    if (pe->events == UV__EPOLLERR || pe->events == UV__EPOLLHUP)
        pe->events |= w->pevents & (UV__EPOLLIN | UV__EPOLLOUT);
    if (pe->events != 0) {
        w->cb(loop, w, pe->events);
        nevents++;
    }
  • 错误处理
    • accept 返回错误码:
      • EINTR:循环再次调用 accept (libuv 在 Linux 上用的使用 accept4);
      • EAGAIN 或 EWOULDBLOCK:不是错误,直接返回;
      • ECONNABORTED:循环再次调用 accept;
      • EMFILE 或 ENFILE:利用 loop->emfile_fd 做特殊处理。
    • recv 返回错误码 EAGAIN 或 EWOULDBLOCK:再注册一次 UV__POLLIN 事件,等待下一次 io_poll。
    • write 返回错误码 EAGAIN 或 EWOULDBLOCK:stream 是阻塞的,则重头再调用一次 write;否则,再注册一次 UV__POLLOUT 事件,等待下一次 io_poll。
  • 备注:
    • libuv open a socket in non-blocking close-on-exec mode, atomically if possible.
      sockfd = socket(domain, type | SOCK_NONBLOCK | SOCK_CLOEXEC, protocol);