从零搭建一个在线聊天室
整体技术栈
flask 框架
flask_login 的使用
WebSocket 简单应用
Redis 应用
flask_socketio 的使用
搭建权限框架
还是使用 Flask 来搭建后台应用
从零搭建一个在线聊天室
整体技术栈
-
flask 框架
-
flask_login 的使用
-
WebSocket 简单应用
-
Redis 应用
-
flask_socketio 的使用
搭建权限框架
还是使用 Flask 来搭建后台应用,使用 flask-login 扩展来处理用户登陆鉴权逻辑。 首先定义登陆表单
class LoginForm(FlaskForm):
username = StringField('Username', validators=[DataRequired(), ])
password = PasswordField('Password', validators=[DataRequired()])
remember_me = BooleanField('Keep me logged in')
submit = SubmitField('Log in')
接下来定义数据库结构
class User(UserMixin, db.Model):
__tablename__ = 'users'
id = db.Column(db.Integer, primary_key=True)
username = db.Column(db.String(64), unique=True, index=True)
password = db.Column(db.String(64))
当前,我们只需要一个 user 用户表,只包含三个字段的简单表。用户密码也只是简单的保存了明文,后面再处理用户密码的 hash 问题。
下面就可以定义用户登陆表单
```
from flask_login import LoginManager
login_manager = LoginManager()
login_manager.session_protection = 'strong'
login_manager.login_view = 'login'
app = Flask(
name
)
login_manager.init_app(app)
app.config['SECRET_KEY'] = 'hardtohard'
@login_manager.user_loader
def load_user(user_id):
return User.query.get(int(user_id))
@app.route('/login', methods=['GET', 'POST'])
def login():
form = LoginForm()
if form.validate_on_submit():
user = User.query.filter_by(username=form.username.data).first()
if user:
login_user(user)
return redirect(url_for('chat'))
return render_template('login.html', form=form)
```
这里定义了,只检查用户名是否存在,如果存在,就执行 login_user() 函数,登陆。用户密码的使用,也留到后面再做处理。
其中 load_user,是回调函数,将获取到的 user 对象存储到浏览器的 session 中,然后在调用 login_user 函数时,就会调用 load_user 来把真正需要登陆的用户设置到 session 中。当登陆成功后,就会跳转到 chat 函数所对应的页面。
chat 函数比较简单,只是展示一个网页
@app.route('/chat', methods=['GET', 'POST'])
@login_required
def chat():
return render_template('chat.html')
使用 login_required 装饰器,保证该函数只允许登陆的用户访问。
增加些初始化函数
```
@app.route('/adddb', methods=['GET', 'POST'])
def addbd():
db.create_all()
return "OK"
@app.route('/deldb', methods=['GET', 'POST'])
def delbd():
db.drop_all()
return "OK"
@app.route('/adduser/
', methods=['GET', 'POST'])
def adduser(user):
user = User(username=user, password='admin')
db.session.add(user)
db.session.commit()
return "OK"
```
增加了初始化数据库和新增用户的函数。
构建前端页面
首先处理登陆页面,在 login.html 中添加
```
{% extends "bootstrap/base.html" %}
{% import "bootstrap/wtf.html" as wtf %}
{% block title %}Flasky{% endblock %}
{% block navbar %}
{% if current_user.is_authenticated %}
-
Logout
{% else %}
-
Login
{% endif %}
{% endblock %}
{% block content %}
{{ wtf.quick_form(form) }}
{% endblock %}
```
使用扩展库 flask_bootstrap 来快速构建页面。
下面重点来看看 chat 页面。 首先来看看主体页面,在 chat.html 中填入代码
```
{% extends 'bootstrap/base.html' %}
{% import "bootstrap/wtf.html" as wtf %}
{% block title %}Kung Fu Realm{%endblock %}
{% block head %}
Hi Hi 聊天室
{% endblock %}
{% block content %}
{% if current_user.is_authenticated %}
{% else %}
{% endif %}
小HI
{% if current_user.is_authenticated %}
-
退出登陆
{% else %}
-
登录
{% endif %}
{% if current_user.is_authenticated %}
Welcome to Hihi Chat Room. 欢迎来到 Hihi 聊天室。
小黄鸭
{% else %}
{% endif %}
{% if current_user.is_authenticated %}
{% else %}
{% endif %}
群聊
{% if current_user.is_authenticated %}
{% else %}
{% endif %}
```
整体效果如下,是不是挺少女系的。
![](https://www.writebug.com/myres/static/uploads/2022/1/8/bb2b570e65a687a7c4f07c0047d1afa8.writebug)
当用户在点击“提交”按钮后,调用 JS 函数
```
/*用户登陆的用户点击提交按钮发送消息按钮*/
$('#sub_but_login').click(function(event){
sendMessageLogin(event, fromname, to_uid, to_uname);
});
```
为了后面便于扩展,将未登录的用户特别区分开来,后面也许同样允许未登陆用户访问该页面,但是只能同机器人小黄鸭聊天
```
/*用户未登陆的用户点击提交按钮发送消息按钮*/
$('#sub_but').click(function(event){
sendMessage(event, fromname, to_uid, to_uname);
});
```
在来看函数 sendMessageLogin
```
function sendMessageLogin(event, from_name, to_uid, to_uname){
var msg = $("#message").val();
var myDate = new Date();
var myTime = myDate.toLocaleTimeString();
var itTime = myDate.toLocaleString();
//var iTime = myDate.toDateString();
var htmlData = '
'
+ '
{% if current_user.is_authenticated %}
{% endif %}
'
+ '
'
+ '
' + msg + '
'
+ '
' + from_name + ' · ' + itTime +'
'
+ '
'
+ '
';
$("#message_box").append(htmlData);
$('#message_box').scrollTop($("#message_box")[0].scrollHeight + 20);
$("#message").val('');
setTimeout(function(){sendToServer(from_name, msg)}, 1000); //延时调用
}
```
> 接收几个参数,然后将当前会话消息追加到 HTML 页面中,并且调用真正的后台 API 函数 sendToServer
```
function sendToServer(name, msg){
var xmlhttp = new XMLHttpRequest();
var myDate = new Date();
//var myTime = myDate.toLocaleTimeString();
var myTime = myDate.toLocaleString();
xmlhttp.onreadystatechange=function() {
if (xmlhttp.readyState == 4 && xmlhttp.status == 200) {
myObj = xmlhttp.responseText;
var htmlData2 = '
'
+ '
'
+ '
'
+ '
' + myObj + '
'
+ '
' + '小黄鸭' + ' · ' + myTime +'
'
+ '
'
+ '
';
$("#message_box").append(htmlData2);
$('#message_box').scrollTop($("#message_box")[0].scrollHeight + 20);
}
}
xmlhttp.open("GET", "/api/sendchat/" + msg, true);
xmlhttp.send();
};
```
> sendToServer 函数调用后台 API,并把返回接收到的消息回写到 HTML 页面中。
而目前的后台 API 也比较简单,直接返回用户输入的消息
```
@app.route('/api/sendchat/
', methods=['GET', 'POST'])
@login_required
def send_chat(info):
return info
```
这样,一个整体的聊天室架子就搭建好了,接下来我们再接入 Redis,来实现聊天功能。
### 应用 Redis
我这里使用 Redis 来作为后端数据存储工具。大家如果有自己的 Redis 服务器当然是最好了,如果没有的话,推荐下在线的 Redis 免费应用 redislabs,大家可以自行体验下,[redislabs.com/](https://link.juejin.cn?target=https%3A%2F%2Fredislabs.com%2F)
下面连接到 Redis 服务器并打开连接池
```
pool = redis.ConnectionPool(host='redis-12143.c8.us-east-1-3.ec2.cloud.redislabs.com', port=12143,
decode_responses=True, password='pkAWNdYWfbLLfNOfxTJinm9SO1')
r = redis.Redis(connection_pool=pool)
```
Redis 中数据结构及用法如下: chat-{ChatRoomName},聊天室及加入的用户,zset 类型 msg-{ChatRoomName},每个聊天室对应的消息,zset 类型
当前结构比较简单,暂时只定义了两个域,分别用来存储聊天室和消息。
### 完善 chat 视图功能
在前面的代码中,chat 视图函数仅仅是返回了一个 HTML 页面,并没有任何功能逻辑,现在要完善下。最新的代码如下:
```
@app.route('/chat', methods=['GET', 'POST'])
@login_required
def chat():
rname = request.args.get('rname', "")
ulist = r.zrange("chat-" + rname, 0, -1)
messages = r.zrange("msg-" + rname, 0, -1, withscores=True)
msg_list = []
for i in messages:
msg_list.append([json.loads(i[0]), time.strftime("%Y/%m/%d %p%H:%M:%S", time.localtime(i[1]))])
return render_template('chat.html', rname=rname, user_list=ulist, msg_list=msg_list)
```
> 其中 rname 是其他函数传值过来的,我们后面再说。 r.zrange() 函数就是从 Redis 中取出对应聊天室的用户列表和历史聊天记录,最后就是把相关的信息返回到模板中。
### 创建及加入聊天室
在 chat 视图中,我们传入了一个 rname 字段,这个字段就是当创建或者加入聊天室时,需要传递过来的。
#### 创建聊天室
```
@app.route('/createroom', methods=["GET", 'POST'])
@login_required
def create_room():
rname = request.form.get('chatroomname', '')
if r.exists("chat-" + rname) is False:
r.zadd("chat-" + rname, current_user.username, 1)
return redirect(url_for('chat', rname=rname))
else:
return redirect(url_for('chat_room_list'))
```
> 判断聊天室名称是否存在,如果不存在,则将当前用户在 Redis 中创建并跳转至 chat 函数;否则跳转至聊天室列表页面。
#### 加入聊天室
```
@app.route('/joinroom', methods=["GET", 'POST'])
@login_required
def join_chat_room():
rname = request.args.get('rname', '')
if rname is None:
return redirect(url_for('chat_room_list'))
r.zadd("chat-" + rname, current_user.username, time.time())
return redirect(url_for('chat', rname=rname))
```
> 这里是从前端获取到聊天室名称(rname),并将当前用户名加入到对应的聊天室中。
到这里,Redis 中的聊天室就处理完成了,下面再来看看其他的一些辅助功能。
### 一些辅助功能
#### 一、聊天室列表
既然有加入聊天室的功能,那么就要提供一个列表供用户选择聊天室。
后台逻辑代码:
```
@app.route('/roomlist', methods=["GET", 'POST'])
@login_required
def chat_room_list():
roomlist_tmp = r.keys(pattern='chat-*')
roomlist = []
for i in roomlist_tmp:
i_str = str(i, encoding='utf-8')
istr_list = i_str.split('-', 1)
roomlist.append(istr_list[1])
return render_template('chatroomlist.html', roomlist=roomlist)
```
> 比较简单,到 Redis 中拿到所有以“chat-”开头的 key 值,然后处理成列表返回到前端即可。
前台页面代码:
```
{% extends "bootstrap/base.html" %}
{% import "bootstrap/wtf.html" as wtf %}
{% block title %}Flasky{% endblock %}
{% block navbar %}
{% if current_user.is_authenticated %}
-
Logout
{% else %}
-
Login
{% endif %}
{% endblock %}
{% block content %}
{% endblock %}
```
> 就是循环渲染列表数据,和一个创建聊天室的表单。
#### 二、退出操作
当用户退出登陆时,我们当前也希望该用户同时退出聊天室,所以修改 logout 函数如下:
```
@app.route('/logout')
@login_required
def logout():
rname = request.args.get("rname", "")
r.zrem("chat-" + rname, current_user.username)
logout_user()
return redirect(url_for('login'))
```
> 从前端拿到聊天室的名字,并在 Redis 的对应 zset 中删除当前用户。
#### 三、用户头像
为了聊天室的美观,不同用户需要拥有不同的头像,这里还是使用 gravatar 这个免费的头像服务。 在 User 模型中添加代码:
```
class User(UserMixin, db.Model):
__tablename__ = 'users'
id = db.Column(db.Integer, primary_key=True)
username = db.Column(db.String(64), unique=True, index=True)
password = db.Column(db.String(64))
avatar_hash = db.Column(db.String(32))
def gravatar(self, name=None, size=100, default='identicon', rating='g'):
if request.is_secure:
url = 'https://secure.gravatar.com/avatar'
else:
url = 'http://www.gravatar.com/avatar'
if name is not None:
email = name + "@hihichat.com"
else:
email = self.username + "@hihichat.com"
myhash = self.avatar_hash or hashlib.md5(email.encode('utf-8')).hexdigest()
return '{url}/{hash}?s={size}&d={default}&r={rating}'.format(url=url, hash=myhash, size=size,
default=default, rating=rating)
```
> gravatar 函数,对于登陆的用户,使用其注册的邮箱来生成头像,对于未登录用户,这里就使用一个固定的邮箱来处理。
### 消息推送逻辑
下面就开始编写最主要的消息推送逻辑。 我采用的技术是 WebSocket,这样节省了使用 AJAX 轮询带来的额外开销。而且 flask 框架也有很好的 WebSocket 相关的扩展库供我们使用,即 flask-sokcetio。
首先安装好 flask_socketio 模块,然后引入并初始化
```
from flask_socketio import SocketIO, emit
socketio = SocketIO()
app = Flask(__name__)
socketio.init_app(app)
```
编写一个 socket 发送消息的函数
```
def socket_send(data, user):
emit("response", {"code": '200', "msg": data, "username": user}, broadcast=True, namespace='/testnamespace')
socketio.on_event('request_for_response', socket_send, namespace='/testnamespace')
```
> 其中 request_for_response,response 和 testnamespace 都需要和前端代码相对应。request_for_response 是用来接收前端传递到后端的消息,response 是后端传递消息到前端时的标识,而 namespace 则类似于作用域的概念,相互传递的消息都仅仅作用在 testnamespace 这个 namespace 中。
前端 JavaScript 代码:
```
//websocket
var websocket_url = 'ws://' + document.domain + ':' + location.port + '/testnamespace';
var socket = io.connect(websocket_url);
//发送消息到后端
socket.emit('request_for_response',{'param':'{{rname}}'});
//监听回复的消息
socket.on('response',function(data){
var myDate = new Date();
var myTime = myDate.toLocaleString();
var msg = data.msg;
var username = data.username;
var currentuser = '{{ current_user.username }}';
console.log(currentuser);
if ( currentuser == username )
{
username = '你';
};
var hash = md5(username + "@hihichat.com");
var htmlData2 =
'
'
+ '
'
+ '
'
+ '
' + msg + '
'
+ '
' + username + ' · ' + myTime +'
'
+ '
'
+ '
';
$("#message_box").append(htmlData2);
$('#message_box').scrollTop($("#message_box")[0].scrollHeight + 20);
});
```
> 关于更多的 WebSocket 用法,大家可以自行查找相关资料,这里就不做过多介绍了。
最后,编写接收聊天内容的 API
```
@app.route('/api/sendchat/
', methods=['GET', 'POST'])
@login_required
def send_chat(info):
rname = request.form.get("rname", "")
body = {"username": current_user.username, "msg": info}
r.zadd("msg-" + rname, json.dumps(body), time.time())
socket_send(info, current_user.username)
return info
```
> 将接收到的聊天内容插入到对应的 Redis 中(msg-*),然后调用 WebSocket 函数,广播刚刚收到的消息到所有已经连接的 socket 客户端。
### 效果图展示
登陆页面:
![](https://www.writebug.com/myres/static/uploads/2022/1/8/1e1967d7bb5dd97b99a7ae543e936538.writebug)
index 页面:
![](https://www.writebug.com/myres/static/uploads/2022/1/8/1e1967d7bb5dd97b99a7ae543e936538.writebug)
聊天室列表页面:
![](https://www.writebug.com/myres/static/uploads/2022/1/8/bb3971e9ef02bd675d8bf68e8044e853.writebug)
聊天室页面:
![](https://www.writebug.com/myres/static/uploads/2022/1/8/100385db6f463066eea764cc509ff666.writebug)
到此为止,其实我们已经完成了一个简单的聊天室功能。但是呢,该程序还有很多功能需要优化,比如程序代码结构(当前所有后台逻辑代码都在一个文件中),聊天室控制(禁言,踢人等),以及非登陆用户的处理,还要比较有意思的聊天机器人等待。
---
这里开始,就是一些进阶的功能,完善我们的聊天室。
### 调整项目结构
随着我们项目功能越来越多,把所有的逻辑代码都写在一个文件里已经不太合适了,下面就通过 flask 的工厂模式,把项目代码拆分开。
首先来看下拆分后的项目结构:
![](https://www.writebug.com/myres/static/uploads/2022/1/8/45a0351212247f57006e364fefc7d586.writebug)
main 中主要存放后台逻辑代码。 static 中存放 js,CSS 以及用到的图片等。 templates 中存放 HTML 模板。 models.py 中是数据库模型。 config.py 中是一些公共的配置信息。 manage.py 中是项目的启动信息。
下面我们分别来看看各个模块对应的代码
### 具体代码拆分
#### 1. 配置信息
在 config.py 中,填入代码:
```
import os
import redis
basedir = os.path.abspath(os.path.dirname(__file__))
class Config:
SECRET_KEY = 'hardtohard'
SQLALCHEMY_DATABASE_URI = 'sqlite:///' + os.path.join(basedir, 'chat.sqlite3')
@staticmethod
def init_app(app):
pass
class DevelopmentConfig(Config):
pass
class TestingConfig(Config):
pass
class ProductionConfig(Config):
pass
config = {
'development': DevelopmentConfig,
'testing': TestingConfig,
'production': ProductionConfig,
'default': DevelopmentConfig
}
```
#### 2. 使用工厂函数
在 app/_*init*_.py 中填入代码:
```
from flask import Flask
from flask_login import LoginManager
from flask_sqlalchemy import SQLAlchemy
from flask_bootstrap import Bootstrap
from flask_socketio import SocketIO
from config import config
login_manager = LoginManager()
login_manager.session_protection = 'strong'
login_manager.login_view = 'main.login'
db = SQLAlchemy()
bootstrap = Bootstrap()
socketio = SocketIO()
def create_app(config_name):
app = Flask(__name__)
app.config.from_object(config[config_name])
config[config_name].init_app(app)
socketio.init_app(app)
login_manager.init_app(app)
db.init_app(app)
bootstrap.init_app(app)
# 注册蓝本
from .main import main as main_blueprint
app.register_blueprint(main_blueprint)
return app
```
> create_app 函数就是程序的工厂函数,它接受一个配置名的参数。
#### 3. 使用蓝本
蓝本和程序类似,也可以定义路由。不同的是,在蓝本中定义的路由处于休眠状态,直到蓝本注册到程序上后,路由才真正成为程序的一部分。
在 main/_*init*_.py 中创建蓝本
```
from flask import Blueprint
main = Blueprint('main', __name__)
from . import views, forms
```
> 通过实例化一个 Blueprint 类对象可以创建蓝本。这个构造函数有两个必须指定的参数: 蓝本的名字和蓝本所在的包或模块。和程序一样,大多数情况下第二个参数使用 Python 的 _*name*_ 变量即可。
#### 4. 修改 view 视图
对于视图函数,需要导入相关的包,同时由于使用了蓝本,原来用来装饰路由的 app.route 都要修改为 main.route,url_for 函数也需要增加 main 作用域,修改后的部分代码如下:
```
from flask import render_template, redirect, url_for, request
from flask_login import login_required, login_user, logout_user, current_user
from . import main
from .. import db
from .forms import LoginForm
from ..models import User
from config import config
import time
import json
from ..socket_conn import socket_send
pool = redis.ConnectionPool(host='redis-12143.c8.us-east-1-3.ec2.cloud.redislabs.com', port=12143,
decode_responses=True, password='pkAWNdYWfbLLfNOfxTJinm9SO16eSJFx')
r = redis.Redis(connection_pool=pool)
@main.route('/login', methods=['GET', 'POST'])
def login():
form = LoginForm()
if form.validate_on_submit():
user = User.query.filter_by(username=form.username.data).first()
if user:
login_user(user)
return redirect(url_for('main.index'))
return render_template('login.html', form=form)
@main.route('/createroom', methods=["GET", 'POST'])
@login_required
def create_room():
rname = request.form.get('chatroomname', '')
if r.exists("chat-" + rname) is False:
r.zadd("chat-" + rname, current_user.username, 1)
return redirect(url_for('main.chat', rname=rname))
else:
return redirect(url_for('main.chat_room_list'))
```
#### 5. 编写 socket 连接函数
在 models.py 的同级目录下创建 socket_conn.py 文件,添加代码如下:
```
from . import socketio
from flask_socketio import emit
@socketio.on('request_for_response', namespace='/testnamespace')
def socket_send(data, user):
emit("response", {"code": '200', "msg": data, "username": user}, broadcast=True, namespace='/testnamespace')
```
> 该函数供视图函数调用,广播 socket 消息。
#### 6. 完成 forms 和 models
将原来的表单代码和数据库模型代码分别拷贝到这两个文件中 forms.py
```
from wtforms import StringField, PasswordField, BooleanField, SubmitField
from wtforms.validators import DataRequired
from flask_wtf import FlaskForm
class LoginForm(FlaskForm):
username = StringField('Username', validators=[DataRequired(), ])
password = PasswordField('Password', validators=[DataRequired()])
remember_me = BooleanField('Keep me logged in')
submit = SubmitField('Log in')
```
models.py
```
from . import db
from flask_login import UserMixin
from flask import request
import hashlib
class User(UserMixin, db.Model):
__tablename__ = 'users'
id = db.Column(db.Integer, primary_key=True)
username = db.Column(db.String(64), unique=True, index=True)
password = db.Column(db.String(64))
avatar_hash = db.Column(db.String(32))
def gravatar(self, name=None, size=100, default='identicon', rating='g'):
if request.is_secure:
url = 'https://secure.gravatar.com/avatar'
else:
url = 'http://www.gravatar.com/avatar'
if name is not None:
email = name + "@hihichat.com"
else:
email = self.username + "@hihichat.com"
myhash = self.avatar_hash or hashlib.md5(email.encode('utf-8')).hexdigest()
return '{url}/{hash}?s={size}&d={default}&r={rating}'.format(url=url, hash=myhash, size=size,
default=default, rating=rating)
```
#### 7. 修改模板
把 HTML 模板里的 url_for() 函数都增加 main.,再放置到 templates 下面即可。
#### 8. 启动脚本
顶级文件夹中的 manage.py 文件用于启动程序。
```
import os
from app import create_app, socketio
app = create_app(os.getenv('FLASK_CONFIG') or 'default')
if __name__ == '__main__':
socketio.run(app, debug=True)
```
> 还是使用 socketio.run 的方式启动应用。
至此,代码拆分完毕。
### 功能增强
#### 1. 新增用户
以前我们都是使用浏览器 URL 直接新增用户的,即函数 adduser,现在我们做一个简单的页面,来规范这个操作。
定义表单
```
class CreateUserForm(FlaskForm):
username = StringField('Username', validators=[DataRequired()])
password = PasswordField('Password', validators=[DataRequired(), EqualTo('password2',
message='Password must match.')])
password2 = PasswordField('Confirm password', validators=[DataRequired()])
submit = SubmitField('Create User')
def validate_username(self, field):
if User.query.filter_by(username=field.data).first():
raise ValidationError('Username already in use.')
```
> 定义了一个函数,来校验用户名是否重复。
修改原来的视图函数 adduser
```
@main.route('/adduser', methods=['GET', 'POST'])
@login_required
def adduser():
form = CreateUserForm()
if form.validate_on_submit():
user = User(username=form.username.data, password=form.password.data)
db.session.add(user)
db.session.commit()
return redirect(url_for('main.index'))
return render_template('adduser.html', form=form)
```
还要再修改下 User 模型,因为当前保存的是明文密码,修改成使用 hash 存储。
```
@property
def password(self):
raise AttributeError('password is not a readable attribute')
@password.setter
def password(self, password):
self.password_hash = generate_password_hash(password)
def verify_password(self, password):
return check_password_hash(self.password_hash, password)
```
> 分别设置密码的只读权限,以及 hash 计算和验证功能。
接下来编写 HTML 模板
```
{% extends "bootstrap/base.html" %}
{% import "bootstrap/wtf.html" as wtf %}
{% block title %}Flasky{% endblock %}
{% block navbar %}
{% if current_user.is_authenticated %}
-
Logout
{% else %}
-
Login
{% endif %}
{% endblock %}
{% block content %}
{{ wtf.quick_form(form) }}
{% endblock %}
```
至此,一个简单的新增用户功能就好了。当然,我们还可以增加删除用户,重置密码等功能,这些的具体实现,都可以在 GitHub 的代码中看到,就不再赘述了。
#### 2. 权限控制
我们其实并不希望所有人都能够创建聊天室,那么就要做一个简单的控制功能。 首先定义一个 permission 表,用来存储创建聊天室等权限,再定义一个用户和权限的关联关系表
```
class Permission(db.Model):
id = db.Column(db.Integer, primary_key=True)
permission_name = db.Column(db.String(64), unique=True, index=True)
class RelUserPermission(db.Model):
id = db.Column(db.Integer, primary_key=True)
user_id = db.Column(db.Integer)
permission_id = db.Column(db.Integer)
```
然后我们还需要一个增加权限的表,以及一个用户列表页面 在 forms.py 中添加
```
class EditUserForm(FlaskForm):
permission = SelectMultipleField('Permission', coerce=int)
submit = SubmitField('Submit')
def __init__(self, user, *args, **kwargs):
super(EditUserForm, self).__init__(*args, **kwargs)
self.permission.choices = [(per.id, per.permission_name)
for per in Permission.query.order_by(Permission.permission_name).all()]
self.user = user
```
> 定义了一个初始化函数,会获取到 Permission 表中的 name,id 等信息
接下来编写视图函数
```
@main.route('/listuser/', methods=['GET', 'POST'])
@login_required
def listuser():
user_list = User.query.all()
return render_template('listuser.html', user_list=user_list)
@main.route('/addper/', methods=['GET', 'POST'])
@login_required
def addper():
form = CreatePerForm()
if form.validate_on_submit():
per = Permission(permission_name=form.permissionname.data)
db.session.add(per)
db.session.commit()
return redirect(url_for('main.index'))
return render_template('addper.html', form=form)
@main.route('/edituser/
/', methods=['GET', 'POST'])
@login_required
def edituser(id):
user = User.query.filter_by(id=id).first()
form = EditUserForm(user=user)
if form.validate_on_submit():
for p in form.permission.data:
rup = RelUserPermission(user_id=id, permission_id=p)
db.session.add(rup)
db.session.commit()
return redirect(url_for('main.index'))
return render_template('edituser.html', form=form)
```
> 三个函数,分别是展示用户列表,增加权限,以及为用户添加权限。
然后再修改下 chat_room_list 函数,使得没有权限的用户不能展示创建聊天室的表单。
```
@main.route('/roomlist/', methods=["GET", 'POST'])
@login_required
def chat_room_list():
roomlist_tmp = r.keys(pattern='chat-*')
roomlist = []
can_create = False
create_room_id = Permission.query.filter_by(permission_name='createroom').first().id
rel_user_id = RelUserPermission.query.filter_by(user_id=current_user.id).first()
rel_permission = RelUserPermission.query.filter_by(user_id=current_user.id).first()
if rel_permission and rel_user_id and create_room_id:
rel_permission_id = rel_permission.permission_id
if rel_permission_id == create_room_id:
can_create = True
for i in roomlist_tmp:
i_str = str(i)
istr_list = i_str.split('-', 1)
roomlist.append(istr_list[1])
return render_template('chatroomlist.html', roomlist=roomlist, can_create=can_create)
```
> 这里主要是判断用户是否拥有 createroom 权限,其实还有一种更加简便,但是稍微有些绕的鉴权方式,可以在文末的链接中找到,大家也可以尝试下。
最后处理 HTML 表单
对于聊天室列表页面:
```
{% if can_create %}
{% endif %}
```
对于用户列表页面:
```
{% extends "bootstrap/base.html" %}
{% import "bootstrap/wtf.html" as wtf %}
{% block title %}Flasky{% endblock %}
{% block navbar %}
{% if current_user.is_authenticated %}
-
Logout
{% else %}
-
Login
{% endif %}
{% endblock %}
{% block content %}
{% endblock %}
```
> 这里为了方便起见,当点击用户时,就会跳转至编辑用户权限的页面。
现在,没有权限的用户,就不能看到创建聊天室的表单喽!
#### 3. 登陆优化
当前的登陆,只要用户名是正确的,不会验证密码,直接登陆成功,现在来处理下密码校验功能。其实也简单,我们在 User 模型中新增了一个函数 verify_password,只要登陆的时候,调用该函数来验证密码即可。
```
@main.route('/login/', methods=['GET', 'POST'])
def login():
form = LoginForm()
if form.validate_on_submit():
user = User.query.filter_by(username=form.username.data).first()
if user is not None and user.verify_password(form.password.data):
login_user(user)
return redirect(url_for('main.index'))
return render_template('login.html', form=form)
```
ok,密码错误的你,已经没法再登陆了。
#### 4. 放开非登陆也可进入聊天室
1.去掉 chat_room_list,join_chat_room,send_chat 和 chat 视图函数的登陆装饰器 @login_required 2.修改 chat_room_list,判断当前用户是否已经登陆
```
@main.route('/roomlist/', methods=["GET", 'POST'])
def chat_room_list():
roomlist_tmp = r.keys(pattern='chat-*')
roomlist = []
can_create = False
create_room = Permission.query.filter_by(permission_name='createroom').first()
if current_user.is_authenticated: # 判断用户是否登陆
rel_user_id = RelUserPermission.query.filter_by(user_id=current_user.id).first()
rel_permission = RelUserPermission.query.filter_by(user_id=current_user.id).first()
if rel_permission and rel_user_id and create_room:
rel_permission_id = rel_permission.permission_id
create_room_id = create_room.id
if rel_permission_id == create_room_id:
can_create = True
for i in roomlist_tmp:
i_str = str(i)
istr_list = i_str.split('-', 1)
roomlist.append(istr_list[1])
return render_template('chatroomlist.html', roomlist=roomlist, can_create=can_create)
```
3.导航栏增加 room list 入口
```
```
4.chat 视图函数增加判断逻辑
```
@main.route('/chat/', methods=['GET', 'POST'])
def chat():
rname = request.args.get('rname', "")
ulist = r.zrange("chat-" + rname, 0, -1)
messages = r.zrange("msg-" + rname, 0, -1, withscores=True)
msg_list = []
for i in messages:
msg_list.append([json.loads(i[0]), time.strftime("%Y/%m/%d %p%H:%M:%S", time.localtime(i[1]))])
if current_user.is_authenticated:
return render_template('chat.html', rname=rname, user_list=ulist, msg_list=msg_list)
else:
email = "youke" + "@hihichat.com"
hash = hashlib.md5(email.encode('utf-8')).hexdigest()
gravatar_url = 'http://www.gravatar.com/avatar/' + hash + '?s=40&d=identicon&r=g'
return render_template('chat.html', rname=rname, user_list=ulist,
msg_list=msg_list, g=gravatar_url)
```
5.修改 send_chat 视图
```
@main.route('/api/sendchat/
', methods=['GET', 'POST'])
def send_chat(info):
if current_user.is_authenticated:
rname = request.form.get("rname", "")
body = {"username": current_user.username, "msg": info}
r.zadd("msg-" + rname, json.dumps(body), time.time())
socket_send(info, current_user.username)
return info
else:
return info
```
> 当前对于未登陆的用户(游客),直接回复游客发送的消息。
### 清理过期消息
由于我们需要定时清理 Redis 中保存的聊天记录,那么就需要一个定时任务。flask 有一个完善的插件 flask-apscheduler,但是简单实验了下,限制还是挺多的,所以,我这里选择自己实现一个简单的定时器功能。 创建一个 tasks.py 文件 首先定义定时器类
```
from threading import Timer
class Scheduler(object):
def __init__(self, sleep_time, func, mytime=None):
self.sleep_time = sleep_time
self.func = func
self._t = None
self.mytime = mytime
def start(self):
if self._t is None:
self._t = Timer(self.sleep_time, self._run)
self._t.start()
else:
raise Exception("this timer is already running")
def _run(self):
if self.mytime is not None:
self.func(self.mytime)
else:
self.func()
self._t = Timer(self.sleep_time, self._run)
self._t.start()
def stop(self):
if self._t is not None:
self._t.cancel()
self._t = None
@staticmethod
def init_app(app):
pass
```
> 使用线程中的 Timer 来调用真正的函数,通过 sleep time 的方式达到定时调用的效果。
然后编写需要定时调用的函数,即清理数据的函数。
```
def keep_msg(mytime=None):
if mytime is not None:
expare_time = mytime
else:
expare_time = 604800
msg_list = r.keys("msg-*")
for msg in msg_list:
_ = r.zrange(msg, 0, 0)
for i in _:
score = r.zscore(msg, i)
if time.time() - score > expare_time:
r.zrem(msg, i)
```
> 比较简单,判断 Redis 中的 score 是否处于过期时间,是,则删除。
接下来注册函数到我们的 flask 应用当中。 在 _*init*_.py 中填入如下代码:
```
from .tasks import Scheduler, keep_msg
sch = Scheduler(86400, keep_msg) # 每间隔一天执行
def create_app(config_name):
...
sch.init_app(app)
...
return app
```
最后还要注意的是,由于我们前面是使用 socketio 来启动的应用,因为 socketio 是异步 io,而我们的 scheduler 是阻塞运行的,所以需要在 socketio 中创建子线程来启动。 修改 manage.py 如下:
```
import os
from app import create_app, socketio, sch
app = create_app(os.getenv('FLASK_CONFIG') or 'default')
if __name__ == '__main__':
my = sch.start
socketio.start_background_task(target=my) # 启动一个子线程
socketio.run(app, debug=True)
```
这样,一个简单的定时任务就做好了。
### 禁言功能
正所谓“林子大了,什么鸟都有”,当聊天室人数很多的时候,经常会出现一些不和谐的话语和人,那么禁言功能就很有必要了。
首先在 views 中创建一个新的函数
```
@main.route('/chat/block/roomuser/', methods=['GET', 'POST'])
@login_required
def block_roomuser():
rname = request.args.get('rname', "")
new_b_user = request.args.get('b_user', "")
b_time = request.args.get('b_time', "")
if b_time is "":
r.set('b_user-' + new_b_user, new_b_user, ex=None)
else:
r.set('b_user-' + new_b_user, new_b_user, ex=b_time)
return redirect(url_for('main.room_user_list', rname=rname))
```
> 从前端获取到对应的聊天室名字、需要禁言的用户和禁言时间,然后根据禁言时间,把用户添加到 Redis 中。
再来看看禁言功能的入口函数
```
@main.route('/chat/roomuser/list', methods=['GET', 'POST'])
@login_required
def room_user_list():
rname = request.args.get('rname', "")
ulist = r.zrange("chat-" + rname, 0, -1)
b_user = r.keys('b_user-*')
b_user_list = []
for b in b_user:
b_user_list.append(r.get(b))
return render_template('roomuser_list.html', ulist=ulist, rname=rname, b_user=b_user_list)
```
> 从 Redis 对应的有序集合中取出正处于禁言状态的用户,把这些用户传递到模板供渲染使用。
对应的 roomuser_list.html 代码为:
```
{% for user in ulist %}
{% if user in b_user %}
禁言中。。。
{% endif %}
解禁
踢出
{% endfor %}
```
> 方便起见,直接使用 bootstrap 框架渲染页面。同时这里取了个巧,在“解禁”的时候,只是传入 b_time 为 1,这样 1 秒之后,用户就自动从 Redis 中过期了,也就成功解禁了。
最后,再来处理聊天室的消息,禁言的用户,当然不能再发消息啦。
在 chat 函数中,添加代码:
```
@main.route('/chat/', methods=['GET', 'POST'])
def chat():
...
b_user = r.keys('b_user-*')
b_user_list = []
for b in b_user:
b_user_list.append(r.get(b))
...
if current_user.is_authenticated:
return render_template('chat.html', rname=rname, user_list=ulist, msg_list=msg_list,
b_user_list=b_user_list)
```
> 把处于禁言的用户取出,传递给模板。
在 send_chat 函数中添加代码:
```
@main.route('/api/sendchat/
', methods=['GET', 'POST'])
def send_chat(info):
...
b_user = r.exists('b_user-%s' % current_user.username)
if b_user:
data = json.dumps({'code': 201, 'msg': 'Your are under block now!'})
return data
...
```
> 如果用户处于禁言状态,直接返回 JSON 消息。
修改 chat.html 中的 JavaScript 函数 sendToServer,增加代码如下:
```
var jsondata = JSON.parse(myObj);
if ( jsondata.code == 201 || jsondata.code == 403) {
var htmlData3 = '
'
+ '
'
+ '
'
+ '
' + "自动回复: " + jsondata.msg + '
'
+ '
' + '小黄鸭' + ' · ' + myTime +'
'
+ '
'
+ '
';
$("#message_box").append(htmlData3);
$('#message_box').scrollTop($("#message_box")[0].scrollHeight + 20);
}
```
> 判断返回的 JSON 中 code 值如果是 201 或 403,则由小黄鸭自动回复消息。
最后的效果如下:
![](https://www.writebug.com/myres/static/uploads/2022/1/8/a7811015d7ce186df986c61e79461565.writebug)
### 踢人功能
如果在聊天室中,这个人真的让人忍无可忍,那么踢人就是最好的办法了。 其实实现思想和逻辑都和禁言相类似,这里直接给出部分代码
新增函数 kick_roomuser
```
@main.route('/chat/kick/roomuser/', methods=['GET', 'POST'])
@login_required
def kick_roomuser():
rname = request.args.get("rname", "")
del_user = request.args.get("del_user", "")
r.zrem("chat-" + rname, del_user)
return redirect(url_for('main.room_user_list', rname=rname))
```
修改 send_chat 函数
```
@main.route('/api/sendchat/
', methods=['GET', 'POST'])
def send_chat(info):
...
if current_user.is_authenticated:
rname = request.form.get("rname", "")
ulist = r.zrange("chat-" + rname, 0, -1)
if current_user.username in ulist:
body = {"username": current_user.username, "msg": info}
r.zadd("msg-" + rname, json.dumps(body), time.time())
socket_send(info, current_user.username)
data = json.dumps({'code': 200, 'msg': info})
return data
else:
data = json.dumps({'code': 403, 'msg': 'You are not in this room'})
return data
else:
data = json.dumps({'code': 202, 'msg': info})
return data
```
最后效果如下
![](https://www.writebug.com/myres/static/uploads/2022/1/8/1cd0463cbf7b853d826f758e804e399c.writebug)
### 对接聊天机器人
当前,如果用户没有登陆,是无法和其他人聊天的。那么一个友好的聊天机器人就非常有必要了。我们可以使用免费的图灵聊天机器人,当然也可以自己训练一个。以前我也写过一篇关于如何训练聊天机器人,感兴趣的小伙伴儿可以到我的公众号里查看(萝卜大杂烩)。
在这里也直接复用以前部署的 API 了,只需要增加几行代码即可 修改 send_chat 函数
```
@main.route('/api/sendchat/
', methods=['GET', 'POST'])
def send_chat(info):
...
else:
base_url = 'http://luobodazahui.top:8889/api/chat/'
chat_text = requests.get(base_url + info).text
return chat_text
```
> 在函数中调用聊天机器人的 API 地址,将返回的内容传递给前端即可。
最终的效果如下:
![](https://www.writebug.com/myres/static/uploads/2022/1/8/a1180717ab1c766be2c170e5143e2d1c.writebug)
---
最后的最后,我们再来看看如何部署上线呢,毕竟没有部署到公网的 Web 服务,都是啥啥啥
首先,你得有一个公网服务器,云主机之类的。 然后就是一顿折腾,各种安装了。我的云主机是 CentOS 7.5,下面的一切操作都默认是这个操作系统了。
在 CentOS 7.5 上安装 docker
```
curl -fsSL https://get.docker.com
```
安装 Redis
```
docker pull redis
```
启动 Redis
```
docker run --name=myredis -p 6379:6379 -v /home/redis-data:/data -d redis redis-server --appendonly yes
```
在本机连接 Redis,测试一下
```
docker exec -it 491f1051715a redis-cli
```
安装 python3 源码安装 安装编译软件
```
yum -y groupinstall "Development tools"
yum -y install zlib-devel bzip2-devel openssl-devel ncurses-devel sqlite-devel readline-devel tk-devel gdbm-devel db4-devel libpcap-devel xz-devel
yum install libffi-devel -y
```
下载源码包并解压
```
wget https://www.python.org/ftp/python/3.7.1/Python-3.7.1.tar.xz
tar Jxvf Python-3.7.1.tar.xz
```
编译安装
```
cd Python-3.7.1
./configure --prefix=/usr/local/python3
make && make install
```
创建软连接
```
ln -s /usr/local/python3/bin/python3 /usr/local/bin/python3
ln -s /usr/local/python3/bin/pip3 /usr/local/bin/pip3
```
验证
```
python3 -V
pip3 -V
```
安装 chatterbot
> 由于使用 pip 直接安装 chatterbot,一直报错,所以我这里采用 conda 来安装。
首先到 conda 官网下载好安装脚本,然后一键安装即可。
```
sh Miniconda3-latest-Linux-x86_64.sh
```
之后就可以使用 conda 的 pip 命令来安装 chatterbot 了。
```
pip install --upgrade chatterbot
pip install chatterbot_corpus
```
安装 gevent 和 gunicorn
```
pip install gevent
pip install gunicorn
```
以上,所有的安装准备工作基本完成了!
下面就很简单了呀
首先编写 gunicorn 启动脚本
```
debug = True
loglevel = 'debug'
bind = '0.0.0.0:5000'
logfile = '/home/mychat/online_chat/log/debug.log'
workers = 1
worker_class = 'eventlet'
reload = True
```
> 使用 gunicorn 启动 flask_socketio,貌似还不能很好的启动多进程,这部分,留待以后再研究。
然后再写一个程序启动脚本
```
/root/miniconda3/bin/gunicorn -D -c /home/mychat/online_chat/gunicorn manage:app
```
最后,运行 run.sh 脚本,然后使用 ps -ef|grep python 来查看是否有进程存在
![](https://www.writebug.com/myres/static/uploads/2022/1/8/3c62f1b87ae2d5dc1e8f9437c99a8e05.writebug)
如上图所示,说明我们的程序已经启动成功了,现在让我们来访问[ http://www.{hostip}:5000],不出意外的话,已经可以正常访问了。
[原文链接](https://juejin.cn/post/6844903880573059085)
参考文献
- 网上论坛系统设计与实现(西安电子科技大学·胡秉玺)
- 手机软酷网即时通讯软件的设计与实现(电子科技大学·齐迎旭)
- 企业内部即时通讯系统的设计与实现(内蒙古大学·王慧平)
- 基于移动平台的SNS系统的设计与实现(电子科技大学·高原)
- 一个通用论坛系统的设计与实现(山东大学·张正)
- 基于Web的在线交流平台的开发技术研究与应用(燕山大学·卢仕伟)
- 基于Hadoop的分布式数据存储设计与实现(吉林大学·毛剑)
- 基于Web的信息发布与信息交流平台的设计与实现(吉林大学·许昭霞)
- 企业级即时通讯系统设计与实现(华南理工大学·余春贵)
- 一个通用论坛系统的设计与实现(山东大学·张正)
- 基于B/S架构的酷跑社区系统的设计与实现(内蒙古大学·张晓乐)
- 基于Web的企业即时通讯系统的设计与实现(河北科技大学·张艳芳)
- 基于Web的在线交流平台的开发技术研究与应用(燕山大学·卢仕伟)
- 基于SSH的大学生联谊交友管理系统设计与实现(华中科技大学·王海波)
- 基于redis的分布式自动化爬虫的设计与实现(华中科技大学·曾胜)
本文内容包括但不限于文字、数据、图表及超链接等)均来源于该信息及资料的相关主题。发布者:源码港湾
,原文地址:https://bishedaima.com/yuanma/35929.html