pipe io
struct uv_pipe_t
Notes: uv_pipe_t is a subclass of uv_handle_t.
pipe socket and sockaddr
int sockfd = uv__socket(AF_UNIX, SOCK_STREAM, 0);
const char* pipe_fname = "/tmp/uv-test-sock";
struct sockaddr_un saddr;
memset(&saddr, 0, sizeof(saddr));
saddr.sun_family = AF_UNIX;
strncpy(saddr.sun_path, pipe_fname, sizeof(saddr.sun_path) - 1);
saddr.sun_path[sizeof(saddr.sun_path) - 1] = '\0';
tcp socket and sockaddr
int sockfd = uv__socket(AF_INET, SOCK_STREAM, 0);
const char* ip = "0.0.0.0";
int port = 9123;
struct sockaddr_in addr;
memset(&addr, 0, sizeof(addr));
addr.sin_family = AF_INET;
addr.sin_port = htons(port);
addr.sin_addr.s_addr = inet_addr(ip);
pipe() and socketpair()
- pipe(): creates a pipe, a unidirectional data channel that can be used for interprocess communication.
- socketpair(): create a pair of connected sockets.
To create a struct uv_loop_t open file descriptor
1. open two pipes:
I. signal.c static int uv__signal_lock_pipefd[2]. created in uv__signal_global_once_init() -> uv__signal_global_init(void) -> uv__make_pipe(uv__signal_lock_pipefd, 0).
II. loop->signal_pipefd. created in uv_signal_init(loop, &loop->child_watcher) -> uv__signal_loop_once_init(loop) -> uv__make_pipe(loop->signal_pipefd, UV__F_NONBLOCK).
2. open a eventfd in Linux: loop->async_watcher.io_watcher.fd. uv_async_init(loop, &loop->wq_async, uv__work_done) -> uv__async_start(loop, &loop->async_watcher, uv__async_event) -> uv__async_eventfd().
3. open a eventpoll in Linux: loop->backend_fd. uv__platform_loop_init(loop) -> uv__epoll_create1(UV__EPOLL_CLOEXEC).
Notes: If using GDB or Eclipse debugging, “ls -l /proc/pid/fd” to view the file descriptors that it opened, you will find it opened some other pipes. View Google forum: Where are these pipes created?.
AF_UNIX
NAME
unix - sockets for local interprocess communication SYNOPSIS
#include <sys/socket.h>
#include <sys/un.h>
unix_socket = socket(AF_UNIX, type, 0);
error = socketpair(AF_UNIX, type, 0, int *sv); DESCRIPTION top
The AF_UNIX (also known as AF_LOCAL) socket family is used to
communicate between processes on the same machine efficiently.
Traditionally, UNIX domain sockets can be either unnamed, or bound to
a filesystem pathname (marked as being of type socket). Linux also
supports an abstract namespace which is independent of the
filesystem.
Valid socket types in the UNIX domain are: SOCK_STREAM, for a stream-
oriented socket; SOCK_DGRAM, for a datagram-oriented socket that
preserves message boundaries (as on most UNIX implementations, UNIX
domain datagram sockets are always reliable and don't reorder
datagrams); and (since Linux 2.6.4) SOCK_SEQPACKET, for a sequenced-
packet socket that is connection-oriented, preserves message
boundaries, and delivers messages in the order that they were sent.
UNIX domain sockets support passing file descriptors or process
credentials to other processes using ancillary data.
Address format A UNIX domain socket address is represented in the following structure:
struct sockaddr_un {
sa_family_t sun_family; /* AF_UNIX */
char sun_path[108]; /* pathname */
};
The sun_family field always contains AF_UNIX. On Linux sun_path is
108 bytes in size.
Sending file descriptors over pipes
- Write a file descriptor
struct msghdr msg;
struct cmsghdr *cmsg;
int fd_to_send = uv__handle_fd((uv_handle_t*) req->send_handle);
char scratch[64] = {0};
assert(fd_to_send >= 0);
msg.msg_name = NULL;
msg.msg_namelen = 0;
msg.msg_iov = iov;
msg.msg_iovlen = iovcnt;
msg.msg_flags = 0;
msg.msg_control = (void*) scratch;
msg.msg_controllen = CMSG_SPACE(sizeof(fd_to_send));
cmsg = CMSG_FIRSTHDR(&msg);
cmsg->cmsg_level = SOL_SOCKET;
cmsg->cmsg_type = SCM_RIGHTS; /* Transfer file descriptors. */
cmsg->cmsg_len = CMSG_LEN(sizeof(fd_to_send));
/* silence aliasing warning */
{
void* pv = CMSG_DATA(cmsg);
int* pi = pv;
*pi = fd_to_send;
}
do {
n = sendmsg(uv__stream_fd(stream), &msg, 0);
}
- Recv a file descriptor
if (!is_ipc) {
do {
nread = read(uv__stream_fd(stream), buf.base, buf.len);
}
while (nread < 0 && errno == EINTR);
} else {
/* ipc uses recvmsg */
msg.msg_flags = 0;
msg.msg_iov = (struct iovec*) &buf;
msg.msg_iovlen = 1;
msg.msg_name = NULL;
msg.msg_namelen = 0;
/* Set up to receive a descriptor even if one isn't in the message */
msg.msg_controllen = sizeof(cmsg_space);
msg.msg_control = cmsg_space;
do {
nread = uv__recvmsg(uv__stream_fd(stream), &msg, 0);
}
while (nread < 0 && errno == EINTR);
}
ssize_t uv__recvmsg(int fd, struct msghdr* msg, int flags) {
struct cmsghdr* cmsg;
ssize_t rc;
int* pfd;
int* end;
#if defined(__linux__)
static int no_msg_cmsg_cloexec;
if (no_msg_cmsg_cloexec == 0) {
rc = recvmsg(fd, msg, flags | 0x40000000); /* MSG_CMSG_CLOEXEC */
if (rc != -1)
return rc;
if (errno != EINVAL)
return -errno;
rc = recvmsg(fd, msg, flags);
if (rc == -1)
return -errno;
no_msg_cmsg_cloexec = 1;
} else {
rc = recvmsg(fd, msg, flags);
}
#else
rc = recvmsg(fd, msg, flags);
#endif
if (rc == -1)
return -errno;
if (msg->msg_controllen == 0)
return rc;
for (cmsg = CMSG_FIRSTHDR(msg); cmsg != NULL; cmsg = CMSG_NXTHDR(msg, cmsg))
if (cmsg->cmsg_type == SCM_RIGHTS)
for (pfd = (int*) CMSG_DATA(cmsg),
end = (int*) ((char*) cmsg + cmsg->cmsg_len);
pfd < end;
pfd += 1)
uv__cloexec(*pfd, 1);
return rc;
}
- 需要注意的是传递描述符并不是传递一个 int 型的描述符编号,而是在接收进程中创建一个新的描述符,并且在内核的文件表中,它与发送进程发送的描述符指向相同的项。
- 在进程之间可以传递任意类型的描述符,比如可以是 pipe , open , mkfifo 或 socket , accept 等函数返回的描述符,而不限于套接字。
- 一个描述符在传递过程中(从调用 sendmsg 发送到调用 recvmsg 接收),内核会将其标记为“在飞行中”( in flight )。在这段时间内,即使发送方试图关闭该描述符,内核仍会为接收进程保持打开状态。发送描述符会使其引用计数加 1 。
- 描述符是通过辅助数据发送的(结构体 msghdr 的 msg_control 成员),在发送和接收描述符时,总是发送至少 1 个字节的数据,即使这个数据没有任何实际意义。否则当接收返回 0 时,接收方将不能区分这意味着“没有数据”(但辅助数据可能有套接字)还是“文件结束符”。
- 具体实现时, msghdr 的 msg_control 缓冲区必须与 cmghdr 结构对齐,可以看到后面代码的实现使用了一个 union 结构来保证这一点。