程序師世界是廣大編程愛好者互助、分享、學習的平台,程序師世界有你更精彩!
首頁
編程語言
C語言|JAVA編程
Python編程
網頁編程
ASP編程|PHP編程
JSP編程
數據庫知識
MYSQL數據庫|SqlServer數據庫
Oracle數據庫|DB2數據庫
您现在的位置: 程式師世界 >> 編程語言 >  >> 更多編程語言 >> Python

tcp客戶端重連例子 python

編輯:Python
#!/usr/bin/env python2.7
# coding:utf-8
# -*- coding: utf-8 -*-
""" 2021.12.31 底盤控制客戶端 2022.1.6v 2022.1.7 增加手動轉彎 """
from socket import *
import json
import struct
import threading
import time
from serial_control_class import ser_c, con_s, ana
from correct_speed import Correct_val
import platform
import ctypes
import inspect
from status_total_data import s_data
from _chassis_logging_init import c_l,logger_motor
python_version = platform.python_version()[0:1]
"""2022 . 1.7 修改為主動檢查掉線模式"""
"""底盤的客戶端參數設置"""
class Client_link():
"""用於初始化。tcp客戶端"""
def __init__(self,HOST = '127.0.0.1',PORT = 8081):
# self.HOST = HOST
# self.PORT = PORT
#self.HOST = "10.168.1.161" # 控制服務器地址
self.HOST = "10.168.1.147" # 控制服務器地址
# self.HOST = "127.0.0.1" # 控制服務器地址
self.PORT = 8888
self.repeat_num = 10 #設置當沒有接收到完整數據幀的時候,重復連接的次數
self.init_motor()
#重連故障參數准備
self.no_action_bool = False # 用於控制重連不上的時候切換為遙控器模式
self.fail_link_num = 0 # 用於在主動線程中統計,設定的單位時間內沒有發送成功的次數,越大說明當前越沒有發出去
self.send_bool = False # 發送前置為假,發送後置為真
self.recycle_bool = False # 在成功拉起發送與接收線程後置為真
self.time_num = 5 # 5*0.2 即為設定為 1秒 堵塞等待時間
def init_motor(self):
"""用於初始化,關節電機控制參數"""
self.m1_angle_speed = 0
self.m1_cnt_speed = 0
self.m1_angle_dis = 0
self.m1_cnt_dis= 0
self.m2_angle_speed = 0
self.m2_cnt_speed = 0
self.m2_angle_dis = 0
self.m2_cnt_dis = 0
def infor_analysis(self,json_buf):
"""通過分析接收到的,下位機發送的json數據,對現有控制數據進行更新"""
if json_buf["status"] == "floor":
self.move_floor(json_buf)
elif json_buf["status"] == "Manu_m":
self.manual_control(json_buf)
else:
c_l.warning("!!!接收的不是底盤的控制命令(檢測各客戶端的地址分配是否正確!),當前控制命令為: " + str(json_buf))
#self.save_motor_c(json_buf["dict"]) 預留接口,方便後續該巡檢項目的二次開發
# elif json_buf["Dete_m"] == "Odom_m":
def move_floor(self, json_buf={
"status": "floor", "dict": {
"routeinfo": [(1, 1), (4, 1), (4, 2)]}}):
"""解析路徑規劃的地面數據進行運動"""
location_list = json_buf["dict"]["routeinfo"]
#
# location_list = [(9,1),(4,1),(4,2)]
# location_list = [(1, 1), (4, 1), (4, 2),(7, 2)]
c_l.warning("________________低盤模式:" +str(location_list))
max_num = len(location_list)
self.time_num = 0.2/0.015
for num, val in enumerate(location_list):
if (num + 2) <= max_num:
if val[0] == location_list[num + 1][0]:
self.move_f_b(location_list[num + 1][1] - val[1])
else:
self.move_l_r(location_list[num + 1][0] - val[0])
def move_f_b(self, dis_num):
if dis_num > 0:
con_s.x = 0.015
con_s.y = 0
time.sleep(dis_num * self.time_num)
# 說明向前
else:
con_s.x = -0.015
con_s.y = 0
time.sleep(-dis_num * self.time_num)
con_s.x = 0
con_s.y = 0
def move_l_r(self, dis_num):
if dis_num > 0:
con_s.x = 0
con_s.y = 0.015
time.sleep(dis_num * self.time_num)
# 說明向前
else:
con_s.x = 0
con_s.y = -0.015
time.sleep(-dis_num * self.time_num)
con_s.x = 0
con_s.y = 0
def chesis_move_stop(self):
con_s.x = 0
con_s.y = 0
con_s.p = 0
def manual_control(self, json_buf):
"""用於手動控制(通過對接的客戶端的數據更新),小車底盤的行動,目前只提供,簡單的單一邊行走"""
json_buf_dict = json_buf["dict"]
if json_buf_dict['Car_forward'] or json_buf_dict['Car_back'] or json_buf_dict['Car_left'] or json_buf_dict['Car_right']:
c_l.warning(str(json_buf_dict))
print("666")
if json_buf_dict["Car_forward"] == True:
c_l.warning("發送向前")
con_s.x = 0.015 # min:0.015 nor: 0.03 max:1.26 m/s
con_s.y = 0 # -0.1
#
elif json_buf_dict["Car_back"] == True:
c_l.warning("發送向後")
con_s.x = - 0.015 # min:0.015 nor: 0.03 max:1.26 m/s
con_s.y = 0 # -0.1
elif json_buf_dict["Car_left"] == True:
c_l.warning("發送向左")
con_s.x = 0 # min:0.015 nor: 0.03 max:1.26 m/s
con_s.y = 0.015 # -0.1
elif json_buf_dict["Car_right"] == True:
c_l.warning("發送向右")
con_s.x = 0 # min:0.015 nor: 0.03 max:1.26 m/s
con_s.y = -0.015 # -0.1
elif json_buf_dict["Car_turnleft"] == True:
c_l.warning("發送向左轉")
con_s.x = 0 # min:0.015 nor: 0.03 max:1.26 m/s
con_s.y = 0 # -0.1
con_s.p = 0.028
elif json_buf_dict["Car_turnright"] == True:
c_l.warning("發送向右轉")
con_s.x = 0 # min:0.015 nor: 0.03 max:1.26 m/s
con_s.y = 0 # -0.1
con_s.p = -0.028
elif json_buf_dict["Car_stop"] == True:
c_l.warning("發送-停止")
con_s.x = 0 # min:0.015 nor: 0.03 max:1.26 m/s
con_s.y = 0 # -0.1
con_s.p = 0
else:
c_l.warning("底盤停止!")
con_s.x = 0 # min:0.015 nor: 0.03 max:1.26 m/s
con_s.y = 0 # -0.1
con_s.p = 0
def save_motor_c(self, data_dict):
"""預留接口用於顯示等"""
s_data.m1_angle_speed = data_dict["m1_a_speed"]
s_data.m1_cnt_speed = data_dict["m1_cnt_speed"]
s_data.m1_angle_dis = data_dict["m1_a_dis"]
s_data.m1_cnt_dis = data_dict["m1_cnt_dis"]
s_data.m2_angle_speed = data_dict["m2_a_speed"]
s_data.m2_cnt_speed = data_dict["m2_cnt_speed"]
s_data.m2_angle_dis = data_dict["m2_a_dis"]
s_data.m2_cnt_dis = data_dict["m2_cnt_dis"]
def start_client(self):
"""開啟客戶端"""
# 先拉起一個測量通訊時間阻塞的線程用於幫助重連
self.t_recycle = threading.Thread(target=self.recycle)
self.t_recycle.setDaemon(True) # 當發送模塊長時間無法發送對接收與發送線程主動拋出異常,並調整底盤到遙控器模式
self.t_recycle.start()
self.link()
def pull_send_and_recv(self):
self.t_client_recv = threading.Thread(target=self.run_recv)
self.t_client_recv.setDaemon(True) # 當斷開後,通通過主動拋出異常自動回收
self.t_client_recv.start()
self.t_client_send = threading.Thread(target=self.listen_send)
self.t_client_send.setDaemon(True) # 當斷開後,通過主動拋出異常自動回收
self.t_client_send.start()
def recycle(self):
"""通過 送入異常引發線程奔潰控制 進行控制"""
while True:
if self.recycle_bool:
if self.send_bool:
self.fail_link_num = 0
else:
self.fail_link_num = self.fail_link_num + 1
time.sleep(0.2)
if self.fail_link_num > self.time_num: # 即2s內沒有
try:
logger_motor.warning("堵塞超過"+str(self.time_num*0.2)+"秒,回收當前兩個連接線程,重新連接服務端")
self._async_raise(self.t_client_recv.ident,SystemExit)
print("回收完成接收線程")
self._async_raise(self.t_client_send.ident, SystemExit)
print("回收完成發送線程")
except:
print("上次重連失敗-----")
self.no_action_bool = True
self.recycle_bool = False
self.client.close()
print("開始重連......")
self.link()
else:
time.sleep(0.5)
def _async_raise(self,tid, exctype):
# todo 強制退出線程 忽略掉報錯
"""raises the exception, performs cleanup if needed"""
tid = ctypes.c_long(tid)
if not inspect.isclass(exctype):
print(".." * 20)
exctype = type(exctype)
res = ctypes.pythonapi.PyThreadState_SetAsyncExc(tid, ctypes.py_object(exctype))
if res == 0:
print("0" * 10)
raise ValueError("通過檢測延時函數主動拋入 變量異常 進入,用於終止當前線程")
elif res != 1:
print("1" * 10)
# """if it returns a number greater than one, you're in trouble,
# and you should call it again with exc=NULL to revert the effect"""
ctypes.pythonapi.PyThreadState_SetAsyncExc(tid, None)
raise SystemError("通過檢測延時函數主動拋入 系統異常 進入,用於終止當前線程")
def link(self):
"""為防止連接服務端失敗,這裡拉起一個連接線程持續守護,當客戶端收發線程,由於無法連接服務端失效的時候執行重新連接"""
if self.no_action_bool:
logger_motor.warning("准備重連為避免服務器損壞無法遙控控制小車底盤,現調整為 可被遙控模式")
ser_c.movement_moudle = "無動作模式"
try:
self.client = socket(AF_INET, SOCK_STREAM)
self.client.bind(('', 4449))
self.client.connect((self.HOST, self.PORT))
c_l.warning("成功連接服務端,准備拉起發送與監聽線程...................")
# self.feed_back_control_bool = True # 用於控制,接收與發送線程能否反饋解鎖當前的守護拉起線程
self.pull_send_and_recv()
except OSError as e:
if e.errno == 10048:
c_l.warning("已成功連接底盤端口:4441,自動跳過該重復連接")
else:
self.recycle_bool = True # 為真開啟統計連接阻塞,如果發送線程的阻塞過長則回收掉兩個線程
self.send_bool = False
print("重連失敗......")
print(ser_c.movement_moudle)
print("*"*50)
def run_recv(self):
"""客戶端監聽程序"""
print('開始連接下位機服務端......') # 等待cilent的連接
while True:
try:
length = self.receive_real(4)
len_data = struct.unpack('i', length)
len_data = len_data[0]
buf = self.receive_real(len_data)
json_buf = json.loads(buf)
self.infor_analysis(json_buf)
except:
self.chesis_move_stop()
self._async_raise(self.t_client_send.ident, SystemExit)
print("回收完成發送線程313")
self.no_action_bool = True
self.recycle_bool = True
self.send_bool = False # 在上面回收了發送線程後,確保控制判斷的重連的變量為假
logger_motor.error('主動拋出接收模塊異常,終止接收線程,並通知守護線程重新拉起接收線程模塊,'
'此時切換底盤模式為無動作模式,可以被遙控器控制')
# 主動拋出接收模塊異常,引發接收異常,使得系統回收該監聽線程
raise RuntimeError('主動拋出接收模塊異常,終止接收線程,並通知守護線程重新拉起接收線程模塊,'
'此時切換底盤模式為無動作模式,可以被遙控器控制')
def receive_real(self,len_data):
"""接收真實數據"""
""" len_data 為預先發過來的真實數據長度 """
buf = b''
recv_size = 0
recv_num = 0 # 接收次數
buf = self.client.recv(len_data)
while (len(buf) <len_data and recv_num < self.repeat_num ):
buftmp = self.client.recv(len_data - len(buf))
buf = buf + buftmp
recv_num = recv_num + 1
if len(buf) != len_data:
logger_motor("服務端應發送長度為" + str(len_data) +", 實際接收長度為: " + str(len(buf)) )
con_s.x = 0
con_s.y = 0
logger_motor.error("!!!!!!!!!!!返回的接收數據為錯誤數據,可能會引起錯誤!!!!!!!!!!!!!!")
return buf
def listen_send(self):
"""用於發送底盤裡程計數值,注意這裡為兩組數據,一組為累加完成後的裡程計數值,另一組為規定的清空緩沖區的數值。"""
data_dict = {
"status": "chessis_s", "dict": {
"point_x": 0, "point_y": 0, "point_p": 0, "point_x": 0, "point_y": 0, "point_p": 0}}
print("准備開啟發送數據線程------")
while True:
data_dict["dict"]["point_x"] = ana.point_x
data_dict["dict"]["point_y"] = ana.point_y
data_dict["dict"]["point_p"] = ana.point_p
data_dict["dict"]["point_x1"] = ana.point_x1
data_dict["dict"]["point_y1"] = ana.point_y1
data_dict["dict"]["point_p1"] = ana.point_p1
self.send_data(data_dict)
ser_c.movement_moudle = "差速模式" # 數據發送到服務端成功切換為差速模式
self.send_bool = True #注意這裡 self.send_bool 變量的位置,需要將為的部分處於延時發送時間占用最多的部分
time.sleep(0.3)
self.send_bool = False
def send_data(self,data):
"""用於發送數據"""
try:
data = json.dumps(data)
length = len(data)
len_data = struct.pack('i', length)
# 固定字節長度,告知數據長度為多少
if (len_data != b'') and (data.encode('utf-8') != b''):
self.client.sendall(len_data)
self.client.sendall(data.encode('utf-8'))
#2022 修改為 sendall 用於避免數據完整性,如需要交互高效選擇send
# self.client.send(len_data)
# self.client.send(data.encode('utf-8'))
except:
self.chesis_move_stop()
self._async_raise(self.t_client_recv.ident, SystemExit)
print("回收完成接收線程420")
self.no_action_bool = True
self.recycle_bool = True
self.send_bool = False # 在上面回收了發送線程後,確保控制判斷的重連的變量為假
logger_motor.error('主動拋出發送模塊異常,終止發送線程,並通知守護線程重新拉起發送線程模塊,'
'此時切換底盤模式為無動作模式,可以被遙控器控制')
# 主動拋出發送模塊異常,引發接收異常,使得系統回收該監聽線程
raise RuntimeError('主動拋出發送模塊異常,終止發送線程,並通知守護線程重新拉起發送線程模塊,'
'此時切換底盤模式為無動作模式,可以被遙控器控制')
def init_chessis():
"""用於初始化,底盤與裡程計的,初始化參數"""
correct_v = Correct_val(ana=ana, con_s=con_s)
correct_v.time_interval = 0.2 # 設置瞬時速度的間隔時間,程序通過解析間隔時間內碼盤移動的平均度數給出速度值
correct_v.ignore_num = 5 # 由於加速度的存在,在啟動階段的實時速度測量由於硬件原因會出現誤差,設置忽略前多少次的速度計算值
con_s.x = 0
con_s.y = 0
con_s.p = 0
ser_c.interval_dis = 200000 # 裡程計清零距離 單位 (毫米)
ser_c.set_safe_area(200000, 200000) # 設置一個矩形安全范圍
ser_c.x_deviation_param = 5 # 當平移時x軸方向偏差超過該值開始進行糾偏
ser_c.y_deviation_param = 5 # 當平移時y軸方向偏差超過該值開始進行糾偏
ser_c.p_deviation_param = 5 # 當平移時角度方向偏差超過該值開始進行糾偏
ser_c.set_port('COM4') # 對應是拿鐵熊貓左上角的那個usb口
ser_c.movement_moudle = "無動作模式" # 在連接服務端成功後會切換為差數模式,差速模式下無法被遙控器控制,連接斷線自動切換為無動作模式
ser_c.open_serial_control()
# 設置差速模式下的角度糾偏,最大的角度轉動速度
# ser_c.max_correct_p = 0.05 # rad/s #左右
# ser_c.turn_hint_rectifying_bool = True #開啟運動糾偏模型
ser_c.error_x_y_min_num = 5 # mm
ser_c.error_p_min_num = 10
ser_c.max_correct_p = 0.3 # rad/s
ser_c.max_correct_x = 0.03 # m/s
ser_c.max_x_y_rang_change_num = 50 # mm(分母x y
ser_c.max_p_rang_change_num = 10 # mm(分母x y
print("..........")
print("打印當前時刻碼盤的返回值")
print(ana.point_x)
print(ana.point_y)
print(ana.point_p)
print("..........")
if __name__ == '__main__':
# #初始化底盤驅動程序
# init_chessis()
#准備拉起 客戶端
c = Client_link()
c.start_client()
c_l.warning("開啟tcp客戶端線程拉起完成")
while True:
time.sleep(20)

  1. 上一篇文章:
  2. 下一篇文章:
Copyright © 程式師世界 All Rights Reserved