Elizabeth
Engineer
Engineer
  • UID625
  • Fans1
  • Follows1
  • Posts68
Reads:1154Replies:0

EPollSelectorImpl of Java NIO

Created#
More Posted time:Sep 19, 2016 9:31 AM
This text will briefly describe the JDK 1.7 NIO implementation on Linux platform, and won’t tap into some core concepts of Java NIO, such as Selector, Channel and Buffer. For details about these concepts, see the JDK documents. The implementation class of JDK 1.7 NIO Selector on Linux platform is sun.nio.ch.EPollSelectorImpl. This class, through the epoll series calls under Linux, implements NIO. Before introducing the implementation of this class, let’s first introduce epoll of Linux. The epoll is an improved version of poll/select system calls and it achieves IO event detection and distribution with a higher performance (mainly thanks to epoll’s event callback mechanism, which will be detailed later). It mainly includes the following three system calls:
#include
int epoll_create(int size);
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
int epoll_wait(int epfd, struct epoll_event *events,int maxevents, int timeout);


In the above functions, epoll_create function creates an epoll instance for detecting IO events. The size argument is used to “imply” the length of the operating system event queue. In Linux- 2.6.32 kernel, this argument is ignored. The epoll_ctl funciton is used to manage the event set of file descriptors. With this function, you can register, modify and delete one or more events.  The epoll_wait function is used to detect the event. For detailed descriptions about the three functions, see epoll manual.
Main functions of the Java class sun.nio.ch.EPollSelectorImpl are entrusted to sun.nio.ch. EPollArrayWrapper for implementation (The Java code used below is decompiled from jdk_1.7.0_17/lib/rt.jar for Linux):
package sun.nio.ch;
class EPollArrayWrapper{
private native int epollCreate();
private native void epollCtl(int paramInt1, int paramInt2, int paramInt3, int paramInt4);
private native int epollWait(long paramLong1, int paramInt1, long paramLong2, int paramInt2) throws IOException;
}


Implementation of the three native methods of EPollArrayWrapper can be found in openjdk7/jdk/src/solaris/native/sun /nio/ch/ EPollArrayWrapper.c. From it, we can see the three native methods are exactly packaging the above mentioned epoll series system calls. (The implementation code of other JDKs may vary. But ultimately they are the result after packaging epoll series system calls.)
EPollSelectorImpl. implRegister method (Specific implementation of Selector.register method) registers events to epoll instances through calling epoll_ctl:
protected void implRegister(SelectionKeyImpl paramSelectionKeyImpl) {
if (this.closed)
throw new ClosedSelectorException();
SelChImpl localSelChImpl = paramSelectionKeyImpl.channel;
this.fdToKey.put(Integer.valueOf(localSelChImpl.getFDVal()), paramSelectionKeyImpl);
this.pollWrapper.add(localSelChImpl);
this.keys.add(paramSelectionKeyImpl);
}


The method above, besides registering events to the epoll instance, also adds the relationship between the registered file descriptor (fd) and SelectionKey to fdToKey. This map maintains the mapping of fd and SelectionKey. Every time when the method registers a Channel to the Selector, it adds a record to the map. When Channel.close and SelectionKey.cancel methods are called, it removes the SelectionKey associated with the Channel fd from fdToKey. Detailed code can be found in EPollSelectorImpl.implDereg method:
protected void implDereg(SelectionKeyImpl paramSelectionKeyImpl) throws IOException {
assert (paramSelectionKeyImpl.getIndex() >= 0);
SelChImpl localSelChImpl = paramSelectionKeyImpl.channel;
int i = localSelChImpl.getFDVal();
this.fdToKey.remove(Integer.valueOf(i));
this.pollWrapper.release(localSelChImpl);
paramSelectionKeyImpl.setIndex(-1);
this.keys.remove(paramSelectionKeyImpl);
this.selectedKeys.remove(paramSelectionKeyImpl);
deregister(paramSelectionKeyImpl);
SelectableChannel localSelectableChannel = paramSelectionKeyImpl.channel();
if ((!localSelectableChannel.isOpen()) && (!localSelectableChannel.isRegistered()))
((SelChImpl)localSelectableChannel).kill();
}


EPollSelectorImpl. doSelect (the implementation of Selector.select method) implements event detection through calling epoll_wait:
protected int doSelect(long paramLong)
throws IOException
{
if (this.closed)
throw new ClosedSelectorException();
processDeregisterQueue();
try {
begin();
this.pollWrapper.poll(paramLong);
} finally {
end();
}
processDeregisterQueue();
int i = updateSelectedKeys();
if (this.pollWrapper.interrupted())
{
this.pollWrapper.putEventOps(this.pollWrapper.interruptedIndex(), 0);
synchronized (this.interruptLock) {
this.pollWrapper.clearInterrupted();
IOUtil.drain(this.fd0);
this.interruptTriggered = false;
}
}
return i;
}


The main procedure of this method can be summarized as follows:
1. Call epoll_wait (this.pollWrapper.poll) to get the ready file descriptor set
2. Find the SelectionKey for the file descriptor through fdToKey and update it. The code for updating SelectionKey can be found in EPollSelectorImpl .updateSelectedKeys:
private int updateSelectedKeys()
{
int i = this.pollWrapper.updated;
int j = 0;
for (int k = 0; k < i; k++) { int m = this.pollWrapper.getDescriptor(k); SelectionKeyImpl localSelectionKeyImpl = (SelectionKeyImpl)this.fdToKey.get(Integer.valueOf(m)); if (localSelectionKeyImpl != null) { int n = this.pollWrapper.getEventOps(k); if (this.selectedKeys.contains(localSelectionKeyImpl)) { if (localSelectionKeyImpl.channel.translateAndSetReadyOps(n, localSelectionKeyImpl)) j++; } else { localSelectionKeyImpl.channel.translateAndSetReadyOps(n, localSelectionKeyImpl); if ((localSelectionKeyImpl.nioReadyOps() & localSelectionKeyImpl.nioInterestOps()) != 0) { this.selectedKeys.add(localSelectionKeyImpl); j++; } } } } return j; }


Several questions about fdToKey:
I. Why does fdToKey become very large? From the above code, we can know that there are two reasons for fdToKey to become very large in size:
1. Many channels are registered to the selector. For example, a persistent connection server may maintain hundreds of thousands of connections at the same time;
2. Outdated or expired channels are not closed in time, and their records are accumulated in the fdToKey;
II. Why is it always serial read in fdToKey? Reading records in fdToKey is executed in the select method and the select method, generally speaking, is single-thread calls (Selector is not thread-safe).
III. Will TCP sending packets congestion increase fdToKey size? Generally speaking, no. Because fdToKey only manages channels registered to the selector and has nothing to do with the data transmission process. Of course, if the TCP sending packets congestion leads to failure of the idle connection detection mechanism in the IO framework, and idle connections cannot be identified and closed in time, it may also increase the fdToKey size.
Next, we will talk about the specific implementation of epoll system calls. Its implementation code is in (Linux-2.6.32.65) fs/eventpoll.c. (The kernel code below only covers the main procedure because of the long procedure. Some error processing or less important details such as parameter check and concurrent control are skipped). Let’s first take a look at implementation of epoll_create:
fs/eventpoll.c
SYSCALL_DEFINE1(epoll_create, int, size)
{
if (size <= 0) return -EINVAL; return sys_epoll_create1(0); }


SYSCALL_DEFINE1 is a macro to define a system call function with parameters. Expand the macro above and you will get:
int sys_epoll_create(int size)
This is the entrance of epoll_create. As to why macro is used instead of direct declaration, it is mainly because there are strict limits on the number of parameters and parameter passing methods on system calls, namely six parameters at most. SYSCALL_DEFINE2 - SYSCALL_DEFINE6 are used to define system calls with 2-6 parameters respectively. From the code above, we can know that epoll_create function finally calls sys_epoll_create1 to implement specific functions, and the size parameter is ignored. Some main code of sys_epoll_create1 is as follows (skipping error processing and some less important details such as parameter checks):
fs/eventpoll.c
SYSCALL_DEFINE1(epoll_create1, int, flags)
{
int error, fd;
struct eventpoll *ep = NULL;
struct file *file;
error = ep_alloc(&ep);
file = anon_inode_getfile("[eventpoll]", &eventpoll_fops, ep,
O_RDWR | (flags & O_CLOEXEC));
fd_install(fd, file);
ep->file = file;
return fd;
}


The above code is mainly used to distribute a struct eventpoll instance, and designate the file descriptor associated with the instance. The subsequent epoll_ctl and epoll_wait call the file descriptor to reference this instance. The structure of struct eventpoll is as follows:
fs/eventpoll.c
struct eventpoll {
spinlock_t lock;
struct mutex mtx;
wait_queue_head_t wq;
wait_queue_head_t poll_wait;
struct list_head rdllist;
struct rb_root rbr;
struct epitem *ovflist;
struct user_struct *user;
struct file *file;
int visited;
struct list_head visited_list_link;
}


A key part of the above data structure is:  
1. A wait queue (wq). The epoll achieves event callback through this wait queue.
2. A ready list (rdllist). The list saves the ready file descriptors in double linked list form.
3. A red-black tree (rbr). It is used to save registered file descriptors. If you register the same file descriptor repeatedly, it will return errors.
The wait queue is the core mechanism of epoll system calls (not only of epoll, but most of the event notification and callback mechanisms under Linux rely on the wait queue). Before proceeding to the implementation of epoll_ctl and epoll_wait, let’s first take a look at the wait queue. The wait queue can put a group of processes/threads into sleep when they are waiting for an event. When the event happens, the kernel will waken the sleeping processes/threads. Note: The text below won’t differentiate the process and thread. In Linux, there is no difference in dispatching processes and threads (Dispatching means dispatching Linux processes, including switching, sleeping and wakening processes). This mechanism can be compared to wait and notify/notifyAll methods of the java.lang.Object class. The wait method puts the thread into sleep, and the notify/notifyAll method wakens a sleeping thread or all the sleeping threads. The wait queue mainly involves two data structures:
include/linux/wait.h
struct __wait_queue_head {
spinlock_t lock;
list_head task_list;
};
struct __wait_queue {
unsigned int flags;
#define WQ_FLAG_EXCLUSIVE 0x01
void *private;
wait_queue_func_t func;
struct list_head task_list;
};


The struct __wait_queue_head is the queue head structure. The task_list saves the elements added to the queue, and the struct list_head is a standard Linux double linked list, defined as below:
include/linux/list.h
struct list_head {
struct list_head *next, *prev;
};


Note: this structure can present both the head of the double linked list or an element in the linked list. And the next and prev pointers can point to any data structures.
The struct __wait_queue is the element structure of the wait queue. The member func is the callback function executed when the waiting process is wakened and its definition is as follows:
include/linux/wait.h
typedef int (*wait_queue_func_t)(wait_queue_t *wait, unsigned mode, int flags, void *key);

The task_list, a member of struct __wait_queue, is an element in the linked list and used to place the structure into struct __wait_queue_head (It has different meaning with the task_list member, as the latter means the head of a double linked list). The private member usually points to the task_struct instance of the waiting process (this structure has many members and I won’t list them all. You only need to know that every process under Linux corresponds to a task_struct instance).
In usage, the wait queue mainly involves the following functions (or macros):
include/linux/wait.h
__add_wait_queue(wait_queue_head_t *q, wait_queue_t *wait);
#define wait_event(wq, condition)
#define wake_up_xxx(x,…)
The __add_wait_queue is used to add a process to the wait queue. The wait_event is a macro, used to wait for an event and put waiting processes into sleep before the event occurs. The wake_up_xxx is a series of macros, including wake_up, wake_up_all, wake_up_locked and wake_up_interruptible. It is responsible for waking up one or a group of sleeping processes of an event. On the specific implementation of the wait queue, because of its broad involvement (process scheduling, interruption handling and so on), I will not go into the detail here. You can compare add_wait_queue and wait_event class to the wait method of java.lang.Object, and wake_up to the notify/notifyAll method of java.lang.Object.
After the introduction of the wait queue, we can further study the implementation of epoll_ctl. The core of the code implementation is:
fs/eventpoll.c
SYSCALL_DEFINE4(epoll_ctl, int, epfd, int, op, int, fd,
struct epoll_event __user *, event)
{
if (!tfile->f_op || !tfile->f_op->poll)
goto error_tgt_fput;
switch (op) {
case EPOLL_CTL_ADD:
error=ep_insert(ep, &epds, tfile, fd);
break;
case EPOLL_CTL_DEL:
error=ep_remove(ep, epi);
break;
case EPOLL_CTL_MOD:
error = ep_modify(ep, epi, &epds);
break;
}
return error;
}


What kind of file descriptors can be registered? From the if judgment, we can see that only the file corresponding to the file descriptor that has implemented the poll method can be registered. In general, all the character device files have implemented this method. The sockets related to network have also implemented this method, while block device files such as ext2/ext3/ext4 file system files have not implemented this method. Files that have implemented the poll method correspond to the java.nio.channels.SelectableChannel of Java NIO, which is also why only SelectableChannel can be registered to the Selector. The ep_insert, ep_remove and ep_modify respectively correspond to event registration, deletion and modification. We can take ep_insert as an example to look at the event registration process. The key code is as follows:
fs/eventpoll.c
static int ep_insert(struct eventpoll *ep, struct epoll_event *event,
struct file *tfile, int fd)
{
init_poll_funcptr(&epq.pt, ep_ptable_queue_proc);
revents = tfile->f_op->poll(tfile, &epq.pt);
ep_rbtree_insert(ep, epi);
if ((revents & event->events) && !ep_is_linked(&epi->rdllink)) {
list_add_tail(&epi->rdllink, &ep->rdllist);;
wake_up_locked(&ep->wq);
}
}


The above code serves to:
1. Bind wait queue callback function ep_ptable_queue_proc
2. Call the poll method of corresponding file instance. Implementation of this method varies greatly, but most will call the wait_event method. When no events occur, put the process to sleep, such as socket implementation (The code is in tcp_poll method in net/ipv4/af_inet.c  and will not be detailed here);
3. If the registered event has occurred, it will insert the ready file descriptor to the ready list of eventpoll instance (list_add_tail (&epi->rdllink, &ep->rdllist);) and waken the sleeping process (wake_up_locked (&ep->wq))
The callback function ep_ptable_queue_proc bound in the first step will be executed when the event it waits for occurs. Its main function is to insert the ready file descriptors to the ready list of the eventpoll instance (In specific, it is implemented through another callback function ep_poll_callback bound to the ep_ptable_queue_proc):
fs/eventpoll.c
static int ep_poll_callback(wait_queue_t *wait, unsigned mode, int sync, void *key){
if (!ep_is_linked(&epi->rdllink))
list_add_tail(&epi->rdllink, &ep->rdllist);
}


Finally, let’s look at the implementation of epoll_wait. With a ready queue, epoll_wait implementation is relatively simple - just check whether the ready queue is empty. If yes, it sleeps or waits as necessary:
fs/eventpoll.c
SYSCALL_DEFINE4(epoll_wait, int, epfd, struct epoll_event __user *, events,
int, maxevents, int, timeout)
{
int error;
struct file *file;
struct eventpoll *ep;
file = fget(epfd);
ep = file->private_data;
error = ep_poll(ep, events, maxevents, timeout);
return error;
}


This function eventually calls ep_poll to complete its main functions:
static int ep_poll(struct eventpoll *ep, struct epoll_event __user *events,
int maxevents, long timeout)
{
retry:
if (list_empty(&ep->rdllist)) {
init_waitqueue_entry(&wait, current);
wait.flags |= WQ_FLAG_EXCLUSIVE;
__add_wait_queue(&ep->wq, &wait);


for (;;) {
set_current_state(TASK_INTERRUPTIBLE);
if (!list_empty(&ep->rdllist) || !jtimeout)
break;
if (signal_pending(current)) {
res = -EINTR;
break;
}

spin_unlock_irqrestore(&ep->lock, flags);
jtimeout = schedule_timeout(jtimeout);
spin_lock_irqsave(&ep->lock, flags);
}
__remove_wait_queue(&ep->wq, &wait);

set_current_state(TASK_RUNNING);
}
eavail = !list_empty(&ep->rdllist) || ep->ovflist != EP_UNACTIVE_PTR;

spin_unlock_irqrestore(&ep->lock, flags);
if (!res && eavail &&
!(res = ep_send_events(ep, events, maxevents)) && jtimeout)
goto retry;

return res;
}


The above code is mainly to check whether the ready queue is empty. The above code is mainly to check whether the ready queue is empty. If yes, it determines whether to sleep (__add_wait_queue) or to wait (jtimeout = schedule_timeout(jtimeout);) according to timeout settings.
In summary, the epoll system calls are through the wait queue. The time complexity of its event check (epoll_wait system call) is O (n), where n is the total number of active file descriptors. By “active”, it means the file descriptor has frequent read and write operations. Compared with the poll or select system calls (their implementation code is in fs/select.c), its time complexity is also O (n), but the n here represents the total number of registered file descriptors. Therefore, when the active file descriptors account for a small proportion in the total file descriptors, for example, in the persistent connection server scenario where although hundreds of thousands of persistent connections are maintained at the same time, only a few of them are active, epoll is more appropriate to use.
Guest