程序師世界是廣大編程愛好者互助、分享、學習的平台,程序師世界有你更精彩!
首頁
編程語言
C語言|JAVA編程
Python編程
網頁編程
ASP編程|PHP編程
JSP編程
數據庫知識
MYSQL數據庫|SqlServer數據庫
Oracle數據庫|DB2數據庫
 程式師世界 >> 編程語言 >> 網頁編程 >> PHP編程 >> 關於PHP編程 >> Python 深入剖析SocketServer模塊(一)(V2.7.11),python2.7socket

Python 深入剖析SocketServer模塊(一)(V2.7.11),python2.7socket

編輯:關於PHP編程

Python 深入剖析SocketServer模塊(一)(V2.7.11),python2.7socket


一、簡介(翻譯)

 

 通用socket server 類
 該模塊盡力從各種不同的方面定義server:
 對於socket-based servers:
 -- address family:
    - AF_INET{,6}: IP socket (default)
    - AF_UNIX: Unix domain sockets
    - others, 如 AF_DECNET (見<socket.h>) (不常用)
 -- socket type:
    - SOCK_STREAM (可靠連接 TCP)
    - SOCK_DGRAM (UDP)
 
 對於request-based servers:
-- client address在發出進一步的請求之前需要認證(這實際上把所有需要發出請求的進程在通過認證之前給阻塞住了)
-- 如何處理多請求:
   - 同步 (一次只能處理一個請求)
   - forking (fork一個新的進程來處理一個請求)
   - threading (創建一個新的線程來處理一個請求)
 
   在這個模塊的各種類中,最簡單的服務器類型就是synchronous TCP/IP server。這是一個糟糕的類設計,但是也保存了一些設計的類型理念。

下面是五個類的繼承關系圖表,其中的四個代表四種類型的同步服務器:
        +--------------+
        | BaseServer |
        +--------------+
              |
              v
        +------------+            +----------------------+
        | TCPServer |------->| UnixStreamServer |
        +------------+            +----------------------+
              |
              v
        +-------------+           +--------------------------+
        | UDPServer |------->| UnixDatagramServer |
        +-------------+           +--------------------------+
       
注意:UnixDatagramServer繼承於UDPServer,而不是UnixStreamServer,IP和Unix stream server之間僅有的差異就是address family,兩個服務器類的內容多數是簡單的重復。

  forking和threading 可以被創建用於ForkingMixIn和TreadingMixIn mix-in類。例如: threading UDP server類會被如下方式創建:
 class ThreadingUDPServer(ThreadingMixIn, UDPServer): pass
 (詳細可見後文示例)

  Mix-in 這個類必須首先實現,因為它重寫了定義UDPServer的方法。設置不同的成員變量也改變了基本的服務器構造方法。
  為了實現一個服務,你必須從基類BaseRequestHandler中重新定義它的handle方法。然後通過把服務類與你重寫的Handle方法類結合,以此運行新的服務類。
  請求處理類的TCP和UDP的方式是不同的,這個可以通過使用請求處理的子類StreamRequestHandler或者DatagramRequestHandler來隱藏。
當然,你還可以思考其他的方法。
    例如,如果服務中包含請求修改的內存的狀態,那麼使用forking server沒有任何意義(因為在子進程中修改將不對父進程的初始化狀態有影響,父進程也不會把這個修改的參數傳遞給其他子進程)。這種情況下,你可以使用threading server,而且你更有可能需要用到“鎖”,以此來避免兩個請求同時到達而使服務器狀態產生沖突。
    此外,如果你在搭建如HTTP服務器等,所有的數據都會存儲在外部(如文件系統中),當客戶端的一項請求被處理時,並且客戶端的讀取數據的速度很慢,synchronous class將會使服務不做出響應,這可能需要維持很長時間。
    在一些情況下,請求同步可能需要恰當的方法,但是為了在子進程中完成請求要受到請求數據的影響。這可以通過使用同步服務器來實現,並且在請求處理類中的Handle方法中明確指定fork的進程。
    另一種處理多個同時發生的請求的方法是維系一張明確的完成請求的表單,使用select()方法來判定哪個請求應該在接下來做出響應(或者判斷是否要處理新到來的請求),當每一個客戶端需要建立很長時間的連接時,這對於stream services來說非常重要。(前提是不使用線程和子進程的方法)

二、用到的所有的類方法
import socket
import select
import sys
import os
import errno
try:
    import threading
except ImportError:
    import dummy_threading as threading

__all__ = ["TCPServer","UDPServer","ForkingUDPServer","ForkingTCPServer",
           "ThreadingUDPServer","ThreadingTCPServer","BaseRequestHandler",
           "StreamRequestHandler","DatagramRequestHandler",
           "ThreadingMixIn", "ForkingMixIn"]
if hasattr(socket, "AF_UNIX"):
    __all__.extend(["UnixStreamServer","UnixDatagramServer",
                    "ThreadingUnixStreamServer",
                    "ThreadingUnixDatagramServer"])

 


三、BaseServer和BaseRequestHandler


        Python把網絡服務抽象成兩個主要的類,一個是Server類,用於處理連接相關的網絡操作,另外一個則是RequestHandler類,用於處理數據相關的操作。並且提供兩個MixIn 類,用於擴展 Server,實現多進程或多線程。在構建網絡服務的時候,Server 和 RequestHandler 並不是分開的,RequestHandler的實例對象在Server 內配合 Server工作。

       3.1 BaseServer分析   BaseServer可供外部調用的方法: - __init__(server_address, RequestHandlerClass)
- serve_forever(poll_interval=0.5)
- shutdown()
- handle_request()  # if you do not use serve_forever()
- fileno() -> int   # for select()
即我們可以通過init初始化,對外提供serve_forever()和handle_request()方法
3.1.1 init初始化
timeout = None

    def __init__(self, server_address, RequestHandlerClass):
        """Constructor.  May be extended, do not override."""
        self.server_address = server_address
        self.RequestHandlerClass = RequestHandlerClass
        self.__is_shut_down = threading.Event()
        self.__shutdown_request = False

 

__init__的主要作用就是創建server對象,並初始化server_address和RequestHandlerClass。
server_address就是包含主機地址和端口的元組。  

3.1.2 serve_forever  
    def serve_forever(self, poll_interval=0.5):
        """Handle one request at a time until shutdown.

        Polls for shutdown every poll_interval seconds. Ignores
        self.timeout. If you need to do periodic tasks, do them in
        another thread.
        """
        self.__is_shut_down.clear()
        try:
            while not self.__shutdown_request:
                
                r, w, e = _eintr_retry(select.select, [self], [], [],
                                       poll_interval)
                if self in r:
                    self._handle_request_noblock()
        finally:
            self.__shutdown_request = False
            self.__is_shut_down.set()

 

   這裡用到了select()函數,即server_forever接受了一個poll_interval=0.5的參數傳入,這表示用於select輪詢的時間,然後進入一個無限循環中,在這個循環中,select每隔poll_interval秒輪詢一次(阻塞於此),以此來進行網絡IO的監聽。一旦有新的網絡連接請求到來,則會調用_handle_request_noblock()方法處理新的連接。

3.1.3 _handle_request_noblock()
    def _handle_request_noblock(self):
        """Handle one request, without blocking.

        I assume that select.select has returned that the socket is
        readable before this function was called, so there should be
        no risk of blocking in get_request().
        """
        try:
            request, client_address = self.get_request()
        except socket.error:
            return
        if self.verify_request(request, client_address):
            try:
                self.process_request(request, client_address)
            except:
                self.handle_error(request, client_address)
                self.shutdown_request(request)

 

英文說明已經說的很明確,該方法處理的是一個非阻塞請求,首先通過get_request()方法獲取連接,具體實現在其子類,一旦獲取了連接,立即調用verify_request認證連接信息,通過認證,則調用process_request()方法處理請求,如果中途出現錯誤,則調用handle_error處理錯誤,同時,調用shutdown_request()方法結束這個連接。 那下面我們就先看上面提到的其他幾個函數調用:
    def verify_request(self, request, client_address):
        """Verify the request.  May be overridden.

        Return True if we should proceed with this request.

        """
        return True


    def process_request(self, request, client_address):
        """Call finish_request.

        Overridden by ForkingMixIn and ThreadingMixIn.

        """
        self.finish_request(request, client_address)
        self.shutdown_request(request)

    def handle_error(self, request, client_address):
        """Handle an error gracefully.  May be overridden.

        The default is to print a traceback and continue.

        """
        print '-'*40
        print 'Exception happened during processing of request from',
        print client_address
        import traceback
        traceback.print_exc() # XXX But this goes to stderr!
        print '-'*40


    def shutdown_request(self, request):
        """Called to shutdown and close an individual request."""
        self.close_request(request)<pre name="code" class="python">    

    def finish_request(self, request, client_address):
        """Finish one request by instantiating RequestHandlerClass."""
        self.RequestHandlerClass(request, client_address, self)

 



verify_request()方法對request進行驗證,通常會被子類重寫。
process_request需要注意一下,它被ForkingMixIn 和 ThreadingMixIn重寫,因此是mixin的入口,ForkingMixIn和ThreadingMixIn分別進行多進程和多線程的配置,並且調用finish_request()完成請求,調用shutdown_request()結束請求。
handle_error也可以被子類重寫,打印錯誤的信息和客戶端地址。
finish_request()處理完畢請求,在__init__中創建requestHandler對象,並通過requestHandler做出具體的處理。

3.1.4 handle_request()
    def handle_request(self):
        """Handle one request, possibly blocking.

        Respects self.timeout.
        """
        # Support people who used socket.settimeout() to escape
        # handle_request before self.timeout was available.
        timeout = self.socket.gettimeout()
        if timeout is None:
            timeout = self.timeout
        elif self.timeout is not None:
            timeout = min(timeout, self.timeout)
        fd_sets = _eintr_retry(select.select, [self], [], [], timeout)
        if not fd_sets[0]:
            self.handle_timeout()
            return
        self._handle_request_noblock()

 

上面已經提到,如果你沒有用到server_forever()方法,說明你希望使用的是阻塞請求來處理連接,如英文描述所說,該方法只是處理一個阻塞的請求,仍然使用select()方法輪詢來監聽網絡連接,但是需要考慮時間超時影響,一旦超時,調用handle_timeout()方法處理超時,一般在子類重寫該方法;如果在超時之前監聽到了網絡的連接請求,則同server_forever一樣,調用_handle_request_noblock()方法,完成對新的連接的請求處理。

3.2 BaseRequestHandler分析
class BaseRequestHandler:

    """Base class for request handler classes.

    This class is instantiated for each request to be handled.  The
    constructor sets the instance variables request, client_address
    and server, and then calls the handle() method.  To implement a
    specific service, all you need to do is to derive a class which
    defines a handle() method.

    The handle() method can find the request as self.request, the
    client address as self.client_address, and the server (in case it
    needs access to per-server information) as self.server.  Since a
    separate instance is created for each request, the handle() method
    can define arbitrary other instance variariables.

    """

    def __init__(self, request, client_address, server):
        self.request = request
        self.client_address = client_address
        self.server = server
        self.setup()
        try:
            self.handle()
        finally:
            self.finish()

    def setup(self):
        pass

    def handle(self):
        pass

    def finish(self):
        pass

 

      以上描述說明,所有requestHandler都繼承BaseRequestHandler基類,該類會處理每一個請求。在__init__中初始化實例變量request、client_address、server,然後調用handle()方法完成請求處理。那麼,我們唯一需要做的就是重寫好Handle()方法,處理所有的請求。   總結:構建一個網絡服務,需要一個BaseServer用於處理網絡IO,同時在內部創建requestHandler對象,對所有具體的請求做處理。

四、各種子類
4.1 由BaseServer衍生的子類
4.1.1 TCPServer
class TCPServer(BaseServer):
    address_family = socket.AF_INET

    socket_type = socket.SOCK_STREAM

    request_queue_size = 5

    allow_reuse_address = False

    def __init__(self, server_address, RequestHandlerClass, bind_and_activate=True):
        """Constructor.  May be extended, do not override."""
        BaseServer.__init__(self, server_address, RequestHandlerClass)
        self.socket = socket.socket(self.address_family,
                                    self.socket_type)
        if bind_and_activate:
            try:
                self.server_bind()
                self.server_activate()
            except:
                self.server_close()
                raise

    def server_bind(self):
        """Called by constructor to bind the socket.

        May be overridden.

        """
        if self.allow_reuse_address:
            self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
        self.socket.bind(self.server_address)
        self.server_address = self.socket.getsockname()

    def server_activate(self):
        """Called by constructor to activate the server.

        May be overridden.

        """
        self.socket.listen(self.request_queue_size)

    def server_close(self):
        """Called to clean-up the server.

        May be overridden.

        """
        self.socket.close()

    def fileno(self):
        """Return socket file number.

        Interface required by select().

        """
        return self.socket.fileno()

    def get_request(self):
        """Get the request and client address from the socket.

        May be overridden.

        """
        return self.socket.accept()

    def shutdown_request(self, request):
        """Called to shutdown and close an individual request."""
        try:
            #explicitly shutdown.  socket.close() merely releases
            #the socket and waits for GC to perform the actual close.
            request.shutdown(socket.SHUT_WR)
        except socket.error:
            pass #some platforms may raise ENOTCONN here
        self.close_request(request)

    def close_request(self, request):
        """Called to clean up an individual request."""
        request.close()

 


 在BaseServer基礎上增加了一個TCP的socket連接,使用server_bind、server_activate、server_close處理TCP啟停等操作,同時增加了get_request、shutdown_request、close_request處理客戶端請求。
4.1.2 UDPServer
class UDPServer(TCPServer):

    """UDP server class."""

    allow_reuse_address = False

    socket_type = socket.SOCK_DGRAM

    max_packet_size = 8192

    def get_request(self):
        data, client_addr = self.socket.recvfrom(self.max_packet_size)
        return (data, self.socket), client_addr

    def server_activate(self):
        # No need to call listen() for UDP.
        pass

    def shutdown_request(self, request):
        # No need to shutdown anything.
        self.close_request(request)

    def close_request(self, request):
        # No need to close anything.
        pass

 

繼承自TCPServer,將socket改為了SOCK_DGRAM型,並修改了get_request,用於從SOCK_DGRAM中獲取request。同時server_activate、shutdown_request、close_request都改成了空(UDP不需要),比TCP簡單一些。

4.2 由BaseRequestHandler衍生的子類
4.2.1 StreamRequestHandler
class StreamRequestHandler(BaseRequestHandler):
    rbufsize = -1
    wbufsize = 0
    timeout = None
    disable_nagle_algorithm = False
    def setup(self):
        self.connection = self.request
        if self.timeout is not None:
            self.connection.settimeout(self.timeout)
        if self.disable_nagle_algorithm:
            self.connection.setsockopt(socket.IPPROTO_TCP,
                                       socket.TCP_NODELAY, True)
        self.rfile = self.connection.makefile('rb', self.rbufsize)
        self.wfile = self.connection.makefile('wb', self.wbufsize)

    def finish(self):
        if not self.wfile.closed:
            try:
                self.wfile.flush()
            except socket.error:
                # An final socket error may have occurred here, such as
                # the local error ECONNABORTED.
                pass
        self.wfile.close()
        self.rfile.close()

 

    最主要的功能是根據socket生成了讀寫socket用的兩個文件對象(可以理解為句柄)rfile和wfile
4.2.2 DatagramRequestHandler
class DatagramRequestHandler(BaseRequestHandler):

    # XXX Regrettably, I cannot get this working on Linux;
    # s.recvfrom() doesn't return a meaningful client address.

    """Define self.rfile and self.wfile for datagram sockets."""

    def setup(self):
        try:
            from cStringIO import StringIO
        except ImportError:
            from StringIO import StringIO
        self.packet, self.socket = self.request
        self.rfile = StringIO(self.packet)
        self.wfile = StringIO()

    def finish(self):
        self.socket.sendto(self.wfile.getvalue(), self.client_address)

 

同樣是生成rfile和wfile,但UDP不直接關聯socket。這裡的rfile是直接由從UDP中讀取的數據生成的,wfile則是新建了一個StringIO,用於寫數據。





(題目起的有點大,部分剖析的不好,等之後再往祖墳上刨。。。。^-^)

參考博客:http://www.cnblogs.com/tuzkee/p/3573210.html              http://www.jianshu.com/p/357e436936bf


 






     

 


 

  1. 上一頁:
  2. 下一頁:
Copyright © 程式師世界 All Rights Reserved