django 使用channels 搭建websocket聊天程序

阿里云国内75折 回扣 微信号:monov8
阿里云国际,腾讯云国际,低至75折。AWS 93折 免费开户实名账号 代冲值 优惠多多 微信号:monov8 飞机:@monov6

 channels官方文档Django Channels — Channels 4.0.0 documentation

 效果如下

 主要实现功能

基于Django的认证的群聊

具体实现

当建立websocket的时候建立之前是http消息我们可以拿到http消息里面的cookie等信息进行认证本次使用的是jwt认证因此需要在建立的连接的时候将jwt认证信息通过cookie发送给后端进行通信认证具体方法如下

from rest_framework_simplejwt.tokens import AccessToken
from django.http.cookie import parse_cookie
from rest_framework_simplejwt.authentication import JWTAuthentication

class ServerAccessToken(AccessToken):
    """
    自定义的token方法是为了登出的时候将 access token 禁用
    """

    def verify(self):
        user_id = self.payload.get('user_id')
        if BlackAccessTokenCache(user_id, hashlib.md5(self.token).hexdigest()).get_storage_cache():
            raise TokenError('Token is invalid or expired')
        super().verify()


class CookieJWTAuthentication(JWTAuthentication):
    """
    支持cookie认证是为了可以访问 django-proxy 的页面比如 flower
    """

    def get_header(self, request):
        header = super().get_header(request)
        if not header:
            cookies = request.META.get('HTTP_COOKIE')
            if cookies:
                cookie_dict = parse_cookie(cookies)
                header = f"Bearer {cookie_dict.get('X-Token')}".encode('utf-8')
        return header



async def token_auth(scope):
    cookies = scope.get('cookies')
    if cookies:
        token = f"{cookies.get('X-Token')}".encode('utf-8')
        if token:
            try:

                auth_class = CookieJWTAuthentication()
                validated_token = ServerAccessToken(token)
                return True, await sync_to_async(auth_class.get_user)(validated_token)
            except TokenError as e:
                return False, e.args[0]
    return False, False

然后再建立连接的时候进行一个认证

class MessageNotify(AsyncJsonWebsocketConsumer):
    def __init__(self, *args, **kwargs):
        super().__init__(args, kwargs)
        self.room_group_name = None
        self.disconnected = True
        self.username = ""

    async def connect(self):
        status, user_obj = await token_auth(self.scope)
        if not status:
            logger.error(f"auth failed {user_obj}")
            # https://developer.mozilla.org/zh-CN/docs/Web/API/CloseEvent#status_codes
            await self.close(4401)

具体群聊核心代码如下

class MessageNotify(AsyncJsonWebsocketConsumer):
    def __init__(self, *args, **kwargs):
        super().__init__(args, kwargs)
        self.room_group_name = None
        self.disconnected = True
        self.username = ""

    async def connect(self):
        status, user_obj = await token_auth(self.scope)
        if not status:
            logger.error(f"auth failed {user_obj}")
            # https://developer.mozilla.org/zh-CN/docs/Web/API/CloseEvent#status_codes
            await self.close(4401)
        else:
            logger.info(f"{user_obj} connect success")
            room_name = self.scope["url_route"]["kwargs"].get('room_name')
            username = self.scope["url_route"]["kwargs"].get('username')
            # data = verify_token(token, room_name, success_once=True)
            if username and room_name:
                self.disconnected = False
                self.username = username
                self.room_group_name = f"message_{room_name}"
                # Join room group
                await self.channel_layer.group_add(self.room_group_name, self.channel_name)
                await self.accept()
            else:
                logger.error(f"room_name:{room_name} token:{username} auth failed")
                await self.close()

    async def disconnect(self, close_code):
        self.disconnected = True
        if self.room_group_name:
            await self.channel_layer.group_discard(self.room_group_name, self.channel_name)

    # Receive message from WebSocket
    async def receive_json(self, content, **kwargs):
        action = content.get('action')
        if not action:
            await self.close()
        data = content.get('data', {})
        if action == "message":
            data['uid'] = self.channel_name
            data['username'] = self.username
            # Send message to room group
            await self.channel_layer.group_send(
                self.room_group_name, {"type": "chat_message", "data": data}
            )
        else:
            await self.channel_layer.send(self.channel_name, {"type": action, "data": data})

    async def userinfo(self, event):
        data = {
            'username': self.username,
            'uid': self.channel_name
        }
        await self.send_data('userinfo', {'data': data})

    async def chat_message(self, event):
        data = event["data"]
        data['time'] = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f')
        # Send message to WebSocket
        await self.send_data('message', {'data': data})

    async def send_data(self, action, content, close=False):
        data = {
            'time': time.time(),
            'action': action
        }
        data.update(content)
        return await super().send_json(data, close)

中间还涉及消息队列本次使用的是基于redis的消息队列需要在settings.py进行配置

CHANNEL_LAYERS = {
    "default": {
        # "BACKEND": "channels_redis.core.RedisChannelLayer",
        "BACKEND": "channels_redis.pubsub.RedisPubSubChannelLayer",
        "CONFIG": {
            "hosts": [f"redis://:{REDIS_PASSWORD}@{REDIS_HOST}:{REDIS_PORT}/{CHANNEL_LAYERS_CACHE_ID}"],
        },
    },
}

代码已经开源GitHub地址GitHub - nineaiyu/xadmin-server: xadmin-基于Django+vue3的rbac权限管理系统

 

阿里云国内75折 回扣 微信号:monov8
阿里云国际,腾讯云国际,低至75折。AWS 93折 免费开户实名账号 代冲值 优惠多多 微信号:monov8 飞机:@monov6
标签: go

“django 使用channels 搭建websocket聊天程序” 的相关文章