集群服务器项目(六)
对于前面单台服务器而言,几千的客户端压力基本没有什么影响,但当客户端达到上万的时候,就可能会遇到一些问题,因此我准备采用nginx+redis
的方式进行负载均衡,主要思路如下:
- 利用
nginx
作为中转服务器,开启8000
端口 - 本地
ip
开启2个端口6000
以及6002
,相当于可以同时开启2台服务器 - 客户端连接
8000
端口,nginx
会按照配置的服务器权重将连接分发到不同服务器上 - 利用
redis
的订阅-发布命令来作为聊天信息中转站
例如用户1向用户2发送消息,当用户1所在服务器查询到用户2在该服务器上未登录的时候,并不是直接将信息存入离线表中,而是先去数据库查询用户2是否在线,若在线则表明用户2在其他服务器上登录,于是利用
redis
作为信息中转站,服务器将信息发给redis
,redis
将信息再发送给用户2
nginx配置
# nginx 负载均衡配置
stream{
# 相当于是有2台服务器,weight=1表示大家都是公平选取,max_fails表示和主服务器进行3次心跳,
# 如果30s内没有响应,则心跳失败一次,连续3次失败,就说明该次服务器断了
upstream MyServer{
server 127.0.0.1:6000 weight=1 max_fails=3 fail_timeout=30s;
server 127.0.0.1:6002 weight=1 max_fails=3 fail_timeout=30s;
}
server {
proxy_connect_timeout 1s;
# 表示所有客户端都去连接8000端口,然后再去分发到其他服务器上
listen 8000;
# 就是一个名字,连到哪个服务器
proxy_pass MyServer;
tcp_nodelay on;
}
}
复制代码
redis
首先每个用户登录的时候,都会向redis
订阅一个消息,命令为SUBSCRIBE runoobChat
,我们将runoobChat
设置为用户的userid
,到时候当有消息到来的时候,redis
就会向userid
对应的TcpConnectionPtr
发送消息。
// id用户登录成功后,向redis订阅channel(id)
_redis.subscribe(id);
// 向redis指定的通道subscribe订阅消息
bool Redis::subscribe(int channel)
{
// SUBSCRIBE命令本身会造成线程阻塞等待通道里面发生消息,这里只做订阅通道,不接收通道消息
// 通道消息的接收专门在observer_channel_message函数中的独立线程中进行
// 只负责发送命令,不阻塞接收redis server响应消息,否则和notifyMsg线程抢占响应资源
if (REDIS_ERR == redisAppendCommand(this->_subcribe_context, "SUBSCRIBE %d", channel))
{
cerr << "subscribe command failed!" << endl;
return false;
}
// redisBufferWrite可以循环发送缓冲区,直到缓冲区数据发送完毕(done被置为1)
int done = 0;
while (!done)
{
if (REDIS_ERR == redisBufferWrite(this->_subcribe_context, &done))
{
cerr << "subscribe command failed!" << endl;
return false;
}
}
// redisGetReply
return true;
}
复制代码
当redis
收到消息后,会自动去调用注册的回调函数向该通道发送消息
// 连接redis服务器
if (_redis.connect())
{
// 设置上报消息的回调,redis发现某个通道有消息,就会自动向该通道发送消息
_redis.init_notify_handler(std::bind(&ChatService::handleRedisSubscribeMessage, this, _1, _2));
}
// 从redis消息队列中获取订阅的消息
void ChatService::handleRedisSubscribeMessage(int userid, string msg)
{
lock_guard<mutex> lock(_connMutex);
auto it = _userConnMap.find(userid);
if (it != _userConnMap.end())
{
it->second->send(msg);
return;
}
// 存储该用户的离线消息
_offlinemodel.insert(userid, msg);
}
// 在独立线程中接收订阅通道中的消息
void Redis::observer_channel_message()
{
redisReply *reply = nullptr;
while (REDIS_OK == redisGetReply(this->_subcribe_context, (void **)&reply))
{
// 订阅收到的消息是一个带三元素的数组 element[1]为通道号(也就是userid),element[2]是接收到的消息,element[0]是一个标志
if (reply != nullptr && reply->element[2] != nullptr && reply->element[2]->str != nullptr)
{
// 给业务层上报通道上发生的消息
_notify_message_handler(atoi(reply->element[1]->str) , reply->element[2]->str);
}
freeReplyObject(reply);
}
cerr << ">>>>>>>>>>>>> observer_channel_message quit <<<<<<<<<<<<<" << endl;
}
void Redis::init_notify_handler(function<void(int,string)> fn)
{
this->_notify_message_handler = fn;
}
复制代码
注意
redis
的订阅消息之后,该线程会一直阻塞,但是发布消息不会引起线程阻塞,所以在这里,对于发布和订阅,我使用了2个不同的线程
至此基本的实现思路以及对应的主要代码已经实现,后续我会将全部功能进行测试,然后上传至Github,希望通过这个项目来增进对后台服务器聊天项目的逻辑与业务有一个基本的认识,同时对项目对应的muduo
有了基本认识,后续我会继续做一个muduo
的基本源码解读与实现,也会挨着写成博客。
© 版权声明
文章版权归作者所有,未经允许请勿转载。
THE END