谈一谈Java的网络编程

昨天在坐车回家,打卡了一道力扣,完事之后闲的无聊,看到自己之前收藏了Akka,依稀记得这是一个比线程小的执行单元(当时是这么理解的),加上之前学Golang,看到了那种无脑开Goroutine处理请求的方式,自己又刚好苦于WebFlux+Reactor写出来的那一堆狗屎代码。于是想:为什么Java没有这种东西,可以无脑开轻量级线程处理?

后来看到Java在推进一个项目,就是为了实现这个功能,名字我忘了。但既然是实现中,那就是目前不可用的,但是看到了Golang处理方式和Java的NIO的对比,发现二者本质都是让原本一个线程处理一件事,变成一个线程处理多个事。Golang的协程我们撇开不说,实现在于Golang自定义的调度器;但是Java的NIO我们可以谈一谈。

我对于NIO似乎也就仅限于使用了,写一写NIO的单线程,多线程,主从多线程的Echo Server。用用Netty写写HelloWorld之类的。一直没有深究一些实现,想起来挂在宿舍床上的“治学严谨”的牌子(这牌子也有来历的),未免心生惭愧,于是利用坐车时间好好Google了一番,加上自己的理解,决定写下这篇文章。

何为I/O?

既然是NIO/BIO/AIO,我们就必须先要搞懂I/O是什么。我之前以为是单纯的读写操作,后来发现没有这么简单。

I/O单按照字面翻译来说的话,是输入/输出。比如从磁盘读/向磁盘写,从网卡接收数据/向网卡写入数据,数据库查询/增删改数据库。但凡涉及到磁盘和网络的操作,我们统称为I/O操作。我先给出一个这么笼统的定义。

何为阻塞?

在了解阻塞之前,我们先了解各个设备的速度

CPU:1ns,寄存器:1ns,高速缓存 10ns,内存:10us,磁盘10ms,网络:100ms

其中:1s=1000ms,1ms=1000us,1us=1000ns,1ns=1000ps。

I/O是操作磁盘或网络的,而I/O操作与内存操作的速度差了1000倍不止,与CPU差了10^6不止,所以在CPU眼中,任何I/O操作都是很漫长的。

而所谓的阻塞指的是CPU等待I/O操作完成的过程。当某个线程想要读取来自网络的数据,需要先等待网卡准备就绪,然后数据传输,然后由内核把数据拷贝到用户内存,这一系列。写入需要等到网卡准备就绪,且前面排队的写请求全部完成,然后数据传输完成,这是一个很漫长的过程。

网络编程时说的阻塞指的就是这个漫长的网络过程,CPU啥也不干,干等数据到来,干等数据被写出。程序就被停在这里了,也不会往下面执行。

我们来看一张图好了。

image.png

蓝色部分即为阻塞,网络操作时的阻塞过程。

BIO/NIO/AIO

写过Socket通信,不论你是什么语言,基本都是这样的流程:

新建一个ServerSocket=>设置监听地址=>accept一个连接并返回一个Socket=>继续监听,新的线程处理刚刚返回的Socket。

这没什么不好的,一个很普通的Socket/ServerSocket服务器是吧!早期的Tomcat就是这样的。

现在我们来看看这整个过程中涉及到线程的部分,就是新建线程处理Socket的部分,为什么要这么做呢?

因为每一个Socket的网络的读/写都是一个耗时的过程,如果我们不开辟新的线程,就会阻塞后面的连接,这样整个系统的连接数,吞吐量就会大幅下降。

每个连接一个线程,似乎是一种不错的方案,也很好地解决了无法多连接的问题。但是每个连接一个线程,会不会在系统连接数很高的情况下崩溃呢?毕竟Java线程就是操作系统线程,而且Linux下一个线程接近于一个进程,创建销毁调度的开销一点都不小。即使我们有线程池这种东西,那也毕竟是有限的,况且线程池维护也是一种负担,有没有一种解决方案呢?

到目前为止BIO结束了,接下来就是引入NIO/AIO的时候了,这里需要说明一下,NIO指的是非阻塞IO,即每次读操作时不像BIO那样等待有数据可读,而是直接返回,返回值代表可读数据大小,为-1表示不可读,所以NIO的非阻塞是在这里实现的,NIO仅仅是read操作立刻返回,而不会等待读到了数据再返回。写操作同理

聪明的读者肯定立马意识到,那你这也没啥改变啊,毕竟你想读数据就得不停轮询返回值,看是否可读,这样还会造成CPU空转,还不如BIO呢!事实确实如此。所以NIO仅仅不阻塞程序,但是并不能加快总的时间。

所以现在说NIO一般是NIO+I/O多路复用产生的NIO,AIO同理。请读者知悉,下文的NIO都是NIO+I/O多路复用技术的处理方式。

好了,来看看I/O多路复用。

到目前为止我们知道Socket处理之所以这么慢,是因为网络读/写太慢了(这里我们暂时忽略业务逻辑中的耗时操作,比如查询数据库等),线程在干等,所以产生了阻塞。此外,我们知道,在程序被阻塞时,CPU是不干活的,这个期间我们为什么不能让另一个线程去执行呢?但这又引入一个新的问题,如果我调度了另一个线程,那当数据准备好时,或者可以写时,我该怎么得知呢?

嗯…嗅到了一丝异步+回调的味道。既然程序么得办法,那我们去看看操作系统能给我们提供什么解决方案吗?

Linux提供了Epoll(select和poll现在基本不用,我就不说了),macOS提供了Kqueue,Windows提供了IOCP。它们是I/O多路复用技术

什么是I/O多路复用呢?首先需要知道在Linux中,万物皆文件,所以引入了文件描述符的概念,即FD。对于某一个远程进程的读写就是对于一个抽象文件的读写,所以它也有一个FD,而每一个Socket对应一个远程进程,所以每个Socket变成了一个FD。

说了这么多干嘛呢?引入FD就是为了说明每一个Socket都有唯一的代表它的FD。Epoll就是把这些FD注册到Linux的文件系统下,通过红黑树来管理,这个红黑树保存了(FD, Socket)这样的结构,通过FD可以快速找到对应的Socket(只能说大概是这样的对应关系),当这个红黑树下的某个FD可读(网卡中断实现)或可写(网卡中断实现)时,添加到就绪列表中。

通过对中断程序注册一个处理函数,可以实现每次网卡中断时,把这个中断对应的FD添加到就绪列表中,Epoll的select操作就是遍历就绪列表,找到每个FD对应的Socket,返回。如果为空就阻塞。当有新的中断产生时被唤醒。

现在我们把远程进程可读/可写这件事丢给了操作系统,操作系统使用网卡中断实现异步通知。就可以解放我们的程序了,而不必让它干等了。

所以就有了Selector+SelectionKey+Channel+Buffer那一套。来看一个典型的Echo服务器,基于NIO+I/O多路复用实现的。

import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.nio.channels.spi.AbstractSelectableChannel;
import java.nio.charset.StandardCharsets;
import java.util.*;

/**
 * @author CodeWithBuff
 */
public class NioTcpSingleThread {

    public static void main(String[] args) {
        NioTcpSingleThread.Server.builder().build().run();
    }

    private static final HashMap<SocketChannel, List<DataLoad>> dataLoads = new LinkedHashMap<>();

    private static final ByteBuffer byteBuffer = ByteBuffer.allocate(1024 * 1024);

    @Data
    @Builder
    @NoArgsConstructor
    @AllArgsConstructor
    private static class DataLoad {
        private int intValue;

        private long longValue;

        private double doubleValue;

        private String stringValue;

        private int[] intArray;

        private long[] longArray;

        private double[] doubleArray;

        private String[] stringArray;
    }

    /**
     * Java NIO处理网络的核心组件只有四个:{@link Channel},{@link Selector},{@link SelectionKey}和{@link java.nio.Buffer}
     * <br/>
     * 说一下{@link ServerSocketChannel},{@link SocketChannel},{@link Selector}和{@link SelectionKey}之间的关系。
     * <br/>
     * {@link ServerSocketChannel}和{@link SocketChannel}不说了,无非就是一个用来在服务端建立连接,一个处理连接(实际I/O交互)的区别,在这里统称为{@link AbstractSelectableChannel},也就是它俩都继承的类。
     * <br/>
     * {@link Selector#select()}调用系统调用,轮询端口,记录已注册的{@link AbstractSelectableChannel}感兴趣的事件,如果发生了所有已注册的{@link AbstractSelectableChannel}感兴趣的事件之一的话,就返回。否则阻塞。
     * <br/>
     * 对于{@link AbstractSelectableChannel}来说,怎么让{@link Selector}帮自己记录并轮询自己感兴趣的事件呢?答案是:注册到{@link Selector}上即可,同时设置感兴趣的事件类型。
     * <br/>
     * 在注册成功后,会返回一个{@link SelectionKey}类型的变量,通过它,可以操作{@link AbstractSelectableChannel}和{@link Selector}。{@link SelectionKey}本身就是{@link AbstractSelectableChannel}和它注册到的{@link Selector}的凭证。
     * 就像是订单一样,记录着它们俩的关系,所以在注册成功的后续操作里,一般都是用{@link SelectionKey}来实现的。同时,{@link SelectionKey}还有一个attachment()方法,可以获取附加到它上面的对象。
     * 一般我们用这个附属对象来处理当前{@link SelectionKey}所包含的{@link AbstractSelectableChannel}和{@link Selector}的实际业务。
     * <br/>
     * 刚才说到了{@link Selector#select()},它会一直阻塞直到发生了感兴趣的事件,但是有时候我们这边可以确定某一事件马上或已经发生,就可以调用{@link Selector#wakeup()}方法,让{@link Selector#select()}立即返回,然后获取
     * {@link SelectionKey}集合也好,重新{@link Selector#select()}(这已经是下一次循环了)也罢。
     * <br/>
     * <br/>
     * 注意!!!如果某一个{@link AbstractSelectableChannel}在同一个{@link Selector}上注册了两个不同的感兴趣的事件类型,那么返回的两个{@link SelectionKey}是没有任何关系的。虽然可以通过{@link SelectionKey}再次修改
     * {@link AbstractSelectableChannel}感兴趣的事件类型。{@link SelectionKey}只在注册时生成返回,所以有(Channel + Selector) = SelectionKey。但是吧,啧,注册多个时会卡死,所以千万不要同一个Channel和同一个Selector注册多个!!!
     */
    @Builder
    private static class Server implements Runnable {

        @Override
        public void run() {
            System.out.println("Server开始运行...");
            Selector globalSelector;
            ServerSocketChannel serverSocketChannel;
            SelectionKey serverSelectionKey;
            try {
                globalSelector = Selector.open();
                serverSocketChannel = ServerSocketChannel.open();
                serverSocketChannel.bind(new InetSocketAddress(8190));
                serverSocketChannel.configureBlocking(false);
                serverSelectionKey = serverSocketChannel.register(globalSelector, SelectionKey.OP_ACCEPT);
                serverSelectionKey.attach(Acceptor.builder()
                        .globalSelector(globalSelector)
                        .serverSocketChannel(serverSocketChannel)
                        .build()
                );
                while (true) {
                    // select()是正儿八经的阻塞方法,它会一直阻塞直到发生了任何注册过的(Server)SocketChannel感兴趣的事件之一。比如有新的连接建立,Channel可以读了,或者Channel可以写了
                    // 它的返回值指出了有几个感兴趣事件,实际没啥用,所以在此直接忽略
                    globalSelector.select();
                    Set<SelectionKey> selectionKeySet = globalSelector.selectedKeys();
                    for (SelectionKey selectionKey : selectionKeySet) {
                        dispatch(selectionKey);
                        selectionKeySet.remove(selectionKey);
                    }
                }
            } catch (IOException ignored) {
            }
        }

        private void dispatch(SelectionKey selectionKey) {
            Runnable runnable = (Runnable) selectionKey.attachment();
            runnable.run();
        }
    }

    @Data
    @Builder
    private static class Acceptor implements Runnable {

        private final Selector globalSelector;

        private final ServerSocketChannel serverSocketChannel;

        @Override
        public void run() {
            try {
                SocketChannel socketChannel = serverSocketChannel.accept();
                System.out.println("已建立连接...");
                socketChannel.configureBlocking(false);
                SelectionKey socketSelectionKey = socketChannel.register(globalSelector, SelectionKey.OP_READ);
                socketSelectionKey.attach(Handler.builder()
                        .socketSelectionKey(socketSelectionKey)
                        .build()
                );
                // 此时注册了读感兴趣Channel,所以为了快速开启读,直接唤醒selector。其实就是让它别等了,我这边准备好了,你那边应该已经有数据了,直接返回吧。
                globalSelector.wakeup();
            } catch (IOException ignored) {
            }
        }
    }

    /**
     * "写"操作依赖于"读"操作读取到的数据,所以"写"之后不能再次"写",必须"读"或"关闭"。
     * <br/>
     * "读"操作之后可以继续"读"而无需等待"写完成",所以"写完"可以把感兴趣类型设置为"读"|"写"而不是单单的"写"。
     */
    @Data
    @Builder
    private static class Handler implements Runnable {

        private final SelectionKey socketSelectionKey;

        @Override
        public void run() {
            SocketChannel socketChannel = (SocketChannel) socketSelectionKey.channel();
            if (!socketChannel.isOpen()) {
                System.out.println("连接已关闭");
                try {
                    socketChannel.shutdownInput();
                    socketChannel.shutdownOutput();
                    socketChannel.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
                return ;
            }
            if (socketSelectionKey.isReadable()) {
                System.out.println("读事件发生,准备读...");
                Reader.builder()
                        .socketChannel(socketChannel)
                        .build()
                        .run();
                // 说明即对读感兴趣,也对写感兴趣(因为客户端可能是长连接,还要再次发送消息),但是同一个SelectionKey只能是读或写之一
                socketSelectionKey.interestOps(SelectionKey.OP_READ | SelectionKey.OP_WRITE);
                // 读完了,就要准备写
                socketSelectionKey.selector().wakeup();
            }
            if (socketSelectionKey.isWritable()) {
                System.out.println("写事件发生,准备写...");
                Writer.builder()
                        .socketChannel(socketChannel)
                        .build()
                        .run();
                socketSelectionKey.interestOps(SelectionKey.OP_READ);
                // 写完了,立即返回就免了
                // socketSelectionKey.selector().wakeup();
            }
        }
    }

    @Data
    @Builder
    private static class Reader implements Runnable {

        private final SocketChannel socketChannel;

        @Override
        public void run() {
            try {
                byteBuffer.clear();
                int readable = socketChannel.read(byteBuffer);
                byte[] bytes = byteBuffer.array();
                String value = new String(bytes, 0, readable);
                System.out.println("读到了: " + value);
                DataLoad dataLoad = DataLoad.builder()
                        .stringValue(value)
                        .build();
                List<DataLoad> tmp = dataLoads.computeIfAbsent(socketChannel, k -> new LinkedList<>());
                tmp.add(dataLoad);
            } catch (IOException ignored) {
            }
        }
    }

    @Data
    @Builder
    private static class Writer implements Runnable {

        private final SocketChannel socketChannel;

        @Override
        public void run() {
            try {
                String value = "Server get: " + dataLoads.get(socketChannel).get(0).getStringValue();
                dataLoads.get(socketChannel).remove(0);
                socketChannel.write(ByteBuffer.wrap(value.getBytes(StandardCharsets.UTF_8)));
            } catch (IOException ignored) {
            }
        }
    }
}
复制代码

NIO做到了数据可读/可写时的通知机制,然后去读,去写,而AIO则是直接做到了数据传输到指定区域,说白了就是不需要自己去读,自己去写,当它被调用时数据已经全部准备好了,更加地异步。但是Linux和实际生产使用不多,我们就不提了,只给出一个示例代码:

import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousServerSocketChannel;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.locks.LockSupport;
import java.util.concurrent.locks.ReentrantLock;

/**
 * @author CodeWithBuff
 */
public class AioTcpSingleThread {

    public static void main(String[] args) {
        Server.builder().build().run();
        // 防止主线程退出
        LockSupport.park(Long.MAX_VALUE);
    }

    private static final ConcurrentHashMap<AsynchronousSocketChannel, LinkedBlockingQueue<DataLoad>> dataLoads = new ConcurrentHashMap<>();

    private static final ReentrantLock READ_LOCK = new ReentrantLock();

    private static final ReentrantLock WRITE_LOCK = new ReentrantLock();

    private static final ByteBuffer READ_BUFFER = ByteBuffer.allocate(1024 * 4);

    private static final ByteBuffer WRITE_BUFFER = ByteBuffer.allocate(1024 * 4);

    @Data
    @Builder
    @NoArgsConstructor
    @AllArgsConstructor
    private static class DataLoad {
        private int intValue;

        private long longValue;

        private double doubleValue;

        private String stringValue;

        private int[] intArray;

        private long[] longArray;

        private double[] doubleArray;

        private String[] stringArray;
    }

    @Builder
    private static class Server implements Runnable {

        @Override
        public void run() {
            try {
                System.out.println("服务器启动...");
                asynchronousServerSocketChannel = AsynchronousServerSocketChannel.open();
                asynchronousServerSocketChannel.bind(new InetSocketAddress(8190));
                asynchronousServerSocketChannel.accept(null, ACCEPTOR);
            } catch (IOException ignored) {
            }
        }
    }

    private static AsynchronousServerSocketChannel asynchronousServerSocketChannel = null;

    private static final Acceptor ACCEPTOR = new Acceptor();

    private static class Acceptor implements CompletionHandler<AsynchronousSocketChannel, Object> {
        // 这个方法是异步调用的,所以不用担心阻塞会阻塞到主线程
        @Override
        public void completed(AsynchronousSocketChannel result, Object attachment) {
            System.out.println("连接建立: " + Thread.currentThread().getName());
            System.out.println("连接建立");
            dataLoads.computeIfAbsent(result, k -> new LinkedBlockingQueue<>());
            // 使用循环来进行多次读取,写入
            while (result.isOpen()) {
                READ_LOCK.lock();
                // 这个方法也是异步的
                result.read(READ_BUFFER, attachment, new Reader(result, READ_BUFFER.array()));
                READ_BUFFER.clear();
                READ_LOCK.unlock();
                WRITE_LOCK.lock();
                String ans = "";
                try {
                    ans = "Server get: " + dataLoads.get(result).take().getStringValue();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                // 异步的
                result.write(ByteBuffer.wrap(ans.getBytes(StandardCharsets.UTF_8)), attachment, new Writer(result));
                WRITE_LOCK.unlock();
            }
            System.out.println("结束通信一次");
            // 尝试建立第二波通信
            asynchronousServerSocketChannel.accept(attachment, ACCEPTOR);
        }

        @Override
        public void failed(Throwable exc, Object attachment) {
            System.out.println("建立连接失败");
        }
    }

    private static class Reader implements CompletionHandler<Integer, Object> {

        private final AsynchronousSocketChannel asynchronousSocketChannel;

        private final byte[] bytes;

        public Reader(AsynchronousSocketChannel asynchronousSocketChannel, byte[] bytes) {
            this.asynchronousSocketChannel = asynchronousSocketChannel;
            this.bytes = bytes;
        }

        @Override
        public void completed(Integer result, Object attachment) {
            System.out.println("读取数据: " + Thread.currentThread().getName());
            if (result == 0 || !asynchronousSocketChannel.isOpen()) {
                return ;
            } else if (result < 0) {
                shutdown(asynchronousSocketChannel);
                return ;
            }
            System.out.println("读取数据: " + result);
            String value = new String(bytes, 0, result);
            System.out.println("读到了: " + value);
            LinkedBlockingQueue<DataLoad> tmp = dataLoads.get(asynchronousSocketChannel);
            DataLoad dataLoad = DataLoad.builder()
                    .stringValue(value)
                    .build();
            tmp.add(dataLoad);
        }

        @Override
        public void failed(Throwable exc, Object attachment) {
            System.out.println("读取失败");
        }
    }

    private static class Writer implements CompletionHandler<Integer, Object> {

        private final AsynchronousSocketChannel asynchronousSocketChannel;

        public Writer(AsynchronousSocketChannel asynchronousSocketChannel) {
            this.asynchronousSocketChannel = asynchronousSocketChannel;
        }

        @Override
        public void completed(Integer result, Object attachment) {
            System.out.println("写入数据: " + Thread.currentThread().getName());
            if (!asynchronousSocketChannel.isOpen()) {
                return ;
            }
            System.out.println("写入数据: " + result);
        }

        @Override
        public void failed(Throwable exc, Object attachment) {
            System.out.println("写入失败");
        }
    }

    private static void shutdown(AsynchronousSocketChannel asynchronousSocketChannel) {
        try {
            asynchronousSocketChannel.shutdownInput();
            asynchronousSocketChannel.shutdownOutput();
            asynchronousSocketChannel.close();
        } catch (IOException ignore) {
        }
    }
}
复制代码

NIO解决了什么?未解决什么?

NIO实现了一个线程管理多个连接的I/O操作,而不用像BIO每个连接一个线程,本质是通过中断机制+系统调用实现的。这样就可以在一个线程处理所有可用的I/O事件。

注意,如果一个Selector(一般来说一个Selector对应一个线程,一个线程对应多个Selector会降低Selector效率)仅注册一个连接,那NIO和BIO就没什么区别了。

还记得我们为什么从BIO走到了NIO吗?是因为BIO无法抗住大量的连接,所以NIO解决了大连接的问题

但是NIO并不会带来每个请求的速度的提升,这点请记住,甚至在连接数不多时,处理速度还不如BIO。

NIO使用注意事项

前面我们假设NIO中不要出现耗时业务,但是如果必须有,比如数据库操作,那怎么办呢?在这里我们参考NIO框架Netty的实现。

Netty建议对于耗时操作,应该通过传入的自定义线程池处理,即把这个任务提交到线程池,然后添加异步调用,当任务处理完毕,继续下一步处理。总之耗时任务线程池处理,普通任务直接在NIO线程处理就好。

© 版权声明
THE END
喜欢就支持一下吧
点赞0 分享