GYM8-NIO和AIO

圣人不死,大盗不止——庄子·胠箧

NIO

简介

New I/O的简称,与旧式的基于流的I/O方法相对。

  • NIO是基于块(Block)的,它以块为基本单位处理数据
  • 为所有的原始类型提供缓存(Buffer)支持
  • 增加通道(Channel)对象,作为新的原始I/O抽象
  • 支持文件锁(.lock文件作为一把锁)和内存映射文件的文件访问接口
  • 提供了基于Selector的异步网络I/O

    Buffer和Channel

    file->channel->buffer->byte
    file->channel->buffer->channel->file
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    public void nioCopyFile(String resource, String destination) throws Exception{
    FileInputStream inputStream = new FileInputStream(resource);
    FileOutputStream outputStream = new FileOutputStream(destination);
    FileChannel inputChannel = inputStream.getChannel();
    FileChannel outputChannel = outputStream.getChannel();
    ByteBuffer buffer = ByteBuffer.allocate(1024);
    int len;
    while((len = inputChannel.read(buffer)) != -1){
    buffer.flip();
    outputChannel.write(buffer);
    buffer.clear();
    }
    inputChannel.close();
    outputChannel.close();
    }

Buffer三个参数

  • position
  • capacity
  • limit
参数 写模式 读模式
位置 当前缓冲区的位置,将从position的下一个位置写数据 当前缓冲区读取的位置,将从此位置后,读取数据
容量 缓冲区上限 缓冲区上限
上限 通常和容量相等 可读取的总容量和上次写入的数据量相等

Buffer操作

Buffer.flip:通常将buffer从写模式转化为读模式
Buffer.rewind:将position设置为0,并且清楚标志位(mark)
Buffer.clear:将position设置0,同时将limit设置为capacity的大小,并清除标志位mark

文件映射到内存

1
2
3
4
5
6
7
8
9
10
public void nioFileToRandom(String resource) throws Exception{
RandomAccessFile raf = new RandomAccessFile(resource,"rw");
FileChannel fc = raf.getChannel();
MappedByteBuffer mappedByteBuffer = fc.map(FileChannel.MapMode.READ_WRITE,0,raf.length());
while ((mappedByteBuffer.hasRemaining())){
System.out.println((char)mappedByteBuffer.get());
}
mappedByteBuffer.put(0,(byte)98);
raf.close();
}

网络编程

SocketServer编写简单服务器

缺点:

  • 为每一个客户端使用一个线程,如果客户端出现延时等异常,线程可能会被占用很长时间。因为数据的准备和读取都在这个线程中。
  • 此时,如果客户端数量众多,可能会消耗大量的系统资源。

Selector

select() 无数据阻塞,有数据返回selectionKey。SelectionKey便是Selector和Channel的关系。从SelectionKey中得到Channel(数据已经准备),并读取数据。
selectNow() 没有数据返回0,
一个线程监控大量的客户端线程,减少数据准备对业务处理线程影响

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123

/**
* Created by baobing on 2016/8/23.
*/
public class MultiThreadNio {

public static Map<Socket,Long> timeStat = new HashMap<>();
class EchoClient{
private LinkedList<ByteBuffer> outputQueue = new LinkedList<>();
public LinkedList getOutputQueue(){
return outputQueue;
}
public void enqueue(ByteBuffer byteBuffer){
outputQueue.addFirst(byteBuffer);
}
}

class HandlerMsg implements Runnable{
SelectionKey selectionKey;
ByteBuffer byteBuffer;

public HandlerMsg(SelectionKey selectionKey, ByteBuffer byteBuffer) {
this.selectionKey = selectionKey;
this.byteBuffer = byteBuffer;
}

@Override
public void run() {
EchoClient echoClient = (EchoClient) selectionKey.attachment();
echoClient.enqueue(byteBuffer);
//修改兴趣事件
selectionKey.interestOps(SelectionKey.OP_READ | SelectionKey.OP_WRITE);
selector.wakeup();

}
}
private Selector selector;
private ExecutorService executorService = Executors.newCachedThreadPool();
private void startServer() throws Exception{
selector = SelectorProvider.provider().openSelector();
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
//非阻塞 需要监听事件
serverSocketChannel.configureBlocking(false);
InetSocketAddress address = new InetSocketAddress(8000);
serverSocketChannel.socket().bind(address);
//感兴趣的事件进行注册
SelectionKey selectionKey = serverSocketChannel.register(selector,SelectionKey.OP_ACCEPT);
while (true){
selector.select();
Set readyKeys = selector.selectedKeys();
Iterator iterator = readyKeys.iterator();
long e = 0;
while(iterator.hasNext()){
SelectionKey key = (SelectionKey) iterator.next();
iterator.remove();
if(selectionKey.isAcceptable()){
doAccept(selectionKey);
} else if(selectionKey.isValid() && selectionKey.isReadable()){
doRead(selectionKey);
} else if(selectionKey.isValid() && selectionKey.isWritable()){
doWrite(selectionKey);
}
}
}
}
private void doAccept(SelectionKey selectionKey){
ServerSocketChannel server = (ServerSocketChannel) selectionKey.channel();
SocketChannel clientChannel ;
try {
clientChannel = server.accept();
clientChannel.configureBlocking(false);
//注册读取事件
SelectionKey clientKey = clientChannel.register(selector,SelectionKey.OP_READ);
clientKey.attach(new EchoClient());
} catch (IOException e){

}
}
private void doRead(SelectionKey selectionKey){
SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
try {
int len = socketChannel.read(byteBuffer);
if(len == -1) return ;
}catch (Exception e){

}
byteBuffer.flip();
executorService.execute(new HandlerMsg(selectionKey,byteBuffer));
}
private void doWrite(SelectionKey selectionKey){
SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
EchoClient echoClient = (EchoClient) selectionKey.attachment();
LinkedList<ByteBuffer> outputQueue = echoClient.getOutputQueue();
ByteBuffer byteBuffer = outputQueue.getLast();
try {
int len = socketChannel.write(byteBuffer);
if(len == -1){
disconnet();
return;
}
if(byteBuffer.remaining() == 0){
outputQueue.removeLast();
}
} catch (IOException e){

}
if(outputQueue.size() == 0){
selectionKey.interestOps(SelectionKey.OP_READ);
}
}

public static void main(String[] args) {
try{
MultiThreadNio server = new MultiThreadNio();
server.startServer();
}catch (Exception e){
System.out.println("exception caught,program exiting...");
e.printStackTrace();
}
}

}

网络编程AIO

特性

  • 数据读取完毕通知业务线程,使用回调函数的形式
  • AIO不加快IO的读写速度,只是改变线程对IO的处理方式,从而提升性能。

    示例代码