程序師世界是廣大編程愛好者互助、分享、學習的平台,程序師世界有你更精彩!
首頁
編程語言
C語言|JAVA編程
Python編程
網頁編程
ASP編程|PHP編程
JSP編程
數據庫知識
MYSQL數據庫|SqlServer數據庫
Oracle數據庫|DB2數據庫
 程式師世界 >> 編程語言 >> 更多編程語言 >> Python >> Python 異步收發數據socket實戰

Python 異步收發數據socket實戰

編輯:Python

Python 寫了一個收發數據用的socket,自己測並發可以達到10K+,用了20分鐘撸了一個,不多講直接曬代碼

#!/usr/bin/python
#coding:utf-8
# author:51reboot.com
import socket,select
import os,sys,copy

HOST = '0.0.0.0'
PORT = 8089
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)

s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) # 釋放端口
s.setblocking(0) # 設置為非阻塞
s.bind((HOST,PORT))
s.listen(10) # 等待對列

# listen: accept -> read -> process -> write -> closing
# accpet: read -> process -> wriet -> closing

# 狀態機 模板
STATE_M = {
-1:{ # sock
's':'read', # sock 狀態
'r':{ # read 的信息
'have':0, # 已讀的字節數
'need':10, # 要讀的字節數
},
'w':{ # write 的數據
'have':0, # 已經write 的數據
'need':0, # 需要write 的數據
},
'd':'', # 用於存放數據
},
}

STATE= {}

def state_machine(sock):
if sock == s:
# 監聽socket
conn,addr = s.accept()
conn.setblocking(0) # 設置為非阻塞
STATE[conn] = copy.deepcopy(STATE_M[-1]) # 給這個連接賦初值
R_LIST.append(conn) # 將sock加入到 需要R_LIST信息的列表
else:
# 已經accept的socket
stat = STATE[sock]['s'] # 保存 sock 的狀態信息
if stat == 'read':
#如果狀態為read,則需要進行讀操作
if STATE[sock]['r']['need'] == 0:
# 如果 client 發過來的數據全部讀取完了
R_LIST.remove(sock) # 將sock 需要讀取的列表中remove
STATE[sock]['s'] = 'process' # 修改sock的狀態
state_machine(sock) # 進行一次回調
else:
one_read = sock.recv(STATE[sock]['r']['need']) # 讀取指定字節數
STATE[sock]['d'] += one_read # 更新讀取的字節數
# 需要重新整理已讀取的數據與還需要讀取的數據
STATE[sock]['r']['have'] += len(one_read)
STATE[sock]['r']['need'] -= len(one_read)
if STATE[sock]['r']['have'] == 10:
# 讀完頭信息開始讀取主體內容
STATE[sock]['r']['need'] += int(STATE[sock]['d']) # 修改接下來需要讀取的字節數
STATE[sock]['d'] = '' # 清空已接收到的數據
elif stat == 'process':
# 讀完了client發送的數據後,開始給client返回數據
response = STATE[sock]['d'][::-1] # 反轉字符
STATE[sock]['d'] = "%010d%s" % (len(response),response) # 更新接收到的數據
STATE[sock]['w']['need'] = len(STATE[sock]['d']) # 更新需要發出的字節數
STATE[sock]['s'] = 'write' # 修改sock的狀態為寫
W_LIST.append(sock) # 將sock 加入到 W_LIST
elif stat == 'write':
print "wirite", STATE
# 如果狀態為 write,同向client發送數據
last_have_send = STATE[sock]['w']['have'] # 保存已發送的字節數
have_send = sock.send(STATE[sock]['d'][last_have_send:]) # 向client發送數據,返回返回了多少個字節
STATE[sock]['w']['have'] += have_send # 修改已發出的字節
STATE[sock]['w']['need'] -= have_send # 修改需要發出的字節
if STATE[sock]['w']['need'] == 0 and STATE[sock]['w']['have'] != 0:
# 如果數據已發完,則關閉sock
STATE[sock]['s'] = 'closing'
elif stat == 'closing':
print "closing", STATE

# 關閉連接
STATE.pop(sock) # 將sock從狀態裡刪除
try:
W_LIST.remove(sock) # 將sock從w_list裡刪除
except ValueError:
pass
sock.close() # 關閉連接

R_LIST = [s] # 可讀的sock列隊
W_LIST = [] # 可寫的sock隊列
non_stop = True
while True:
try:
r_socks,w_socks,err_socks = select.select(R_LIST,W_LIST,[])
except socket.error,e:
print e
for sock in r_socks:
state_machine(sock)
for sock in w_socks:
state_machine(sock)

  1. 上一頁:
  2. 下一頁:
Copyright © 程式師世界 All Rights Reserved