内容来自:java nio线程精讲

NIO简介

NIO与原来的IO有同样的作用和目的,但是使用的方式完全不同,NIO支持面向缓冲区的、基于通道的IO操作。NIO将以更加高效的方式进行文件读写操作

IO和NIO的区别
IO NIO
面向流 面向缓冲区
阻塞IO 非阻塞IO
选择器

NIO的核心在于:通道(channel)和缓冲区(buffer)。通道表示打开到IO设备的连接。如需要使用NIO系统,需要获取用于IO设备的通道以及用于容纳数据的缓冲区。然后操作缓冲区,对数据进行处理。

缓冲区
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
/**
* 一、缓冲区(buffer):在java NIO中负责数据的存取。缓冲区就是数组。用于存储不同类型的数据
*
* 根据数据类型不同(除了boolean),提供了相应的缓冲区
* ByteBuffer
* CharBuffer
* ShortBuffer
* IntBuffer
* LongBuffer
* FloatBuffer
* DoubleBuffer
*
* 都是通过allocate()获取缓冲区
*
* 二、存取的两个核心方法
* put():存数据
* get():获取
*
* 三、缓冲区中的四个核心属性
* capacity:缓冲区最大容量,一旦声明不能改变。
* limit:界限,可以操作数据的大小,limit后的数据不能进行读写
* position:表示正在操作数据的位置
* mark:标记当前position的位置,可以通过reset()恢复到mark的位置
*
* 0<=mark<=position<=limit<=capacity
*
* 四、直接缓冲区与非直接缓冲区
* 非直接缓冲区:通过allocate()方法分配缓冲区,将缓冲区建立在JVM内存中
* 直接缓冲区:通过allocateDirect()方法分配缓冲区,建立在物理内存中,可以提高效率(资源消耗更高,不宜控制)
*
*/
public class TestBuffer {
@Test
public void test1() {
String str = "abcd";
//分配一个指定大小的缓冲区
ByteBuffer buffer = ByteBuffer.allocate(1024);

System.out.println(buffer.position());
System.out.println(buffer.limit());
System.out.println(buffer.capacity());

System.out.println("-----------------------");
//存入数据
buffer.put(str.getBytes());

System.out.println(buffer.position());
System.out.println(buffer.limit());
System.out.println(buffer.capacity());
System.out.println("-----------------------");

//切换读取模式
buffer.flip();
System.out.println(buffer.position());
System.out.println(buffer.limit());
System.out.println(buffer.capacity());
System.out.println("-----------------------");

//读取
byte[] rs = new byte[buffer.limit()];
buffer.get(rs);
System.out.println(new String(rs));

System.out.println(buffer.position());
System.out.println(buffer.limit());
System.out.println(buffer.capacity());
System.out.println("-----------------------");

//rewind():可重复读数据
buffer.rewind();
System.out.println(buffer.position());
System.out.println(buffer.limit());
System.out.println(buffer.capacity());
System.out.println("-----------------------");

//清空缓冲区,但是缓冲区数据还在,但是指针位置初始化
buffer.clear();
System.out.println(buffer.position());
System.out.println(buffer.limit());
System.out.println(buffer.capacity());
System.out.println("-----------------------");
}

@Test
public void test2() {
String str = "abcd";
ByteBuffer buffer = ByteBuffer.allocate(1024);
buffer.put(str.getBytes());
buffer.flip();
byte[] bytes = new byte[buffer.limit()];
buffer.get(bytes, 0, 2);
System.out.println(new String(bytes));
System.out.println(buffer.position());
System.out.println(buffer.limit());
System.out.println(buffer.capacity());
System.out.println("-----------------------");

//标记
buffer.mark();

buffer.get(bytes, 2, 2);
System.out.println(new String(bytes,2,2));
System.out.println(buffer.position());
System.out.println(buffer.limit());
System.out.println(buffer.capacity());
System.out.println("-----------------------");

buffer.reset();
System.out.println(buffer.position());
System.out.println(buffer.limit());
System.out.println(buffer.capacity());
System.out.println("-----------------------");

//判断缓冲区中是否还有剩余的数据
if (buffer.hasRemaining()) {

//获取缓冲区中可以操作的数量
System.out.println(buffer.remaining());
}
}
}
通道(Channel)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
/**
* 一、通道(Channel):用于源节点与目标节点的连接。在java NIO中负责缓冲区中数据的传输。Channel本身不存储数据,因此需要配合缓冲区传输
* <p>
* 二、主要实现类
* java.nio.channel.Channel接口:
* -FileChannel
* -SocketChannel
* -ServerSocketChannel
* -DatagramChannel
* <p>
* 三、获取通道
* 1.getChannel()方法
* 2.jdk1.7中方的NIO2正对各个通道提供了open()方法
* 3.jdk1.7中方的NIO2的Files工具类的newByteChannel()
* <p>
* 四、通道之间的数据传输
* transferFrom
* transferTo
* <p>
* 五、分散(scatter)于聚集(gather)
* 分散读取:将通道中的数据分散到多个缓冲区中
* 聚集写入:将多个缓冲区数据聚集到通道中
* <p>
* 六、字符集:Charset
* 编码:字符串->直接数组
* 解码:字符数组->字符串
*/
public class TestChannel {

/**
* 利用通道完成文件的复制(非直接缓冲区)
*/
@Test
public void test1() {
FileInputStream inputStream = null;
FileOutputStream outputStream = null;

FileChannel inChannel = null;
FileChannel outChannel = null;

try {
inputStream = new FileInputStream("D:\\IdeaProject\\Nio\\src\\main\\resources\\1.png");
outputStream = new FileOutputStream("D:\\IdeaProject\\Nio\\src\\main\\resources\\2.png");

//获取通道
inChannel = inputStream.getChannel();
outChannel = outputStream.getChannel();

//分配缓冲区
ByteBuffer buffer = ByteBuffer.allocate(1024);

//通道中的数据存入缓冲区
while (inChannel.read(buffer) != -1) {
//切换读取模式
buffer.flip();
//将通道中的数据写入通道中
outChannel.write(buffer);
buffer.clear();
}
} catch (IOException e) {
e.printStackTrace();
} finally {
if (outChannel != null) {
try {
outChannel.close();
} catch (IOException e) {
e.printStackTrace();
}
}
if (inChannel != null) {
try {
inChannel.close();
} catch (IOException e) {
e.printStackTrace();
}
}
if (inputStream != null) {
try {
inputStream.close();
} catch (IOException e) {
e.printStackTrace();
}
}
if (outputStream != null) {
try {
outputStream.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}

/**
* 直接缓冲区完成复制(内存映射文件)
*/
@Test
public void test2() {
FileChannel inChannel = null;
FileChannel outChannel = null;
try {
inChannel = FileChannel.open(Paths.get("D:\\IdeaProject\\Nio\\src\\main\\resources\\1.png"), StandardOpenOption.READ);
outChannel = FileChannel.open(Paths.get("D:\\IdeaProject\\Nio\\src\\main\\resources\\2.png"), StandardOpenOption.WRITE, StandardOpenOption.READ, StandardOpenOption.CREATE);

//内存映射文件
MappedByteBuffer inBuffer = inChannel.map(FileChannel.MapMode.READ_ONLY, 0, inChannel.size());
MappedByteBuffer outBuffer = outChannel.map(FileChannel.MapMode.READ_WRITE, 0, inChannel.size());

//直接对缓冲区进行数据读写操作
byte[] rs = new byte[inBuffer.limit()];
inBuffer.get(rs);
outBuffer.put(rs);
} catch (IOException e) {
e.printStackTrace();
} finally {
if (inChannel != null) {
try {
inChannel.close();
} catch (IOException e) {
e.printStackTrace();
}
}
if (outChannel != null) {
try {
outChannel.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}

/**
* 通道之间的数据传输(直接缓冲区)
*/
@Test
public void test3() throws IOException {
FileChannel inChannel = FileChannel.open(Paths.get("D:\\IdeaProject\\Nio\\src\\main\\resources\\1.png"), StandardOpenOption.READ);
FileChannel outChannel = FileChannel.open(Paths.get("D:\\IdeaProject\\Nio\\src\\main\\resources\\2.png"), StandardOpenOption.WRITE, StandardOpenOption.READ, StandardOpenOption.CREATE);

inChannel.transferTo(0, inChannel.size(), outChannel);
inChannel.close();
outChannel.close();
}

@Test
public void test4() throws IOException {
RandomAccessFile raf1 = new RandomAccessFile("D:\\IdeaProject\\Nio\\src\\main\\resources\\1.txt", "rw");

//获取通道
FileChannel channel = raf1.getChannel();

//分配缓冲区
ByteBuffer buf1 = ByteBuffer.allocate(1);
ByteBuffer buf2 = ByteBuffer.allocate(2);

//分散读取
ByteBuffer[] bufs = {buf1, buf2};
channel.read(bufs);

for (ByteBuffer byteBuffer : bufs) {
byteBuffer.flip();
}

System.out.println(new String(bufs[0].array(), 0, bufs[0].limit()));
System.out.println("------------------");
System.out.println(new String(bufs[1].array(), 0, bufs[1].limit()));


//聚集写入
RandomAccessFile raf2 = new RandomAccessFile("D:\\IdeaProject\\Nio\\src\\main\\resources\\2.txt", "rw");
FileChannel wChannel = raf2.getChannel();
wChannel.write(bufs);
}

/**
* 字符集
*/
@Test
public void test5() {
SortedMap<String, Charset> map = Charset.availableCharsets();
Set<Map.Entry<String, Charset>> set = map.entrySet();

for (Map.Entry<String, Charset> entry : set) {
System.out.println(entry.getKey() + "=" + entry.getValue());
}
}

@Test
public void test6() throws CharacterCodingException {
Charset gbk = Charset.forName("GBK");

//获取编码器
CharsetEncoder encoder = gbk.newEncoder();

//获取解码器
CharsetDecoder decoder = gbk.newDecoder();

CharBuffer charBuffer = CharBuffer.allocate(1024);
charBuffer.put("测试!");
charBuffer.flip();

//编码
ByteBuffer eBuf = encoder.encode(charBuffer);
for (int i = 0; i < 5; i++) {
System.out.println(eBuf.get());
}

//解码
eBuf.flip();
CharBuffer cbuf = decoder.decode(eBuf);
System.out.println(cbuf.toString());
}
}

阻塞式NIO

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
@Test
public void client() throws IOException {
//获取通道
SocketChannel channel = SocketChannel.open(new InetSocketAddress("127.0.0.1", 9999));
FileChannel inChannel = FileChannel.open(Paths.get("D:\\IdeaProject\\Nio\\src\\main\\resources\\1.png"), StandardOpenOption.READ);

//分配缓冲区
ByteBuffer buffer = ByteBuffer.allocate(1024);

//读取本地文件,发送给服务端
while (inChannel.read(buffer) != -1) {
buffer.flip();
channel.write(buffer);
buffer.clear();
}

channel.shutdownOutput();

//接受服务端的反馈
int len = 0;
while ((len = channel.read(buffer)) != -1) {
buffer.flip();
System.out.println(new String(buffer.array(), 0, len));
buffer.clear();
}

inChannel.close();
channel.close();
}

@Test
public void server() throws IOException {
//获取通道
ServerSocketChannel channel = ServerSocketChannel.open();
FileChannel outChannel = FileChannel.open(Paths.get("D:\\IdeaProject\\Nio\\src\\main\\resources\\2.png"), StandardOpenOption.WRITE, StandardOpenOption.CREATE);

//绑定连接
channel.bind(new InetSocketAddress(9999));

//获取客户端通道
SocketChannel acceptChannel = channel.accept();

//分配缓冲区
ByteBuffer buffer = ByteBuffer.allocate(1024);

//接受客户端数据,保存到本地
while (acceptChannel.read(buffer) != -1) {
buffer.flip();
outChannel.write(buffer);
buffer.clear();
}

//发送反馈
buffer.put("接受成功".getBytes());
buffer.flip();
acceptChannel.write(buffer);

acceptChannel.close();
outChannel.close();
channel.close();
}

非阻塞式NIO

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
//客户端
@Test
public void client() throws IOException {
//获取通道
SocketChannel channel = SocketChannel.open(new InetSocketAddress("127.0.0.1", 9999));

//切换为非阻塞模式
channel.configureBlocking(false);

//分配指定大小的缓冲区
ByteBuffer buffer = ByteBuffer.allocate(1024);

//发送数据给服务端
Scanner scan = new Scanner(System.in);

while (scan.hasNext()) {
String str = scan.next();
buffer.put((new Date().toString() + "\n" + str).getBytes());
buffer.flip();
channel.write(buffer);
buffer.clear();
}

//关闭通道
channel.close();
}

//服务端
@Test
public void server() throws IOException {
//获取通道
ServerSocketChannel serverChannel = ServerSocketChannel.open();

//切换非阻塞模式
serverChannel.configureBlocking(false);

//绑定链接
serverChannel.bind(new InetSocketAddress(9999));

//获取到选择器
Selector selector = Selector.open();

//将通道注册到选择器上,并指定监听事件
serverChannel.register(selector, SelectionKey.OP_ACCEPT);

//轮询式的获取选择器上已经准备就绪的事件
while (selector.select() > 0) {

//获取当前选择器中所有注册的已就绪的监听事件
Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();

while (iterator.hasNext()) {
//获取装备就绪的事件
SelectionKey key = iterator.next();

//判断具体是什么事件
if (key.isAcceptable()) {
//若接收就绪就获取客户端连接
SocketChannel aChannel = serverChannel.accept();

//切换非阻塞模式
aChannel.configureBlocking(false);

//将通道注册到选择器上
aChannel.register(selector, SelectionKey.OP_READ);
} else if (key.isReadable()) {
//若读就绪就获取客户端连接
SocketChannel sChannel = (SocketChannel) key.channel();

//读取数据
ByteBuffer buffer = ByteBuffer.allocate(1024);
int len = 0;
while ((len = sChannel.read(buffer)) > 0) {
buffer.flip();
System.out.println(new String(buffer.array(), 0, len));
buffer.clear();
}
}
//取消选择键
iterator.remove();
}
}
}

非阻塞式NIO(UDP)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
@Test
public void send() throws IOException {
DatagramChannel channel = DatagramChannel.open();
channel.configureBlocking(false);

ByteBuffer buffer = ByteBuffer.allocate(1024);

Scanner scanner = new Scanner(System.in);
while (scanner.hasNext()) {
String str = scanner.next();
buffer.put((new Date().toString() + "\n" + str).getBytes());
buffer.flip();
channel.send(buffer, new InetSocketAddress("127.0.0.1", 9999));
buffer.clear();
}

channel.close();
}

@Test
public void receive() throws IOException {
DatagramChannel channel = DatagramChannel.open();
channel.configureBlocking(false);

channel.bind(new InetSocketAddress(9999));

Selector selector = Selector.open();
channel.register(selector, SelectionKey.OP_READ);

while (selector.select() > 0) {
Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
while (iterator.hasNext()) {
SelectionKey sk = iterator.next();

if (sk.isReadable()) {
ByteBuffer buffer = ByteBuffer.allocate(1024);
channel.receive(buffer);
buffer.flip();
System.out.println(new String(buffer.array(), 0, buffer.limit()));
}
}

iterator.remove();
}
}

管道

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
@Test
public void test1() throws IOException {
//获取管道
Pipe pipe = Pipe.open();

ByteBuffer buffer = ByteBuffer.allocate(1024);

//将缓冲区数据写入管道
Pipe.SinkChannel sinkChannel = pipe.sink();

buffer.put("数据".getBytes());
buffer.flip();
sinkChannel.write(buffer);

//读取
Pipe.SourceChannel sourceChannel = pipe.source();
buffer.flip();
int len = sourceChannel.read(buffer);
System.out.println(new String(buffer.array(), 0, len));

sourceChannel.close();
sinkChannel.close();
}