BIO到NIO源码的一些事儿之NIO 下 之 Selector

前言

此系列文章会详细解读NIO的功能逐步丰满的路程,为Reactor-Netty 库的讲解铺平道路。

关于Java编程方法论-Reactor与Webflux的视频分享,已经完成了Rxjava 与 Reactor,b站地址如下:

Rxjava源码解读与分享:https://www.bilibili.com/video/av34537840

Reactor源码解读与分享:https://www.bilibili.com/video/av35326911

本系列源码解读基于JDK11 api细节可能与其他版本有所差别,请自行解决jdk版本问题。

本系列前几篇:

BIO到NIO源码的一些事儿之BIO

BIO到NIO源码的一些事儿之NIO 上

BIO到NIO源码的一些事儿之NIO 中

SelectionKey的引入

如我们在前面内容所讲,在学生确定之后,我们就要对其状态进行设定,然后再交由Selector进行管理,其状态的设定我们就通过SelectionKey来进行。

那这里我们先通过之前在Channel中并未仔细讲解的SelectableChannel下的register方法。我们前面有提到过, SelectableChannelchannel打造成可以通过Selector来进行多路复用。作为管理者,channel想要实现复用,就必须在管理者这里进行注册登记。所以,SelectableChannel下的register方法也就是我们值得二次关注的核心了,也是对接我们接下来内容的切入点,对于register方法的解读,请看我们之前的文章BIO到NIO源码的一些事儿之NIO 上赋予Channel可被多路复用的能力这一节的内容。

这里要记住的是SelectableChannel是对接channel特征(即SelectionKey)的关键所在,这有点类似于表设计,原本可以将特征什么的设定在一张表内,但为了操作更加具有针对性,即为了让代码功能更易于管理,就进行抽取并设计了第二张表,这个就有点像人体器官,整体上大家共同协作完成一件事,但器官内部自己专注于自己的主要特定功能,偶尔也具备其他器官的一些小功能。

由此,我们也就可以知道,SelectionKey表示一个SelectableChannelSelector关联的标记,可以简单理解为一个token。就好比是我们做权限管理系统用户登录后前台会从后台拿到的一个token一样,用户可以凭借此token来访问操作相应的资源信息。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
//java.nio.channels.spi.AbstractSelectableChannel#register
public final SelectionKey register(Selector sel, int ops, Object att)
throws ClosedChannelException
{ ...
synchronized (regLock) {
...
synchronized (keyLock) {
...
SelectionKey k = findKey(sel);
if (k != null) {
k.attach(att);
k.interestOps(ops);
} else {
// New registration
k = ((AbstractSelector)sel).register(this, ops, att);
addKey(k);
}
return k;
}
}
}

结合上下两段源码,在每次Selector使用register方法注册channel时,都会创建并返回一个SelectionKey

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
//sun.nio.ch.SelectorImpl#register
@Override
protected final SelectionKey register(AbstractSelectableChannel ch,
int ops,
Object attachment)
{
if (!(ch instanceof SelChImpl))
throw new IllegalSelectorException();
SelectionKeyImpl k = new SelectionKeyImpl((SelChImpl)ch, this);
k.attach(attachment);

// register (if needed) before adding to key set
implRegister(k);

// add to the selector's key set, removing it immediately if the selector
// is closed. The key is not in the channel's key set at this point but
// it may be observed by a thread iterating over the selector's key set.
keys.add(k);
try {
k.interestOps(ops);
} catch (ClosedSelectorException e) {
assert ch.keyFor(this) == null;
keys.remove(k);
k.cancel();
throw e;
}
return k;
}

我们在BIO到NIO源码的一些事儿之NIO 上赋予Channel可被多路复用的能力这一节的内容知道,一旦注册到Selector上,Channel将一直保持注册直到其被解除注册。在解除注册的时候会解除Selector分配给Channel的所有资源。
也就是SelectionKey在其调用SelectionKey#channel方法,或这个key所代表的channel 关闭,抑或此key所关联的Selector关闭之前,都是有效。我们在前面的文章分析中也知道,取消一个SelectionKey,不会立刻从Selector移除,它将被添加到SelectorcancelledKeys这个Set集合中,以便在下一次选择操作期间删除,我们可以通过java.nio.channels.SelectionKey#isValid判断一个SelectionKey是否有效。

SelectionKey包含四个操作集,每个操作集用一个Int来表示,int值中的低四位的bit 用于表示channel支持的可选操作种类。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
 
/**
* Operation-set bit for read operations.
*/
public static final int OP_READ = 1 << 0;

/**
* Operation-set bit for write operations.
*/
public static final int OP_WRITE = 1 << 2;

/**
* Operation-set bit for socket-connect operations.
*/
public static final int OP_CONNECT = 1 << 3;

/**
* Operation-set bit for socket-accept operations.
*/
public static final int OP_ACCEPT = 1 << 4;

interestOps

通过interestOps来确定了selector在下一个选择操作的过程中将测试哪些操作类别的准备情况,操作事件是否是channel关注的。interestOpsSelectionKey创建时,初始化为注册Selector时的ops值,这个值可通过sun.nio.ch.SelectionKeyImpl#interestOps(int)来改变,这点我们在SelectorImpl#register可以清楚的看到。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
 //sun.nio.ch.SelectionKeyImpl
public final class SelectionKeyImpl
extends AbstractSelectionKey
{
private static final VarHandle INTERESTOPS =
ConstantBootstraps.fieldVarHandle(
MethodHandles.lookup(),
"interestOps",
VarHandle.class,
SelectionKeyImpl.class, int.class);

private final SelChImpl channel;
private final SelectorImpl selector;

private volatile int interestOps;
private volatile int readyOps;

// registered events in kernel, used by some Selector implementations
private int registeredEvents;

// index of key in pollfd array, used by some Selector implementations
private int index;

SelectionKeyImpl(SelChImpl ch, SelectorImpl sel) {
channel = ch;
selector = sel;
}
...
}

readyOps

readyOps表示通过Selector检测到channel已经准备就绪的操作事件。在SelectionKey创建时(即上面源码所示),readyOps值为0,在Selectorselect操作中可能会更新,但是需要注意的是我们不能直接调用来更新。

SelectionKeyreadyOps表示一个channel已经为某些操作准备就绪,但不能保证在针对这个就绪事件类型的操作过程中不会发生阻塞,即该操作所在线程有可能会发生阻塞。在完成select操作后,大部分情况下会立即对readyOps更新,此时readyOps值最准确,如果外部的事件或在该channel有IO操作,readyOps可能不准确。所以,我们有看到其是volatile类型。

SelectionKey定义了所有的操作事件,但是具体channel支持的操作事件依赖于具体的channel,即具体问题具体分析。
所有可选择的channel(即SelectableChannel的子类)都可以通过SelectableChannel#validOps方法,判断一个操作事件是否被channel所支持,即每个子类都会有对validOps的实现,返回一个数字,仅标识channel支持的哪些操作。尝试设置或测试一个不被channel所支持的操作设定,将会抛出相关的运行时异常。
不同应用场景下,其所支持的Ops是不同的,摘取部分如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
//java.nio.channels.SocketChannel#validOps
public final int validOps() {
//即1|4|8 1101
return (SelectionKey.OP_READ
| SelectionKey.OP_WRITE
| SelectionKey.OP_CONNECT);
}
//java.nio.channels.ServerSocketChannel#validOps
public final int validOps() {
// 16
return SelectionKey.OP_ACCEPT;
}
//java.nio.channels.DatagramChannel#validOps
public final int validOps() {
// 1|4
return (SelectionKey.OP_READ
| SelectionKey.OP_WRITE);
}

如果需要经常关联一些我们程序中指定数据到SelectionKey,比如一个我们使用一个object表示上层的一种高级协议的状态,object用于通知实现协议处理器。所以,SelectionKey支持通过attach方法将一个对象附加到SelectionKeyattachment上。attachment可以通过java.nio.channels.SelectionKey#attachment方法进行访问。如果要取消该对象,则可以通过该种方式:selectionKey.attach(null)

需要注意的是如果附加的对象不再使用,一定要人为清除,如果没有,假如此SelectionKey一直存在,由于此处属于强引用,那么垃圾回收器不会回收该对象,若不清除的话会成内存泄漏。

SelectionKey在由多线程并发使用时,是线程安全的。我们只需要知道,Selectorselect操作会一直使用在调用该操作开始时当前的interestOps所设定的值。

Selector探究

到现在为止,我们已经多多少少接触了Selector,其是一个什么样的角色,想必都很清楚了,那我们就在我们已经接触到的来进一步深入探究Selector的设计运行机制。

Selector的open方法

从命名上就可以知道 SelectableChannel对象是依靠Selector来实现多路复用的。
我们可以通过调用java.nio.channels.Selector#open来创建一个selector对象:

1
2
3
4
//java.nio.channels.Selector#open
public static Selector open() throws IOException {
return SelectorProvider.provider().openSelector();
}

关于这个SelectorProvider.provider(),其使用了根据所在系统的默认实现,我这里是windows系统,那么其默认实现为sun.nio.ch.WindowsSelectorProvider,这样,就可以调用基于相应系统的具体实现了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
//java.nio.channels.spi.SelectorProvider#provider
public static SelectorProvider provider() {
synchronized (lock) {
if (provider != null)
return provider;
return AccessController.doPrivileged(
new PrivilegedAction<>() {
public SelectorProvider run() {
if (loadProviderFromProperty())
return provider;
if (loadProviderAsService())
return provider;
provider = sun.nio.ch.DefaultSelectorProvider.create();
return provider;
}
});
}
}
//sun.nio.ch.DefaultSelectorProvider
public class DefaultSelectorProvider {

/**
* Prevent instantiation.
*/
private DefaultSelectorProvider() { }

/**
* Returns the default SelectorProvider.
*/
public static SelectorProvider create() {
return new sun.nio.ch.WindowsSelectorProvider();
}

}

基于windows来讲,selector这里最终会使用sun.nio.ch.WindowsSelectorImpl来做一些核心的逻辑。

1
2
3
4
5
6
public class WindowsSelectorProvider extends SelectorProviderImpl {

public AbstractSelector openSelector() throws IOException {
return new WindowsSelectorImpl(this);
}
}

这里,我们需要来看一下WindowsSelectorImpl的构造函数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
//sun.nio.ch.WindowsSelectorImpl#WindowsSelectorImpl
WindowsSelectorImpl(SelectorProvider sp) throws IOException {
super(sp);
pollWrapper = new PollArrayWrapper(INIT_CAP);
wakeupPipe = Pipe.open();
wakeupSourceFd = ((SelChImpl)wakeupPipe.source()).getFDVal();

// Disable the Nagle algorithm so that the wakeup is more immediate
SinkChannelImpl sink = (SinkChannelImpl)wakeupPipe.sink();
(sink.sc).socket().setTcpNoDelay(true);
wakeupSinkFd = ((SelChImpl)sink).getFDVal();

pollWrapper.addWakeupSocket(wakeupSourceFd, 0);
}

我们由Pipe.open()就可知道selector会保持打开的状态,直到其调用它的close方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
//java.nio.channels.spi.AbstractSelector#close
public final void close() throws IOException {
boolean open = selectorOpen.getAndSet(false);
if (!open)
return;
implCloseSelector();
}
//sun.nio.ch.SelectorImpl#implCloseSelector
@Override
public final void implCloseSelector() throws IOException {
wakeup();
synchronized (this) {
implClose();
synchronized (publicSelectedKeys) {
// Deregister channels
Iterator<SelectionKey> i = keys.iterator();
while (i.hasNext()) {
SelectionKeyImpl ski = (SelectionKeyImpl)i.next();
deregister(ski);
SelectableChannel selch = ski.channel();
if (!selch.isOpen() && !selch.isRegistered())
((SelChImpl)selch).kill();
selectedKeys.remove(ski);
i.remove();
}
assert selectedKeys.isEmpty() && keys.isEmpty();
}
}
}
//sun.nio.ch.WindowsSelectorImpl#implClose
@Override
protected void implClose() throws IOException {
assert !isOpen();
assert Thread.holdsLock(this);

// prevent further wakeup
synchronized (interruptLock) {
interruptTriggered = true;
}

wakeupPipe.sink().close();
wakeupPipe.source().close();
pollWrapper.free();

// Make all remaining helper threads exit
for (SelectThread t: threads)
t.makeZombie();
startLock.startThreads();
}

可以看到,前面的wakeupPipe在close方法中关闭掉了。这里的close方法中又涉及了wakeupPipe.sink()wakeupPipe.source()的关闭与pollWrapper.free()的释放,此处也是我们本篇的难点所在,这里,我们来看看它们到底是什么样的存在。
首先,我们对WindowsSelectorImpl(SelectorProvider sp)这个构造函数做下梳理:

  • 创建一个PollArrayWrapper对象(pollWrapper);
  • Pipe.open()打开一个管道;
  • 拿到wakeupSourceFdwakeupSinkFd两个文件描述符;
  • 把pipe内Source端的文件描述符(wakeupSourceFd)放到pollWrapper里;

    Pipe.open()的解惑

    这里我们会有疑惑,为什么要创建一个管道,它是用来做什么的。

我们来看Pipe.open()源码实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
//java.nio.channels.Pipe#open
public static Pipe open() throws IOException {
return SelectorProvider.provider().openPipe();
}
//sun.nio.ch.SelectorProviderImpl#openPipe
public Pipe openPipe() throws IOException {
return new PipeImpl(this);
}
//sun.nio.ch.PipeImpl#PipeImpl
PipeImpl(final SelectorProvider sp) throws IOException {
try {
AccessController.doPrivileged(new Initializer(sp));
} catch (PrivilegedActionException x) {
throw (IOException)x.getCause();
}
}
private class Initializer
implements PrivilegedExceptionAction<Void>
{

private final SelectorProvider sp;

private IOException ioe = null;

private Initializer(SelectorProvider sp) {
this.sp = sp;
}

@Override
public Void run() throws IOException {
LoopbackConnector connector = new LoopbackConnector();
connector.run();
if (ioe instanceof ClosedByInterruptException) {
ioe = null;
Thread connThread = new Thread(connector) {
@Override
public void interrupt() {}
};
connThread.start();
for (;;) {
try {
connThread.join();
break;
} catch (InterruptedException ex) {}
}
Thread.currentThread().interrupt();
}

if (ioe != null)
throw new IOException("Unable to establish loopback connection", ioe);

return null;
}

从上述源码我们可以知道,创建了一个PipeImpl对象, 在PipeImpl的构造函数里会执行AccessController.doPrivileged,在它调用后紧接着会执行Initializerrun方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
//sun.nio.ch.PipeImpl.Initializer.LoopbackConnector
private class LoopbackConnector implements Runnable {

@Override
public void run() {
ServerSocketChannel ssc = null;
SocketChannel sc1 = null;
SocketChannel sc2 = null;

try {
// Create secret with a backing array.
ByteBuffer secret = ByteBuffer.allocate(NUM_SECRET_BYTES);
ByteBuffer bb = ByteBuffer.allocate(NUM_SECRET_BYTES);

// Loopback address
InetAddress lb = InetAddress.getLoopbackAddress();
assert(lb.isLoopbackAddress());
InetSocketAddress sa = null;
for(;;) {
// Bind ServerSocketChannel to a port on the loopback
// address
if (ssc == null || !ssc.isOpen()) {
ssc = ServerSocketChannel.open();
ssc.socket().bind(new InetSocketAddress(lb, 0));
sa = new InetSocketAddress(lb, ssc.socket().getLocalPort());
}

// Establish connection (assume connections are eagerly
// accepted)
sc1 = SocketChannel.open(sa);
RANDOM_NUMBER_GENERATOR.nextBytes(secret.array());
do {
sc1.write(secret);
} while (secret.hasRemaining());
secret.rewind();

// Get a connection and verify it is legitimate
sc2 = ssc.accept();
do {
sc2.read(bb);
} while (bb.hasRemaining());
bb.rewind();

if (bb.equals(secret))
break;

sc2.close();
sc1.close();
}

// Create source and sink channels
source = new SourceChannelImpl(sp, sc1);
sink = new SinkChannelImpl(sp, sc2);
} catch (IOException e) {
try {
if (sc1 != null)
sc1.close();
if (sc2 != null)
sc2.close();
} catch (IOException e2) {}
ioe = e;
} finally {
try {
if (ssc != null)
ssc.close();
} catch (IOException e2) {}
}
}
}
}

这里即为创建pipe的过程,windows下的实现是创建两个本地的socketChannel,然后连接(连接的过程通过写一个随机数据做两个socket的连接校验),两个socketChannel分别实现了管道pipesourcesink端。
而我们依然不清楚这个pipe到底干什么用的,
假如大家熟悉系统调用的C/C++的话,就可以知道,一个阻塞在select上的线程有以下三种方式可以被唤醒:

  1. 有数据可读/写,或出现异常。
  2. 阻塞时间到,即time out
  3. 收到一个non-block的信号。可由killpthread_kill发出。

所以,Selector.wakeup()要唤醒阻塞的select,那么也只能通过这三种方法,其中:

  • 第二种方法可以排除,因为select一旦阻塞,无法修改其time out时间。
  • 而第三种看来只能在Linux上实现,Windows上没有这种信号通知的机制。

看来只有第一种方法了。假如我们多次调用Selector.open(),那么在Windows上会每调用一次,就会建立一对自己和自己的loopbackTCP连接;在Linux上的话,每调用一次,会开一对pipe(pipe在Linux下一般都成对打开),到这里,估计我们能够猜得出来——那就是如果想要唤醒select,只需要朝着自己的这个loopback连接发点数据过去,于是,就可以唤醒阻塞在select上的线程了。

我们对上面所述做下总结:在Windows下,Java虚拟机在Selector.open()时会自己和自己建立loopbackTCP连接;在Linux下,Selector会创建pipe。这主要是为了Selector.wakeup()可以方便唤醒阻塞在select()系统调用上的线程(通过向自己所建立的TCP链接和管道上随便写点什么就可以唤醒阻塞线程)。

PollArrayWrapper解读

WindowsSelectorImpl构造器最后,我们看到这一句代码:pollWrapper.addWakeupSocket(wakeupSourceFd, 0);,即把pipe内Source端的文件描述符(wakeupSourceFd)放到pollWrapper里。pollWrapper作为PollArrayWrapper的实例,它到底是什么,这一节,我们就来对其探索一番。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
class PollArrayWrapper {

private AllocatedNativeObject pollArray; // The fd array

long pollArrayAddress; // pollArrayAddress

@Native private static final short FD_OFFSET = 0; // fd offset in pollfd
@Native private static final short EVENT_OFFSET = 4; // events offset in pollfd

static short SIZE_POLLFD = 8; // sizeof pollfd struct

private int size; // Size of the pollArray

PollArrayWrapper(int newSize) {
int allocationSize = newSize * SIZE_POLLFD;
pollArray = new AllocatedNativeObject(allocationSize, true);
pollArrayAddress = pollArray.address();
this.size = newSize;
}

...

// Access methods for fd structures
void putDescriptor(int i, int fd) {
pollArray.putInt(SIZE_POLLFD * i + FD_OFFSET, fd);
}

void putEventOps(int i, int event) {
pollArray.putShort(SIZE_POLLFD * i + EVENT_OFFSET, (short)event);
}
...
// Adds Windows wakeup socket at a given index.
void addWakeupSocket(int fdVal, int index) {
putDescriptor(index, fdVal);
putEventOps(index, Net.POLLIN);
}
}

这里将wakeupSourceFdPOLLIN事件标识为pollArrayEventOps的对应的值,这里使用的是unsafe直接操作的内存,也就是相对于这个pollArray所在内存地址的偏移量SIZE_POLLFD * i + EVENT_OFFSET这个位置上写入Net.POLLIN所代表的值,即参考下面本地方法相关源码所展示的值。putDescriptor同样是这种类似操作。当sink端有数据写入时,source对应的文件描述符wakeupSourceFd就会处于就绪状态。

1
2
3
4
5
6
7
8
9
10
11
//java.base/windows/native/libnio/ch/nio_util.h
/* WSAPoll()/WSAPOLLFD and the corresponding constants are only defined */
/* in Windows Vista / Windows Server 2008 and later. If we are on an */
/* older release we just use the Solaris constants as this was previously */
/* done in PollArrayWrapper.java. */
#define POLLIN 0x0001
#define POLLOUT 0x0004
#define POLLERR 0x0008
#define POLLHUP 0x0010
#define POLLNVAL 0x0020
#define POLLCONN 0x0002

AllocatedNativeObject这个类的父类有大量的unsafe类的操作,这些都是直接基于内存级别的操作。从其父类的构造器中,我们能也清楚的看到pollArray是通过unsafe.allocateMemory(size + ps)分配的一块系统内存。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
class AllocatedNativeObject                             // package-private
extends NativeObject
{
/**
* Allocates a memory area of at least {@code size} bytes outside of the
* Java heap and creates a native object for that area.
*/
AllocatedNativeObject(int size, boolean pageAligned) {
super(size, pageAligned);
}

/**
* Frees the native memory area associated with this object.
*/
synchronized void free() {
if (allocationAddress != 0) {
unsafe.freeMemory(allocationAddress);
allocationAddress = 0;
}
}

}
//sun.nio.ch.NativeObject#NativeObject(int, boolean)
protected NativeObject(int size, boolean pageAligned) {
if (!pageAligned) {
this.allocationAddress = unsafe.allocateMemory(size);
this.address = this.allocationAddress;
} else {
int ps = pageSize();
long a = unsafe.allocateMemory(size + ps);
this.allocationAddress = a;
this.address = a + ps - (a & (ps - 1));
}
}

至此,我们算是完成了对Selector.open()的解读,其主要任务就是完成建立Pipe,并把pipe source端的wakeupSourceFd放入pollArray中,这个pollArraySelector完成其角色任务的枢纽。本篇主要围绕Windows的实现来进行分析,即在windows下通过两个连接的socketChannel实现了Pipelinux下则直接使用系统的pipe即可。

SelectionKey在selector中的管理

SelectionKey在selector中注册

所谓的注册,其实就是将一个对象放到注册地对象内的一个容器字段上,这个字段可以是数组,队列,也可以是一个set集合,也可以是一个list。这里,同样是这样,只不过,其需要有个返回值,那么把这个要放入集合的对象返回即可。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
//sun.nio.ch.SelectorImpl#register
@Override
protected final SelectionKey register(AbstractSelectableChannel ch,
int ops,
Object attachment)
{
if (!(ch instanceof SelChImpl))
throw new IllegalSelectorException();
SelectionKeyImpl k = new SelectionKeyImpl((SelChImpl)ch, this);
k.attach(attachment);

// register (if needed) before adding to key set
implRegister(k);

// add to the selector's key set, removing it immediately if the selector
// is closed. The key is not in the channel's key set at this point but
// it may be observed by a thread iterating over the selector's key set.
keys.add(k);
try {
k.interestOps(ops);
} catch (ClosedSelectorException e) {
assert ch.keyFor(this) == null;
keys.remove(k);
k.cancel();
throw e;
}
return k;
}
//sun.nio.ch.WindowsSelectorImpl#implRegister
@Override
protected void implRegister(SelectionKeyImpl ski) {
ensureOpen();
synchronized (updateLock) {
newKeys.addLast(ski);
}
}

这段代码我们之前已经有看过,这里我们再次温习下。
首先会新建一个SelectionKeyImpl对象,这个对象就是对Channel的包装,不仅如此,还顺带把当前这个Selector对象给收了进去,这样,我们也可以通过SelectionKey的对象来拿到其对应的Selector对象。

接着,基于windows平台实现的implRegister,先通过ensureOpen()来确保该Selector是打开的。接着将这个SelectionKeyImpl加入到WindowsSelectorImpl内针对于新注册SelectionKey进行管理的newKeys之中,newKeys是一个ArrayDeque对象。对于ArrayDeque有不懂的,可以参考Java 容器源码分析之 Deque 与 ArrayDeque这篇文章。

然后再将此这个SelectionKeyImpl加入到sun.nio.ch.SelectorImpl#keys中去,这个Set<SelectionKey>集合代表那些已经注册到当前这个Selector对象上的SelectionKey集合。我们来看sun.nio.ch.SelectorImpl的构造函数:

1
2
3
4
5
6
7
8
//sun.nio.ch.SelectorImpl#SelectorImpl
protected SelectorImpl(SelectorProvider sp) {
super(sp);
keys = ConcurrentHashMap.newKeySet();
selectedKeys = new HashSet<>();
publicKeys = Collections.unmodifiableSet(keys);
publicSelectedKeys = Util.ungrowableSet(selectedKeys);
}

也就是说,这里的publicKeys就来源于keys,只是publicKeys属于只读的,我们想要知道当前Selector对象上所注册的keys,就可以调用sun.nio.ch.SelectorImpl#keys来得到:

1
2
3
4
5
6
//sun.nio.ch.SelectorImpl#keys
@Override
public final Set<SelectionKey> keys() {
ensureOpen();
return publicKeys;
}

再回到这个构造函数中,selectedKeys,顾名思义,其属于已选择Keys,即前一次操作期间,已经准备就绪的Channel所对应的SelectionKey。此集合为keys的子集。通过selector.selectedKeys()获取。

1
2
3
4
5
6
//sun.nio.ch.SelectorImpl#selectedKeys
@Override
public final Set<SelectionKey> selectedKeys() {
ensureOpen();
return publicSelectedKeys;
}

我们看到其返回的是publicSelectedKeys,针对这个字段里的元素操作可以做删除,但不能做增加。
在前面的内容中,我们有涉及到SelectionKey的取消,所以,我们在java.nio.channels.spi.AbstractSelector方法内,是有定义cancelledKeys的,也是一个HashSet对象。其代表已经被取消但尚未取消注册(deregister)的SelectionKey。此Set集合无法直接访问,同样,它也是keys()的子集。

对于新的Selector实例,上面几个集合均为空。由上面展示的源码可知,通过channel.registerSelectionKey添加keys中,此为key的来源。
如果某个selectionKey.cancel()被调用,那么此key将会被添加到cancelledKeys这个集合中,然后在下一次调用selector select方法期间,此时canceldKeys不为空,将会触发此SelectionKeyderegister操作(释放资源,并从keys中移除)。无论通过channel.close()还是通过selectionKey.cancel(),都会导致SelectionKey被加入到cannceldKey中.

每次选择操作(select)期间,都可以将key添加到selectedKeys中或者将从cancelledKeys中移除。

Selector的select方法的解读

了解了上面的这些,我们来进入到select方法中,观察下它的细节。由Selector的api可知,select操作有两种形式,一种为
select(),selectNow(),select(long timeout);另一种为select(Consumer<SelectionKey> action, long timeout)select(Consumer<SelectionKey> action)selectNow(Consumer<SelectionKey> action)。后者为JDK11新加入的api,主要针对那些准备好进行I/O操作的channels在select过程中对相应的key进行的一个字的自定义的一个操作。
需要注意的是,有Consumer<SelectionKey> action参数的select操作是阻塞的,只有在选择了至少一个Channel的情况下,才会调用此Selector实例的wakeup方法来唤醒,同样,其所在线程被打断也可以。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
//sun.nio.ch.SelectorImpl
@Override
public final int select(long timeout) throws IOException {
if (timeout < 0)
throw new IllegalArgumentException("Negative timeout");
return lockAndDoSelect(null, (timeout == 0) ? -1 : timeout);
}

//sun.nio.ch.SelectorImpl
@Override
public final int select(Consumer<SelectionKey> action, long timeout)
throws IOException
{
Objects.requireNonNull(action);
if (timeout < 0)
throw new IllegalArgumentException("Negative timeout");
return lockAndDoSelect(action, (timeout == 0) ? -1 : timeout);
}
//sun.nio.ch.SelectorImpl#lockAndDoSelect
private int lockAndDoSelect(Consumer<SelectionKey> action, long timeout)
throws IOException
{
synchronized (this) {
ensureOpen();
if (inSelect)
throw new IllegalStateException("select in progress");
inSelect = true;
try {
synchronized (publicSelectedKeys) {
return doSelect(action, timeout);
}
} finally {
inSelect = false;
}
}
}

我们可以观察,无论哪种,它们最后都落在了lockAndDoSelect这个方法上,最终会执行特定系统上的doSelect(action, timeout)实现。
这里我们以sun.nio.ch.WindowsSelectorImpl#doSelect为例来讲述其操作执行的步骤:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
// sun.nio.ch.WindowsSelectorImpl#doSelect
@Override
protected int doSelect(Consumer<SelectionKey> action, long timeout)
throws IOException
{
assert Thread.holdsLock(this);
this.timeout = timeout; // set selector timeout
processUpdateQueue(); // <1>
processDeregisterQueue(); // <2>
if (interruptTriggered) {
resetWakeupSocket();
return 0;
}
// Calculate number of helper threads needed for poll. If necessary
// threads are created here and start waiting on startLock
adjustThreadsCount();
finishLock.reset(); // reset finishLock
// Wakeup helper threads, waiting on startLock, so they start polling.
// Redundant threads will exit here after wakeup.
startLock.startThreads();
// do polling in the main thread. Main thread is responsible for
// first MAX_SELECTABLE_FDS entries in pollArray.
try {
begin();
try {
subSelector.poll(); // <3>
} catch (IOException e) {
finishLock.setException(e); // Save this exception
}
// Main thread is out of poll(). Wakeup others and wait for them
if (threads.size() > 0)
finishLock.waitForHelperThreads();
} finally {
end();
}
// Done with poll(). Set wakeupSocket to nonsignaled for the next run.
finishLock.checkForException();
processDeregisterQueue(); // <4>
int updated = updateSelectedKeys(action); // <5>
// Done with poll(). Set wakeupSocket to nonsignaled for the next run.
resetWakeupSocket(); // <6>
return updated;
}

processUpdateQueue解读

  1. 首先通过相应操作系统实现类(此处是WindowsSelectorImpl)的具体实现我们可以知道,通过<1> 处的 processUpdateQueue()获得关于每个剩余Channel(有些Channel取消了)的在此刻的interestOps,这里包括新注册的和updateKeys,并对其进行pollWrapper的管理操作。

    • 即对于新注册的SelectionKeyImpl,我们在相对于这个pollArray所在内存地址的偏移量SIZE_POLLFD * totalChannels + FD_OFFSETSIZE_POLLFD * totalChannels + EVENT_OFFSET分别存入SelectionKeyImpl的文件描述符fd与其对应的EventOps(初始为0)。

    • updateKeys,因为是其之前已经在pollArray的某个相对位置上存储过,这里我们还需要对拿到的key的有效性进行判断,如果有效,只需要将正在操作的这个SelectionKeyImpl对象的interestOps写入到在pollWrapper中的存放它的EventOps位置上。

    注意: 在对newKeys进行key的有效性判断之后,如果有效,会调用growIfNeeded()方法,这里首先会判断channelArray.length == totalChannels,此为一个SelectionKeyImpl的数组,初始容量大小为8。channelArray其实就是方便Selector管理在册SelectionKeyImpl数量的一个数组而已,通过判断它的数组长度大小,如果和totalChannels(初始值为1)相等,不仅仅是为了channelArray扩容,更重要的是为了辅助pollWrapper,让pollWrapper扩容才是这里的目的所在。
    而当totalChannels % MAX_SELECTABLE_FDS == 0时,则多开一个线程处理selectorwindowsselect系统调用有最大文件描述符限制,一次只能轮询1024个文件描述符,如果多于1024个,需要多线程进行轮询。通过ski.setIndex(totalChannels)选择键记录下在数组中的索引位置SelectionKeyImpl选择键的映射关系,以待后续使用。同时调用pollWrapper.addWakeupSocket(wakeupSourceFd, totalChannels)在相对于这个pollArray所在内存地址的偏移量SIZE_POLLFD * totalChannels + FD_OFFSET这个位置上写入wakeupSourceFd所代表的fdVal值。这样在新起的线程就可以通过MAX_SELECTABLE_FDS来确定这个用来监控的wakeupSourceFd

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    88
    89
    90
    91
    92
    93
    94
    95
    96
    97
    98
    99
    100
    101
    102
    103
    104
    105
    106
    107
    108
    109
    110
       /**
    * sun.nio.ch.WindowsSelectorImpl#processUpdateQueue
    * Process new registrations and changes to the interest ops.
    */
    private void processUpdateQueue() {
    assert Thread.holdsLock(this);

    synchronized (updateLock) {
    SelectionKeyImpl ski;

    // new registrations
    while ((ski = newKeys.pollFirst()) != null) {
    if (ski.isValid()) {
    growIfNeeded();
    channelArray[totalChannels] = ski;
    ski.setIndex(totalChannels);
    pollWrapper.putEntry(totalChannels, ski);
    totalChannels++;
    MapEntry previous = fdMap.put(ski);
    assert previous == null;
    }
    }

    // changes to interest ops
    while ((ski = updateKeys.pollFirst()) != null) {
    int events = ski.translateInterestOps();
    int fd = ski.getFDVal();
    if (ski.isValid() && fdMap.containsKey(fd)) {
    int index = ski.getIndex();
    assert index >= 0 && index < totalChannels;
    pollWrapper.putEventOps(index, events);
    }
    }
    }
    }

    //sun.nio.ch.PollArrayWrapper#putEntry
    // Prepare another pollfd struct for use.
    void putEntry(int index, SelectionKeyImpl ski) {
    putDescriptor(index, ski.getFDVal());
    putEventOps(index, 0);
    }
    //sun.nio.ch.WindowsSelectorImpl#growIfNeeded
    private void growIfNeeded() {
    if (channelArray.length == totalChannels) {
    int newSize = totalChannels * 2; // Make a larger array
    SelectionKeyImpl temp[] = new SelectionKeyImpl[newSize];
    System.arraycopy(channelArray, 1, temp, 1, totalChannels - 1);
    channelArray = temp;
    pollWrapper.grow(newSize);
    }
    if (totalChannels % MAX_SELECTABLE_FDS == 0) { // more threads needed
    pollWrapper.addWakeupSocket(wakeupSourceFd, totalChannels);
    totalChannels++;
    threadsCount++;
    }
    }
    // Initial capacity of the poll array
    private final int INIT_CAP = 8;
    // Maximum number of sockets for select().
    // Should be INIT_CAP times a power of 2
    private static final int MAX_SELECTABLE_FDS = 1024;

    // The list of SelectableChannels serviced by this Selector. Every mod
    // MAX_SELECTABLE_FDS entry is bogus, to align this array with the poll
    // array, where the corresponding entry is occupied by the wakeupSocket
    private SelectionKeyImpl[] channelArray = new SelectionKeyImpl[INIT_CAP];
    // The number of valid entries in poll array, including entries occupied
    // by wakeup socket handle.
    private int totalChannels = 1;

    //sun.nio.ch.PollArrayWrapper#grow
    // Grows the pollfd array to new size
    void grow(int newSize) {
    PollArrayWrapper temp = new PollArrayWrapper(newSize);
    for (int i = 0; i < size; i++)
    replaceEntry(this, i, temp, i);
    pollArray.free();
    pollArray = temp.pollArray;
    this.size = temp.size;
    pollArrayAddress = pollArray.address();
    }

    // Maps file descriptors to their indices in pollArray
    private static final class FdMap extends HashMap<Integer, MapEntry> {
    static final long serialVersionUID = 0L;
    private MapEntry get(int desc) {
    return get(Integer.valueOf(desc));
    }
    private MapEntry put(SelectionKeyImpl ski) {
    return put(Integer.valueOf(ski.getFDVal()), new MapEntry(ski));
    }
    private MapEntry remove(SelectionKeyImpl ski) {
    Integer fd = Integer.valueOf(ski.getFDVal());
    MapEntry x = get(fd);
    if ((x != null) && (x.ski.channel() == ski.channel()))
    return remove(fd);
    return null;
    }
    }

    // class for fdMap entries
    private static final class MapEntry {
    final SelectionKeyImpl ski;
    long updateCount = 0;
    MapEntry(SelectionKeyImpl ski) {
    this.ski = ski;
    }
    }
    private final FdMap fdMap = new FdMap();
processDeregisterQueue解读
  1. 接着通过上面WindowsSelectorImpl#doSelect展示源码中<2> 处的 processDeregisterQueue()
    • cancelledKeys进行清除,遍历cancelledKeys,并对每个key进行deregister操作,然后从cancelledKeys集合中删除,从keys集合与selectedKeys中删除,以此来释放引用,方便gc回收,
    • 其内调用implDereg方法,将会从channelArray中移除对应的Channel代表的SelectionKeyImpl,调整totalChannels和线程数,从mapkeys中移除SelectionKeyImpl,移除Channel上的SelectionKeyImpl并关闭Channel
    • 同时还发现该processDeregisterQueue()方法在调用poll方法前后都进行调用,这是确保能够正确处理在调用poll方法阻塞的这一段时间之内取消的键能被及时清理。
    • 最后,还会判断这个cancelledKey所代表的channel是否打开和解除注册,如果关闭并解除注册,则应该将相应的文件描述符对应占用的资源给关闭掉。
      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      29
      30
      31
      32
      33
      34
      35
      36
      37
      38
      39
      40
      41
      42
      43
      44
      45
      46
      47
      48
      49
      50
      51
      52
      53
      54
      55
      56
      57
      58
      59
      60
      61
      62
      63
      64
      65
      66
      67
      68
      69
      70
      71
      72
      73
      74
      75
      76
      77
      78
      79
         /**
      * sun.nio.ch.SelectorImpl#processDeregisterQueue
      * Invoked by selection operations to process the cancelled-key set
      */
      protected final void processDeregisterQueue() throws IOException {
      assert Thread.holdsLock(this);
      assert Thread.holdsLock(publicSelectedKeys);

      Set<SelectionKey> cks = cancelledKeys();
      synchronized (cks) {
      if (!cks.isEmpty()) {
      Iterator<SelectionKey> i = cks.iterator();
      while (i.hasNext()) {
      SelectionKeyImpl ski = (SelectionKeyImpl)i.next();
      i.remove();

      // remove the key from the selector
      implDereg(ski);

      selectedKeys.remove(ski);
      keys.remove(ski);

      // remove from channel's key set
      deregister(ski);

      SelectableChannel ch = ski.channel();
      if (!ch.isOpen() && !ch.isRegistered())
      ((SelChImpl)ch).kill();
      }
      }
      }
      }
      //sun.nio.ch.WindowsSelectorImpl#implDereg
      @Override
      protected void implDereg(SelectionKeyImpl ski) {
      assert !ski.isValid();
      assert Thread.holdsLock(this);

      if (fdMap.remove(ski) != null) {
      int i = ski.getIndex();
      assert (i >= 0);

      if (i != totalChannels - 1) {
      // Copy end one over it
      SelectionKeyImpl endChannel = channelArray[totalChannels-1];
      channelArray[i] = endChannel;
      endChannel.setIndex(i);
      pollWrapper.replaceEntry(pollWrapper, totalChannels-1, pollWrapper, i);
      }
      ski.setIndex(-1);

      channelArray[totalChannels - 1] = null;
      totalChannels--;
      if (totalChannels != 1 && totalChannels % MAX_SELECTABLE_FDS == 1) {
      totalChannels--;
      threadsCount--; // The last thread has become redundant.
      }
      }
      }

      //sun.nio.ch.SocketChannelImpl#kill
      @Override
      public void kill() throws IOException {
      synchronized (stateLock) {
      if (state == ST_KILLPENDING) {
      state = ST_KILLED;
      nd.close(fd);
      }
      }
      }
      //C:/Program Files/Java/jdk-11.0.1/lib/src.zip!/java.base/sun/nio/ch/SocketChannelImpl.java:1126
      static {
      IOUtil.load();
      nd = new SocketDispatcher();
      }
      //sun.nio.ch.SocketDispatcher#close
      void close(FileDescriptor fd) throws IOException {
      close0(fd);
      }
adjustThreadsCount解读
  1. 接着我们来看到上面WindowsSelectorImpl#doSelect展示源码中adjustThreadsCount()方法的调用。
    • 前面有提到如果totalChannels % MAX_SELECTABLE_FDS == 0,则多开一个线程处理selector。这里就是根据分配的线程数量值来增加或减少线程,其实就是针对操作系统的最大select操作的文件描述符限制对线程个数进行调整。
    • 我们来观察所建线程做了什么事情,即观察SelectThreadrun方法实现。通过观察其源码可以看到它首先是while (true),通过startLock.waitForStart(this)来控制该线程是否运行还是等待,运行状态的话,会进而调用subSelector.poll(index)(这个我们后面内容详细解读),
    • 当此线程poll结束,而且相对于当前主线程假如有多条SelectThread子线程的话,当前这条SelectThread线程第一个结束poll的话,就调用finishLock.threadFinished()来通知主线程。在刚新建这个线程并调用其run方法的时候,此时lastRun = 0,在第一次启动的时候sun.nio.ch.WindowsSelectorImpl.StartLock#runsCounter同样为0,所以会调用startLock.wait()进而进入等待状态。

注意:

  • sun.nio.ch.WindowsSelectorImpl.StartLock同样会判断当前其所检测的线程是否废弃,废弃的话就返回true,这样被检测线程也就能跳出其内run方法的while循环从而结束线程运行。
  • 在调整线程的时候(调用adjustThreadsCount方法)与Selector调用close方法会间接调用到sun.nio.ch.WindowsSelectorImpl#implClose,这两个方法都会涉及到Selector线程的释放,即调用sun.nio.ch.WindowsSelectorImpl.SelectThread#makeZombie
  • finishLock.threadFinished()会调用wakeup()方法来通知主线程,这里,我们可以学到一个细节,如果线程正阻塞在select方法上,就可以调用wakeup方法会使阻塞的选择操作立即返回,通过Windows的相关实现,原理其实是向pipesink端写入了一个字节,source文件描述符就会处于就绪状态,poll方法会返回,从而导致select方法返回。而在其他solaris或者linux系统上其实采用系统调用pipe来完成管道的创建,相当于直接用了系统的管道。通过wakeup()相关实现还可以看出,调用wakeup会设置interruptTriggered的标志位,所以连续多次调用wakeup的效果等同于一次调用,不会引起无所谓的bug出现。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
//sun.nio.ch.WindowsSelectorImpl#adjustThreadsCount
// After some channels registered/deregistered, the number of required
// helper threads may have changed. Adjust this number.
private void adjustThreadsCount() {
if (threadsCount > threads.size()) {
// More threads needed. Start more threads.
for (int i = threads.size(); i < threadsCount; i++) {
SelectThread newThread = new SelectThread(i);
threads.add(newThread);
newThread.setDaemon(true);
newThread.start();
}
} else if (threadsCount < threads.size()) {
// Some threads become redundant. Remove them from the threads List.
for (int i = threads.size() - 1 ; i >= threadsCount; i--)
threads.remove(i).makeZombie();
}
}

//sun.nio.ch.WindowsSelectorImpl.SelectThread
// Represents a helper thread used for select.
private final class SelectThread extends Thread {
private final int index; // index of this thread
final SubSelector subSelector;
private long lastRun = 0; // last run number
private volatile boolean zombie;
// Creates a new thread
private SelectThread(int i) {
super(null, null, "SelectorHelper", 0, false);
this.index = i;
this.subSelector = new SubSelector(i);
//make sure we wait for next round of poll
this.lastRun = startLock.runsCounter;
}
void makeZombie() {
zombie = true;
}
boolean isZombie() {
return zombie;
}
public void run() {
while (true) { // poll loop
// wait for the start of poll. If this thread has become
// redundant, then exit.
if (startLock.waitForStart(this))
return;
// call poll()
try {
subSelector.poll(index);
} catch (IOException e) {
// Save this exception and let other threads finish.
finishLock.setException(e);
}
// notify main thread, that this thread has finished, and
// wakeup others, if this thread is the first to finish.
finishLock.threadFinished();
}
}
}

// sun.nio.ch.WindowsSelectorImpl.FinishLock#threadFinished
// Each helper thread invokes this function on finishLock, when
// the thread is done with poll().
private synchronized void threadFinished() {
if (threadsToFinish == threads.size()) { // finished poll() first
// if finished first, wakeup others
wakeup();
}
threadsToFinish--;
if (threadsToFinish == 0) // all helper threads finished poll().
notify(); // notify the main thread
}

//sun.nio.ch.WindowsSelectorImpl#wakeup
@Override
public Selector wakeup() {
synchronized (interruptLock) {
if (!interruptTriggered) {
setWakeupSocket();
interruptTriggered = true;
}
}
return this;
}
//sun.nio.ch.WindowsSelectorImpl#setWakeupSocket
// Sets Windows wakeup socket to a signaled state.
private void setWakeupSocket() {
setWakeupSocket0(wakeupSinkFd);
}
private native void setWakeupSocket0(int wakeupSinkFd);

JNIEXPORT void JNICALL
Java_sun_nio_ch_WindowsSelectorImpl_setWakeupSocket0(JNIEnv *env, jclass this,
jint scoutFd)
{
/* Write one byte into the pipe */
const char byte = 1;
send(scoutFd, &byte, 1, 0);
}
subSelector的poll方法解读
  1. subSelector.poll() 是select的核心,由native函数poll0实现,并把pollWrapper.pollArrayAddress作为参数传给poll0readFdswriteFdsexceptFds数组用来保存底层select的结果,数组的第一个位置都是存放发生事件的socket的总数,其余位置存放发生事件的socket句柄fd
    我们通过下面的代码可知:
    这个poll0()会监听pollWrapper中的FD有没有数据进出,这里会造成IO阻塞,直到有数据读写事件发生。由于pollWrapper中保存的也有ServerSocketChannelFD,所以只要ClientSocket发一份数据到ServerSocket,那么poll0()就会返回;又由于pollWrapper中保存的也有pipewrite端的FD,所以只要pipewrite端向FD发一份数据,也会造成poll0()返回;如果这两种情况都没有发生,那么poll0()就一直阻塞,也就是selector.select()会一直阻塞;如果有任何一种情况发生,那么selector.select()就会返回,所有在SelectThreadrun()里要用while (true) {},这样就可以保证在selector接收到数据并处理完后继续监听poll();

可以看出,NIO依然是阻塞式的IO,那么它和BIO的区别究竟在哪呢。
其实它的区别在于阻塞的位置不同,BIO是阻塞在read方法(recvfrom),而NIO阻塞在select方法。那么这样做有什么好处呢。如果单纯的改变阻塞的位置,自然是没有什么变化的,但epoll等的实现的巧妙之处就在于,它利用回调机制,让监听能够只需要知晓哪些socket上的数据已经准备好了,只需要处理这些线程上面的数据就行了。采用BIO,假设有1000个连接,需要开1000个线程,然后有1000read的位置在阻塞(我们在讲解BIO部分已经通过Demo体现),采用NIO编程,只需要1个线程,它利用select的轮询策略配合epoll的事件机制及红黑树数据结构,降低了其内部轮询的开销,同时极大的减小了线程上下文切换的开销。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
//sun.nio.ch.WindowsSelectorImpl.SubSelector
private final class SubSelector {
private final int pollArrayIndex; // starting index in pollArray to poll
// These arrays will hold result of native select().
// The first element of each array is the number of selected sockets.
// Other elements are file descriptors of selected sockets.
// 保存发生read的FD
private final int[] readFds = new int [MAX_SELECTABLE_FDS + 1];
// 保存发生write的FD
private final int[] writeFds = new int [MAX_SELECTABLE_FDS + 1];
//保存发生except的FD
private final int[] exceptFds = new int [MAX_SELECTABLE_FDS + 1];

private SubSelector() {
this.pollArrayIndex = 0; // main thread
}

private SubSelector(int threadIndex) { // helper threads
this.pollArrayIndex = (threadIndex + 1) * MAX_SELECTABLE_FDS;
}

private int poll() throws IOException{ // poll for the main thread
return poll0(pollWrapper.pollArrayAddress,
Math.min(totalChannels, MAX_SELECTABLE_FDS),
readFds, writeFds, exceptFds, timeout);
}

private int poll(int index) throws IOException {
// poll for helper threads
return poll0(pollWrapper.pollArrayAddress +
(pollArrayIndex * PollArrayWrapper.SIZE_POLLFD),
Math.min(MAX_SELECTABLE_FDS,
totalChannels - (index + 1) * MAX_SELECTABLE_FDS),
readFds, writeFds, exceptFds, timeout);
}

private native int poll0(long pollAddress, int numfds,
int[] readFds, int[] writeFds, int[] exceptFds, long timeout);
...
}
updateSelectedKeys解读
  1. 接下来将通过上面WindowsSelectorImpl#doSelect展示源码中<5> 处的 updateSelectedKeys(action)来处理每个channel准备就绪的信息。
    • 如果该通道的key尚未在selectedKeys中存在,则将其添加到该集合中。
    • 如果该通道的key已经存在selectedKeys中,即这个channel存在所支持的ReadyOps就绪操作中必须包含一个这种操作(由(ski.nioReadyOps() & ski.nioInterestOps()) != 0来确定),此时修改其ReadyOps为当前所要进行的操作。而我们之前看到的Consumer<SelectionKey>这个动作也是在此处进行。而由下面源码可知,先前记录在ReadyOps中的任何就绪信息在调用此action之前被丢弃掉,直接进行设定。
      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      29
      30
      31
      32
      33
      34
      35
      36
      37
      38
      39
      40
      41
      42
      43
      44
      45
      46
      47
      48
      49
      50
      51
      52
      53
      54
      55
      56
      57
      58
      59
      60
      61
      62
      63
      64
      65
      66
      67
      68
      69
      70
      71
      72
      73
      74
      75
      76
      77
      78
      79
      80
      81
      82
      83
      84
      85
      86
      87
      88
      89
      90
      91
      92
      93
      94
      95
      96
      97
      98
      99
      100
      101
      //sun.nio.ch.WindowsSelectorImpl#updateSelectedKeys
      private int updateSelectedKeys(Consumer<SelectionKey> action) {
      updateCount++;
      int numKeysUpdated = 0;
      numKeysUpdated += subSelector.processSelectedKeys(updateCount, action);
      for (SelectThread t: threads) {
      numKeysUpdated += t.subSelector.processSelectedKeys(updateCount, action);
      }
      return numKeysUpdated;
      }
      //sun.nio.ch.SelectorImpl#processReadyEvents
      protected final int processReadyEvents(int rOps,
      SelectionKeyImpl ski,
      Consumer<SelectionKey> action) {
      if (action != null) {
      ski.translateAndSetReadyOps(rOps);
      if ((ski.nioReadyOps() & ski.nioInterestOps()) != 0) {
      action.accept(ski);
      ensureOpen();
      return 1;
      }
      } else {
      assert Thread.holdsLock(publicSelectedKeys);
      if (selectedKeys.contains(ski)) {
      if (ski.translateAndUpdateReadyOps(rOps)) {
      return 1;
      }
      } else {
      ski.translateAndSetReadyOps(rOps);
      if ((ski.nioReadyOps() & ski.nioInterestOps()) != 0) {
      selectedKeys.add(ski);
      return 1;
      }
      }
      }
      return 0;
      }
      //sun.nio.ch.WindowsSelectorImpl.SubSelector#processSelectedKeys
      private int processSelectedKeys(long updateCount, Consumer<SelectionKey> action) {
      int numKeysUpdated = 0;
      numKeysUpdated += processFDSet(updateCount, action, readFds,
      Net.POLLIN,
      false);
      numKeysUpdated += processFDSet(updateCount, action, writeFds,
      Net.POLLCONN |
      Net.POLLOUT,
      false);
      numKeysUpdated += processFDSet(updateCount, action, exceptFds,
      Net.POLLIN |
      Net.POLLCONN |
      Net.POLLOUT,
      true);
      return numKeysUpdated;
      }

      /**
      * sun.nio.ch.WindowsSelectorImpl.SubSelector#processFDSet
      * updateCount is used to tell if a key has been counted as updated
      * in this select operation.
      *
      * me.updateCount <= updateCount
      */
      private int processFDSet(long updateCount,
      Consumer<SelectionKey> action,
      int[] fds, int rOps,
      boolean isExceptFds)
      {
      int numKeysUpdated = 0;
      for (int i = 1; i <= fds[0]; i++) {
      int desc = fds[i];
      if (desc == wakeupSourceFd) {
      synchronized (interruptLock) {
      interruptTriggered = true;
      }
      continue;
      }
      MapEntry me = fdMap.get(desc);
      // If me is null, the key was deregistered in the previous
      // processDeregisterQueue.
      if (me == null)
      continue;
      SelectionKeyImpl sk = me.ski;

      // The descriptor may be in the exceptfds set because there is
      // OOB data queued to the socket. If there is OOB data then it
      // is discarded and the key is not added to the selected set.
      if (isExceptFds &&
      (sk.channel() instanceof SocketChannelImpl) &&
      discardUrgentData(desc))
      {
      continue;
      }
      //我们应该关注的
      int updated = processReadyEvents(rOps, sk, action);
      if (updated > 0 && me.updateCount != updateCount) {
      me.updateCount = updateCount;
      numKeysUpdated++;
      }
      }
      return numKeysUpdated;
      }

至此,关于Selector的内容就暂时告一段落,在下一篇中,我会针对Java NIO Buffer进行相关解读。

您的支持将鼓励我继续创作!