C++实现集群聊天服务器

阿里云国内75折 回扣 微信号:monov8
阿里云国际,腾讯云国际,低至75折。AWS 93折 免费开户实名账号 代冲值 优惠多多 微信号:monov8 飞机:@monov6

写在前面

所有代码加起来两千行是我接触到的第一个中型项目就现在的我而言。里面用到了语言、数据结构、计网、操作系统、数据库、redis等知识可谓是麻雀虽小五脏俱全。
项目结束时我并没有完全理解整个流程有些地方仍有些懵所以把开发逻辑理清楚对现在的我而言不是一件容易的事。
加油这篇博客的完笔将对我而言是个突破

在这里插入图片描述

1、项目需求

  1. 客户端新用户注册
  2. 客户端用户登录
  3. 添加好友和添加群组
  4. 好友聊天
  5. 群组聊天
  6. 离线消息
  7. nginx配置tcp负载均衡
  8. 集群聊天系统支持客户端跨服务器通信

2、Json

Json是一种轻量级的数据交换格式。独立于编程语言、宜上手等特点使Json能够有效地提高网路传输效率。

接下来介绍Json的使用

#include "json.hpp"  
using json = nlohmann::json;

#include <iostream>
#include <string>
using namespace std;

int main()
{
    json js;
    js["id"] = 1;
    js["name"] = "zhang san";
    cout << "js: " << js << endl;

    string s = js.dump();           // 将json转为string
    cout << "s: " << s << endl;

    json js2 = json::parse(s);      // 将string转为json
    cout << "js2: " << js2 << endl;

    int id = js["id"].get<int>();   // 处理json里面的int
    cout << id << endl;
    return 0;
}

3、muduo

muduo 是一个网络库给用户提供了两个主要的类

  1. TcpServer: 用于编写服务端程序
  2. TcpClient: 用于编写客户端程序

muduo 的使用可以暂时理解为直接套板子即可。

它的搭板子流程大致如下

3. 组合TcpServer对象
4. 创建eventloop事件循环对象的指针
5. 明确TcpServer构造函数需要什么参数输出ChatServer的构造函数
6. 在当前服务器类的构造函数当中注册处理连接的回调函数和处理读写的回调函数
7. 设置合适的服务端线程数量muduo库会自己划分I/O线程和worker线程

下面提供一个测试代码

#include <muduo/net/TcpServer.h>
#include <muduo/net/EventLoop.h>
#include <iostream>
#include <functional>
#include <string>

using namespace std;
using namespace muduo;
using namespace muduo::net;
using namespace placeholders;

class ChatServer
{
public:
    ChatServer(EventLoop* loop,                 //事件循环
            const InetAddress& listenAddr,      //IP + Port
            const string& nameArg)              //服务器名字
            :_server(loop, listenAddr, nameArg)
            ,_loop(loop)
    {
        //给服务器注册用户连接的创建和断开回调
        _server.setConnectionCallback(std::bind(&ChatServer::onConnection, this, _1));

        //给服务器注册用户读写事件回调
        _server.setMessageCallback(std::bind(&ChatServer::onMessage, this, _1, _2, _3));
        
        //设置服务端的线程数量 1个I/O线程 3个worker线程
        _server.setThreadNum(4);
    }

    //开启事件循环
    void start()
    {
        _server.start();
    }
private:
    //专门处理用户的连接创建和断开
    void onConnection(const TcpConnectionPtr&conn)
    {
        if (conn->connected()) {
            cout << conn->peerAddress().toIpPort() << "->" <<
                conn->localAddress().toIpPort() << " state: online" << endl;
        } else {
            cout << conn->peerAddress().toIpPort() << "->" <<
                conn->localAddress().toIpPort() << " state: offline" << endl;
            conn->shutdown();
        }
    }

    //专门处理用户的读写事件
    void onMessage(const TcpConnectionPtr&conn, //连接
                    Buffer *buffer,                //缓冲区
                    Timestamp time)  
    {
        string buf = buffer->retrieveAllAsString();
        cout << "recv data:" << buf << "time: " << time.toString() << endl;
        string sendbuf = "来自服务端的消息: " + buf;
        conn->send(sendbuf);    //发送给客户端
    }   

    TcpServer _server; 
    EventLoop *_loop;
};

int main()
{
    EventLoop loop;     //epoll
    InetAddress addr("127.0.0.1", 6000);
    ChatServer server(&loop, addr, "ChatServer");

    server.start();
    loop.loop();

    return 0;
}

编译方式

g++ -o testmuduo my_muduo_server.cpp -lmuduo_net -lmuduo_base -lpthread

运行结果

在这里插入图片描述


分析上面的测试代码可知有几个是在写死了的板子以外的API

  • conn->connected() 是否连接
  • conn->peerAddress().toIpPort()conn->localAddress().toIpPort() 来源和本地的ip+端口
  • string buf = buffer->retrieveAllAsString() 服务端接收客户端传过来的数据
  • conn->send(sendbuf) 服务端将sendbuf发给客户端

4、CMake

CMake 相比于手写 Makefile 友好太多了。手写 Makefile 是一场噩梦。

CMake 使用起来就是指下编译器去哪个文件夹找文件对于我做的这个项目来说还是挺容易的。

没有太大的难度用的时候查下就行了贴两个具有代表性的上来。

# 主入口
cmake_minimum_required(VERSION 3.0)
project(chat)

# 配置编译选项
set(CMAKE_CXX_FLAGS ${CMAKE_CXX_FLAGS} -g)

# 配置最终的可执行文件输出的路径
set(EXECUTABLE_OUTPUT_PATH ${PROJECT_SOURCE_DIR}/bin)

# 配置头文件的搜索路径
include_directories(${PROJECT_SOURCE_DIR}/include)
include_directories(${PROJECT_SOURCE_DIR}/include/server)
include_directories(${PROJECT_SOURCE_DIR}/include/server/db)
include_directories(${PROJECT_SOURCE_DIR}/include/server/model)
include_directories(${PROJECT_SOURCE_DIR}/include/server/redis)
include_directories(${PROJECT_SOURCE_DIR}/thirdparty)

# 加载子目录
add_subdirectory(src)
# src/server
# 定义一个SRC_LIST变量包含了该目录下所有的源文件
aux_source_directory(. SRC_LIST)
aux_source_directory(./db DB_LIST)
aux_source_directory(./model MODEL_LIST)
aux_source_directory(./redis REDIS_LIST)

# 指定生成可执行文件
add_executable(ChatServer ${SRC_LIST} ${DB_LIST} ${MODEL_LIST} ${REDIS_LIST})
# 指定可执行文件链接时需要依赖的库文件
target_link_libraries(ChatServer muduo_net muduo_base mysqlclient hiredis pthread)

5、MySQL

MySQL 模块只用关心调用数据库的 API 实现。

这里我用的是 数据库连接池 实现数据库的连接

可以点击上面的链接查看详细介绍。

6、网络模块

chatserver.hppchatserver.cpp。和testmuduo的代码几乎一样主要处理连接事件和读写事件的成功接收发送。

连接如果客户端断开了连接从map表删除用户的连接信息、将用户更新为下线。
消息接收所有消息后反序列化通过解析js["msgid"]来获得一个业务处理器handler再调用相应的函数。

//chatserver.cpp
#include "chatserver.hpp"
#include "json.hpp"
#include "chatservice.hpp"

#include <iostream>
#include <functional>
#include <string>
using namespace std;
using namespace placeholders;
using json = nlohmann::json;

ChatServer::ChatServer(EventLoop* loop,
            const InetAddress& listenAddr,
            const string& nameArg)
            :_server(loop, listenAddr, nameArg) ,_loop(loop)
{
    //注册连接回调
    _server.setConnectionCallback(std::bind(&ChatServer::onConnection, this, _1));
    
    //注册消息回调
    _server.setMessageCallback(std::bind(&ChatServer::onMessage, this, _1, _2, _3));
    
    //设置线程数量
    _server.setThreadNum(4);
}

//启动服务
void ChatServer::start()
{
    _server.start();
}

// 上报连接相关信息的回调函数
void ChatServer::onConnection(const TcpConnectionPtr& conn)
{
    // 客户端断开连接
    if (!conn->connected()) 
	{
        ChatService::instance()->clientCloseException(conn);
        conn->shutdown();
    }
}

// 上报读写事件相关信息的回调函数
void ChatServer::onMessage(const TcpConnectionPtr& conn,
                        Buffer* buffer,
                        Timestamp time)
{
    string buf = buffer->retrieveAllAsString();

    cout << buf << endl;
    // 数据的反序列化
    json js = json::parse(buf);
    // 目的: 完全解耦网络模块的代码和业务模块的代码
    // 通过js["msgid"] 获取一个业务处理器handler 
    auto msgHandler = ChatService::instance()->getHandler(js["msgid"].get<int>()); //json类型转成int
    // 回调消息绑定好的事件处理器, 来执行相应的业务
    msgHandler(conn, js, time);                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                               
}

7、业务模块

前面的数据库模块和网络模块解决了我们调用的API接下来就是业务逻辑了直接上代码说明下业务模块需要解决的问题

//chatservice.hpp
#ifndef CHATSERVICE_H
#define CHATSERVICE_H

#include <muduo/net/TcpConnection.h>
#include <unordered_map>
#include <functional>
#include <mutex>
using namespace std;
using namespace muduo;
using namespace muduo::net;

#include "redis.hpp"
#include "groupmodel.hpp"
#include "friendmodel.hpp"
#include "usermodel.hpp"
#include "offlinemessagemodel.hpp"
#include "json.hpp"

using json = nlohmann::json;
//处理消息的事件回调方法类型
using MsgHandler = std::function<void(const TcpConnectionPtr &conn, json &js, Timestamp)>;

//聊天服务器业务类
class ChatService
{
public:
    // 获取单例对象的接口函数
    static ChatService* instance();
    // 处理登录业务
    void login(const TcpConnectionPtr &conn, json &js, Timestamp time);
    // 处理注册业务
    void reg(const TcpConnectionPtr &conn, json &js, Timestamp time);
    // 一对一聊天业务
    void oneChat(const TcpConnectionPtr &conn, json &js, Timestamp time);
    // 添加好友业务
    void addFriend(const TcpConnectionPtr &conn, json &js, Timestamp time);
    // 创建群组业务
    void createGroup(const TcpConnectionPtr &conn, json &js, Timestamp time);
    // 加入群组业务
    void addGroup(const TcpConnectionPtr &conn, json &js, Timestamp time);
    // 群组聊天业务
    void groupChat(const TcpConnectionPtr &conn, json &js, Timestamp time);
    // 处理客户端异常退出
    void clientCloseException(const TcpConnectionPtr &conn);
    // 服务器异常业务重置方法
    void reset();
	// 获取消息对应的处理器
    MsgHandler getHandler(int msgid);
    // 从redis消息队列中获取订阅的消息
    void handleRedisSubscribeMessage(int, string);

private:
    ChatService()
    // 存储消息id和其对应的业务处理方法
    unordered_map<int, MsgHandler> _msgHandlerMap;
    // 存储在线用户的通信连接 线程安全
    unordered_map<int, TcpConnectionPtr> _userConnMap;
    // 定义互斥锁保证_userConnMap的线程安全
    mutex _connMutex;
    // 数据操作类对象
    UserModel _userModel;
    OfflineMsgModel _offlineMsgModel;
    FriendModel _friendModel;
    GroupModel _groupModel;
    // redis操作对象
    Redis _redis;
};

#endif

再处理单个业务时先解析json字符串得到数据再通过这些数据进行相应的处理。需要注意的是STL本身是线程不安全的所以在处理STL时需要加锁。


我这里具体分析下一对一聊天业务其它的业务处理流程大同小异。

在构造函数里会对业务相关的事件处理注册回调函数

_msgHandlerMap.insert({ONE_CHAT_MSG, std::bind(&ChatService::oneChat, this, _1, _2, _3)});
// 还有很多这里不一一列举

服务器接收到json服务端和客户端会相互约定好发送格式后会解析出信息。比如一对一聊天的json格式为

json js;
js["msgid"] = ONE_CHAT_MSG;
js["id"] = g_currentUser.getId();
js["name"] = g_currentUser.getName();
js["toid"] = friendid;
js["msg"] = message;
js["time"] = getCurrentTime();
string buffer = js.dump();

然后再根据toid是否在线选择及时发送消息还是存储到离线消息里面。

// 一对一聊天业务
void ChatService::oneChat(const TcpConnectionPtr &conn, json &js, Timestamp time)
{
    int toid = js["toid"].get<int>();
    {
        lock_guard<mutex> lock(_connMutex);
        auto it = _userConnMap.find(toid);
        if (it != _userConnMap.end()) 
        {
            // toid 在线转发消息  服务器主动推送消息给toid用户
            it->second->send(js.dump());
            return;
        }
    }
    // 查询toid是否在线 
    User user = _userModel.query(toid);
    if (user.getState() == "online")
    {
        _redis.publish(toid, js.dump());
        return;
    }
    // toid 不在线存储离线消息
    _offlineMsgModel.insert(toid, js.dump());
}

8、ngnix

ngnix 是个负载均衡器用于服务器集群。需要自行配置tcp负载均衡。

root 用户下进行如下配置
在这里插入图片描述

# ngnix tcp loadbalance config
stream {
	upstream MyServer {
		server 127.0.0.1:6000 weight=1 max_fails=3 fail_timeout=30s;
		server 127.0.0.1:6002 weight=1 max_fails=3 fail_timeout=30s;
	}

	server {
		proxy_connect_timeout 1s;
		#proxy_timeout 3s;
		listen 8000;
		proxy_pass MyServer;
		tcp_nodelay on;
	}
}

再重启ngnix支持平滑重启可是我没试成功下就ok了。

效果如下
在这里插入图片描述

9、redis

ngnix 实现了集群可是如果有两个用户登录在了不同的服务器他们应该怎样通信呢最好的方式就是引入中间件消息队列解耦各个服务器使整个系统
松耦合提高服务器的响应能力节省服务器的带宽资源。
在这里插入图片描述

redis 采用的 发布-订阅 模式本质上是一个存储 键值对 的缓存数据库。

redis 的简易使用
在这里插入图片描述
在这里插入图片描述


redis 要实现的功能如下

//redis.hpp
#ifndef REDIS_H
#define REDIS_H

#include <hiredis/hiredis.h>
#include <thread>
#include <functional>
using namespace std;

/*
redis作为集群服务器通信的基于发布-订阅消息队列时会遇到两个难搞的bug问题
https://blog.csdn.net/QIANGWEIYUAN/article/details/97895611
*/
class Redis
{
public:
    Redis();
    ~Redis();
    // 连接redis服务器 
    bool connect();
    // 向redis指定的通道channel发布消息
    bool publish(int channel, string message);
    // 向redis指定的通道subscribe订阅消息
    bool subscribe(int channel);
    // 向redis指定的通道unsubscribe取消订阅消息
    bool unsubscribe(int channel);
    // 在独立线程中接收订阅通道中的消息
    void observer_channel_message();
    // 初始化向业务层上报通道消息的回调对象
    void init_notify_handler(function<void(int, string)> fn);
private:
    // hiredis同步上下文对象负责publish消息
    redisContext *_publish_context;
    // hiredis同步上下文对象负责subscribe消息
    redisContext *_subcribe_context;
    // 回调操作收到订阅的消息给service层上报
    function<void(int, string)> _notify_message_handler;
};
#endif

连接redis服务器 发布消息和订阅消息绑定ip+端口开个单独的线程用于监听通道上的事件。
发布消息 直接调用 redisCommand。这个api包含三步

#1 redisAppendCommand 把消息写到本地缓存
#2 redisBufferWrite 发送给服务器
#3 redisGetReply 阻塞等待消息

订阅消息 等待消息是阻塞的所以不要在这个函数里面阻塞等待。只进行前两步。
取消订阅 和订阅的大致步骤一样
接收订阅的消息 独立线程。调用ChatService在构造函数传过来的函数名handleRedisSubscribeMessage

10、表设计

User

字段名称字段类型字段说明约束
idINT用户idPRIMARY KEY、AUTO_INCREMENT
nameVARCHAR(50)用户名NOT NULL, UNIQUE
passwordVARCHAR(50)用户密码NOT NULL
stateENUM(‘online’, ‘offline’)当前登录状态DEFAULT ‘offline’

Friend

字段名称字段类型字段说明约束
useridINT用户idNOT NULL、联合主键
friendidINT好友idNOT NULL、联合主键

AllGroup

字段名称字段类型字段说明约束
idINT组idPRIMARY KEY、AUTO_INCREMENT
groupnameVARCHAR(50)组名称NOT NULL,UNIQUE
groupdescVARCHAR(200)组功能描述DEFAULT ‘’

GroupUser

字段名称字段类型字段说明约束
groupidINT组idNOT NULL、联合主键
useridINT组员idNOT NULL、联合主键
grouproleENUM(‘creator’, ‘normal’)组内角色DEFAULT ‘normal’

OfflineMessage

字段名称字段类型字段说明约束
useridINT用户idNOT NULL
messageVARCHAR(500)离线消息存储Json字符串NOT NULL
阿里云国内75折 回扣 微信号:monov8
阿里云国际,腾讯云国际,低至75折。AWS 93折 免费开户实名账号 代冲值 优惠多多 微信号:monov8 飞机:@monov6
标签: c++服务器