好久没写文章了 先把前段时间的一个需求记录下来,顺着这个线去提升到netty框架。然后最近还在学习RabbitMQ,这两天也把mq的文章记录一下。
开始
需求:PLC控制器会通过TCP协议传输数据,项目需要记录数据到数据库中,同时还需要发送数据到PLC控制器来改变控制器数值。
实体
- PLC控制器
- PLC控制器的点(用来控制对应的设备,比如说灯——开启或者关闭)
- PLC的数据
数据库表
(只写了最核心的字段)
PLC控制器:
字段名 | 类型 | 描述 |
---|---|---|
id | bigint | 主键ID |
cloud_name | varchar | 控制器有唯一的标识符——物云名 例如:C1710479131 |
version | int | 控制器 版本号 |
token | varchar | 控制器的token验证 |
PLC控制器点:
字段名 | 类型 | 描述 |
---|---|---|
id | bigint | 主键ID |
controller_id | bigint | 控制器ID |
point_name | varchar | 点的唯一名称 |
value | decimal | 值 |
state | tinyint | 状态(不知道有啥用) |
PLC控制器数据:
字段名 | 类型 | 描述 |
---|---|---|
id | bigint | 主键ID |
controller_cloud_name | varchar | 控制器物云名 |
point_name | varcahr | 点名 |
value | deciamal | 值 |
state | tinyint | 状态 |
timestamp | timestamp | 时间戳 |
没有用id把plc数据表和其他表关联起来的原因是:需求需要无需记录开关型的只记录数值型的数据
本项目中控制器传输的数据分两种BO(Binary Output数字输出)和AI(模拟输入点)。前者的BO是只会有1或者0的数值用于代表开或者关。后者AI是会有任意的浮点数,例如温度或者湿度。
方案
当初想的是两种方案:
- 初始化项目时分配线程开启TCP客户端获得数据——集成在项目中
- 独立部署用mq来传输数据
最后选择的时第一个方案,因为本身项目时是个单体项目,因为这个功能需求额外需要维护其他连接不划算。
实现
既然选择用第一套方案。先来了解如何在SpringBoot的项目初始化启动。
SpringBoot初始化系统资源
方法一——实现ApplicationRunner接口
创建一个xxxxxRunner类实现ApplicationRunner接口
同时加上@Component注解和@Order注解。Order注解里的数字越小 启动顺序越大
实现接口中的run方法在run方法中,写需要的方法
方法二——实现CommandLineRunner接口
和方法一无差别,都是实现接口配合@Order注解,order的值越小,优先级越高。再实现run方法即可。这里就不多介绍了。
当然初始化加载的方法还有很多有兴趣可以去了解一下。
分配线程池 建立SocketServer连接
首先我们需要一个线程给SocketServer用于监听PLC控制器的连接。那么从哪里拿线程呢?当然是线程池,而且要是自定义的线程池。
详情见阿里巴巴《Java 开发手册》!
获取到线程池之后,我们需要一个用于开启控制器服务端的Task方法。
/**
* 获得开启控制器服务端的方法
* @return Runnable
*/
private Runnable getSocketServerCommand(){
return () -> {
ServerSocket serverSocket = null;
Socket accept = null;
try{
//监听连接 长开启
while(true){
if (serverSocket == null || serverSocket.isClosed()){
logger.info("{},正在开启服务端,端口:{}",Thread.currentThread().getName(),serverPort);
serverSocket = new ServerSocket(serverPort);
}
//建立连接
accept = serverSocket.accept();
synchronized (accept){
//建立循环监听消息
recvMsg(accept);
}
}
} catch (IOException e) {
logger.error("{},控制器服务端出错!{}",Thread.currentThread().getName(),e);
}finally {
logger.info("{},正在关闭控制器服务端!",Thread.currentThread().getName());
try {
if (accept != null){
accept.close();
}
if (serverSocket != null){
serverSocket.close();
}
} catch (IOException ioException) {
logger.error("{},关闭服务端控制器出错!{}", Thread.currentThread().getName(),ioException);
}
}
};
}
复制代码
在循环中判断SocketServer是不是需要重新开启,同时不断监听是否有新连接。这里的上锁是我不太清楚,如果同一时间有大量连接过来(虽然不太可能),这个socket会不会重置。看了一圈的accept方法源码没找到核心的点,或者说没看懂。随后呢用recvMsg方法获得到这个Socket。
分配线程池给Socket连接
为了保证每个控制器都能连接上所以用线程池分配任务给每个socket建立长连接。
同样的我们需要一个task方法处理socket。
首先通过socket获取到inputStream获取PLC控制器的数据。
/**
* 接收消息 命令
* @param socket socket连接
*/
private Runnable getRecvMsgCommand(Socket socket) {
return ()->{
InputStream myInputStream = null;
logger.info("{},正在连接{}",Thread.currentThread().getName(),socket.getInetAddress().getHostAddress());
try {
myInputStream = socket.getInputStream();
//放入循环中重复获取数据
while(true){
if (socket.isClosed()){
break;
}
//获取数据
int size = myInputStream.available();
byte[] buffer = new byte[size];
myInputStream.read(buffer);
String res = new String(buffer,StandardCharsets.UTF_8);
//解析数据
if (!StrUtil.isBlankIfStr(res)){
//排除是第一次启动的心跳
if (!NumberUtil.isNumber(res)){
//进行数据存储
logger.info("{},接收数据:{}",Thread.currentThread().getName(),res);
}
}
Thread.sleep(100);
}
} catch (IOException | InterruptedException e) {
logger.error("{},socket连接出错!{}",Thread.currentThread().getName(),e);
}finally {
logger.info("{},正在关闭Socket连接!",Thread.currentThread().getName());
try {
if (myInputStream != null){
myInputStream.close();
}
if (!socket.isClosed()){
socket.close();
}
} catch (IOException ioException) {
logger.error("{},关闭socket连接出错!{}",Thread.currentThread().getName(),ioException);
}
}
};
}
复制代码
到这里我们已经很好的获取到了数据。但还有其他问题也就是长连接的心跳问题。在PLC控制器中当闲置时间超过配置的时间时,就会断开tcp连接,而且会一直大量的发送一大堆空白数据(非常奇怪),所以我们需要的是PLC的长连接,避免闲置断开。
额外提一嘴try with resource机制 它可以自动的回收可关闭的资源,这样就不需要我们在finally里手动关闭资源,但是他需要资源需要实现Closeable或者AutoCloseable接口。感兴趣的同学可以去了解一下。
发送心跳
发送心跳的方式我最初想到用时间差去计算,像这样
myInputStream = socket.getInputStream();
long timeA = System.currentTimeMillis();
while(true){
if (socket.isClosed()){
break;
}
//当大于60s发送一次心跳维持长连接
if(System.currentTimeMillis() - timeA > 60 * 1000){
//获得输出流
outInputStream = socket.getOutputStream();
//发送心跳 记得关闭输出流
}
int size = myInputStream.available();
byte[] buffer = new byte[size];
myInputStream.read(buffer);
String res = new String(buffer,StandardCharsets.UTF_8);
//解析数据
if (!StrUtil.isBlankIfStr(res)){
//排除是第一次启动的心跳
if (!NumberUtil.isNumber(res)){
//进行数据存储
logger.info("{},接收数据:{}",Thread.currentThread().getName(),res);
}
}
Thread.sleep(100);
}
复制代码
但很快我就发现不可行。我发现在这个循环中并不会一直一直循环,而是有消息才会执行一次。我看了一遍源码发现了问题所在。
答案就在myInputStream.read(buffer)
这个代码中,我们看看read的注释
第一段的最后一句话
This method blocks until input data is available, end of file is detected, or an exception is thrown. 复制代码
所以在这个循环中,整个循环体被read()方法堵塞了,只有有消息的时候才会进行一次。
所以没办法我只能再开启一个线程每60s中发送一次心跳
/**
* 获得 发送心跳包 指令
*/
public static Runnable getHeartbeatCommand(Socket socket){
return ()->{
OutputStream myOutputStream = null;
try {
myOutputStream = socket.getOutputStream();
logger.info("{} ,开始发送心跳!",Thread.currentThread().getName());
while (true){
if (socket.isClosed()){
break;
}
myOutputStream.write("[]\n".getBytes(StandardCharsets.UTF_8));
myOutputStream.flush();
Thread.sleep(60 * 1000);
}
} catch (IOException | InterruptedException e) {
logger.error("{},发送心跳出错!{}",Thread.currentThread().getName(),e);
e.printStackTrace();
}finally {
logger.info("{},正在关闭心跳!",Thread.currentThread().getName());
try {
if (myOutputStream != null){
myOutputStream.close();
}
} catch (IOException e) {
logger.error("{},关闭心跳出错!{}",Thread.currentThread().getName(),e);
}
}
};
}
复制代码
那到这里我们已经成功的接收到了数据,并且完成了PLC的长连接。
接下来我们要做的是 发送指定的数据到控制器来控制PLC对应的设备。
发送数据
小总结:在之前的代码中我们在初始化是给socketServer分配了一个线程,之后我们给每一个连接分配了一个不断接收数据的线程一个发送心跳的线程。
到这里为止,我们的每一个线程都对应着一个控制器的连接。而发送数据是需要获取到对应的连接的,那么如何获取到控制器的连接呢?
PLC控制器在每个次新连接时都会发送一个数据,而这个数据,我们是可以改变的。
我们可以利用这唯一的物云名来存储socket连接,也就是说我们的代码变成了这样:
myInputStream = socket.getInputStream();
while(true){
if (socket.isClosed()){
break;
}
int size = myInputStream.available();
byte[] buffer = new byte[size];
myInputStream.read(buffer);
String res = new String(buffer,StandardCharsets.UTF_8);
Thread.sleep(100);
//解析数据
if (!StrUtil.isBlankIfStr(res)){
//如果是第一次接收到的是一个唯一物云名 用来存储Socket
if (!res.startsWith(IotControllerUtils.BRACKET_START)){
logger.info("{},初次连接 记录socket,controllerName:{}",Thread.currentThread().getName(),res);
//在一个map中存储socket
IotControllerUtils.putSocketMapValue(res, socket);
continue;
}
//进行数据存储
logger.info("{},接收数据:{}",Thread.currentThread().getName(),res);
}
}
复制代码
我们在一个map中存储连接的socket,而唯一的物云名作为我们的key。这样我们就可以获取到这个连接从而实现发送消息。
注意!这个map必须是所有连接线程共享的,也就是说他处在一个线程不安全的环境下,但是,虽然所有线程操作的都是同一个map但他们放入的值都是key唯一的,这样是否会造成线程安全的情况呢?
剩下的发送数据我们只需要获取到socket的outputStream,发送我们需要的数据了(数据从PLC控制器数据库的配置中获取)
总结
- 初始化数据可以用实现ApplicationRunner接口或者CommandLineRunner接口
- socket连接分配线程池,inputStream.read()方法会堵塞
- 需要获取连接发送数据,记得通过唯一的方式保存socket连接
考虑到现在连接服务端用的基本上都是netty,我接下来应该会接着学习netty的知识。下一篇文章应该是rabbitmq。
谢谢有人看到最后,目前公司前辈都是最近才转java的程序员,所以基本上处于没人带的自学状态,看掘金、github比较多,平时会有计划的学习,尽量坚持leetcode每日一题。
所以最近想换工作,21届,base深圳,有没有内推哇!