使用 VueJS、Flask 和 RethinkDB 实现文件控制器和简单的文件存储服务
入门
有关如何设置工作区的更多信息,请查看本系列的第一个指南:使用 VueJS、Flask 和 RethinkDB 构建简单文件存储服务的简介和设置。
文件控制器
文件控制器将用于处理文件和文件夹,因此,这里的逻辑将比我们之前的控制器略多。我们首先在/api/controllers/files.py模块中为我们的控制器创建一个样板。
import os
from flask import request, g
from flask_restful import reqparse, abort, Resource
from werkzeug import secure_filename
from api.models import File
BASE_DIR = os.path.abspath(
os.path.dirname(os.path.dirname(os.path.dirname(__file__)))
)
class CreateList(Resource):
def get(self, user_id):
pass
def post(self, user_id):
pass
class ViewEditDelete(Resource):
def get(self, user_id, file_id):
pass
def put(self, user_id, file_id):
pass
def delete(self, user_id, file_id):
pass
CreateList类,顾名思义,将用于为登录用户创建和列出文件。ViewEditDelete 类,同样,顾名思义,将用于查看、编辑和删除文件。我们在类中使用的方法与相应的 HTTP 操作相对应。
装饰器
我们将通过创建一组装饰器来开始实现,这些装饰器将在 Resource 类中的方法上使用。您需要将其分离到/api/utils/decorators.py模块中。
from jose import jwt
from jose.exceptions import JWTError
from functools import wraps
from flask import current_app, request, g
from flask_restful import abort
from api.models import User, File
def login_required(f):
'''
This decorator checks the header to ensure a valid token is set
'''
@wraps(f)
def func(*args, **kwargs):
try:
if 'authorization' not in request.headers:
abort(404, message="You need to be logged in to access this resource")
token = request.headers.get('authorization')
payload = jwt.decode(token, current_app.config['SECRET_KEY'], algorithms=['HS256'])
user_id = payload['id']
g.user = User.find(user_id)
if g.user is None:
abort(404, message="The user id is invalid")
return f(*args, **kwargs)
except JWTError as e:
abort(400, message="There was a problem while trying to parse your token -> {}".format(e.message))
return func
def validate_user(f):
'''
This decorate ensures that the user logged in is the actually the same user we're operating on
'''
@wraps(f)
def func(*args, **kwargs):
user_id = kwargs.get('user_id')
if user_id != g.user['id']:
abort(404, message="You do not have permission to the resource you are trying to access")
return f(*args, **kwargs)
return func
def belongs_to_user(f):
'''
This decorator ensures that the file we're trying to access actually belongs to us
'''
@wraps(f)
def func(*args, **kwargs):
file_id = kwargs.get('file_id')
user_id = kwargs.get('user_id')
file = File.find(file_id, True)
if not file or file['creator'] != user_id:
abort(404, message="The file you are trying to access was not found")
g.file = file
return f(*args, **kwargs)
return func
login_required装饰器用于验证用户在访问方法的功能之前是否确实登录。我们使用此装饰器通过解码令牌来确保其有效性,从而保护某些端点。我们获取存储在令牌中的id字段并尝试检索相应的用户对象。然后,还将此对象存储在g.user中,以便在方法定义中访问。
类似地,我们创建了validate_user装饰器,以确保其他登录用户无法访问标有其他用户ID的URL模式。此验证完全基于URL中的信息。
最后,belongs_to_user装饰器确保只有创建文件的用户才能访问它。该装饰器实际上会根据提供的user_id检查文件文档中的创建者字段。
以下是创建新文件和列出文件的视图:
class CreateList(Resource):
@login_required
@validate_user
@marshal_with(file_array_serializer)
def get(self, user_id):
try:
return File.filter({'creator': user_id, 'parent_id': '0'})
except Exception as e:
abort(500, message="There was an error while trying to get your files --> {}".format(e.message))
@login_required
@validate_user
@marshal_with(file_serializer)
def post(self, user_id):
try:
parser = reqparse.RequestParser()
parser.add_argument('name', type=str, help="This should be the folder name if creating a folder")
parser.add_argument('parent_id', type=str, help='This should be the parent folder id')
parser.add_argument('is_folder', type=bool, help="This indicates whether you are trying to create a folder or not")
args = parser.parse_args()
name = args.get('name', None)
parent_id = args.get('parent_id', None)
is_folder = args.get('is_folder', False)
parent = None
# Are we adding this to a parent folder?
if parent_id is not None:
parent = File.find(parent_id)
if parent is None:
raise Exception("This folder does not exist")
if not parent['is_folder']:
raise Exception("Select a valid folder to upload to")
# Are we creating a folder?
if is_folder:
if name is None:
raise Exception("You need to specify a name for this folder")
return Folder.create(
name=name,
parent=parent,
is_folder=is_folder,
creator=user_id
)
else:
files = request.files['file']
if files and is_allowed(files.filename):
_dir = os.path.join(BASE_DIR, 'upload/{}/'.format(user_id))
if not os.path.isdir(_dir):
os.mkdir(_dir)
filename = secure_filename(files.filename)
to_path = os.path.join(_dir, filename)
files.save(to_path)
fileuri = os.path.join('upload/{}/'.format(user_id), filename)
filesize = os.path.getsize(to_path)
return File.create(
name=filename,
uri=fileuri,
size=filesize,
parent=parent,
creator=user_id
)
raise Exception("You did not supply a valid file in your request")
except Exception as e:
abort(500, message="There was an error while processing your request --> {}".format(e.message))
列表方法非常简单。我们过滤表中由特定用户创建并存储在根目录中的所有文件。我们将这些数据返回给此端点,如果有任何错误,则抛出异常。
创造
对于创建操作,它有点复杂。在本指南中,我们假设文件和文件夹将使用相同的端点创建。对于文件,如果我们要将其上传到文件夹中,则需要提供文件以及 parent_id 。对于文件夹,如果我们在另一个文件夹中创建它,则需要名称和parent_id值。对于文件夹,我们还需要随请求发送is_folder字段以指定我们正在创建文件夹。
如果我们要将其存储在文件夹中,我们必须确保该文件夹存在并且是有效的文件夹。如果要创建文件夹,我们还必须确保提供名称字段。
对于文件创建,我们将文件上传到专门为不同用户命名的文件夹中,如前所述。在我们的例子中,我们对不同的用户文件目录使用模式/upload/<user_id>。我们还使用文件信息来填充我们将要存储在表中的文档。
最后,我们分别调用文件和文件夹创建的方法 - File.create()和Folder.create()。
序列化器
请注意,我们使用了Flask-RESTful 提供的marshal_with装饰器。此装饰器用于格式化响应对象并指示我们将返回的不同字段名称和类型。请参阅下面的file_array_serializer和file_serializer的定义:
file_array_serializer = {
'id': fields.String,
'name': fields.String,
'size': fields.Integer,
'uri': fields.String,
'is_folder': fields.Boolean,
'parent_id': fields.String,
'creator': fields.String,
'date_created': fields.DateTime(dt_format= 'rfc822'),
'date_modified': fields.DateTime(dt_format='rfc822'),
}
file_serializer = {
'id': fields.String,
'name': fields.String,
'size': fields.Integer,
'uri': fields.String,
'is_folder': fields.Boolean,
'objects': fields.Nested(file_array_serializer, default=[]),
'parent_id': fields.String,
'creator': fields.String,
'date_created': fields.DateTime(dt_format='rfc822'),
'date_modified': fields.DateTime(dt_format='rfc822'),
}
可以将其添加到/api/controllers/files.py模块的顶部或单独的/api/utils/serializers.py模块中。
这两个序列化器之间的区别在于,文件序列化器在响应中包含对象数组。我们对列表响应使用file_array_serializer ,而对对象响应使用file_serializer 。
我们还使用了一个名为is_allowed()的函数来帮助确保我们支持所有正在上传的文件。我们创建了一个名为ALLOWED_EXTENSIONS的列表,其中包含所有允许的扩展名的列表。
ALLOWED_EXTENSIONS = set(['txt', 'pdf', 'png', 'jpg', 'jpeg', 'gif'])
def is_allowed(filename):
return '.' in filename and \
filename.rsplit('.', 1)[1] in ALLOWED_EXTENSIONS
最后,我们通过在/api/controllers/files.py模块中添加ViewEditDelete的资源类来结束。
class ViewEditDelete(Resource):
@login_required
@validate_user
@belongs_to_user
@marshal_with(file_serializer)
def get(self, user_id, file_id):
try:
should_download = request.args.get('download', False)
if should_download == 'true':
parts = os.path.split(g.file['uri'])
return send_from_directory(directory=parts[0], filename=parts[1])
return g.file
except Exception as e:
abort(500, message="There was an while processing your request --> {}".format(e.message))
@login_required
@validate_user
@belongs_to_user
@marshal_with(file_serializer)
def put(self, user_id, file_id):
try:
update_fields = {}
parser = reqparse.RequestParser()
parser.add_argument('name', type=str, help="New name for the file/folder")
parser.add_argument('parent_id', type=str, help="New parent folder for the file/folder")
args = parser.parse_args()
name = args.get('name', None)
parent_id = args.get('parent_id', None)
if name is not None:
update_fields['name'] = name
if parent_id is not None and g.file['parent_id'] != parent_id:
if parent_id != '0'
folder_access = Folder.filter({'id': parent_id, 'creator': user_id})
if not folder_access:
abort(404, message="You don't have access to the folder you're trying to move this object to")
if g.file['is_folder']:
update_fields['tag'] = g.file['id'] if parent_id == '0' else '{}#{}'.format(folder_access['tag'], folder['last_index'])
Folder.move(g.file, folder_access)
else:
File.move(g.file, folder_access)
update_fields['parent_id'] = parent_id
if g.file['is_folder']:
Folder.update(file_id, update_fields)
else:
File.update(file_id, update_fields)
return File.find(file_id)
except Exception as e:
abort(500, message="There was an while processing your request --> {}".format(e.message))
@login_required
@validate_user
@belongs_to_user
def delete(self, user_id, file_id):
try:
hard_delete = request.args.get('hard_delete', False)
if not g.file['is_folder']:
if hard_delete == 'true':
os.remove(g.file['uri'])
File.delete(file_id)
else:
File.update(file_id, {'status': False})
else:
if hard_delete == 'true':
folders = Folder.filter(lambda folder: folder['tag'].startswith(g.file['tag']))
for folder in folders:
files = File.filter({'parent_id': folder['id'], 'is_folder': False })
File.delete_where({'parent_id': folder['id'], 'is_folder': False })
for f in files:
os.remove(f['uri'])
else:
File.update(file_id, {'status': False})
File.update_where({'parent_id': file_id}, {'status': False})
return "File has been deleted successfully", 204
except:
abort(500, message="There was an error while processing your request --> {}".format(e.message))
我们创建了一个get()方法,该方法根据 ID 返回单个文件或文件夹对象。对于文件夹,它包括列表信息。如果您查看belong_to_user装饰器,您可以看到这是如何完成的。对于文件,我们包含了一个查询参数should_download,如果我们想要下载文件,则将其设置为true 。
put ()方法负责更新文件和文件夹信息。这还包括移动文件和文件夹。移动文件是通过更新文件/文件夹的parent_id字段来触发的。两者的逻辑已在文件和文件夹模型的move()方法中介绍。
delete ()方法还带有一个查询参数,用于指定我们是否要执行硬删除。对于硬删除,记录将从数据库中删除,文件将从文件系统中删除。对于软删除,我们仅将文件状态字段更新为 false。
我们在RethinkDBModel类中创建了名为update_where()和delete_where()的新方法,用于从表中删除和更新过滤集:
@classmethod
def update_where(cls, predicate, fields):
status = r.table(cls._table).filter(predicate).update(fields).run(conn)
if status['errors']:
raise DatabaseProcessError("Could not complete the update action")
retu
免责声明:本内容来源于第三方作者授权、网友推荐或互联网整理,旨在为广大用户提供学习与参考之用。所有文本和图片版权归原创网站或作者本人所有,其观点并不代表本站立场。如有任何版权侵犯或转载不当之情况,请与我们取得联系,我们将尽快进行相关处理与修改。感谢您的理解与支持!
请先 登录后发表评论 ~