实现文件管理器并上传下载文件
code 1
python
from pathlib import Path
from pprint import pprint
from datetime import datetime
import os
import http.server
import socketserver
import shutil
from urllib.parse import unquote
from string import Template
from base64 import b64decode, b64encode
def human_readable_size(size_bytes):
size_units = ['B', 'KB', 'MB', 'GB', 'TB']
unit_index = 0
while size_bytes >= 1024 and unit_index < len(size_units) - 1:
size_bytes /= 1024
unit_index += 1
return f'{size_bytes:.2f} {size_units[unit_index]}'
def get_file_info(path_):
p = Path(path_)
p_dir = []
if os.path.exists(path_):
is_exists = True
is_file = os.path.isfile(path_)
if not is_file:
for item in p.iterdir():
p_dir.append({
'name': item.name,
'is_file': os.path.isfile(item),
'relat': str(item),
'abs': str(item.absolute()),
'suffix': item.suffix,
'stem': item.stem,
'created_at': datetime.fromtimestamp(os.path.getctime(item)).isoformat(sep=' ', timespec='seconds'),
'size': human_readable_size(item.stat().st_size),
})
else:
is_file = False
p_dir = []
else:
is_file = False
is_exists = False
return {
'path': p.name,
'parent': str(p.parent),
'is_exists': is_exists,
'is_file': is_file,
'total': len(p_dir),
'dir': p_dir
}
html = """
<!DOCTYPE html>
<html>
<head>
<meta charset="utf-8">
<title>File Share</title>
<link rel="stylesheet" href="https://cdn.bootcss.com/bootstrap/4.3.1/css/bootstrap.min.css">
<link rel="stylesheet" href="https://cdn.bootcss.com/font-awesome/4.7.0/css/font-awesome.min.css">
<script src="https://cdn.bootcss.com/bootstrap/4.3.1/js/bootstrap.min.js"></script>
<script >
function delete_file(filename) {
if (confirm('Are you sure you want to delete ' + filename + ' ? ')) {
fetch('/' + filename, {
method: 'DELETE'
}).then(response => {
if (response.ok) {
location.reload();
} else {
alert('Error deleting file');
}
}).catch(error => alert('Error deleting file'));
}
}
function upload_file() {
let file_input = document.getElementById('file-input');
let file = file_input.files[0];
let filename = file.name;
let reader = new FileReader();
reader.onload = function() {
let file_data = reader.result;
fetch('/upload', {
method: 'POST',
headers: {
'Content-Type': 'application/octet-stream',
'filename': filename
},
body: file_data
}).then(response => {
if (response.ok) {
location.reload();
} else {
alert('Error uploading file');
}
});
};
reader.readAsArrayBuffer(file);
}
</script>
</head>
<body>
<div class="container">
<h1>File Manager</h1>
<button class="btn btn-primary" onclick="location.reload()">刷新 </button>
<hr>
<span><a href="$parent_path">返回上一级</a></span>
<span>当前目录:<strong>./$current_path</strong></span>
<table class="table">
<thead>
<tr>
<th>文件名</th>
<th>类型</th>
<th>大小</th>
<th>创建时间</th>
<th>下载</th>
<th>执行</th>
</tr>
</thead>
<tbody> $file_rows </tbody>
</table>
<hr>
<form enctype="multipart/form-data" method="POST" action="/upload">
<div class="form-group">
<label for="file-input">上传文件:</label>
<input type="file" class="form-control-file" id="file-input" name="file">
</div>
<button type="button" class="btn btn-primary" onclick="upload_file()">上传</button>
</form>
<hr>
</div>
</body>
</html>
<style>
.fa-folder {
color: #0069d9;
}
.fa-file-text-o {
color: #007bff;
}
.fa-file-pdf-o {
color: #dc3545;
}
.fa-file-image-o {
color: #28a745;
}
.fa-file-archive-o {
color: #6c757d;
}
</style>
"""
file_type_icons = {
'目录': 'fa-folder',
'.txt': 'fa-file-text-o',
'.pdf': 'fa-file-pdf-o',
'.jpg': 'fa-file-image-o',
'.png': 'fa-file-image-o',
'.zip': 'fa-file-archive-o',
'.rar': 'fa-file-archive-o',
'.tar': 'fa-file-archive-o',
'.gz': 'fa-file-archive-o',
}
def get_icon_tag(file_type):
return f'<i class="fa {file_type_icons.get(file_type, "fa-file")}" aria-hidden="true"></i>'
def list_files(path_cleartext):
rows = []
path_cleartext = path_cleartext if path_cleartext else '.'
p_info = get_file_info(path_cleartext)
for p_item in p_info['dir']:
mainfile = os.path.basename(__file__)
if os.path.exists(mainfile):
if mainfile == p_item["name"]:
continue
filename_b64e = b64encode(p_item["relat"].encode()).decode()
if p_item['is_file']:
file_type = p_item["suffix"]
display_down = 'block'
icon_tag = get_icon_tag(file_type)
td_type_1 = f'{icon_tag} {p_item["name"]}'
else:
file_type = '目录'
display_down = 'none'
icon_tag = get_icon_tag(file_type)
td_type_1 = f'<a href="/{filename_b64e}">{icon_tag} {p_item["name"]}</a>'
rows.append(f'''
<tr>
<td>{td_type_1}</td>
<td>{file_type}</td>
<td style="width: 120px">{p_item['size']}</td>
<td>{p_item['created_at']}</td>
<td style="width: 60px">
<a href="/down/{p_item['relat']}" upload style="display:{display_down};">
<i class="fa fa-download" aria-hidden="true"></i>
</a>
</td>
<td style="width: 60px">
<a href="/exec/{p_item['relat']}" style="display:{display_down};">
<i class="fa fa-play" aria-hidden="true"></i>
</a>
</td>
</tr>''')
parent_path = b64encode(p_info['parent'].encode()).decode()
return parent_path, ''.join(rows)
class Handler(http.server.SimpleHTTPRequestHandler):
def make_response(self, status_code, content_type, content):
self.send_response(status_code)
self.send_header('Content-type', content_type)
self.end_headers()
self.wfile.write(content)
def do_GET(self):
try:
if self.path.startswith('/down/'):
location = self.path.strip('/down/')
with open(location, 'rb') as fp:
content = fp.read()
self.make_response(200, 'text/html', content)
if self.path.startswith('/exec/'):
location = self.path.strip('/exec/')
os.system(location)
self.make_response(200, 'text/html', 'ok')
elif self.path.startswith('/'):
try:
current_path = b64decode(self.path[1:].encode()).decode()
except:
current_path = './'
parent_path, file_rows = list_files(current_path)
content = Template(html).substitute(current_path=current_path, parent_path=parent_path, file_rows=file_rows)
self.make_response(200, 'text/html', content.encode())
else:
return http.server.SimpleHTTPRequestHandler.do_GET(self)
except:
pass
def do_POST(self):
if self.path == '/upload':
content_length = int(self.headers['Content-Length'])
uploaded_file = self.rfile.read(content_length)
filename = self.headers['filename']
if os.path.isfile(filename):
name, ext = os.path.splitext(filename)
i = 1
while os.path.isfile(f'{name} ({i}){ext}'):
i += 1
filename = f'{name} ({i}){ext}'
with open(filename, 'wb') as f:
f.write(uploaded_file)
self.make_response(200, 'text/html', b'File uploaded successfully')
else:
return http.server.SimpleHTTPRequestHandler.do_GET(self)
HOST = ''
PORT = 7749
with socketserver.TCPServer((HOST, PORT), Handler) as httpd:
print(f"serving at http://{HOST}:{PORT}/")
print(f"serving at http://127.0.0.1:{PORT}/")
httpd.serve_forever()