vertx入门:什么是Event Loop(3)

verx系列教程:
vertx入门:为什么同步API是垃圾(1)
vertx入门:使用NIO进行异步编程(2)
vertx入门:什么是Event Loop(3)


有一个流行的异步事件处理的线程模型叫做event-loop。不像上一节(NIO)那样从已经到达的事件中拉(pull)自己感兴趣的事件做处理,在event-loop中我们都是将事件往处理器(handler)里面推(push)。
如图所示,当事件到达时会被放到队列里面。他们可以是io事件也可以是一个没被socket完全写完的buffer,或者是任何的其他时间,比如定时器。有一个单线程的event loop专门处理各种非阻塞或者执行时间不长的事件。如果线程被阻塞,那么就违反了event loop设计的初衷。event loop相当流行比如在浏览器中的js代码都是基于event loop线程模型来运行的。还有Java Swing也是一样基于它。
实现一个event loop相当简单。下面看代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
/*
mian方法创建了一个EventLoop对象,让后我们后面开启了几个线程不断的向EventLoop中投递事件。
*/
public static void main(String[] args) {
EventLoop eventLoop = new EventLoop();
//开启一个新线程,每秒投递一次
new Thread(() -> {
for (int n = 0; n < 6; n++) {
delay(1000);
eventLoop.dispatch(new EventLoop.Event("tick", n));
}
eventLoop.dispatch(new EventLoop.Event("stop", null));
}).start();
//开启一个新线程,隔2.5秒和3.3秒投递一次
new Thread(() -> {
delay(2500);
eventLoop.dispatch(new EventLoop.Event("hello", "beautiful world"));
delay(800);
eventLoop.dispatch(new EventLoop.Event("hello", "beautiful universe"));
}).start();
eventLoop.dispatch(new EventLoop.Event("hello", "world!"));
eventLoop.dispatch(new EventLoop.Event("foo", "bar"))
//注册不同的事件对应的处理器
eventLoop
.on("hello", s -> System.out.println("hello " + s))
.on("tick", n -> System.out.println("tick #" + n))
.on("stop", v -> eventLoop.stop())
.run();
System.out.println("Bye!");
}
//模拟阻塞
private static void delay(long millis) {
try {
Thread.sleep(millis);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}

运行上面的语句之后,我们将得到如下的结果:

1
2
3
4
5
6
7
8
9
10
11
hello world!
No handler for key foo
tick #0
tick #1
hello beautiful world
tick #2
hello beautiful universe
tick #3
tick #4
tick #5
Bye!

接下来我们看下EventLoop是怎么实现的吧。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
public final class EventLoop {
//这个容器来盛装待处理的事件
private final ConcurrentLinkedDeque<Event> events = new ConcurrentLinkedDeque<>();
//保存事件处理器和名称的关系
private final ConcurrentHashMap<String, Consumer<Object>> handlers = new ConcurrentHashMap<>();
//注册不同的事件对应的处理器,key是处理器的名称,handler就是具体处理器的实现。
public EventLoop on(String key, Consumer<Object> handler) {
handlers.put(key, handler);
return this;
}
//将要处理的事件保存进入队列。
public void dispatch(Event event) { events.add(event); }
//停止event-loop
public void stop() { Thread.currentThread().interrupt(); }
//运行event-loop
public void run() {
while (!(events.isEmpty() && Thread.interrupted())) {
if (!events.isEmpty()) {
Event event = events.pop();
if (handlers.containsKey(event.key)) {
handlers.get(event.key).accept(event.data);
} else {
System.err.println("No handler for key " + event.key);
}
}
}
}
}
//事件
public static final class Event {
private final String key;
private final Object data;
public Event(String key, Object data) {
this.key = key;
this.data = data;
}
}

从上面的实现我们可以看到,event-loop线程会不断的循环自己,如果有事件处理时间叫长,那么将会导致后面的事件堆积。

原文地址:https://www.jdkdownload.com/vertx/vertx_event_loop.html