Java NIO同步非阻塞编程原理解析及案例
阿里云国内75折 回扣 微信号:monov8 |
阿里云国际,腾讯云国际,低至75折。AWS 93折 免费开户实名账号 代冲值 优惠多多 微信号:monov8 飞机:@monov6 |
Java NIO同步非阻塞编程原理解析及案例
文章目录
NIO介绍
Java NIO 全称java non-blocking IO 是指 JDK 提供的新 API。从 JDK1.4 开始Java 提供了一系列改进的输入/输出的新特性被统称为 NIO(即 New IO)是同步非阻塞的.
- NIO 有三大核心部分Channel(通道)Buffer(缓冲区), Selector(选择器)
- NIO是 面向缓冲区编程的。数据读取到一个缓冲区中需要时可在缓冲区中前后移动这就增加了处理过程中的灵活性使用它可以提供非阻塞式的高伸缩性网络
- Java NIO 的非阻塞模式使一个线程从某通道发送请求或者读取数据但是它仅能得到目前可用的数据如果目前没有数据可用时就什么都不会获取而不是保持线程阻塞所以直至数据变的可以读取之前该线程可以继续做其他的事情。 非阻塞写也是如此一个线程请求写入一些数据到某通道但不需要等待它完全写入 这个线程同时可以去做别的事情。通俗理解NIO 是可以做到用一个线程来处理多个操作的。假设有 10000 个请求过来,根据实际情况可以分配50 或者 100 个线程来处理。不像之前的阻塞 IO 那样非得分配 10000 个
NIO和 BIO的比较
- BIO 以流的方式处理数据,而 NIO 以缓冲区的方式处理数据,缓冲区 I/O 的效率比流 I/O 高很多
- BIO 是阻塞的NIO则是非阻塞的
- BIO 基于字节流和字符流进行操作而 NIO 基于 Channel(通道)和 Buffer(缓冲区)进行操作数据总是从通道读取到缓冲区中或者从缓冲区写入到通道中。Selector(选择器)用于监听多个通道的事件比如连接请求 数据到达等因此使用单个线程就可以监听多个客户端通道
NIO 三大核心原理示意图
一张图描述 NIO 的 Selector 、 Channel 和 Buffer 的关系
- 每个 channel 都会对应一个 Buffer
- Selector 对应一个线程 一个线程对应多个 channel(连接)
- 每个 channel 都注册到 Selector选择器上
- Selector不断轮询查看Channel上的事件, 事件是通道Channel非常重要的概念
- Selector 会根据不同的事件完成不同的处理操作
- Buffer 就是一个内存块 底层是有一个数组
- 数据的读取写入是通过 Buffer, 这个和 BIO不同, BIO 中要么是输入流或者是输出流, 不能双向但是NIO 的 Buffer 是可以读也可以写 , channel 是双向的.
缓冲区(Buffer)
基本介绍
缓冲区Buffer缓冲区本质上是一个可以读写数据的内存块可以理解成是一个数组该对象提供了一组方法可以更轻松地使用内存块缓冲区对象内置了一些机制能够跟踪和记录缓冲区的状态变化情况。Channel 提供从网络读取数据的渠道但是读取或写入的数据都必须经由 Buffer.
Buffer常用API介绍
Buffer 类及其子类
在 NIO 中Buffer是一个顶层父类它是一个抽象类, 类的层级关系图,常用的缓冲区分别对应byte,short, int, long,float,double,char 7种.
缓冲区对象创建
方法名 | 说明 |
---|---|
static ByteBuffer allocate(长度) | 创建byte类型的指定长度的缓冲区 |
static ByteBuffer wrap(byte[] array) | 创建一个有内容的byte类型缓冲区 |
示例代码
import java.nio.ByteBuffer;
public class CreateBufferDemo {
public static void main(String[] args) {
//1.创建一个指定长度的缓冲区, 以ByteBuffer为例
ByteBuffer byteBuffer = ByteBuffer.allocate(5);
for (int i = 0; i < 5; i++) {
System.out.println(byteBuffer.get());
}
//在此调用会报错--后续再读缓冲区时着重讲解
//System.out.println(byteBuffer.get());
//2.创建一个有内容的缓冲区
ByteBuffer wrap = ByteBuffer.wrap("buffer".getBytes());
for (int i = 0; i < 5; i++) {
System.out.println(wrap.get());
}
}
}
缓冲区对象添加数据
方法名 | 说明 |
---|---|
int position()/position(int newPosition) | 获得当前要操作的索引/修改当前要操作的索引位 置 |
int limit()/limit(int newLimit) | 最多能操作到哪个索引/修改最多能操作的索引位 置 |
int capacity() | 返回缓冲区的总长度 |
int remaining()/boolean hasRemaining() | 还有多少能操作索引个数/是否还有能操作 |
put(byte b)/put(byte[] src) | 添加一个字节/添加字节数组 |
图解:
示例代码:
import java.nio.ByteBuffer;
import java.util.Arrays;
public class PutBufferDemo {
public static void main(String[] args) {
//1.创建一个指定长度的缓冲区, 以ByteBuffer为例
ByteBuffer byteBuffer = ByteBuffer.allocate(10);
System.out.println("当前索引所在位置: " + byteBuffer.position());//0 获取当前索引所在位置
System.out.println("最多能操作到哪个索引: " + byteBuffer.limit());//10 最多能操作到哪个索引
System.out.println("返回缓冲区总长度: " + byteBuffer.capacity());//10 返回缓冲区总长度
System.out.println("还有多少个能操作: " + byteBuffer.remaining());//10 还有多少个能操作
//添加一个字节
byteBuffer.put((byte) 97);
System.out.println("添加字节后当前索引所在位置: " + byteBuffer.position());//1 获取当前索引所在位置
System.out.println("添加字节后最多能操作到哪个索引: " + byteBuffer.limit());//10 最多能操作到哪个索引
System.out.println("添加字节后最多能操作到哪个索引: " + byteBuffer.capacity());//10 返回缓冲区总长度
System.out.println("添加字节后最多能操作到哪个索引: " + byteBuffer.remaining());//9 还有多少个能操作
System.out.println(Arrays.toString(byteBuffer.array()));
//添加一个字节数组
byteBuffer.put("abc".getBytes());
System.out.println("添加字节数组后当前索引所在位置: " + byteBuffer.position());//4 获取当前索引所在位置
System.out.println("添加字节数组后最多能操作到哪个索引: " + byteBuffer.limit());//10 最多能操作到哪个索引
System.out.println("添加字节数组后最多能操作到哪个索引: " + byteBuffer.capacity());//10 返回缓冲区总长度
System.out.println("添加字节数组后最多能操作到哪个索引: " + byteBuffer.remaining());//6 还有多少个能操作
System.out.println(Arrays.toString(byteBuffer.array()));
//当添加超过缓冲区的长度时会报错
byteBuffer.put("012345".getBytes());
System.out.println("超过缓存区长度时当前索引所在位置: " + byteBuffer.position());//10 获取当前索引所在位置
System.out.println("超过缓存区长度时最多能操作到哪个索引: " + byteBuffer.limit());//10 最多能操作到哪个索引
System.out.println("超过缓存区长度时最多能操作到哪个索引: " + byteBuffer.capacity());//10 返回缓冲区总长度
System.out.println("超过缓存区长度时最多能操作到哪个索引: " + byteBuffer.remaining());//0 还有多少个能操作
System.out.println("超过缓存区长度时是否还能有操作的数组: " + byteBuffer.hasRemaining());// false 是否还能有操作的数组
System.out.println(Arrays.toString(byteBuffer.array()));
// 如果缓存区存满后, 可以调整position位置可以重复写,这样会覆盖之前存入索引的对应的值
byteBuffer.position(0);
byteBuffer.put("012345".getBytes());
System.out.println(Arrays.toString(byteBuffer.array()));
}
}
输出结果
当前索引所在位置: 0
最多能操作到哪个索引: 10
返回缓冲区总长度: 10
还有多少个能操作: 10
添加字节后当前索引所在位置: 1
添加字节后最多能操作到哪个索引: 10
添加字节后最多能操作到哪个索引: 10
添加字节后最多能操作到哪个索引: 9
[97, 0, 0, 0, 0, 0, 0, 0, 0, 0]
添加字节数组后当前索引所在位置: 4
添加字节数组后最多能操作到哪个索引: 10
添加字节数组后最多能操作到哪个索引: 10
添加字节数组后最多能操作到哪个索引: 6
[97, 97, 98, 99, 0, 0, 0, 0, 0, 0]
超过缓存区长度时当前索引所在位置: 10
超过缓存区长度时最多能操作到哪个索引: 10
超过缓存区长度时最多能操作到哪个索引: 10
超过缓存区长度时最多能操作到哪个索引: 0
超过缓存区长度时是否还能有操作的数组: false
[97, 97, 98, 99, 48, 49, 50, 51, 52, 53]
覆盖后的结果
[48, 49, 50, 51, 52, 53, 50, 51, 52, 53]
首先定义了一个长度为10的字节数组缓存区然后先写入一个字节97输出缓冲区的内容为[97, 0, 0, 0, 0, 0, 0, 0, 0, 0]。接下来写入一个字节数组此时缓冲区的内容为[97, 97, 98, 99, 0, 0, 0, 0, 0, 0]。接下来再写入一个字节数组此时会超出缓冲区的范围如果缓存区存满后, 可以调整position位置可以重复写,这样会覆盖之前存入索引的对应的值。
缓冲区对象读取数据
方法名 | 介绍 |
---|---|
flip() | 写切换读模式 limit设置position位置, position设置0 |
get() | 读一个字节 |
get(byte[] dst) | 读多个字节 |
get(int index) | 读指定索引的字节 |
rewind() | 将position设置为0可以重复读 |
clear() | 切换写模式 position设置为0 , limit 设置为 capacity |
array() | 将缓冲区转换成字节数组返回 |
图解:flip()方法
图解:clear()方法
实例代码:
import java.nio.ByteBuffer;
public class GetBufferDemo {
public static void main(String[] args) {
//1.创建一个指定长度的缓冲区
ByteBuffer allocate = ByteBuffer.allocate(10);
allocate.put("0123".getBytes());
System.out.println("缓冲区内容 " + Arrays.toString(allocate.array()));
System.out.println("position:" + allocate.position());//4
System.out.println("limit:" + allocate.limit());//10
System.out.println("capacity:" + allocate.capacity());//10
System.out.println("remaining:" + allocate.remaining());//6
//切换读模式
System.out.println("读取数据--------------");
allocate.flip();
System.out.println("position:" + allocate.position());//4
System.out.println("limit:" + allocate.limit());//10
System.out.println("capacity:" + allocate.capacity());//10
System.out.println("remaining:" + allocate.remaining());//6
for (int i = 0; i < allocate.limit(); i++) {
System.out.println(allocate.get());
}
//读取完毕后.继续读取会报错,超过limit值
//System.out.println(allocate.get());
//读取指定索引字节
System.out.println("读取指定索引字节--------------");
System.out.println(allocate.get(1));
System.out.println("读取多个字节--------------");
// 重复读取
allocate.rewind();
byte[] bytes = new byte[4];
allocate.get(bytes);
System.out.println(new String(bytes));
// 将缓冲区转化字节数组返回
System.out.println("将缓冲区转化字节数组返回--------------");
byte[] array = allocate.array();
System.out.println(new String(array));
// 切换写模式,覆盖之前索引所在位置的值
System.out.println("写模式--------------");
allocate.clear();
allocate.put("abc".getBytes());
System.out.println(new String(allocate.array()));
}
}
输出结果
缓冲区内容 [48, 49, 50, 51, 0, 0, 0, 0, 0, 0]
position:4
limit:10
capacity:10
remaining:6
读取数据--------------
position:0
limit:4
capacity:10
remaining:4
48
49
50
51
读取指定索引字节--------------
49
读取多个字节--------------
0123
将缓冲区转化字节数组返回--------------
0123
写模式--------------
abc3
注意事项:
- capacity容量长度limit 界限最多能读/写到哪里posotion位置读/写哪个索引
- 获取缓冲区里面数据之前需要调用flip方法
- 再次写数据之前需要调用clear方法但是数据还未消失等再次写入数据被覆盖了才会消失。
通道(Channel)
基本介绍
通常来说NIO中的所有IO都是从 Channel通道 开始的。NIO 的通道类似于流但有些区别如下
- 通道可以读也可以写流一般来说是单向的只能读或者写所以之前我们用流进行IO操作的时候需要分别创建一个输入流和一个输出流
- 通道可以异步读写
- 通道总是基于缓冲区Buffer来读写
Channel常用类介绍
Channel接口
常 用 的Channel实现类类 有 FileChannel , DatagramChannel ,ServerSocketChannel和SocketChannel 。FileChannel 用于文件的数据读写 DatagramChannel 用于 UDP 的数据读写 ServerSocketChannel 和SocketChannel 用于 TCP 的数据读写。【ServerSocketChanne类似 ServerSocket , SocketChannel 类似 Socket】
SocketChannel 与ServerSocketChannel
类似 Socket和ServerSocket,可以完成客户端与服务端数据的通信工作.
ServerSocketChannel
服务端实现步骤
- 打开一个服务端通道
- 绑定对应的端口号
- 通道默认是阻塞的需要设置为非阻塞
- 检查是否有客户端连接 有客户端连接会返回对应的通道
- 获取客户端传递过来的数据,并把数据放在byteBuffer这个缓冲区中
- 给客户端回写数据
- 释放资源
代码实现
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.nio.charset.StandardCharsets;
public class NIOServer {
public static void main(String[] args) throws IOException, InterruptedException {
//1. 打开一个服务端通道
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
//2. 绑定对应的端口号
serverSocketChannel.bind(new InetSocketAddress(9999));
//3. 通道默认是阻塞的需要设置为非阻塞
// true 为通道阻塞 false 为非阻塞
serverSocketChannel.configureBlocking(false);
System.out.println("服务端启动成功..........");
while (true) {
//4. 检查是否有客户端连接 有客户端连接会返回对应的通道 , 否则返回null
SocketChannel socketChannel = serverSocketChannel.accept();
if (socketChannel == null) {
System.out.println("没有客户端连接...我去做别的事情");
Thread.sleep(2000);
continue;
}
//5. 获取客户端传递过来的数据,并把数据放在byteBuffer这个缓冲区中
ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
//返回值:
//正数: 表示本次读到的有效字节个数.
//0 : 表示本次没有读到有效字节.
//-1 : 表示读到了末尾
int read = socketChannel.read(byteBuffer);
System.out.println("客户端消息:" + new String(byteBuffer.array(), 0, read, StandardCharsets.UTF_8));
//6. 给客户端回写数据
socketChannel.write(ByteBuffer.wrap("服务器响应客户端请求".getBytes(StandardCharsets.UTF_8)));
//7. 释放资源
socketChannel.close();
}
}
}
输出结果由于目前没有客户端连接所以会一直打印“没有客户端连接…我去做别的事情”
SocketChannel
实现步骤
- 打开通道
- 设置连接IP和端口号
- 写出数据
- 读取服务器写回的数据
- 释放资源
代码实现
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
import java.nio.charset.StandardCharsets;
public class NIOClient {
public static void main(String[] args) throws IOException {
//1.打开通道
SocketChannel socketChannel = SocketChannel.open();
//2.设置连接IP和端口号
socketChannel.connect(new InetSocketAddress("127.0.0.1", 9999));
//3.写出数据
socketChannel.write(ByteBuffer.wrap("客户端1请求服务".getBytes(StandardCharsets.UTF_8)));
//4.读取服务器写回的数据
ByteBuffer readBuffer = ByteBuffer.allocate(1024);
int read=socketChannel.read(readBuffer);
System.out.println("服务端消息:" + new String(readBuffer.array(), 0, read, StandardCharsets.UTF_8));
//5.释放资源
socketChannel.close();
}
}
输出结果
服务端消息:服务器响应客户端请求
从上面代码可以看出ServerSocketChannel 了一个服务端通道SocketChannel 实现了一个客户端通道类似于ServerSocket和Socket间的通信方式。只是这里加入了Channel通道的概念用于读写数据过程中数据的缓冲。
Selector (选择器)
基本介绍
可以用一个线程处理多个的客户端连接就会使用到NIO的Selector(选择器). Selector 能够检测多个注册的服务端通道上是否有事件发生如果有事件发生便获取事件然后针对每个事件进行相应的处理。这样就可以只用一个单线程去管理多个通道也就是管理多个连接和请求。
在这种没有选择器的情况下,对应每个连接对应一个处理线程. 但是连接并不能马上就会发送信息,所以线程往往会同步阻塞在这里等待消息的到来因此会产生资源浪费。
只有在通道真正有读写事件发生时才会进行读写就大大地减少了系统开销并且不必为每个连接都创建一个线程不用去维护多个线程, 避免了多线程之间的上下文切换导致的开销
常用API介绍
Selector 类是一个抽象类
常用方法
- Selector.open() : //得到一个选择器对象
- selector.select() : //阻塞 监控所有注册的通道,当有对应的事件操作时, 会将SelectionKey放入集合内部并返回事件数量
- selector.select(1000): //阻塞 1000 毫秒监控所有注册的通道,当有对应的事件操作时, 会将SelectionKey放入集合内部并返回
- selector.selectedKeys() : // 返回存有SelectionKey的集合
SelectionKey
常用方法
- SelectionKey.isAcceptable(): 是否是连接继续事件
- SelectionKey.isConnectable(): 是否是连接就绪事件
- SelectionKey.isReadable(): 是否是读就绪事件
- SelectionKey.isWritable(): 是否是写就绪事件
SelectionKey中定义的4种事件
- SelectionKey.OP_ACCEPT —— 接收连接继续事件表示服务器监听到了客户连接服务器可以接收这个连接了
- SelectionKey.OP_CONNECT —— 连接就绪事件表示客户端与服务器的连接已经建立成功
- SelectionKey.OP_READ —— 读就绪事件表示通道中已经有了可读的数据可以执行读操作了通道目前有数据可以进行读操作了
- SelectionKey.OP_WRITE —— 写就绪事件表示已经可以向通道写数据了通道目前可以用于写操作
Selector 编码
服务端实现步骤
- 打开一个服务端通道
- 绑定对应的端口号
- 通道默认是阻塞的需要设置为非阻塞
- 创建选择器
- 将服务端通道注册到选择器上,并指定注册监听的事件为OP_ACCEPT
- 检查选择器是否有事件
- 获取事件集合
- 判断事件是否是客户端连接事件SelectionKey.isAcceptable()
- 得到客户端通道,并将通道注册到选择器上, 并指定监听事件为OP_READ
- 判断是否是客户端读就绪事件SelectionKey.isReadable()
- 得到客户端通道,读取数据到缓冲区
- 给客户端回写数据
- 从集合中删除对应的事件, 因为防止二次处理
代码实现
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.nio.charset.StandardCharsets;
import java.util.Iterator;
import java.util.Set;
public class NIOSelectorServer {
public static void main(String[] args) throws IOException,InterruptedException {
//1. 打开一个服务端通道
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
//2. 绑定对应的端口号
serverSocketChannel.bind(new InetSocketAddress(9999));
//3. 通道默认是阻塞的需要设置为非阻塞
serverSocketChannel.configureBlocking(false);
//4. 创建选择器
Selector selector = Selector.open();
//5. 将服务端通道注册到选择器上,并指定注册监听的事件为OP_ACCEPT
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
System.out.println("服务端启动成功...");
while (true) {
//6. 检查选择器是否有事件
int select = selector.select(2000);
if (select == 0) {
continue;
}
//7. 获取事件集合
Set<SelectionKey> selectionKeys = selector.selectedKeys();
Iterator<SelectionKey> iterator = selectionKeys.iterator();
while (iterator.hasNext()) {
//8. 判断事件是否是客户端连接事件SelectionKey.isAcceptable()
SelectionKey key = iterator.next();
//9. 得到客户端通道,并将通道注册到选择器上, 并指定监听事件为OP_READ
if (key.isAcceptable()) {
SocketChannel socketChannel = serverSocketChannel.accept();
System.out.println("客户端已连接......" + socketChannel);
//必须设置通道为非阻塞, 因为selector需要轮询监听每个通道的事件
socketChannel.configureBlocking(false);
//并指定监听事件为OP_READ
socketChannel.register(selector, SelectionKey.OP_READ);
}
//10. 判断是否是客户端读就绪事件SelectionKey.isReadable()
if (key.isReadable()) {
//11.得到客户端通道,读取数据到缓冲区
SocketChannel socketChannel = (SocketChannel) key.channel();
ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
int read = socketChannel.read(byteBuffer);
if (read > 0) {
System.out.println("客户端消息:" + new String(byteBuffer.array(), 0, read, StandardCharsets.UTF_8));
//12.给客户端回写数据
socketChannel.write(ByteBuffer.wrap("服务器响应".getBytes(StandardCharsets.UTF_8)));
socketChannel.close();
}
}
//13.从集合中删除对应的事件, 因为防止二次处理.
iterator.remove();
}
}
}
}
定义客户端
public class NIOClient {
public static void main(String[] args) throws IOException {
//1.打开通道
SocketChannel socketChannel = SocketChannel.open();
//2.设置连接IP和端口号
socketChannel.connect(new InetSocketAddress("127.0.0.1", 9999));
//3.写出数据
socketChannel.write(ByteBuffer.wrap("客户端请求服务".getBytes(StandardCharsets.UTF_8)));
//4.读取服务器写回的数据
ByteBuffer readBuffer = ByteBuffer.allocate(1024);
int read=socketChannel.read(readBuffer);
System.out.println("服务端消息:" + new String(readBuffer.array(), 0, read, StandardCharsets.UTF_8));
//5.释放资源
socketChannel.close();
}
}
输出结果
客户端
服务端消息:服务器响应
服务端
服务端启动成功...
客户端已连接......java.nio.channels.SocketChannel[connected local=/127.0.0.1:9999 remote=/127.0.0.1:56699]
客户端消息:客户端请求服务