程序師世界是廣大編程愛好者互助、分享、學習的平台,程序師世界有你更精彩!
首頁
編程語言
C語言|JAVA編程
Python編程
網頁編程
ASP編程|PHP編程
JSP編程
數據庫知識
MYSQL數據庫|SqlServer數據庫
Oracle數據庫|DB2數據庫
您现在的位置: 程式師世界 >> 編程語言 >  >> 更多編程語言 >> Python

python3 simpleHttpServerWithUpload

編輯:Python

python3 simpleHttpServerWithUpload


Inadvertently come into contact with simpleHttpServer This module of , I found it convenient to share files . But native can only view and download files , There is no upload function .

There are many great gods on the Internet who have added this function to the modification , But most of them are python2 Of ,python3 Many modules have changed . There are also improvements python3 Example , I changed it , I can barely use it . Here is the source code :

import html
import http.server
import mimetypes
import os
import platform
import posixpath
import re
import shutil
import socket
import sys
import time
import urllib.error
import urllib.parse
import urllib.request
from socketserver import ThreadingMixIn
from io import StringIO
class GetWanIp:
def getip(self):
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
s.connect(('8.8.8.8', 80))
ip = s.getsockname()[0]
s.close()
return ip
def visit(self, url):
opener = urllib.request.urlopen(url, None, 3)
str = ''
if url == opener.geturl():
str = opener.read()
return re.search(r'(\d+\.){3}\d+', str).group(0)
def showTips():
print('----------------------------------------------------------------------- ')
port = ''
try:
port = int(sys.argv[1])
except Exception as e:
print(' No listening port specified , The default port... Will be used : 8080 ')
print(' You can execute this module with the port number to change the listening port , example : ')
print('python HTTPServerWithUpload.py port ')
port = 8080
if not 1024 < port < 65535:
print(f'{port} Not in valid port range , Effective range 1024~65535, The default port number... Will be used 8080...')
port = 8080
osType = platform.system()
ip_address = f'http://{GetWanIp().getip()}:{port}'
if osType == "Windows":
print(f' Now you can access the address URL: {ip_address} To access the shared directory .')
else:
print(f' Now you can access the address URL: {ip_address} To access the shared directory .')
return ('', port)
def sizeof_fmt(num):
for x in ['bytes', 'KB', 'MB', 'GB']:
if num < 1024.0:
return "%3.1f%s" % (num, x)
num /= 1024.0
return "%3.1f%s" % (num, 'TB')
def modification_date(filename):
return time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(os.path.getmtime(filename)))
class SimpleHTTPRequestHandler(http.server.BaseHTTPRequestHandler):
def do_GET(self):
f = self.send_head()
if f:
for i in f.readlines():
if isinstance(i, str):
self.wfile.write(i.encode("utf-8"))
else:
self.wfile.write(i)
f.close()
def do_HEAD(self):
f = self.send_head()
if f:
f.close()
# Upload result processing 
def do_POST(self):
r, info = self.deal_post_data()
print(r, info, "by: ", self.client_address)
f = StringIO()
f.write('<!DOCTYPE html PUBLIC "-//W3C//DTD HTML 3.2 Final//EN">')
f.write('<meta name="viewport" content="width=device-width" charset="utf-8"><meta name="viewport" content="width=device-width, initial-scale=1.0, minimum-scale=1.0, maximum-scale=1.0, user-scalable=no">')
f.write("<html>\n<title> Upload results </title>\n")
f.write("<body>\n<h2> Upload files </h2>")
if r:
f.write('<strong > success </strong>\n')
else:
f.write('<strong > Failure </strong>\n')
f.write("<hr>\n")
f.write(info)
f.write("</br><a href=\"%s\"> Click on the return </a>" % self.headers['referer'])
f.write("<hr><small>Powered By: gaowanliang ")
f.write("</small></body>\n</html>\n")
length = f.tell()
f.seek(0)
self.send_response(200)
self.send_header("Content-type", "text/html")
self.send_header("Content-Length", str(length))
self.end_headers()
if f:
for i in f.readlines():
self.wfile.write(i.encode("utf-8"))
f.close()
def deal_post_data(self):
boundary = str(
self.headers["Content-Type"].split("=")[1]).encode("utf-8")
remainbytes = int(self.headers['Content-length'])
line = self.rfile.readline()
remainbytes -= len(line)
if not boundary in line:
return (False, "Content NOT begin with boundary")
line = self.rfile.readline()
remainbytes -= len(line)
fn = re.findall(
r'Content-Disposition.*name="file"; filename="(.*)"'.encode('utf-8'), line)
if not fn:
return (False, "Can't find out file name...")
path = str(self.translate_path(self.path)).encode('utf-8')
osType = platform.system()
try:
if osType == "Linux":
fn = os.path.join(path, fn[0].decode('gbk').encode('utf-8'))
else:
fn = os.path.join(path, fn[0])
except Exception as e:
return (False, " Please do not use Chinese for the file name , Or use IE Upload a file with Chinese name .{}" .format(e))
while os.path.exists(fn):
fn += "_".encode("utf-8")
line = self.rfile.readline()
remainbytes -= len(line)
line = self.rfile.readline()
remainbytes -= len(line)
try:
out = open(fn, 'wb')
except IOError:
return (False, "Can't create file to write, do you have permission to write?")
preline = self.rfile.readline()
remainbytes -= len(preline)
while remainbytes > 0:
line = self.rfile.readline()
remainbytes -= len(line)
if boundary in line:
preline = preline[0:-1]
if preline.endswith('\r'.encode("utf-8")):
preline = preline[0:-1]
out.write(preline)
out.close()
return (True, " file '%s' Upload successful " % fn)
else:
out.write(preline)
preline = line
return (False, "Unexpect Ends of data.")
def send_head(self):
path = self.translate_path(self.path)
f = None
if os.path.isdir(path):
if not self.path.endswith('/'):
self.send_response(301)
self.send_header("Location", self.path + "/")
self.end_headers()
return None
for index in "index.html", "index.htm":
index = os.path.join(path, index)
if os.path.exists(index):
path = index
break
else:
return self.list_directory(path)
ctype = self.guess_type(path)
try:
f = open(path, 'rb')
except IOError:
self.send_error(404, "File not found")
return None
self.send_response(200)
self.send_header("Content-type", ctype)
fs = os.fstat(f.fileno())
self.send_header("Content-Length", str(fs[6]))
self.send_header("Last-Modified", self.date_time_string(fs.st_mtime))
self.end_headers()
return f
def list_directory(self, path):
try:
list = os.listdir(path)
except os.error:
self.send_error(404, "No permission to list directory")
return None
list.sort(key=lambda a: a.lower())
f = StringIO()
displaypath = html.escape(urllib.parse.unquote(self.path))
dirs = ''
for name in list:
fullname = os.path.join(path, name)
colorName = displayname = linkname = name
if os.path.isdir(fullname):
colorName = '<span >' + name + '/</span>'
displayname = name
linkname = name + "/"
if os.path.islink(fullname):
colorName = '<span >' + name + '@</span>'
displayname = name
filename = os.getcwd() + '/' + displaypath + displayname
dirs += f""" <tr> <td width='60%'><a href='{urllib.parse.quote(linkname)}'>{colorName}</a> </td><td width='20%'>{sizeof_fmt(os.path.getsize(filename))}</td> <td width='20%'>{modification_date(filename)}</td> </tr> """
script_code = """ """
content = f""" <!DOCTYPE html> <html lang="en"> <head> <meta charset="UTF-8"> <title>Title</title> </head> <body> <h2> List of contents be located {displaypath}</h2> <hr> <form ENCTYPE="multipart/form-data" method="post"> <input name="file" type="file"/> <input type="submit" value=" Upload "/> <input type="button" value=" Home directory " onClick="location='/'"> </form> <h2 > Please select the file before clicking upload , If you don't do this, something strange may happen </h2> <hr> <table id='wrap'> {dirs} </table> <hr> </body> {script_code} </html> """
f.write(content)
length = f.tell()
f.seek(0)
self.send_response(200)
self.send_header("Content-type", "text/html")
self.send_header("Content-Length", str(length))
self.end_headers()
return f
def translate_path(self, path):
path = path.split('?', 1)[0]
path = path.split('#', 1)[0]
path = posixpath.normpath(urllib.parse.unquote(path))
words = path.split('/')
words = [_f for _f in words if _f]
path = os.getcwd()
for word in words:
drive, word = os.path.splitdrive(word)
head, word = os.path.split(word)
if word in (os.curdir, os.pardir):
continue
path = os.path.join(path, word)
return path
def copyfile(self, source, outputfile):
shutil.copyfileobj(source, outputfile)
def guess_type(self, path):
base, ext = posixpath.splitext(path)
if ext in self.extensions_map:
return self.extensions_map[ext]
ext = ext.lower()
if ext in self.extensions_map:
return self.extensions_map[ext]
else:
return self.extensions_map['']
if not mimetypes.inited:
mimetypes.init() # try to read system mime.types
extensions_map = mimetypes.types_map.copy()
extensions_map.update({

'': 'application/octet-stream', # Default
'.py': 'text/plain',
'.c': 'text/plain',
'.h': 'text/plain',
})
class ThreadingServer(ThreadingMixIn, http.server.HTTPServer):
pass
if __name__ == '__main__':
serveraddr = showTips()
srvr = ThreadingServer(serveraddr, SimpleHTTPRequestHandler)
srvr.serve_forever()

Realization effect



I originally wanted to change the upload function of that file to drag and drop , But I'm not very good at learning , I think this module can be improved , Make a common file sharing module . Welcome to leave a message to study and discuss !


  1. 上一篇文章:
  2. 下一篇文章:
Copyright © 程式師世界 All Rights Reserved