程序師世界是廣大編程愛好者互助、分享、學習的平台,程序師世界有你更精彩!
首頁
編程語言
C語言|JAVA編程
Python編程
網頁編程
ASP編程|PHP編程
JSP編程
數據庫知識
MYSQL數據庫|SqlServer數據庫
Oracle數據庫|DB2數據庫
 程式師世界 >> 編程語言 >> Visual Basic語言 >> VB.NET >> 在vb.net中運用多線程實現遠程數據收集

在vb.net中運用多線程實現遠程數據收集

編輯:VB.NET

引言

在筆者參與的四川省重點污染源企業環境遠程監控系統中,有一項非常重要的工作:將多達80台的遠程DVS(視頻服務器)的監測數據通過因特網傳輸,由上位機收集上來,寫入SQL Server 2005數據庫中。遠程數據每隔一分鐘發送一次實時數據。如果數據在一分鐘內傳送不成功,那麼DVS將認為網絡已經斷開,又要不斷的發起新的連接。因此,上位機能不能及時的准確的收集、寫入,是系統成敗的關鍵。

項目分析

80多台遠程DVS正在不間斷的采集數據,在網絡正常的情況下,會不間斷的向上位機發送數據。如果采用傳統的單線程結構,上位機接受連接請求,接收處理數據,將數據寫入數據庫,然後再接受新的連接請求,接收處理數據,……,這樣,上位機程序異常繁忙,CPU利用率幾乎將達100%。由於服務器不能迅速處理請求,DVS只好等待。

更為重要的是,為了減少上位機發送響應連接的次數,設備采用的是長連接,即發送一次連接請求並得到響應後,發送數據時不再發送連接請求。因此,要求上位機能夠保存客戶端的Socket。

為了避免這種情形發生。筆者采用了異步、多線程來處理。所謂異步,是程序調用一個方法後立即返回,總體而言,主線程與方法線程並行執行。而同步即程序執行一個方法,等該方法返回之後,繼續往下走,本系統從功能上分成3個模塊,即3個前後關聯的線程:主線程、數據接收線程、存入數據庫線程,它們異步執行。

主線程

主線程工作流程如圖一所示。其主要功能是:初始化參數,如連接端口號、IP地址等,偵聽連接請求,將傳入的連接保留到TcpClient對象數組sockets,而這個數組sockets恰恰是我們後面線程中要用到的全局變量。 為了不使線程間爭用這個數組變量,這裡用到了VB.net提供的Monitor類,它提供同步對象的訪問的機制。

當主線程偵聽到遠程DVS有連接請求時,立即執行AcceptTcpClient方法,創建一個TcpClient實例,並將它放入sockets數組。同時創建線程對象serverthread。

聲明創建線程時,使用 ThreadStart 委托作為其唯一參數的構造函數創建 Thread 類的新實例,創建線程時需要傳遞處理連接的過程或函數的地址以被線程調用。創建線程委托,傳遞需要操作的過程的地址,這部分的代碼如下所示:

Public Sub WaitData()

Try

Dim ipHostInfo As IPHostEntry = Dns.Resolve(Dns.GetHostName())

Dim localAddr As IPAddress = ipHostInfo.AddressList(0)

s = New TcpListener(localAddr, ListenPort)

s.Start()’開始偵聽連接請求

Dim Recdatathread As New Thread(New ThreadStart(AddressOf RecDataProc)) ’創建數據接收線程

Recdatathread.IsBackground = True

Recdatathread.Start()’啟動線程

While True

Dim client As TcpClient = s.AcceptTcpClient()

Monitor.Enter(sockets) '在指定對象上獲取排他鎖

sockets(socketcount) = client

socketcount = socketcount + 1

Monitor.Exit(sockets) '釋放指定對象上的排他鎖

End While

Catch e As SocketException

s.Stop()

saveErrLog(Date.Now, CType(s.AcceptTcpClient.Client.RemoteEndPoint, IPEndPoint).Address.ToString(), e.Message)’寫入錯誤日志

Catch e As ThreadAbortException

t.Abort()

saveErrLog(Date.Now, CType(s.AcceptTcpClient.Client.RemoteEndPoint, IPEndPoint).Address.ToString(), e.Message) ’寫入錯誤日志

Finally

t.Abort()

End

End Try

End Sub

數據接收線程

數據接收線程的工作流程如圖二所示。主要功能是:將掛起連接的DVS上傳數據從流中讀取出來,創建數據寫入線程,並在listbox中顯示。

從保存的socket數組中讀取字節流時,必須考慮以下問題:

一、有些DVS可能會在工作一段時間後發生設備故障或者網絡中斷,但服務器保存的是其歷史socket,因此,必須判斷其connect屬性,即設備是否在線。

二、為了減少服務器的空等時間,必須判斷流對象(stream)的DataAvailable屬性。

三、創建線程saveToDb時,必須考慮傳入參數的問題。通常的線程創建是不可提供參數的。我們將線程saveToDb的執行體封裝到一個類中,通過初始化類的成員變量的方法,來達到傳送參數的目的。

四、由於本線程是長駐內存並循環執行的。因此,應當在適當的地方阻止,否則,CPU的利用率將達幾乎100%。

這部分的代碼如下:

Public Sub RecDataProc()

Dim i As Integer

Dim c As TcpClient

While (True)

Try

For i = 0 To socketList.Count - 1

If socketList.Item(i).client.connected Then '如果該連接在線

Dim dh1 As DelegateHandler = New DelegateHandler(AddressOf displayStatusBarPanel2)

'New 出一個委托並指定委托方法

Me.Invoke(dh1, New Object() {CStr(i)})  '調用invoke方法

c = socketList.Item(i)

Dim stream As NetworkStream = c.GetStream()

If stream.DataAvailable Then

Dim dh As DelegateHandler1 = New DelegateHandler1(AddressOf ShowInBox)

Dim readbuff As New ReadBuffClass(c, stream, Connection, dh) '由構造函數來初始化成員變量

ThreadPool.QueueUserWorkItem(New WaitCallback(AddressOf readbuff.ReadBuff), readbuff)’把具體從流中讀取數據的工作交給線程池的線程來進行

Dim workerThreads, portThreads As Integer

ThreadPool.GetAvailableThreads(workerThreads, portThreads)

Dim dh2 As DelegateHandler = New DelegateHandler(AddressOf displayStatusBarPanel4)

'New 出一個委托並指定委托方法

Me.Invoke(dh2, New Object() {workerThreads.ToString})  '調用invoke方法

End If

Thread.Sleep(20) '如果不阻止,則CPU利用率將為100%

End If

Next

Catch ex As System.ArgumentOutOfRangeException

Catch ex As System.InvalidOperationException

Catch ex As ObjectDisposedException 'TcpClient 已關閉

Catch ex As SocketException

Catch ex As ThreadAbortException

Catch ex As System.IO.IOException

Catch ex As System.AccessViolationException

Finally

End Try

End While

End Sub

數據處理線程

這部份線程每個都由線程池來調度運行。由於要接收線程參數,因此,線程本身被封裝到一個類中,限於篇幅的原因,只描述類的結構。

Public Class ReadBuffClass

Private sck As TcpClient

Private ns As NetworkStream

Private sqlcnn As SqlConnection

Private delg As frmServerMain.DelegateHandler1

Dim sqlcmd As SqlCommand

Dim sqlda As SqlDataAdapter

Public Sub New(ByVal sc As TcpClient, ByVal n As NetworkStream, ByVal cn As SqlConnection, ByVal dh As frmServerMain.DelegateHandler1) '由構造函數來初始化成員變量

Me.sck = sc

Me.ns = n

Me.sqlcnn = cn

Me.delg = dh

End Sub

Public Sub ReadBuff(ByVal state As Object) ' 線程的入口函數

Dim datastring As String = ""

ns.ReadTimeout = 100 '讀取失敗前經歷的毫秒數

Try

While (True)

Dim bytes(2048) As Byte

ns.Read(bytes, 0, 2048)

datastring = datastring + Encoding.ASCII.GetString(bytes)

If datastring.IndexOf(vbCrLf) > 0 Then

Exit While

End If

End While

delg.Invoke(datastring, sck) '通過委托的方式,將參數傳給UI

Dim tmparr() As String = datastring.Split("##")

Dim i As Integer

For i = 0 To tmparr.Length - 1

If tmparr(i) <> "" Then

ProcessInfo(tmparr(i))

End If

Next

Catch ex As System.AccessViolationException

Catch ex As NotSupportedException

Catch ex As ArgumentNullException

Catch ex As ArgumentOutOfRangeException

Catch ex As ObjectDisposedException

Catch ex As IO.IOException '

Catch ex As SocketException

Catch ex As ThreadAbortException

Finally

End Try

End Sub

Private Sub ProcessInfo(ByVal tmpString As String) '對收到的數據進行解析、處理

……

End Sub

……

End Class

結束語

本文著重論述的是在VB2005的環境下,運用多線程異步實現遠程DVS數據收集的原理,重點考慮的是怎樣提高程序的反應速度,特別討論了程序開發中的一些細節問題,對有志於從事遠程臨控系統開發的軟件人員有一定的參考意義。

文中代碼在windows2003+VB2005+SqlServer2005的環境下調試通過,現在正在使用。

  1. 上一頁:
  2. 下一頁:
Copyright © 程式師世界 All Rights Reserved