通用socket server 類
import socket
import select
import sys
import os
import errno
try:
import threading
except ImportError:
import dummy_threading as threading
__all__ = ["TCPServer","UDPServer","ForkingUDPServer","ForkingTCPServer",
"ThreadingUDPServer","ThreadingTCPServer","BaseRequestHandler",
"StreamRequestHandler","DatagramRequestHandler",
"ThreadingMixIn", "ForkingMixIn"]
if hasattr(socket, "AF_UNIX"):
__all__.extend(["UnixStreamServer","UnixDatagramServer",
"ThreadingUnixStreamServer",
"ThreadingUnixDatagramServer"])
timeout = None
def __init__(self, server_address, RequestHandlerClass):
"""Constructor. May be extended, do not override."""
self.server_address = server_address
self.RequestHandlerClass = RequestHandlerClass
self.__is_shut_down = threading.Event()
self.__shutdown_request = False
__init__的主要作用就是創建server對象,並初始化server_address和RequestHandlerClass。
def serve_forever(self, poll_interval=0.5):
"""Handle one request at a time until shutdown.
Polls for shutdown every poll_interval seconds. Ignores
self.timeout. If you need to do periodic tasks, do them in
another thread.
"""
self.__is_shut_down.clear()
try:
while not self.__shutdown_request:
r, w, e = _eintr_retry(select.select, [self], [], [],
poll_interval)
if self in r:
self._handle_request_noblock()
finally:
self.__shutdown_request = False
self.__is_shut_down.set()
這裡用到了select()函數,即server_forever接受了一個poll_interval=0.5的參數傳入,這表示用於select輪詢的時間,然後進入一個無限循環中,在這個循環中,select每隔poll_interval秒輪詢一次(阻塞於此),以此來進行網絡IO的監聽。一旦有新的網絡連接請求到來,則會調用_handle_request_noblock()方法處理新的連接。
def _handle_request_noblock(self):
"""Handle one request, without blocking.
I assume that select.select has returned that the socket is
readable before this function was called, so there should be
no risk of blocking in get_request().
"""
try:
request, client_address = self.get_request()
except socket.error:
return
if self.verify_request(request, client_address):
try:
self.process_request(request, client_address)
except:
self.handle_error(request, client_address)
self.shutdown_request(request)
英文說明已經說的很明確,該方法處理的是一個非阻塞請求,首先通過get_request()方法獲取連接,具體實現在其子類,一旦獲取了連接,立即調用verify_request認證連接信息,通過認證,則調用process_request()方法處理請求,如果中途出現錯誤,則調用handle_error處理錯誤,同時,調用shutdown_request()方法結束這個連接。 那下面我們就先看上面提到的其他幾個函數調用:
def verify_request(self, request, client_address):
"""Verify the request. May be overridden.
Return True if we should proceed with this request.
"""
return True
def process_request(self, request, client_address):
"""Call finish_request.
Overridden by ForkingMixIn and ThreadingMixIn.
"""
self.finish_request(request, client_address)
self.shutdown_request(request)
def handle_error(self, request, client_address):
"""Handle an error gracefully. May be overridden.
The default is to print a traceback and continue.
"""
print '-'*40
print 'Exception happened during processing of request from',
print client_address
import traceback
traceback.print_exc() # XXX But this goes to stderr!
print '-'*40
def shutdown_request(self, request):
"""Called to shutdown and close an individual request."""
self.close_request(request)<pre name="code" class="python">
def finish_request(self, request, client_address):
"""Finish one request by instantiating RequestHandlerClass."""
self.RequestHandlerClass(request, client_address, self)
def handle_request(self):
"""Handle one request, possibly blocking.
Respects self.timeout.
"""
# Support people who used socket.settimeout() to escape
# handle_request before self.timeout was available.
timeout = self.socket.gettimeout()
if timeout is None:
timeout = self.timeout
elif self.timeout is not None:
timeout = min(timeout, self.timeout)
fd_sets = _eintr_retry(select.select, [self], [], [], timeout)
if not fd_sets[0]:
self.handle_timeout()
return
self._handle_request_noblock()
上面已經提到,如果你沒有用到server_forever()方法,說明你希望使用的是阻塞請求來處理連接,如英文描述所說,該方法只是處理一個阻塞的請求,仍然使用select()方法輪詢來監聽網絡連接,但是需要考慮時間超時影響,一旦超時,調用handle_timeout()方法處理超時,一般在子類重寫該方法;如果在超時之前監聽到了網絡的連接請求,則同server_forever一樣,調用_handle_request_noblock()方法,完成對新的連接的請求處理。
class BaseRequestHandler:
"""Base class for request handler classes.
This class is instantiated for each request to be handled. The
constructor sets the instance variables request, client_address
and server, and then calls the handle() method. To implement a
specific service, all you need to do is to derive a class which
defines a handle() method.
The handle() method can find the request as self.request, the
client address as self.client_address, and the server (in case it
needs access to per-server information) as self.server. Since a
separate instance is created for each request, the handle() method
can define arbitrary other instance variariables.
"""
def __init__(self, request, client_address, server):
self.request = request
self.client_address = client_address
self.server = server
self.setup()
try:
self.handle()
finally:
self.finish()
def setup(self):
pass
def handle(self):
pass
def finish(self):
pass
以上描述說明,所有requestHandler都繼承BaseRequestHandler基類,該類會處理每一個請求。在__init__中初始化實例變量request、client_address、server,然後調用handle()方法完成請求處理。那麼,我們唯一需要做的就是重寫好Handle()方法,處理所有的請求。 總結:構建一個網絡服務,需要一個BaseServer用於處理網絡IO,同時在內部創建requestHandler對象,對所有具體的請求做處理。
class TCPServer(BaseServer):
address_family = socket.AF_INET
socket_type = socket.SOCK_STREAM
request_queue_size = 5
allow_reuse_address = False
def __init__(self, server_address, RequestHandlerClass, bind_and_activate=True):
"""Constructor. May be extended, do not override."""
BaseServer.__init__(self, server_address, RequestHandlerClass)
self.socket = socket.socket(self.address_family,
self.socket_type)
if bind_and_activate:
try:
self.server_bind()
self.server_activate()
except:
self.server_close()
raise
def server_bind(self):
"""Called by constructor to bind the socket.
May be overridden.
"""
if self.allow_reuse_address:
self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self.socket.bind(self.server_address)
self.server_address = self.socket.getsockname()
def server_activate(self):
"""Called by constructor to activate the server.
May be overridden.
"""
self.socket.listen(self.request_queue_size)
def server_close(self):
"""Called to clean-up the server.
May be overridden.
"""
self.socket.close()
def fileno(self):
"""Return socket file number.
Interface required by select().
"""
return self.socket.fileno()
def get_request(self):
"""Get the request and client address from the socket.
May be overridden.
"""
return self.socket.accept()
def shutdown_request(self, request):
"""Called to shutdown and close an individual request."""
try:
#explicitly shutdown. socket.close() merely releases
#the socket and waits for GC to perform the actual close.
request.shutdown(socket.SHUT_WR)
except socket.error:
pass #some platforms may raise ENOTCONN here
self.close_request(request)
def close_request(self, request):
"""Called to clean up an individual request."""
request.close()
class UDPServer(TCPServer):
"""UDP server class."""
allow_reuse_address = False
socket_type = socket.SOCK_DGRAM
max_packet_size = 8192
def get_request(self):
data, client_addr = self.socket.recvfrom(self.max_packet_size)
return (data, self.socket), client_addr
def server_activate(self):
# No need to call listen() for UDP.
pass
def shutdown_request(self, request):
# No need to shutdown anything.
self.close_request(request)
def close_request(self, request):
# No need to close anything.
pass
繼承自TCPServer,將socket改為了SOCK_DGRAM型,並修改了get_request,用於從SOCK_DGRAM中獲取request。同時server_activate、shutdown_request、close_request都改成了空(UDP不需要),比TCP簡單一些。
class StreamRequestHandler(BaseRequestHandler):
rbufsize = -1
wbufsize = 0
timeout = None
disable_nagle_algorithm = False
def setup(self):
self.connection = self.request
if self.timeout is not None:
self.connection.settimeout(self.timeout)
if self.disable_nagle_algorithm:
self.connection.setsockopt(socket.IPPROTO_TCP,
socket.TCP_NODELAY, True)
self.rfile = self.connection.makefile('rb', self.rbufsize)
self.wfile = self.connection.makefile('wb', self.wbufsize)
def finish(self):
if not self.wfile.closed:
try:
self.wfile.flush()
except socket.error:
# An final socket error may have occurred here, such as
# the local error ECONNABORTED.
pass
self.wfile.close()
self.rfile.close()
最主要的功能是根據socket生成了讀寫socket用的兩個文件對象(可以理解為句柄)rfile和wfile
class DatagramRequestHandler(BaseRequestHandler):
# XXX Regrettably, I cannot get this working on Linux;
# s.recvfrom() doesn't return a meaningful client address.
"""Define self.rfile and self.wfile for datagram sockets."""
def setup(self):
try:
from cStringIO import StringIO
except ImportError:
from StringIO import StringIO
self.packet, self.socket = self.request
self.rfile = StringIO(self.packet)
self.wfile = StringIO()
def finish(self):
self.socket.sendto(self.wfile.getvalue(), self.client_address)
同樣是生成rfile和wfile,但UDP不直接關聯socket。這裡的rfile是直接由從UDP中讀取的數據生成的,wfile則是新建了一個StringIO,用於寫數據。