C++實現服務器壓力測試框架
flyfish 2015-3-9
#pragma once #include#include #include #include #include class CSession: public boost::enable_shared_from_this { public: CSession(const std::string IP, unsigned short port,int m_heartbeat_timer_minutes,boost::asio::io_service& io_service_); ~CSession(void); private: boost::asio::ip::tcp::endpoint m_ep; boost::asio::ip::tcp::socket m_sock; boost::asio::ip::tcp::resolver m_resolver; boost::array read_buffer; boost::array write_buffer; boost::asio::deadline_timer heartbeat_timer; int m_heartbeat_timer_minutes;//心跳間隔 以分為單位 void receive_handler(const boost::system::error_code &ec, std::size_t bytes_transferred);//接收數據處理結果 void connect_handler(const boost::system::error_code &ec); //連接處理結果 void heartbeat_handler(const boost::system::error_code &ec); //心跳處理結果 void login_handler(const boost::system::error_code &ec,std::size_t bytes_transferred);//登錄結果的處理 void start_receive();//開始接收數據 void send_handler(const boost::system::error_code &ec); std::size_t check_frame(const boost::system::error_code &ec, std::size_t bytes_transferred);//校驗數據 void parse_frame(const boost::system::error_code &ec, std::size_t bytes_transferred);//解析數據 public: void start_send();//其他地方調用時 只需要把任務加到任務隊列 執行 io_service_.post(boost::bind(&CSession::start_send, this)); void start();//啟動停止 void stop(); void login();//登錄 void heartbeat();//心跳 task m_task_queue;//線程安全的任務隊列 };
實現文件
#include "Session.h"
CSession::CSession(
const std::string IP,
unsigned short port,
int heartbeat_timer_minutes,
boost::asio::io_service& io_service_)
: m_resolver(io_service_),
m_sock(io_service_),
heartbeat_timer(io_service_),
m_nConnectState(0),
m_ep(boost::asio::ip::address::from_string(IP),port),
m_pObj(nullptr),
m_heartbeat_timer_minutes(heartbeat_timer_minutes),
m_nHeartbeatCount(0)
{
read_buffer.fill(0);
}
void CSession::start()
{
m_sock.async_connect(m_ep,
boost::bind(&CSession::connect_handler,
shared_from_this(),
boost::asio::placeholders::error));
}
CSession::~CSession(void)
{
stop();
}
void CSession::stop()
{
heartbeat_timer.cancel();
m_sock.close();
}
std::size_t CSession::check_frame(const boost::system::error_code &ec, std::size_t bytes_transferred)
{
//bytes_transferred 已經接收的字節數
//返回0 表示 驗證通過
//返回1 表示 需要繼續驗證
}
void CSession::parse_frame(const boost::system::error_code &ec, std::size_t bytes_transferred)
{
if (!ec)
{
}
}
void CSession::start_receive()
{
m_sock.async_receive(boost::asio::buffer(read_buffer),
boost::bind(&CSession::receive_handler,
shared_from_this(),
boost::asio::placeholders::error,
boost::asio::placeholders::bytes_transferred));
}
void CSession::receive_handler(const boost::system::error_code &ec, std::size_t bytes_transferred)
{
if (!ec)
{
//check_frame 驗證接收的數據,當驗證成功時,開始
boost::asio::async_read(m_sock,boost::asio::buffer(read_buffer),
boost::bind(&CSession::check_frame,
shared_from_this(), boost::asio::placeholders::error,
bytes_transferred),
boost::bind(&CSession::parse_frame,
shared_from_this(),
boost::asio::placeholders::error,
bytes_transferred));
m_sock.async_receive(boost::asio::buffer(read_buffer),
boost::bind(&CSession::receive_handler,
shared_from_this(),
boost::asio::placeholders::error,
boost::asio::placeholders::bytes_transferred));
}
else
{
stop();
}
}
void CSession::heartbeat()
{
unsigned char c[256];
UINT nLen = process_hartbeat(c);//生成心跳數據
m_sock.async_write_some(boost::asio::buffer(c,nLen),boost::bind(&CSession::heartbeat_handler,shared_from_this(),boost::asio::placeholders::error));
}
void CSession::heartbeat_handler(const boost::system::error_code &ec)
{
if (!ec)
{
heartbeat_timer.expires_from_now(boost::posix_time::minutes(m_heartbeat_timer_minutes));
heartbeat_timer.async_wait(boost::bind(&CSession::heartbeat,shared_from_this()));
}
else
{
stop();
}
}
void CSession::login()//登錄
{
//連接完成 發送登錄幀
//*******************************************************************************************
unsigned char c[256];
UINT nLen = process_login(c);//生成登錄數據 例如 c中存儲了用戶名稱和密碼等
m_sock.async_write_some(boost::asio::buffer(c,nLen),boost::bind(&CSession::login_handler,
shared_from_this(),
boost::asio::placeholders::error,
boost::asio::placeholders::bytes_transferred));
}
void CSession::login_handler(const boost::system::error_code &ec,std::size_t bytes_transferred)
{
if (!ec)
{
start_receive();//啟動接收
heartbeat();
}
else
{
stop();
}
}
void CSession::connect_handler(const boost::system::error_code &ec)
{
if (!ec)
{
boost::this_thread::sleep(boost::posix_time::microseconds(500));
login();//連接成功之後開始登錄
}
else
{
boost::this_thread::sleep(boost::posix_time::microseconds(60000));
start();//連接失敗 需要再次連接
}
}
void CSession::start_send()
{
//從線程安全的任務隊列中獲取一個任務開始發送
std::tuple ret=m_task_queue.get_nonblock();
if (std::get<0>(ret))
{
unsigned char* c=std::get<1>(ret).data;
int nLen=std::get<1>(ret).len;
/**************************************************************************************************/
m_sock.async_write_some(boost::asio::buffer(c,nLen),boost::bind(&CSession::send_handler,
shared_from_this(),
boost::asio::placeholders::error));
}
}
void CSession::send_handler(const boost::system::error_code &ec)
{
if (!ec)
{
start_send();//任務隊列不為空時,開始發送下一個任務
}
} class client
{
public:
boost::shared_ptr m_pSocket;//創建繼承於enable_shared_from_this類的對象時必須使用智能指針
};
std::vector client_queue;//存儲生成的客戶端實例
boost::asio::io_service m_io_service;
boost::asio::io_service::work m_work(m_io_service);//即使io任務完成,也不退出
std::string IP="192.168.1.1";
unsigned short port=8000;
int heartbeat=10;
for (int i=0;ip (new CSession(IP,port,heartbeat,(m_io_service)));
client_queue.at(i)->m_pSocket->start();
}
boost::thread t(boost::bind(&boost::asio::io_service::run,boost::ref(m_io_service)));
多線程調用run方式
int thread_count=(std::max)(static_cast(boost::thread::hardware_concurrency()),1);//至少有一個線程運行 boost::thread_group tg; for (int i=0;i
只要將任務加入到任務隊列,執行 m_pSocket->start_send();