Socket X 短连接 X 长连接

前言

项目以短连接的形式对接了一套socket接口。这么考虑主要和业务场景有关,我们对接后,重新封装,并以http的方式再暴露给其它系统调用。因此使用短连接随开随关,是一件特别省心省代码的操作。

原本一直相安无事、各自安好,直到某天对接方表示他们其实一直都不好,默默的重启才维持住现状(T▽T)

虽然我觉得这主要和他们有关系,但为了对(避)方(免)考(撕)虑(逼),还是打定主意换成长连接。

目前我们在Http中调用socket,每当一条http线程运行时,便会创建一次socket连接,线程运行结束后,关闭socket连接。各Http线程各用各的,互不干扰。
所以切换成长连接就意味着,该连接必定在线程中共享。

基于这个思路,有两套方案:
一、有且仅有一条连接,多线程共用;
二、组建socket连接池,多线程共用;

下面啰嗦一点,从头开始讲。

Socket

万物起源socket,通信协议之本,TCP/IP的封装,强如HTTP也只是socket的延伸。
java编写原生socket也不是太复杂,确认好目标服务器的IP、端口,new出一个socket对象,建立连接,并通过连接中的流读写数据,此外再注意一下字符集以及截止符这些细节即可。

这是一个简单的demo:

    public static void main(String[] args) throws Exception {
        String host = "xxx.xx.xx.xx";
        int port = 5050;
        String endDelimiter = "\r", charset = "UTF-8";
        
        String message = "hello" + endDelimiter;
        Socket socket = new Socket(host, port);
        OutputStream outputStream = socket.getOutputStream();
        outputStream.write(message.getBytes(charset));
        outputStream.flush();
        System.out.println("客户端发送:" + message);

        InputStream inputStream = socket.getInputStream();
        byte[] bytes = new byte[10240];
        StringBuilder sb = new StringBuilder();
        int len = inputStream.read(bytes);
        sb.append(new String(bytes, 0, len, charset));
        System.out.println("获取数据,长度:" + len + ",内容为:" + sb);
    }
复制代码

字符集

“GBK”、”UTF-8″等等。
是client与server之间定义的通信规则,用于编解码。只有对接双方字符集保持一致时,才能解析数据。

截止符

一个连接建立完毕后,可以一次发送很长的数据,也可以一次发送很短的数据。如果不制定一个读取规则,接收双方没法知道数据是否读取完毕。这其中截止符算比较常见的,在发送的数据末尾加上一个特殊字符,表示本次数据发送完毕,(类比文件尾EOF)。
在截止符里,用换行符又是最常见的,末尾带个换行符,表示数据发送完毕。上面的例子里,就是以“\r”作为截止符。

String endDelimiter = "\r", charset = "UTF-8"; String message = "hello" + endDelimiter;

顺带一提,换行符也有多种,UNIX系统下\n表示换行、MAC系统\r表示换行,Windows系统\r\n表示换行。

http如何判断流中的数据读取完毕
前面讲过,http是socket的延伸,socket会遇到的问题,http也都会遇到,所以http怎么判断流中的数据有没有读取完呢?
1.是不是一条短连接,如果请求结束后,连接就自动关闭,可以直接判断if(read != -1),读取不到返回值,就意味着读取结束了
2.长连接,靠截止符,也就是数据末尾带上换行符
3.长连接,在header里设置content-length,说明数据长度

一般来说,良心对接方都会在文档中写明字符集与截止符是什么,按文档开发就行。有时候也会遇到文档中没写这两个参数的,这种情况下要挨个试,比较头疼。

短连接

写原生socket代码去对接是不现实的,重复造轮子麻烦且累,因此我们必须和框架打交道。
在这一章里,会简单介绍一下mina和netty,异步特性,以及他们在短连接场景下是如何使用的。

稍微解释下短连接。
客户端每请求一次服务器,就建立一条连接,随开随关,就是短连接。
相对的,每次请求都复用一条连接,就叫做长连接。

mina

mina的历史比较早,最早发布于2006年,那时很多同学应该还在上小学→_→。
在流程上,mina和原生socket没有太大区别,都是确认好目标服务器的ip、端口,建立连接,定义好字符集截止符并发送数据,简单讲分为如下3步:
第一,使用一个connector建立连接。
第二,获取一个session,并利用这个session向服务端发送数据。
第三,定义一个handler,意思就是收到对方发来的数据后,要怎么处理。
// 这里的connector、session、handler均为简写

     public static void main(String[] args) throws Exception {
        String host = "xxx.xxx.xxx.xxx";
        int port = 6666;
        String message = "hey there!";

        //创建连接
        IoConnector connector = new NioSocketConnector();
        connector.setConnectTimeoutMillis(15 * 1000);
        
        // 设置编解码器
        connector.getFilterChain().addLast("codec", new ProtocolCodecFilter(
        	new TextLineCodecFactory(Charsets.UTF_8, LineDelimiter.MAC, LineDelimiter.MAC)));

        // 设置请求以及出入参处理器
        connector.setHandler(new SimpleHandler());

        // 连接
        ConnectFuture connectFuture = connector.connect(new InetSocketAddress(host, port));
        connectFuture.awaitUninterruptibly();// 等待连接成功
        
        // 获取session
        IoSession session = connectFuture.getSession();

        // 发送消息
        System.out.println(Thread.currentThread().getName());
        session.write(message);

        // 连接关闭
        Thread.sleep(1 * 1000);
        connector.dispose();
    }

    static class SimpleHandler extends IoHandlerAdapter {
        @Override
        public void sessionCreated(IoSession session) throws Exception {
            System.out.println("创建");
        }
        @Override
        public void sessionOpened(IoSession session) throws Exception {
            System.out.println("打开");
        }
        @Override
        public void sessionClosed(IoSession session) throws Exception {
            System.out.println("关闭");
        }
        @Override
        public void messageReceived(IoSession session, Object message) throws Exception {
            System.out.println(Thread.currentThread().getName() + "收到消息===" + message);
        }
        @Override
        public void messageSent(IoSession session, Object message) throws Exception {
            System.out.println(Thread.currentThread().getName() + "发送消息===" + message);
        }
    }
复制代码

异步

mina是异步的,一般提到mina或是netty都会提到一个词“异步非阻塞”。浅层表现上,它发送是一条线程,接收是另一条线程。
这个设计肯定是为了提升效率,异步释放了同步需要等待的时间,极大的解放了生产力。
算上异步非阻塞,在IO这一块,一共有4个概念

  • 同步阻塞
  • 同步非阻塞
  • 异步阻塞
  • 异步非阻塞

同步阻塞指BIO,最上面Socket那一章节的demo,就是同步阻塞的例子。在代码层面上的体现就是InputStream与OutputStream,inputStream.read()与outputStream.write()的过程,都会阻塞,线程只能干等。

同步非阻塞指NIO,本篇文章没有给出demo。NIO的代码体现是Selector、Buffer与Channel,通过selector可以在一条线程内处理多个事件,除了第一次调用selector.select()方法会阻塞,其余时间线程都在干活,不会干等。
因为底层原理和BIO不同,所以这是一套全新的接口设计。IO的原理篇幅过长,这里不再展开。

异步阻塞略过,关于异步非阻塞,杰出代表就是mina与netty这样的框架,发送消息与接收消息独立开来,互不干扰,各自工作,都不干等。

因为异步的关系,上面的demo还不适用于我们的场景,我们是封装socket接口,并重新暴露给别的调用方,因此别人调用我们接口时,整个处理流程都会在一条http线程内。

这个demo还得做些调整:

public class MinaClient {

    private static final Logger logger = LoggerFactory.getLogger(MinaClient.class);

    private final static long CMD_WAIT_MAX_TIMEMILLIS = 15 * 1000;


    private String host = "";
    private int port = 0;

    private IoSession ioSession;

    private BaseResponse receivedResponse;

    private Object waitLock = new Object();

    private IoSession getSession() {
        //创建连接
        IoConnector connector = new NioSocketConnector();

        // 设置编解码器
        connector.getFilterChain().addLast("codec", new ProtocolCodecFilter(new TextLineCodecFactory(
                Charsets.UTF_8, LineDelimiter.MAC, LineDelimiter.MAC)));

        // 设置请求以及出入参处理器
        connector.setHandler(new SimpleMinaHandler());

        // 连接
        ConnectFuture connectFuture = connector.connect(new InetSocketAddress(host, port));
        connectFuture.awaitUninterruptibly();// 等待连接成功

        // 获取session
        IoSession session = connectFuture.getSession();

        ioSession = session;
        return ioSession;
    }

    public void close() {
        if (ioSession != null) {
            IoService ioService = ioSession.getService();

            ioSession.closeNow();// 关闭session
            ioService.dispose(false);
        }
    }

    public BaseResponse writeAndWaitResponse(BaseRequest request) {
        try {
            receivedResponse = null;// 清空接收器

            synchronized (waitLock) {
                IoSession session = getSession();
                session.write(request.pack());

                waitLock.wait(CMD_WAIT_MAX_TIMEMILLIS);

                if (receivedResponse == null) {
                    return BaseResponse.fail("请求失败,未收到返回值");
                }
                return receivedResponse;
            }
        } catch (Exception e) {
            logger.error("发送消息异常", e);
            return BaseResponse.fail("请求异常,未收到返回值" + e.getMessage());
        }
    }

    protected BaseResponse unpack(Object message) {
        BaseResponse response = new BaseResponse();
        response.setMessage(String.valueOf(message));
        return response;
    }

    class SimpleMinaHandler extends IoHandlerAdapter {
        @Override
        public void sessionCreated(IoSession session) throws Exception {
            logger.info("创建");
        }
        @Override
        public void sessionOpened(IoSession session) throws Exception {
            logger.info("打开");
        }
        @Override
        public void sessionClosed(IoSession session) throws Exception {
            logger.info("关闭");
        }
        @Override
        public void messageReceived(IoSession session, Object message) throws Exception {
            synchronized (waitLock){
                logger.info("收到消息===" + message);

                receivedResponse = unpack(message);

                waitLock.notifyAll();
            }

        }

        @Override
        public void messageSent(IoSession session, Object message) throws Exception {
            logger.info("发送消息===" + message);
        }
    }

}
复制代码

这段代码扩充了不少内容,可以作为一个基础的客户端类去使用了。
和上一个demo比,最主要的是新增了private Object waitLock = new Object();一把对象锁。以及private BaseResponse receivedResponse;一个消息接收器。
通过对象锁以及消息接收器,解决了异步不可控的问题,耗费些许时间等待,使得整个流程依旧可以在一条http线程内处理,并返回最终结果给其它调用方。

netty

其实单单从使用角度看,netty和mina没什么区别,流程是一样的,只是换了一下调用的类名和方法。netty和mina出自一个作者,都很出色,但是netty更年轻一些,社区也更活跃。
本篇文章主要以mina举例,这里就简单放个demo了。

public class NettyClient {

    private static final Logger logger = LoggerFactory.getLogger(NettyClient.class);

    static class SimpleNettyInitializer extends ChannelInitializer<SocketChannel> {
        @Override
        protected void initChannel(SocketChannel ch) throws Exception {
            ChannelPipeline pipeline = ch.pipeline();
            // in
            pipeline.addLast(new DelimiterBasedFrameDecoder(Integer.MAX_VALUE, Unpooled.wrappedBuffer(new byte[]{'\r'})));
            pipeline.addLast(new StringDecoder(CharsetUtil.UTF_8));
            pipeline.addLast(new SimpleNettyHandler());

            // out
            pipeline.addLast(new StringEncoder(CharsetUtil.UTF_8));
            pipeline.addLast(new MessageToMessageEncoder<String>() {
                @Override
                protected void encode(ChannelHandlerContext ctx, String msg, List<Object> out) throws Exception {
                    logger.error("粘包器===msg==={}", msg);
                    if (msg.length() == 0) {
                        return;
                    }
                    out.add(msg + "\r");
                }
            });
        }
    }

    static class SimpleNettyHandler extends ChannelInboundHandlerAdapter {
        @Override
        public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
            logger.info("注册");
        }
        @Override
        public void channelActive(ChannelHandlerContext ctx) throws Exception {
            logger.info("激活");
        }
        @Override
        public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
            logger.info("读取完毕");
        }
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            logger.info("收到消息==={}", msg);
        }
    }
}
复制代码

长连接

单条长连接

这里响应开头,从短连接切换为长连接。单条长连接的做法其实相当粗暴,改法也很简单:

  • 在原来的基础上,再加一把锁
  • 把Client对象设置为单例的
  • 每次获取连接时,都返回当前这一条

如此,每次请求时,都获取同一个Client对象,在进行socket请求时,复用一条连接。坏处是并发能力很弱,请求数稍微多一点,就会造成长时间的等待。

//这里省去一些重复代码

public class MinaClient {
    private BaseResponse receivedResponse;

    private static ReentrantLock sendLock = new ReentrantLock();

    private Object waitLock = new Object();

    private IoSession getSession() {
        if (this.isActive()) {
            return ioSession;
        }
        // 先做关闭
        this.close();

        //创建连接
        IoConnector connector = new NioSocketConnector();

        // 设置编解码器
        connector.getFilterChain().addLast("codec", new ProtocolCodecFilter(new TextLineCodecFactory(
                Charsets.UTF_8, LineDelimiter.MAC, LineDelimiter.MAC)));

        // 设置请求以及出入参处理器
        connector.setHandler(new SimpleMinaHandler());

        // 连接
        ConnectFuture connectFuture = connector.connect(new InetSocketAddress(host, port));
        connectFuture.awaitUninterruptibly();// 等待连接成功

        // 获取session
        IoSession session = connectFuture.getSession();

        ioSession = session;
        return ioSession;
    }

    public boolean isActive() {
        return ioSession != null && ioSession.isConnected() && ioSession.isActive();
    }

    public BaseResponse writeAndWaitResponse(BaseRequest request) {
        try {
            sendLock.lock();
            receivedResponse = null;// 清空接收器

            synchronized (waitLock) {
                IoSession session = getSession();
                session.write(request.pack());

                waitLock.wait(CMD_WAIT_MAX_TIMEMILLIS);

                if (receivedResponse == null) {
                    return BaseResponse.fail("请求失败,未收到返回值");
                }
                return receivedResponse;
            }
        } catch (Exception e) {
            logger.error("发送消息异常", e);
            return BaseResponse.fail("请求异常,未收到返回值" + e.getMessage());
        } finally {
            sendLock.unlock();
        }
    }
}
复制代码

多条长连接

连接池算得上是一个比较妥善的方案,这里借助commons-pool实现。
commons-pool是一个对象池,点开他的源码会发现一件很有意思的事,commons-pool的一些配置与druid这种数据库连接池,以及其它的一些线程池,几乎差不多。所以,数据库连接池,线程池,归根结底都是对象池,只是他们的对象是连接、是线程。
我们以之前的MinaClient类,作为池的单位,按commons-pool的规范,实现PooledObjectFactory,然后就可以开始愉快的玩耍了。


public class MinaClientPoolObejctFactory implements PooledObjectFactory<MinaClient> {
    @Override
    public PooledObject<MinaClient> makeObject() throws Exception {
        MinaClient minaClient = new MinaClient();
        return new DefaultPooledObject<>(minaClient);
    }

    @Override
    public void destroyObject(PooledObject<MinaClient> p) throws Exception {
        MinaClient minaClient = p.getObject();
        minaClient.close();
    }

    @Override
    public boolean validateObject(PooledObject<MinaClient> p) {
        MinaClient minaClient = p.getObject();
        return minaClient.isActive();
    }

    @Override
    public void activateObject(PooledObject<MinaClient> p) throws Exception {

    }

    @Override
    public void passivateObject(PooledObject<MinaClient> p) throws Exception {

    }
    
    public static void main(String[] args) throws Exception {
        MinaClientPoolObejctFactory minaClientPoolObejctFactory = new MinaClientPoolObejctFactory();
        GenericObjectPoolConfig poolConfig = new GenericObjectPoolConfig();
        poolConfig.setMaxTotal(10);
        poolConfig.setMaxIdle(4);

        // 对象池
        ObjectPool<MinaClient> objectPool = new GenericObjectPool<>(minaClientPoolObejctFactory, poolConfig);
        // 拿出一个对象
        MinaClient minaClient = objectPool.borrowObject();

        BaseRequest request = new BaseRequest();
        request.setMessage("hey there");

        minaClient.writeAndWaitResponse(request);

        // 归还对象
        objectPool.returnObject(minaClient);

        // 对象池销毁
        objectPool.close();
    }
}
复制代码
© 版权声明
THE END
喜欢就支持一下吧
点赞0 分享