vertx入门:使用NIO进行异步编程(2)

verx系列教程:
vertx入门:为什么同步API是垃圾(1)
vertx入门:使用NIO进行异步编程(2)
vertx入门:什么是Event Loop(3)


为了讲清楚vertx,我们必须先明白现在的NIO是怎么样的。

为了不必傻傻等待IO完成,我们可以切换到NIO来进行编程。NIO实际上就是请求执行一项操作(阻塞),接着并不等待这项操作的完成,而是转而去做其他事情,直到有人通知他之前请求的那个操作有结果了,它才去进行后续的处理。比如一个socket需要读取256kb的数据,执行线程并不会一直卡在那里(因为网络的原因),而是会做其他的事情。直到系统通知它要读取的内容已经准备好了,他才会去读。

java早就提供了NIO的api来让我们操作网络和文件。现在我们来使用NIO来实现一个Echo的程序。你发什么给服务端,服务端返回同样的内容回来。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public static void main(String[] args) throws IOException {
Selector selector = Selector.open();
ServerSocketChannel serverSocketChannel =ServerSocketChannel.open();
serverSocketChannel.bind(new InetSocketAddress(3000));
//设置成非阻塞
serverSocketChannel.configureBlocking(false);
//注册通知事件
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
while (true) {
//获取所有请求,上面注册了SelectionKey.OP_ACCEPT事件,如果由新的请求,这里就会获取到。
selector.select();
Iterator<SelectionKey> it = selector.selectedKeys().iterator();
//遍历处理所有请求(如果有的话)
while (it.hasNext()) {
SelectionKey key = it.next();
if (key.isAcceptable()) {
//处理新的请求
newConnection(selector, key);
} else if (key.isReadable()) {
//socket收到消息,进行读取
echo(key);
} else if (key.isWritable()) {
//socket准备再次写消息了
continueEcho(selector, key);
}
//处理了这次通知之后,需要手动从selector中移除,防止重复处理。
it.remove();
}
}

上面的代码监听了一个3000端口,并且使用了nio来处理所有的链接。注意,这里有一个死循环,这意味着他会不断的去查看当前是否发生了自己感兴趣的事件。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24

//该类用于保存会话信息
private static class Context {
private final ByteBuffer nioBuffer = ByteBuffer.allocate(512);
private String currentLine = "";
private boolean terminating = false;
}

//每一个channel对应一个Context容器保存对话内容
private static final HashMap<SocketChannel, Context> contexts = new HashMap<>();
//接受新的消息
private static void newConnection(Selector selector, SelectionKey key) throws IOException {
//获取到对应的channel
ServerSocketChannel serverSocketChannel = (ServerSocketChannel) key.channel();
//服务端接受请求
SocketChannel socketChannel = serverSocketChannel.accept();
//设置为非阻塞
socketChannel.configureBlocking(false)
//向selector注册“准备”读事件
.register(selector, SelectionKey.OP_READ);
//将当前的会话信息和channel保存到map中
contexts.put(socketChannel, new Context());
}

上面代码简单的接受了一个新的Connection,并且把新的channel缓存了起来,方便后面使用。特别注意的时这里让selector监听了 SelectionKey.OP_READ事件,和下面读数据的代码又关联起来了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
//匹配退出的字符串
private static final Pattern QUIT = Pattern.compile("(\\r)?(\\n)?/quit$");
private static void echo(SelectionKey key) throws IOException {
SocketChannel socketChannel = (SocketChannel) key.channel();
//获取channel对应的context
Context context = contexts.get(socketChannel);
try {
//把socket中的信息读到context.nioBuffer中。
socketChannel.read(context.nioBuffer);
context.nioBuffer.flip();
//拼凑信息
context.currentLine = context.currentLine + Charset.defaultCharset().decode(context.nioBuffer);
if (QUIT.matcher(context.currentLine).find()) {
//如果匹配就退出
context.terminating = true;
} else if (context.currentLine.length() > 16) {
context.currentLine = context.currentLine.substring(8);
}
context.nioBuffer.flip();
//准备将内容写回去
int count = socketChannel.write(context.nioBuffer);
//如果此时不是所有的数据都可以被写出,则等待下次可以写的时候继续写。
if (count < context.nioBuffer.limit()) {
key.cancel();
//注册下次可以写的时候通知事件
socketChannel.register(key.selector(), SelectionKey.OP_WRITE);
} else {
//写完了。
context.nioBuffer.clear();
if (context.terminating) {
cleanup(socketChannel);
}
}
} catch (IOException err) {
err.printStackTrace();
cleanup(socketChannel);
}
}

private static void cleanup(SocketChannel socketChannel) throws IOException {
socketChannel.close();
contexts.remove(socketChannel);
}

上面包括了读数据和写数据2个部分,写数据可能有一次不能写完的情况,所以当socket再次可写时,就会触发下面的代码。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
//续写,当上次写操作未全部完成,此时继续写
private static void continueEcho(Selector selector, SelectionKey key) throws IOException {
SocketChannel socketChannel = (SocketChannel) key.channel();
Context context = contexts.get(socketChannel);
try {
int remainingBytes = context.nioBuffer.limit() - context.nioBuffer.position();
int count = socketChannel.write(context.nioBuffer);
if (count == remainingBytes) {
context.nioBuffer.clear();
key.cancel();
if (context.terminating) {
cleanup(socketChannel);
} else {
socketChannel.register(selector, SelectionKey.OP_READ);
}
}
} catch (IOException err) {
err.printStackTrace();
cleanup(socketChannel);
}
}

上面的例子展示了如何使用nio处理简单的读写需求,但是这样的写法相对于BIO来说太过于复杂。而且指的注意的是java api也就是java.nio包下只关注了底层的IO,并没有包括http等高级协议。另外就是NIO并没有一个提前规划好的线程模型,这对合理的利用多核cpu、处理io事件和编写业务逻辑极为重要。

而vertx有一个厉害的线程模型来处理各种io事件,这将会在下一节中讲到。

本文链接:https://www.jdkdownload.com/vertx/asynchronous_programming_nio.html