Java NIO 通信基础
简介
-
JAVA NIO 三个核心组件:
- channel
- buffer
- selector
-
NIO 和 OIO 的区别:
- OIO 面向流,NIO 面向缓冲区,面向缓冲区后,读取和写入,只需要从通道中读取数据到缓冲区中,可以任意读取buffer中任意位置的数据。
- OIO的操作是阻塞的,而NIO 的操作是非阻塞的。
- NIO 有选择器的概念
-
Channel
- 一个通道类似于OIO 中Input Stream 和 OutputStream 的结合体,既可以从通道中读取,也可以向通道中写入。
-
Selector 选择器
- 与OIO 相比,使用选择器的最大的优势:系统开销小,系统不必为每个网络连接创建进程/线程,从而大大减小了系统的开销。
-
Buffer
- 通道的读取就是将数据从通道中读取到缓冲区中;通道的写入,就是讲数据从缓冲区写入到同道中。
Buffer(缓冲)类
buffer 四个重要属性说明
属性 | 说明 |
---|---|
capacity | 容量,即可以容纳的最大数据量;在缓冲区创建并且不能改变 |
limit | 上限,缓冲区中当前的数量量 |
position | 位置,缓冲区中下一个要被读取或写的索引 |
mark | 暂存属性,暂时报存position 的值,方便后面重复使用position 的值 |
allocate() 创建缓冲区
public static void main(String[] args) {
intBuffer = IntBuffer.allocate(20);
System.out.println("---after allocate ----");
System.out.println("position =" + intBuffer.position());
// 写模型下,limit 表示可以写入的数据上限
System.out.println("limit =" + intBuffer.limit());
System.out.println("capacity =" + intBuffer.capacity());
}
---after allocate ----
position =0
limit =20
capacity =20
put() 写入到缓冲区
for (int i = 0; i < 5; i++) {
intBuffer.put(i);
}
System.out.println("---after allocate ----");
System.out.println("position =" + intBuffer.position());
// 写模型下,limit 表示可以写入的数据上限
System.out.println("limit =" + intBuffer.limit());
System.out.println("capacity =" + intBuffer.capacity());
---after allocate ----
position =5
limit =20
capacity =20
- 索引值变成了5,即第六个位置
flip() 翻转
intBuffer = IntBuffer.allocate(20);
for (int i = 0; i < 5; i++) {
intBuffer.put(i);
}
// 切换到读模式
// public final Buffer flip() {
// limit = position;
// position = 0;
// mark = -1;
// return this;
// }
intBuffer.flip();
System.out.println("---after allocate ----");
// 读的位置变成了0
System.out.println("position =" + intBuffer.position());
// 读模型下,limit 可读上限值
System.out.println("limit =" + intBuffer.limit());
System.out.println("capacity =" + intBuffer.capacity());
---after allocate ----
position =0
limit =5
capacity =20
get() 从缓冲区读取
intBuffer = IntBuffer.allocate(20);
for (int i = 0; i < 5; i++) {
intBuffer.put(i);
}
// 切换到读模式
intBuffer.flip();
for(int i=0; i<2; i++) {
int j = intBuffer.get();
System.out.println("j =" +j);
}
System.out.println("---after allocate ----");
// 读的位置变成了0
System.out.println("position =" + intBuffer.position());
// 读模型下,limit 可读上限值
System.out.println("limit =" + intBuffer.limit());
System.out.println("capacity =" + intBuffer.capacity());
for(int i=0; i<4; i++) {
int j = intBuffer.get();
System.out.println("j =" +j);
}
System.out.println("---after allocate ----");
// 读的位置变成了0
System.out.println("position =" + intBuffer.position());
// 读模型下,limit 可读上限值
System.out.println("limit =" + intBuffer.limit());
System.out.println("capacity =" + intBuffer.capacity());
j =0
j =1
---after allocate ----
position =2
limit =5
capacity =20
j =2
j =3
j =4
// position 超过limit的值会报错
Exception in thread "main" java.nio.BufferUnderflowException
at java.nio.Buffer.nextGetIndex(Buffer.java:500)
at java.nio.HeapIntBuffer.get(HeapIntBuffer.java:135)
at UseBuffer.main(UseBuffer.java:33)
- 如果想从读模式下,切换为写模式,必须调用buffer.clear() 和 buffer.compact(),清空或压缩缓冲区
rewind() 倒带
intBuffer.rewind();
System.out.println("---after allocate ---- read two");
// 读的位置变成了0
System.out.println("position =" + intBuffer.position());
// 读模型下,limit 可读上限值
System.out.println("limit =" + intBuffer.limit());
System.out.println("capacity =" + intBuffer.capacity());
for(int i=0; i<3; i++) {
int j = intBuffer.get();
System.out.println("j =" +j);
}
---after allocate ---- read two
position =0
limit =5
capacity =20
- position 重置为0
- limit 属性保持不变,数据量还是保持一致
- mark 被清理
Channel(通道) 类
- FIleChannel 文件通道,用户文件的读写
- 慢方法
fis = new FileInputStream(srcFile);
fos = new FileOutputStream(destFile);
inChannel = fis.getChannel();
outchannel = fos.getChannel();
int length = -1;
ByteBuffer buf = ByteBuffer.allocate(1024);
//从输入通道读取到buf,buffer 默认是写模式
while ((length = inChannel.read(buf)) != -1) {
//翻转buf,变成成读模式
buf.flip();
int outlength = 0;
//将buf写入到输出的通道
while ((outlength = outchannel.write(buf)) != 0) {
System.out.println("写入字节数:" + outlength);
}
//清除buf,变成写入模式
buf.clear();
}
//强制刷新磁盘
outchannel.force(true);
- 快方法
fis = new FileInputStream(srcFile);
fos = new FileOutputStream(destFile);
inChannel = fis.getChannel();
outChannel = fos.getChannel();
long size = inChannel.size();
long pos = 0;
long count = 0;
while (pos < size) {
//每次复制最多1024个字节,没有就复制剩余的
count = size - pos > 1024 ? 1024 : size - pos;
//复制内存,偏移量pos + count长度
pos += outChannel.transferFrom(inChannel, pos, count);
}
//强制刷新磁盘
outChannel.force(true);
- SocketChannel 用于socket套接字TCP 连接的数据读写
FileChannel fileChannel = new FileInputStream(file).getChannel();
SocketChannel socketChannel = SocketChannel.open();
socketChannel.socket().connect(
new InetSocketAddress(NioDemoConfig.SOCKET_SERVER_IP
, NioDemoConfig.SOCKET_SERVER_PORT));
socketChannel.configureBlocking(false);
Logger.debug("Cliect 成功连接服务端");
while (!socketChannel.finishConnect()) {
//因为非阻塞,所以客户端需要不断的自旋、等待,或者做一些其他的事情
}
//发送文件名称
ByteBuffer fileNameByteBuffer = charset.encode(destFile);
socketChannel.write(fileNameByteBuffer);
//发送文件长度
ByteBuffer buffer = ByteBuffer.allocate(NioDemoConfig.SEND_BUFFER_SIZE);
buffer.putLong(file.length());
buffer.flip();
socketChannel.write(buffer);
buffer.clear();
//发送文件内容
Logger.debug("开始传输文件");
int length = 0;
long progress = 0;
while ((length = fileChannel.read(buffer)) > 0) {
buffer.flip();
socketChannel.write(buffer);
buffer.clear();
progress += length;
Logger.debug("| " + (100 * progress / file.length()) + "% |");
}
if (length == -1) {
IOUtil.closeQuietly(fileChannel);
socketChannel.shutdownOutput();
IOUtil.closeQuietly(socketChannel);
}
Logger.debug("======== 文件传输成功 ========");
- ServerSocketChannel 允许我们监听TCP 连接请求,为每一个监听到请求,创建一个SocketChannel 套接字通道
- DatagreamChannel 数据报通道,用于UDP 协议的数据读写
DatagramChannel dChannel = DatagramChannel.open();
// 设置为非阻塞的
dChannel.configureBlocking(false);
ByteBuffer buffer = ByteBuffer.allocate(NioDemoConfig.SEND_BUFFER_SIZE);
Scanner scanner = new Scanner(System.in);
Print.tcfo("UDP 客户端启动成功!");
Print.tcfo("请输入发送内容:");
while (scanner.hasNext()) {
String next = scanner.next();
buffer.put((Dateutil.getNow() + " >>" + next).getBytes());
buffer.flip();
// 操作三:通过DatagramChannel数据报通道发送数据
dChannel.send(buffer,
new InetSocketAddress(NioDemoConfig.SOCKET_SERVER_IP
, NioDemoConfig.SOCKET_SERVER_PORT));
buffer.clear();
}
//操作四:关闭DatagramChannel数据报通道
dChannel.close();
NIO Selector
选择器的使用流程:
- 获取选择器实例
// 1、获取Selector选择器
Selector selector = Selector.open();
- 将通道注册到选择器实例
// 2、获取通道
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
// 3.设置为非阻塞
serverSocketChannel.configureBlocking(false);
// 4、绑定连接
serverSocketChannel.bind(new InetSocketAddress(NioDemoConfig.SOCKET_SERVER_PORT));
Logger.info("服务器启动成功");
// 5、将通道注册到选择器上,并注册的IO事件为:“接收新连接”
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
- 选出感兴趣的IO 就绪事件
// 6、轮询感兴趣的I/O就绪事件(选择键集合)
while (selector.select() > 0) {
// 7、获取选择键集合
Iterator<SelectionKey> selectedKeys = selector.selectedKeys().iterator();
while (selectedKeys.hasNext()) {
// 8、获取单个的选择键,并处理
SelectionKey selectedKey = selectedKeys.next();
// 9、判断key是具体的什么事件
if (selectedKey.isAcceptable()) {
// 10、若选择键的IO事件是“连接就绪”事件,就获取客户端连接
SocketChannel socketChannel = serverSocketChannel.accept();
// 11、切换为非阻塞模式
socketChannel.configureBlocking(false);
// 12、将该通道注册到selector选择器上
socketChannel.register(selector, SelectionKey.OP_READ);
} else if (selectedKey.isReadable()) {
// 13、若选择键的IO事件是“可读”事件,读取数据
SocketChannel socketChannel = (SocketChannel) selectedKey.channel();
// 14、读取数据
ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
int length = 0;
while ((length = socketChannel.read(byteBuffer)) >0) {
byteBuffer.flip();
Logger.info(new String(byteBuffer.array(), 0, length));
byteBuffer.clear();
}
socketChannel.close();
}
// 15、移除选择键
selectedKeys.remove();
}
}
// 7、关闭连接
serverSocketChannel.close();