項目中需要對兩個不同格式的存儲設備進行數據轉錄,因為數據量非常大,所以時間非常緩慢;解決 方案是使用ReaderWriterSlim類建立一個共享的同步數據,可以支持一個線程讀取外部設備,向同步數據 寫入;多個線程從同步數據中讀取,轉換格式,然後寫入到本地設備。
本例中采用Queue<T>作為存放數據的集合,寫入線程向它的尾部寫入對象,讀取線程從它的頭 部獲取對象。
需要注意的是,由於Queue會拋棄已處理的對象,所以在同步數據隊列中無法驗證數據對象的唯一性, 被寫入的數據庫需要去掉唯一約束,或在寫入時向數據庫請求驗證。
首先定義一個讀寫接口:
namespace Common
{
public interface IReaderWriter<T>
{
T Read(int argument);
void Write(int arugment, T instance);
void Delete(int argument);
void Clear();
}
}
然後實現一個隊列的讀寫器:
namespace Common
{
public class QueueReaderWriter<T> : IReaderWriter<T>
{
private Queue<T> queues;
public QueueReaderWriter()
{
queues = new Queue<T>();
}
public QueueReaderWriter(int capacity)
{
queues = new Queue<T>(capacity);
}
#region IReadWrite<T> 成員
public T Read(int argument)
{
return queues.FirstOrDefault();
}
public void Write(int arugment, T instance)
{
queues.Enqueue(instance);
}
public void Delete(int argument)
{
queues.Dequeue();
}
public void Clear()
{
queues.Clear();
queues.TrimExcess();
}
#endregion
}
}
使用ReaderWriterLockSlim實現同步數據類:
namespace Common
{
public class SynchronizedWriteData<T> : IDisposable
{
private ReaderWriterLockSlim _dataLock = new ReaderWriterLockSlim();
private IReaderWriter<T> _innerData;
private SynchronizedWriteData()
{ }
public SynchronizedWriteData(IReaderWriter<T> innerData)
{
_innerData = innerData;
}
public T Read()
{
_dataLock.EnterReadLock();
try
{
return _innerData.Read(0);
}
finally
{
_dataLock.ExitReadLock();
}
}
public T Read(int argument)
{
_dataLock.EnterReadLock();
try
{
return _innerData.Read(argument);
}
finally
{
_dataLock.ExitReadLock();
}
}
public void Add(T instance)
{
_dataLock.EnterWriteLock();
try
{
_innerData.Write(0, instance);
}
finally
{
_dataLock.ExitWriteLock();
}
}
public void Add(int argument, T instance)
{
_dataLock.EnterWriteLock();
try
{
_innerData.Write(argument, instance);
}
finally
{
_dataLock.ExitWriteLock();
}
}
public bool AddWithTimeout(T instance, int timeout)
{
if (_dataLock.TryEnterWriteLock(timeout))
{
try
{
_innerData.Write(0, instance);
}
finally
{
_dataLock.ExitWriteLock();
}
return true;
}
else
{
return false;
}
}
public bool AddWithTimeout(int argument, T instance, int timeout)
{
if (_dataLock.TryEnterWriteLock(timeout))
{
try
{
_innerData.Write(argument, instance);
}
finally
{
_dataLock.ExitWriteLock();
}
return true;
}
else
{
return false;
}
}
public void Delete()
{
_dataLock.EnterWriteLock();
try
{
_innerData.Delete(0);
}
finally
{
_dataLock.ExitWriteLock();
}
}
public void Delete(int argument)
{
_dataLock.EnterWriteLock();
try
{
_innerData.Delete(argument);
}
finally
{
_dataLock.ExitWriteLock();
}
}
#region IDisposable 成員
public void Dispose()
{
try
{
_dataLock.EnterWriteLock();
{
try
{
_innerData.Clear();
}
finally
{
_dataLock.ExitWriteLock();
}
}
}
finally
{
_dataLock.Dispose();
}
}
#endregion
}
}
namespace ExternalDataHandle
{
/// <summary>
/// 從外部數據源獲取到內部數據源的適配器抽象類
/// </summary>
/// <typeparam name="T">T 數據對象類型</typeparam>
public abstract class ExternalDataAdapter<T> : IDisposable
{
/// <summary>
/// 外部數據源連接字符串
/// </summary>
protected abstract string ConnectString { get; }
/// <summary>
/// 提供初始化數據適配器的方法
/// </summary>
protected abstract void Initialize();
/// <summary>
/// 提供數據傳遞的方法
/// </summary>
public abstract void Transmit();
/// <summary>
/// 提供從外部數據設備讀取數據的方法
/// </summary>
protected abstract void ReadFromExternalDevice();
/// <summary>
/// 提供保存數據到內部設備的方法
/// </summary>
protected abstract void SaveToInternalDevice();
#region IDisposable 成員
public abstract void Dispose();
#endregion
}
}
多線程數據轉錄類,本例只使用了一個讀取線程:
namespace ExternalDataHandle
{
/// <summary>
/// 提供多線程方式從外部數據源獲取到內部數據源的適配器類
/// </summary>
/// <typeparam name="T"></typeparam>
public abstract class MultiThreadAdapter<T> : ExternalDataAdapter<T>
{
protected SynchronizedWriteData<T> _data;
protected Thread _readThread;
protected abstract override string ConnectString { get; }
protected abstract override void Initialize();
public sealed override void Transmit()
{
_readThread = new Thread(new ThreadStart(ReadFromExternalDevice));
_readThread.Start();
Thread.Sleep(10000);
while (_readThread.IsAlive)
{
SaveToInternalDevice();
}
_readThread.Join();
}
protected abstract override void ReadFromExternalDevice();
protected abstract override void SaveToInternalDevice();
public override void Dispose()
{
if (_data != null)
{
_data.Dispose();
}
}
}
}