forked from aofeng/JavaTutorial
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathReactor.java
More file actions
113 lines (94 loc) · 3.62 KB
/
Reactor.java
File metadata and controls
113 lines (94 loc) · 3.62 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
package cn.aofeng.demo.reactor;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.util.Iterator;
import java.util.Set;
import java.util.logging.Level;
import java.util.logging.Logger;
/**
* 负责Echo Server启动和停止 ,ACCEPT和READ事件的分派。
*
* @author <a href="mailto:aofengblog@163.com">NieYong </a>
*/
public class Reactor {
private final static Logger logger = Logger.getLogger(Reactor.class.getName());
// 监听端口
private int _port;
// {@link Selector}检查通道就绪状态的超时时间(单位:毫秒)
private int _selectTimeout = 3000;
// 服务运行状态
private volatile boolean _isRun = true;
/**
* @param port 服务监听端口。
*/
public Reactor(int port) {
this._port = port;
}
public void setSelectTimeout(int selectTimeout) {
this._selectTimeout = selectTimeout;
}
/**
* 启动服务。
*/
public void start() {
ServerSocketChannel serverChannel = null;
try {
serverChannel = ServerSocketChannel.open();
serverChannel.configureBlocking(false);
ServerSocket serverSocket = serverChannel.socket();
serverSocket.bind(new InetSocketAddress(_port));
_isRun = true;
if (logger.isLoggable(Level.INFO)) {
logger.info("NIO echo网络服务启动完毕,监听端口:" +_port);
}
Selector selector = Selector.open();
serverChannel.register(selector, SelectionKey.OP_ACCEPT, new Acceptor(selector, serverChannel));
while (_isRun) {
int selectNum = selector.select(_selectTimeout);
if (0 == selectNum) {
continue;
}
Set<SelectionKey> selectionKeys = selector.selectedKeys();
Iterator<SelectionKey> it = selectionKeys.iterator();
while (it.hasNext()) {
SelectionKey selectionKey = (SelectionKey) it.next();
// 接受新的Socket连接
if (selectionKey.isValid() && selectionKey.isAcceptable()) {
Acceptor acceptor = (Acceptor) selectionKey.attachment();
acceptor.accept();
}
// 读取并处理Socket的数据
if (selectionKey.isValid() && selectionKey.isReadable()) {
Reader reader = (Reader) selectionKey.attachment();
reader.read();
}
// 移除已经处理过的Key
it.remove();
} // end of while iterator
}
} catch (IOException e) {
logger.log(Level.SEVERE, "处理网络连接出错", e);
}
}
/**
* 停止服务。
*/
public void stop() {
_isRun = false;
}
public static void main(String[] args) {
if (1 != args.length) {
logger.severe("无效参数。使用示例:\n java cn.aofeng.demo.reactor.Reactor 9090");
System.exit(-1);
}
int port = Integer.parseInt(args[0]);
int selectTimeout = 1000;
Reactor reactor = new Reactor(port);
reactor.setSelectTimeout(selectTimeout);
reactor.start();
}
}