python-flask + mysql实现带登录功能的简易文件服务器
需求
登录功能,文件上传功能,上传文件管理(删除)功能,免登录文件下载功能
实现
sql部分
需要两个sql表,分别完成用户信息存放、文件信息存放。因为做演示使用,密码就使用明文存放了。
CREATE TABLE users ( id INT AUTO_INCREMENT PRIMARY KEY, username VARCHAR(255) NOT NULL, password VARCHAR(255) NOT NULL); CREATE TABLE files ( id INT AUTO_INCREMENT PRIMARY KEY, filename VARCHAR(255) NOT NULL, filepath VARCHAR(255) NOT NULL);
user表存放用户名、密码,files表存放文件名与存储路径。
python部分
mysql连接与cursor获取
# MySQL连接配置 db = mysql.connector.connect( host='127.0.0.1', user='filemanager', password='password', database='filemanager' ) # 创建一个游标对象 cursor = db.cursor()
登录功能
@app.route('/', methods=['GET', 'POST'])
def login():
if request.method == 'POST':
username = request.form['username']
password = request.form['password']
# 在数据库中验证用户名和密码
cursor.execute("SELECT * FROM users WHERE username=%s AND password=%s", (username, password))
user = cursor.fetchone()
if user:
# 如果用户验证成功,将其存储在会话中
session['username'] = user[1]
flash('登录成功!')
return redirect(url_for('dashboard'))
else:
flash('用户名或密码错误!')
db.close()
return render_template('login.html')面板与上传文件功能
@app.route('/dashboard', methods=['GET', 'POST'])
def dashboard():
if 'username' in session:
if request.method == 'POST':
if 'file' not in request.files:
flash('未选择文件!')
return redirect(request.url)
file = request.files['file']
if file.filename == '':
flash('未选择文件!')
return redirect(request.url)
if file and allowed_file(file.filename):
filename = secure_filename(file.filename)
file.save(os.path.join(app.config['UPLOAD_FOLDER'], filename))
# 将文件链接存储到数据库中
cursor.execute("INSERT INTO files (filename, filepath) VALUES (%s, %s)",
(filename, os.path.join(app.config['UPLOAD_FOLDER'], filename)))
db.commit()
flash('文件上传成功!')
# 从数据库中获取上传的文件列表
cursor.execute("SELECT * FROM files")
files = cursor.fetchall()
db.close()
return render_template('dashboard.html', username=session['username'], files=files)
else:
db.close()
return redirect(url_for('login'))删除文件功能
@app.route('/delete_file/<int:file_id>', methods=['GET'])
def delete_file(file_id):
if 'username' in session:
# 从数据库中删除文件记录
cursor.execute("SELECT * FROM files WHERE id=%s", (file_id,))
file = cursor.fetchone()
if file:
cursor.execute("DELETE FROM files WHERE id=%s", (file_id,))
db.commit()
# 删除文件
file_path = file[2]
os.remove(file_path)
flash('文件删除成功!')
else:
flash('文件不存在!')
db.close()
return redirect(url_for('dashboard'))文件下载功能
@app.route('/uploads/<filename>', methods=['GET'])
def download(filename):
if 1:
file_path = os.path.join(app.config['UPLOAD_FOLDER'], filename)
if os.path.exists(file_path):
return send_from_directory(app.config['UPLOAD_FOLDER'], filename, as_attachment=True)
else:
flash('文件不存在!')
return redirect(url_for('dashboard'))完整代码
run.py
from flask import Flask, render_template, request, redirect, url_for, flash,send_from_directory
from flask import session
from werkzeug.utils import secure_filename
import os
import mysql.connector
app = Flask(__name__)
app.secret_key = 'secret_key' # 替换为一个随机的密钥
# MySQL连接配置
db = mysql.connector.connect(
host='127.0.0.1',
user='filemanager',
password='l2',
database='filemanager'
)
# 创建一个游标对象
cursor = db.cursor()
# 设置允许上传的文件类型
ALLOWED_EXTENSIONS = {'txt', 'pdf', 'png', 'jpg', 'jpeg', 'gif','zip','rar'}
# 配置文件上传路径
UPLOAD_FOLDER = 'uploads'
app.config['UPLOAD_FOLDER'] = UPLOAD_FOLDER
app.config['SECRET_KEY'] = 'secret_key'
if not os.path.exists(app.config['UPLOAD_FOLDER']):
os.makedirs(app.config['UPLOAD_FOLDER'])
def allowed_file(filename):
return '.' in filename and \
filename.rsplit('.', 1)[1].lower() in ALLOWED_EXTENSIONS
@app.route('/', methods=['GET', 'POST'])
def login():
if request.method == 'POST':
username = request.form['username']
password = request.form['password']
# 在数据库中验证用户名和密码
cursor.execute("SELECT * FROM users WHERE username=%s AND password=%s", (username, password))
user = cursor.fetchone()
if user:
# 如果用户验证成功,将其存储在会话中
session['username'] = user[1]
flash('登录成功!')
return redirect(url_for('dashboard'))
else:
flash('用户名或密码错误!')
db.close()
return render_template('login.html')
@app.route('/dashboard', methods=['GET', 'POST'])
def dashboard():
if 'username' in session:
if request.method == 'POST':
if 'file' not in request.files:
flash('未选择文件!')
return redirect(request.url)
file = request.files['file']
if file.filename == '':
flash('未选择文件!')
return redirect(request.url)
if file and allowed_file(file.filename):
filename = secure_filename(file.filename)
file.save(os.path.join(app.config['UPLOAD_FOLDER'], filename))
# 将文件链接存储到数据库中
cursor.execute("INSERT INTO files (filename, filepath) VALUES (%s, %s)",
(filename, os.path.join(app.config['UPLOAD_FOLDER'], filename)))
db.commit()
flash('文件上传成功!')
# 从数据库中获取上传的文件列表
cursor.execute("SELECT * FROM files")
files = cursor.fetchall()
db.close()
return render_template('dashboard.html', username=session['username'], files=files)
else:
db.close()
return redirect(url_for('login'))
@app.route('/delete_file/<int:file_id>', methods=['GET'])
def delete_file(file_id):
if 'username' in session:
# 从数据库中删除文件记录
cursor.execute("SELECT * FROM files WHERE id=%s", (file_id,))
file = cursor.fetchone()
if file:
cursor.execute("DELETE FROM files WHERE id=%s", (file_id,))
db.commit()
# 删除文件
file_path = file[2]
os.remove(file_path)
flash('文件删除成功!')
else:
flash('文件不存在!')
db.close()
return redirect(url_for('dashboard'))
@app.route('/uploads/<filename>', methods=['GET'])
def download(filename):
if 1:
file_path = os.path.join(app.config['UPLOAD_FOLDER'], filename)
if os.path.exists(file_path):
return send_from_directory(app.config['UPLOAD_FOLDER'], filename, as_attachment=True)
else:
flash('文件不存在!')
return redirect(url_for('dashboard'))
if __name__ == '__main__':
app.run(host='0.0.0.0',port=55555,debug=False)dashboard.html
<!DOCTYPE html>
<html>
<head>
<title>Dashboard</title>
</head>
<body>
<h2>Welcome, {{ username }}!</h2>
<h3>允许上传的文件类型:txt,pdf,png,jpg,jpeg,gif,zip,rar</h3>
<form method="POST" action="{{ url_for('dashboard') }}" enctype="multipart/form-data">
<div>
<input type="file" name="file">
<input type="submit" value="Upload">
</div>
</form>
<hr>
{% with messages = get_flashed_messages() %}
{% if messages %}
<ul class="messages">
{% for message in messages %}
<li>{{ message }}</li>
{% endfor %}
</ul>
{% endif %}
{% endwith %}
<h3>Uploaded Files:</h3>
<ul>
{% for file in files %}
<li>
<a href="{{ file[2] }}">{{ file[1] }}</a>
<a href="{{ url_for('delete_file', file_id=file[0]) }}">Delete</a>
</li>
{% endfor %}
</ul>
<a href="{{ url_for('login') }}">Logout</a>
</body>
</html>login.html
<!DOCTYPE html>
<html>
<head>
<title>Login</title>
</head>
<body>
{% with messages = get_flashed_messages() %}
{% if messages %}
<ul class="messages">
{% for message in messages %}
<li>{{ message }}</li>
{% endfor %}
</ul>
{% endif %}
{% endwith %}
<h2>Login</h2>
<form method="POST" action="{{ url_for('login') }}">
<div>
<label>Username:</label>
<input type="text" name="username">
</div>
<div>
<label>Password:</label>
<input type="password" name="password">
</div>
<div>
<input type="submit" value="Login">
</div>
</form>
</body>
</html>运行截图


