简介

Java NIO是一个可以替代标准Java IO API的IO API,主要有以下三个核心组件:

  • Channels
  • Buffers
  • Selectors

NIO与IO的主要区别在于:

  • IO是面向流的,NIO是面向缓冲区的。
  • IO是阻塞的,NIO是非阻塞的。
  • NIO有选择器,允许一个单独的线程来管理多个输入通道。

Channel

Java NIO的通道类似流,主要区别在于:

  • 既可以从通道中读取数据,又可以写数据到通道。但流的读写通常是单向的。
  • 通道可以异步地读写。
  • 通道中的数据总是要先读到一个Buffer,或者总是要从一个Buffer中写入。

NIO中有以下几个重要的通道实现。

FileChannel

FileChannel是一个连接到文件的通道。可以通过文件通道读写文件。

注意:FileChannel无法设置为非阻塞模式,它总是运行在阻塞模式下。

1
2
3
4
5
6
7
8
9
10
11
12
13
//打开FileChannel
RandomAccessFile aFile = new RandomAccessFile("data/nio-data.txt", "rw");
FileChannel inChannel = aFile.getChannel();

//在FileChannel的某个特定位置进行数据的读/写操作
long pos = channel.position();
channel.position(pos +123);

//可以使用FileChannel.truncate()方法截取一个文件,指定长度后面的部分将被删除
channel.truncate(1024);

//出于性能方面的考虑,操作系统会将数据缓存在内存中,所以无法保证写入到FileChannel里的数据一定会即时写到磁盘上,而通过FileChannel.force()方法则可以将通道里尚未写入磁盘的数据强制写到磁盘上
channel.force(true);

SocketChannel

SocketChannel是一个连接到TCP网络套接字的通道。可以通过以下2种方式创建SocketChannel:

  • 打开一个SocketChannel并连接到互联网上的某台服务器。
  • 一个新连接到达ServerSocketChannel时,会创建一个SocketChannel。

SocketChannel是可以设置为非阻塞模式的,设置之后,就可以在异步模式下调用connect(), read()write()了。

1
2
3
4
5
6
7
8
9
10
//打开并连接
SocketChannel socketChannel = SocketChannel.open();
socketChannel.connect(new InetSocketAddress("http://jenkov.com", 80));

//非阻塞模式可以调用finishConnect()的方法确定连接是否建立
socketChannel.configureBlocking(false);
socketChannel.connect(new InetSocketAddress("http://jenkov.com", 80));
while(! socketChannel.finishConnect() ){
//wait, or do something else...
}

ServerSocketChannel

ServerSocketChannel是一个可以监听新进来的TCP连接的通道。在打开了ServerSocketChannel之后,可以通过accept()方法监听新进来的连接,当accept()方法返回的时候,它返回一个包含新进来的连接的SocketChannel。因此,accept()方法会一直阻塞到有新连接到达。

ServerSocketChannel也可以设置成非阻塞模式。在非阻塞模式下,accept()方法会立刻返回,如果还没有新进来的连接,返回的将是null

1
2
3
4
5
6
7
//打开通道并监听新进来的连接
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.socket().bind(new InetSocketAddress(9999));
while(true){
SocketChannel socketChannel = serverSocketChannel.accept();
//do something with socketChannel...
}

DatagramChannel

DatagramChannel是一个能收发UDP包的通道。因为UDP是无连接的网络协议,所以不能像其它通道那样读取和写入。它发送和接收的是数据包。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
//打开DatagramChannel
DatagramChannel channel = DatagramChannel.open();
channel.socket().bind(new InetSocketAddress(9999));

//通过receive()方法从DatagramChannel接收数据到指定的Buffer,如果Buffer容不下收到的数据,多出的数据将被丢弃
ByteBuffer buf = ByteBuffer.allocate(48);
buf.clear();
channel.receive(buf);

//通过send()方法从DatagramChannel发送数据
String newData = "New String to write to file..." + System.currentTimeMillis();
ByteBuffer buf = ByteBuffer.allocate(48);
buf.clear();
buf.put(newData.getBytes());
buf.flip();
int bytesSent = channel.send(buf, new InetSocketAddress("jenkov.com", 80));

Buffer

Buffer本质是一块可以写入数据,并可以从中读取数据的内存,用于和通道进行交互,过程如下:

  • 写入数据到Buffer
  • 调用flip()方法
  • 从Buffer中读取数据
  • 调用clear()方法或者compact()方法

其中flip()方法是将Buffer从写模式切换到读模式,clear()方法会清空整个缓冲区,compact()方法只会清除已经读过的数据并将所有未读的数据拷贝到Buffer起始处。

Buffer有三个属性:

  • capacity
  • position:当写数据到Buffer中时,position表示当前的位置;当从Buffer读取数据时,也是从当前位置开始读;将Buffer从写模式切换到读模式,position会被重置为0。
  • limit:在写模式下,limit表示你最多能往Buffer里写多少数据,此时limit等于capacity;在读模式下,limit表示最多能读到多少数据,因此,当切换Buffer到读模式时,limit会被设置成写模式下的position值。

Buffer有以下几种类型,它们代表了不同的数据类型:

  • ByteBuffer
  • MappedByteBuffer
  • CharBuffer
  • DoubleBuffer
  • FloatBuffer
  • IntBuffer
  • LongBuffer
  • ShortBuffer

Buffer的分配

要想获得一个Buffer对象首先要进行分配。每一个Buffer类都有一个allocate方法:

1
ByteBuffer buf = ByteBuffer.allocate(48);

向Buffer写数据

写数据到Buffer有两种方式:

  • 从Channel写到Buffer。
  • 通过Buffer的put()方法写到Buffer里。
1
2
3
int bytesRead = inChannel.read(buf); //read into buffer.

buf.put(127);

从Buffer读数据

从Buffer中读取数据有两种方式:

  • 从Buffer读取数据到Channel。
  • 使用get()方法从Buffer中读取数据。
1
2
3
int bytesWritten = inChannel.write(buf);

byte aByte = buf.get();

rewind()方法

rewind()将position设回0,所以可以重读Buffer中的所有数据。limit保持不变,仍然表示能从Buffer中读取多少个元素。

Selector

Selector是NIO中能够检测一到多个通道,并能够知晓通道是否为诸如读写事件做好准备的组件。这样,一个单独的线程可以管理多个channel,从而管理多个网络连接。

创建并注册

与Selector一起使用时,Channel必须处于非阻塞模式下。这意味着不能将FileChannel与Selector一起使用,因为FileChannel不能切换到非阻塞模式。

1
2
3
4
5
6
7
//Selector的创建
Selector selector = Selector.open();

//向Selector注册通道
channel.configureBlocking(false);
SelectionKey key = channel.register(selector,
Selectionkey.OP_READ);

注意register()方法的第二个参数。这是一个interest集合,意思是在通过Selector监听Channel时对什么事件感兴趣。可以监听四种不同类型的事件,用SelectionKey的四个常量来表示:

  • SelectionKey.OP_CONNECT:某个channel成功连接到另一个服务器称为“连接就绪”
  • SelectionKey.OP_ACCEPT:一个server socket channel准备好接收新进入的连接称为“接收就绪”
  • SelectionKey.OP_READ:一个有数据可读的通道可以说是“读就绪”
  • SelectionKey.OP_WRITE:等待写数据的通道可以说是“写就绪”。

如果对不止一种事件感兴趣,那么可以用“位或”操作符将常量连接起来,如int interestSet = SelectionKey.OP_READ | SelectionKey.OP_WRITE;

SelectionKey

当向Selector注册Channel时,register()法会返回一个SelectionKey对象,这个对象包含以下几个属性:

  • terest集合
  • ready集合
  • Channel
  • Selector
  • 附加的对象(可选)

interest集合

可以用位与操作interest集合和给定的SelectionKey常量来确定某个事件是否在interest集合中。

1
2
3
4
5
6
int interestSet = selectionKey.interestOps();

boolean isInterestedInAccept = (interestSet & SelectionKey.OP_ACCEPT) == SelectionKey.OP_ACCEPT;
boolean isInterestedInConnect = interestSet & SelectionKey.OP_CONNECT;
boolean isInterestedInRead = interestSet & SelectionKey.OP_READ;
boolean isInterestedInWrite = interestSet & SelectionKey.OP_WRITE;

ready集合

ready集合是通道已经准备就绪的操作的集合。可以用像检测interest集合那样的方法,来检测channel中什么事件或操作已经就绪,也可以通过如下的方法:

1
2
3
4
selectionKey.isAcceptable();
selectionKey.isConnectable();
selectionKey.isReadable();
selectionKey.isWritable();

Channel Selector

1
2
Channel  channel  = selectionKey.channel();
Selector selector = selectionKey.selector();

附加的对象

可以将一个对象或者更多信息附着到SelectionKey上,这样就能方便的识别某个给定的通道。使用方法如下:

1
2
selectionKey.attach(theObject);
Object attachedObj = selectionKey.attachment();

选择通道

一旦向Selector注册了一或多个通道,就可以调用select()方法选择就绪的通道,方法返回已经就绪的通道数目。select()阻塞到至少有一个通道在你注册的事件上就绪了,而selectNow()不会阻塞。

一旦调用了select()方法,并且返回值表明有一个或更多个通道就绪了,就可以通过调用selectedKeys()方法返回就绪通道的集合:

1
Set selectedKeys = selector.selectedKeys();

之后就可以通过遍历这个集合来访问就绪的通道:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
Set selectedKeys = selector.selectedKeys();
Iterator keyIterator = selectedKeys.iterator();
while(keyIterator.hasNext()) {
SelectionKey key = keyIterator.next();
if(key.isAcceptable()) {
// a connection was accepted by a ServerSocketChannel.
} else if (key.isConnectable()) {
// a connection was established with a remote server.
} else if (key.isReadable()) {
// a channel is ready for reading
} else if (key.isWritable()) {
// a channel is ready for writing
}
keyIterator.remove();
}

注意每次迭代末尾的keyIterator.remove()调用。Selector不会自己从集合中移除SelectionKey实例,必须在处理完通道时自己移除。下次该通道变成就绪时,Selector会再次将其放入集合中。

关闭

用完Selector后调用其close()方法会关闭该Selector,且使注册到该Selector上的所有SelectionKey实例无效,但通道本身并不会关闭。

Scatter/Gather

Java NIO支持scatter/gather,scatter是指数据从一个channel读取到多个buffer中,而gather则是指数据从多个buffer写入到同一个channel。scatter/gather经常用于需要将传输的数据分开处理的场合,例如传输一个由消息头和消息体组成的消息,你可能会将消息体和消息头分散到不同的buffer中,这样可以方便的处理消息头和消息体。

1
2
3
4
5
6
7
8
9
ByteBuffer header = ByteBuffer.allocate(128);
ByteBuffer body = ByteBuffer.allocate(1024);
ByteBuffer[] bufferArray = { header, body };
channel.read(bufferArray);

ByteBuffer header = ByteBuffer.allocate(128);
ByteBuffer body = ByteBuffer.allocate(1024);
ByteBuffer[] bufferArray = { header, body };
channel.write(bufferArray);

注意,Scattering Reads在移动下一个buffer前,必须填满当前的buffer,这意味着它不适用于动态消息,而Gathering Writes只有position和limit之间的数据才会被写入,因此能较好的处理动态消息。

通道之间的数据传输

如果两个通道中有一个是FileChannel,那么可以直接将数据从一个通道传输到另外一个通道。

transferFrom()

transferFrom()方法可以将数据从源channel传输到FileChannel中:

1
2
3
4
5
6
7
8
9
10
RandomAccessFile fromFile = new RandomAccessFile("fromFile.txt", "rw");
FileChannel fromChannel = fromFile.getChannel();

RandomAccessFile toFile = new RandomAccessFile("toFile.txt", "rw");
FileChannel toChannel = toFile.getChannel();

long position = 0;
long count = fromChannel.size();

toChannel.transferFrom(position, count, fromChannel);

transferTo()

transferTo()方法将数据从FileChannel传输到其他的channel中。

1
2
3
4
5
6
7
8
9
10
RandomAccessFile fromFile = new RandomAccessFile("fromFile.txt", "rw");
FileChannel fromChannel = fromFile.getChannel();

RandomAccessFile toFile = new RandomAccessFile("toFile.txt", "rw");
FileChannel toChannel = toFile.getChannel();

long position = 0;
long count = fromChannel.size();

fromChannel.transferTo(position, count, toChannel);

Path

Path接口表示的是一个与平台无关的路径,既可以是绝对路径也可以是相对路径,文件和目录都用Path表示。

创建Path

可以使用Paths工具类的工厂方法创建一个Path对象:

1
2
3
4
5
6
Path path = Paths.get("C:\\DATA\\test.txt");

Path projects = Paths.get("d:\\data", "projects");

Path currentDir = Paths.get(".");
Path parentDir = Paths.get("..");

normalize()

normalize()方法可以标准化路径,它会处理路径中的相对路径,去除. ..

1
2
3
4
5
6
7
8
9
Path path = Paths.get("c:/Z_DATA/./test.txt");
System.out.println("path = " + path);

path = path.normalize();
System.out.println("path = " + path);

//输出
path = c:\Z_DATA\.\test.txt
path = c:\Z_DATA\test.txt

Files

Files工具类封装提供了一些操作文件系统中文件的工具方法,往往和和Path一起使用。

exists()

exists()可以判断一个Path在文件系统中是否存在。

createDirctory()

在调用创建方法前最好先检查是否存在,如果已经存在会抛出FileAlreadyExistsException异常。

1
2
3
4
5
6
7
8
Path newDir = Paths.get("c:/Z_DATA/newDir");
try {
if(!Files.exists(newDir)) {
Files.createDirectory(newDir);
}
} catch (IOException e) {
e.printStackTrace();
}

copy()

copy()只能复制到不存在的路径,如果复制的目标文件已存在则会抛出异常。强制覆盖已存在文件也是可以的,需要增加相应参数:

1
2
3
4
5
6
7
Path sourcePath = Paths.get(classPath,"nio-data.txt");
Path targetPath = Paths.get(classPath,"nio-data-copy.txt");
try {
Files.copy(sourcePath, targetPath, StandardCopyOption.REPLACE_EXISTING); // 复制并覆盖已有文件
} catch (IOException e) {
e.printStackTrace();
}

move()

Java NIO Files类同样提供了移动文件的方法。

1
2
3
4
5
6
7
Path sourcePath = Paths.get(classPath,"nio-data.txt");
Path targetPath = Paths.get(classPath,"nio-data-copy.txt");
try {
Files.move(sourcePath, targetPath, StandardCopyOption.REPLACE_EXISTING); // 移动并覆盖已有文件
} catch (IOException e) {
e.printStackTrace();
}

delete()

删除文件或目录

1
Files.delete(targetPath);

如果删除的文件或目录不存在会抛出IOException异常。

AsynchronousFileChannel

使用AsynchronousFileChannel可以实现异步地读取和写入文件数据。

创建

我们可以使用AsynchronousFileChannel提供的静态方法open()创建它。

1
2
3
Path path = Paths.get("data/test.xml");
AsynchronousFileChannel fileChannel =
AsynchronousFileChannel.open(path, StandardOpenOption.READ);

读取数据

AsynchronousFileChannel提供了两种读取数据的方式,都是调用它本身的read()方法。

使用Futrue读取数据

第一种方式是调用AsynchronousFileChannel的read()方法,该方法返回一个Future类型的对象。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
AsynchronousFileChannel fileChannel = 
AsynchronousFileChannel.open(path, StandardOpenOption.READ);

ByteBuffer buffer = ByteBuffer.allocate(1024);
long position = 0;

Future<Integer> operation = fileChannel.read(buffer, position);

while(!operation.isDone());

buffer.flip();
byte[] data = new byte[buffer.limit()];
buffer.get(data);
System.out.println(new String(data));
buffer.clear();

以上代码read()方法会立即返回,即使整个读的过程还没有完全结束。我们可以通过Future.isDone()来检查读取是否完成。

使用CompletionHandler读取数据

第二种读取数据的方式是调用AsynchronousFileChannel的另一个read()方法,该方法需要一个CompletionHandler作为参数。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
fileChannel.read(buffer, position, buffer, new CompletionHandler<Integer, ByteBuffer>() {
@Override
public void completed(Integer result, ByteBuffer attachment) {
System.out.println("result = " + result);

attachment.flip();
byte[] data = new byte[attachment.limit()];
attachment.get(data);
System.out.println(new String(data));
attachment.clear();
}

@Override
public void failed(Throwable exc, ByteBuffer attachment) {

}
});

一旦读取操作完成,CompletionHandlercomplete()方法将会被调用。它的第一个参数是个Integer类型,表示读取的字节数。第二个参数attachmentByteBuffer类型的,用来存储读取的数据(ByteBuffer也可以换成其他合适的对象方便数据写入)。它其实就是由read()方法的第三个参数。读取失败的时候,CompletionHandlerfailed()方法会被调用。

写入数据

就像读取一样,我们同样有两种方式向AsynchronousFileChannel写入数据。

使用Future读取数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
Path path = Paths.get("data/test-write.txt");
AsynchronousFileChannel fileChannel =
AsynchronousFileChannel.open(path, StandardOpenOption.WRITE);

ByteBuffer buffer = ByteBuffer.allocate(1024);
long position = 0;

buffer.put("test data".getBytes());
buffer.flip();

Future<Integer> operation = fileChannel.write(buffer, position);
buffer.clear();

while(!operation.isDone());

System.out.println("Write done");

注意,写入目标文件要提前创建好,如果它不存在的话,write()方法会抛出一个 NoSuchFileException

使用CompletionHandler写入数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
Path path = Paths.get("data/test-write.txt");
if(!Files.exists(path)){
Files.createFile(path);
}
AsynchronousFileChannel fileChannel =
AsynchronousFileChannel.open(path, StandardOpenOption.WRITE);

ByteBuffer buffer = ByteBuffer.allocate(1024);
long position = 0;

buffer.put("test data".getBytes());
buffer.flip();

fileChannel.write(buffer, position, buffer, new CompletionHandler<Integer, ByteBuffer>() {

@Override
public void completed(Integer result, ByteBuffer attachment) {
System.out.println("bytes written: " + result);
}

@Override
public void failed(Throwable exc, ByteBuffer attachment) {
System.out.println("Write failed");
exc.printStackTrace();
}
});

当写入程序完成时,CompletionHandler的completed()方法将会被调用,相反的如果写入失败则会调用failed()方法。

参考资料