BIO,NIO,AIO写法记录

写法详解~

BIO

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
public class BIOPlainEchoServer {
public void serve(int port) throws IOException {
//将ServerSocket绑定到指定的端口里
final ServerSocket socket = new ServerSocket(port);
while (true) {
//阻塞直到收到新的客户端连接
final Socket clientSocket = socket.accept();
System.out.println("Accepted connection from " + clientSocket);
//创建一个子线程去处理客户端的请求
new Thread(new Runnable() {
@Override
public void run() {
try (BufferedReader reader = new BufferedReader(new InputStreamReader(clientSocket.getInputStream()))) {
PrintWriter writer = new PrintWriter(clientSocket.getOutputStream(), true);
//从客户端读取数据并原封不动回写回去
while (true) {
writer.println(reader.readLine());
writer.flush();
}
} catch (IOException e) {
e.printStackTrace();
}
}
}).start();
}
}

public void improvedServe(int port) throws IOException {
//将ServerSocket绑定到指定的端口里
final ServerSocket socket = new ServerSocket(port);
//创建一个线程池
ExecutorService executorService = Executors.newFixedThreadPool(6);
while (true) {
//阻塞直到收到新的客户端连接
final Socket clientSocket = socket.accept();
System.out.println("Accepted connection from " + clientSocket);
//将请求提交给线程池去执行
executorService.execute(() -> {
try (BufferedReader reader = new BufferedReader(new InputStreamReader(clientSocket.getInputStream()))) {
PrintWriter writer = new PrintWriter(clientSocket.getOutputStream(),true);
//从客户端读取数据并原封不动回写回去
while (true) {
writer.println(reader.readLine());
writer.flush();
}
} catch (IOException e) {
e.printStackTrace();
}
});
}
}
}

NIO

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
public class NIOPlainEchoServer {
public void serve(int port) throws IOException {
System.out.println("Listening for connections on port " + port);
ServerSocketChannel serverChannel = ServerSocketChannel.open();
ServerSocket ss = serverChannel.socket();
InetSocketAddress address = new InetSocketAddress(port);
//将ServerSocket绑定到指定的端口里
ss.bind(address);
serverChannel.configureBlocking(false);
Selector selector = Selector.open();
//将channel注册到Selector里,并说明让Selector关注的点,这里是关注建立连接这个事件
serverChannel.register(selector, SelectionKey.OP_ACCEPT);
while (true) {
try {
//阻塞等待就绪的Channel,即没有与客户端建立连接前就一直轮询
selector.select();
} catch (IOException ex) {
ex.printStackTrace();
//代码省略的部分是结合业务,正确处理异常的逻辑
break;
}
//获取到Selector里所有就绪的SelectedKey实例,每将一个channel注册到一个selector就会产生一个SelectedKey
Set<SelectionKey> readyKeys = selector.selectedKeys();
Iterator<SelectionKey> iterator = readyKeys.iterator();
while (iterator.hasNext()) {
SelectionKey key = (SelectionKey) iterator.next();
//将就绪的SelectedKey从Selector中移除,因为马上就要去处理它,防止重复执行
iterator.remove();
try {
//若SelectedKey处于Acceptable状态
if (key.isAcceptable()) {
ServerSocketChannel server = (ServerSocketChannel) key.channel();
//接受客户端的连接
SocketChannel client = server.accept();
System.out.println("Accepted connection from " + client);
client.configureBlocking(false);
//像selector注册socketchannel,主要关注读写,并传入一个ByteBuffer实例供读写缓存
client.register(selector, SelectionKey.OP_WRITE | SelectionKey.OP_READ, ByteBuffer.allocate(100));
}
//若SelectedKey处于可读状态
if (key.isReadable()) {
SocketChannel client = (SocketChannel) key.channel();
ByteBuffer output = (ByteBuffer) key.attachment();
//从channel里读取数据存入到ByteBuffer里面
client.read(output);
}
//若SelectedKey处于可写状态
if (key.isWritable()) {
SocketChannel client = (SocketChannel) key.channel();
ByteBuffer output = (ByteBuffer) key.attachment();
output.flip();
//将ByteBuffer里的数据写入到channel里
client.write(output);
output.compact();
}
} catch (IOException ex) {
key.cancel();
try {
key.channel().close();
} catch (IOException cex) {
}
}
}
}
}
}

AIO

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
public class AIOPlainEchoServer {
public void serve(int port) throws IOException {
System.out.println("Listening for connections on port " + port);
final AsynchronousServerSocketChannel serverChannel = AsynchronousServerSocketChannel.open();
InetSocketAddress address = new InetSocketAddress(port);
// 将ServerSocket绑定到指定的端口里
serverChannel.bind(address);
final CountDownLatch latch = new CountDownLatch(1);
// 开始接收新的客户端请求. 一旦一个客户端请求被接收, CompletionHandler 就会被调用.
serverChannel.accept(null, new CompletionHandler<AsynchronousSocketChannel, Object>() {
@Override
public void completed(final AsynchronousSocketChannel channel, Object attachment) {
// 一旦完成处理,再次接收新的客户端请求
serverChannel.accept(null, this);
ByteBuffer buffer = ByteBuffer.allocate(100);
// 在channel里植入一个读操作EchoCompletionHandler,一旦buffer有数据写入,EchoCompletionHandler 便会被唤醒
channel.read(buffer, buffer, new EchoCompletionHandler(channel));
}

@Override
public void failed(Throwable throwable, Object attachment) {
try {
// 若遇到异常,关闭channel
serverChannel.close();
} catch (IOException e) {
// ingnore on close
} finally {
latch.countDown();
}
}
});
try {
latch.await();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}

private final class EchoCompletionHandler implements CompletionHandler<Integer, ByteBuffer> {
private final AsynchronousSocketChannel channel;

EchoCompletionHandler(AsynchronousSocketChannel channel) {
this.channel = channel;
}

@Override
public void completed(Integer result, ByteBuffer buffer) {
buffer.flip();
// 在channel里植入一个读操作CompletionHandler,一旦channel有数据写入,CompletionHandler 便会被唤醒
channel.write(buffer, buffer, new CompletionHandler<Integer, ByteBuffer>() {
@Override
public void completed(Integer result, ByteBuffer buffer) {
if (buffer.hasRemaining()) {
// 如果buffer里还有内容,则再次触发写入操作将buffer里的内容写入channel
channel.write(buffer, buffer, this);
} else {
buffer.compact();
// 如果channel里还有内容需要读入到buffer里,则再次触发写入操作将channel里的内容读入buffer
channel.read(buffer, buffer, EchoCompletionHandler.this);
}
}

@Override
public void failed(Throwable exc, ByteBuffer attachment) {
try {
channel.close();
} catch (IOException e) {
// ingnore on close
}
}
});
}

@Override
public void failed(Throwable exc, ByteBuffer attachment) {
try {
channel.close();
} catch (IOException e) {
// ingnore on close
}
}
}
}