前言
项目以短连接的形式对接了一套socket接口。这么考虑主要和业务场景有关,我们对接后,重新封装,并以http的方式再暴露给其它系统调用。因此使用短连接随开随关,是一件特别省心省代码的操作。
原本一直相安无事、各自安好,直到某天对接方表示他们其实一直都不好,默默的重启才维持住现状(T▽T)
虽然我觉得这主要和他们有关系,但为了对(避)方(免)考(撕)虑(逼),还是打定主意换成长连接。
目前我们在Http中调用socket,每当一条http线程运行时,便会创建一次socket连接,线程运行结束后,关闭socket连接。各Http线程各用各的,互不干扰。
所以切换成长连接就意味着,该连接必定在线程中共享。
基于这个思路,有两套方案:
一、有且仅有一条连接,多线程共用;
二、组建socket连接池,多线程共用;
下面啰嗦一点,从头开始讲。
Socket
万物起源socket,通信协议之本,TCP/IP的封装,强如HTTP也只是socket的延伸。
java编写原生socket也不是太复杂,确认好目标服务器的IP、端口,new出一个socket对象,建立连接,并通过连接中的流读写数据,此外再注意一下字符集以及截止符这些细节即可。
这是一个简单的demo:
public static void main(String[] args) throws Exception {
String host = "xxx.xx.xx.xx";
int port = 5050;
String endDelimiter = "\r", charset = "UTF-8";
String message = "hello" + endDelimiter;
Socket socket = new Socket(host, port);
OutputStream outputStream = socket.getOutputStream();
outputStream.write(message.getBytes(charset));
outputStream.flush();
System.out.println("客户端发送:" + message);
InputStream inputStream = socket.getInputStream();
byte[] bytes = new byte[10240];
StringBuilder sb = new StringBuilder();
int len = inputStream.read(bytes);
sb.append(new String(bytes, 0, len, charset));
System.out.println("获取数据,长度:" + len + ",内容为:" + sb);
}
复制代码
字符集
“GBK”、”UTF-8″等等。
是client与server之间定义的通信规则,用于编解码。只有对接双方字符集保持一致时,才能解析数据。
截止符
一个连接建立完毕后,可以一次发送很长的数据,也可以一次发送很短的数据。如果不制定一个读取规则,接收双方没法知道数据是否读取完毕。这其中截止符算比较常见的,在发送的数据末尾加上一个特殊字符,表示本次数据发送完毕,(类比文件尾EOF)。
在截止符里,用换行符又是最常见的,末尾带个换行符,表示数据发送完毕。上面的例子里,就是以“\r”作为截止符。
String endDelimiter = "\r", charset = "UTF-8"; String message = "hello" + endDelimiter;
顺带一提,换行符也有多种,UNIX系统下\n表示换行、MAC系统\r表示换行,Windows系统\r\n表示换行。
http如何判断流中的数据读取完毕
前面讲过,http是socket的延伸,socket会遇到的问题,http也都会遇到,所以http怎么判断流中的数据有没有读取完呢?
1.是不是一条短连接,如果请求结束后,连接就自动关闭,可以直接判断if(read != -1),读取不到返回值,就意味着读取结束了
2.长连接,靠截止符,也就是数据末尾带上换行符
3.长连接,在header里设置content-length,说明数据长度
一般来说,良心对接方都会在文档中写明字符集与截止符是什么,按文档开发就行。有时候也会遇到文档中没写这两个参数的,这种情况下要挨个试,比较头疼。
短连接
写原生socket代码去对接是不现实的,重复造轮子麻烦且累,因此我们必须和框架打交道。
在这一章里,会简单介绍一下mina和netty,异步特性,以及他们在短连接场景下是如何使用的。
稍微解释下短连接。
客户端每请求一次服务器,就建立一条连接,随开随关,就是短连接。
相对的,每次请求都复用一条连接,就叫做长连接。
mina
mina的历史比较早,最早发布于2006年,那时很多同学应该还在上小学→_→。
在流程上,mina和原生socket没有太大区别,都是确认好目标服务器的ip、端口,建立连接,定义好字符集截止符并发送数据,简单讲分为如下3步:
第一,使用一个connector建立连接。
第二,获取一个session,并利用这个session向服务端发送数据。
第三,定义一个handler,意思就是收到对方发来的数据后,要怎么处理。
// 这里的connector、session、handler均为简写
public static void main(String[] args) throws Exception {
String host = "xxx.xxx.xxx.xxx";
int port = 6666;
String message = "hey there!";
//创建连接
IoConnector connector = new NioSocketConnector();
connector.setConnectTimeoutMillis(15 * 1000);
// 设置编解码器
connector.getFilterChain().addLast("codec", new ProtocolCodecFilter(
new TextLineCodecFactory(Charsets.UTF_8, LineDelimiter.MAC, LineDelimiter.MAC)));
// 设置请求以及出入参处理器
connector.setHandler(new SimpleHandler());
// 连接
ConnectFuture connectFuture = connector.connect(new InetSocketAddress(host, port));
connectFuture.awaitUninterruptibly();// 等待连接成功
// 获取session
IoSession session = connectFuture.getSession();
// 发送消息
System.out.println(Thread.currentThread().getName());
session.write(message);
// 连接关闭
Thread.sleep(1 * 1000);
connector.dispose();
}
static class SimpleHandler extends IoHandlerAdapter {
@Override
public void sessionCreated(IoSession session) throws Exception {
System.out.println("创建");
}
@Override
public void sessionOpened(IoSession session) throws Exception {
System.out.println("打开");
}
@Override
public void sessionClosed(IoSession session) throws Exception {
System.out.println("关闭");
}
@Override
public void messageReceived(IoSession session, Object message) throws Exception {
System.out.println(Thread.currentThread().getName() + "收到消息===" + message);
}
@Override
public void messageSent(IoSession session, Object message) throws Exception {
System.out.println(Thread.currentThread().getName() + "发送消息===" + message);
}
}
复制代码
异步
mina是异步的,一般提到mina或是netty都会提到一个词“异步非阻塞”。浅层表现上,它发送是一条线程,接收是另一条线程。
这个设计肯定是为了提升效率,异步释放了同步需要等待的时间,极大的解放了生产力。
算上异步非阻塞,在IO这一块,一共有4个概念
- 同步阻塞
- 同步非阻塞
- 异步阻塞
- 异步非阻塞
同步阻塞指BIO,最上面Socket那一章节的demo,就是同步阻塞的例子。在代码层面上的体现就是InputStream与OutputStream,inputStream.read()与outputStream.write()的过程,都会阻塞,线程只能干等。
同步非阻塞指NIO,本篇文章没有给出demo。NIO的代码体现是Selector、Buffer与Channel,通过selector可以在一条线程内处理多个事件,除了第一次调用selector.select()方法会阻塞,其余时间线程都在干活,不会干等。
因为底层原理和BIO不同,所以这是一套全新的接口设计。IO的原理篇幅过长,这里不再展开。
异步阻塞略过,关于异步非阻塞,杰出代表就是mina与netty这样的框架,发送消息与接收消息独立开来,互不干扰,各自工作,都不干等。
因为异步的关系,上面的demo还不适用于我们的场景,我们是封装socket接口,并重新暴露给别的调用方,因此别人调用我们接口时,整个处理流程都会在一条http线程内。
这个demo还得做些调整:
public class MinaClient {
private static final Logger logger = LoggerFactory.getLogger(MinaClient.class);
private final static long CMD_WAIT_MAX_TIMEMILLIS = 15 * 1000;
private String host = "";
private int port = 0;
private IoSession ioSession;
private BaseResponse receivedResponse;
private Object waitLock = new Object();
private IoSession getSession() {
//创建连接
IoConnector connector = new NioSocketConnector();
// 设置编解码器
connector.getFilterChain().addLast("codec", new ProtocolCodecFilter(new TextLineCodecFactory(
Charsets.UTF_8, LineDelimiter.MAC, LineDelimiter.MAC)));
// 设置请求以及出入参处理器
connector.setHandler(new SimpleMinaHandler());
// 连接
ConnectFuture connectFuture = connector.connect(new InetSocketAddress(host, port));
connectFuture.awaitUninterruptibly();// 等待连接成功
// 获取session
IoSession session = connectFuture.getSession();
ioSession = session;
return ioSession;
}
public void close() {
if (ioSession != null) {
IoService ioService = ioSession.getService();
ioSession.closeNow();// 关闭session
ioService.dispose(false);
}
}
public BaseResponse writeAndWaitResponse(BaseRequest request) {
try {
receivedResponse = null;// 清空接收器
synchronized (waitLock) {
IoSession session = getSession();
session.write(request.pack());
waitLock.wait(CMD_WAIT_MAX_TIMEMILLIS);
if (receivedResponse == null) {
return BaseResponse.fail("请求失败,未收到返回值");
}
return receivedResponse;
}
} catch (Exception e) {
logger.error("发送消息异常", e);
return BaseResponse.fail("请求异常,未收到返回值" + e.getMessage());
}
}
protected BaseResponse unpack(Object message) {
BaseResponse response = new BaseResponse();
response.setMessage(String.valueOf(message));
return response;
}
class SimpleMinaHandler extends IoHandlerAdapter {
@Override
public void sessionCreated(IoSession session) throws Exception {
logger.info("创建");
}
@Override
public void sessionOpened(IoSession session) throws Exception {
logger.info("打开");
}
@Override
public void sessionClosed(IoSession session) throws Exception {
logger.info("关闭");
}
@Override
public void messageReceived(IoSession session, Object message) throws Exception {
synchronized (waitLock){
logger.info("收到消息===" + message);
receivedResponse = unpack(message);
waitLock.notifyAll();
}
}
@Override
public void messageSent(IoSession session, Object message) throws Exception {
logger.info("发送消息===" + message);
}
}
}
复制代码
这段代码扩充了不少内容,可以作为一个基础的客户端类去使用了。
和上一个demo比,最主要的是新增了private Object waitLock = new Object();
一把对象锁。以及private BaseResponse receivedResponse;
一个消息接收器。
通过对象锁以及消息接收器,解决了异步不可控的问题,耗费些许时间等待,使得整个流程依旧可以在一条http线程内处理,并返回最终结果给其它调用方。
netty
其实单单从使用角度看,netty和mina没什么区别,流程是一样的,只是换了一下调用的类名和方法。netty和mina出自一个作者,都很出色,但是netty更年轻一些,社区也更活跃。
本篇文章主要以mina举例,这里就简单放个demo了。
public class NettyClient {
private static final Logger logger = LoggerFactory.getLogger(NettyClient.class);
static class SimpleNettyInitializer extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
// in
pipeline.addLast(new DelimiterBasedFrameDecoder(Integer.MAX_VALUE, Unpooled.wrappedBuffer(new byte[]{'\r'})));
pipeline.addLast(new StringDecoder(CharsetUtil.UTF_8));
pipeline.addLast(new SimpleNettyHandler());
// out
pipeline.addLast(new StringEncoder(CharsetUtil.UTF_8));
pipeline.addLast(new MessageToMessageEncoder<String>() {
@Override
protected void encode(ChannelHandlerContext ctx, String msg, List<Object> out) throws Exception {
logger.error("粘包器===msg==={}", msg);
if (msg.length() == 0) {
return;
}
out.add(msg + "\r");
}
});
}
}
static class SimpleNettyHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
logger.info("注册");
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
logger.info("激活");
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
logger.info("读取完毕");
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
logger.info("收到消息==={}", msg);
}
}
}
复制代码
长连接
单条长连接
这里响应开头,从短连接切换为长连接。单条长连接的做法其实相当粗暴,改法也很简单:
- 在原来的基础上,再加一把锁
- 把Client对象设置为单例的
- 每次获取连接时,都返回当前这一条
如此,每次请求时,都获取同一个Client对象,在进行socket请求时,复用一条连接。坏处是并发能力很弱,请求数稍微多一点,就会造成长时间的等待。
//这里省去一些重复代码
public class MinaClient {
private BaseResponse receivedResponse;
private static ReentrantLock sendLock = new ReentrantLock();
private Object waitLock = new Object();
private IoSession getSession() {
if (this.isActive()) {
return ioSession;
}
// 先做关闭
this.close();
//创建连接
IoConnector connector = new NioSocketConnector();
// 设置编解码器
connector.getFilterChain().addLast("codec", new ProtocolCodecFilter(new TextLineCodecFactory(
Charsets.UTF_8, LineDelimiter.MAC, LineDelimiter.MAC)));
// 设置请求以及出入参处理器
connector.setHandler(new SimpleMinaHandler());
// 连接
ConnectFuture connectFuture = connector.connect(new InetSocketAddress(host, port));
connectFuture.awaitUninterruptibly();// 等待连接成功
// 获取session
IoSession session = connectFuture.getSession();
ioSession = session;
return ioSession;
}
public boolean isActive() {
return ioSession != null && ioSession.isConnected() && ioSession.isActive();
}
public BaseResponse writeAndWaitResponse(BaseRequest request) {
try {
sendLock.lock();
receivedResponse = null;// 清空接收器
synchronized (waitLock) {
IoSession session = getSession();
session.write(request.pack());
waitLock.wait(CMD_WAIT_MAX_TIMEMILLIS);
if (receivedResponse == null) {
return BaseResponse.fail("请求失败,未收到返回值");
}
return receivedResponse;
}
} catch (Exception e) {
logger.error("发送消息异常", e);
return BaseResponse.fail("请求异常,未收到返回值" + e.getMessage());
} finally {
sendLock.unlock();
}
}
}
复制代码
多条长连接
连接池算得上是一个比较妥善的方案,这里借助commons-pool实现。
commons-pool是一个对象池,点开他的源码会发现一件很有意思的事,commons-pool的一些配置与druid这种数据库连接池,以及其它的一些线程池,几乎差不多。所以,数据库连接池,线程池,归根结底都是对象池,只是他们的对象是连接、是线程。
我们以之前的MinaClient类,作为池的单位,按commons-pool的规范,实现PooledObjectFactory,然后就可以开始愉快的玩耍了。
public class MinaClientPoolObejctFactory implements PooledObjectFactory<MinaClient> {
@Override
public PooledObject<MinaClient> makeObject() throws Exception {
MinaClient minaClient = new MinaClient();
return new DefaultPooledObject<>(minaClient);
}
@Override
public void destroyObject(PooledObject<MinaClient> p) throws Exception {
MinaClient minaClient = p.getObject();
minaClient.close();
}
@Override
public boolean validateObject(PooledObject<MinaClient> p) {
MinaClient minaClient = p.getObject();
return minaClient.isActive();
}
@Override
public void activateObject(PooledObject<MinaClient> p) throws Exception {
}
@Override
public void passivateObject(PooledObject<MinaClient> p) throws Exception {
}
public static void main(String[] args) throws Exception {
MinaClientPoolObejctFactory minaClientPoolObejctFactory = new MinaClientPoolObejctFactory();
GenericObjectPoolConfig poolConfig = new GenericObjectPoolConfig();
poolConfig.setMaxTotal(10);
poolConfig.setMaxIdle(4);
// 对象池
ObjectPool<MinaClient> objectPool = new GenericObjectPool<>(minaClientPoolObejctFactory, poolConfig);
// 拿出一个对象
MinaClient minaClient = objectPool.borrowObject();
BaseRequest request = new BaseRequest();
request.setMessage("hey there");
minaClient.writeAndWaitResponse(request);
// 归还对象
objectPool.returnObject(minaClient);
// 对象池销毁
objectPool.close();
}
}
复制代码