最近架構一個項目,實現行情的接入和分發,需要達到極致的低時延特性,這對於證券系統是非常重要的。接入的行情源是可以配置,既可以是Level-1,也可以是Level-2或其他第三方的源。雖然Level-1行情沒有Level-2快,但是作為系統支持的行情源,我們還是需要優化它,使得從文件讀取,到用戶通過socket收到行情,端到端的時延盡可能的低。本文主要介紹對level-1行情dbf文件讀取的極致優化方案。相信對其他的dbf文件讀取應該也有借鑒意義。
Level-1行情是由行情小站,定時每隔幾秒把dbf文件(上海是show2003.dbf,深圳是sjshq.dbf)更新一遍,用新的行情替換掉舊的。我們的目標就是,在新文件完成更新後,在最短時間內將文件讀取到內存,把每一行轉化為對象,把每個列轉化為對應的數據類型。
我們一共采用了6種優化方式。
我們在上文《Java讀取Level-1行情dbf文件極致優化(1)》 《Java讀取Level-1行情dbf文件極致優化(2)》中,已經介紹了4種優化策略:
優化一:采用內存硬盤(RamDisk)
優化二:采用JNotify,用通知替代輪詢
優化三:采用NIO讀取文件
優化四:減少讀取文件時內存反復分配和GC
行情dbf文件很多字段是價格類型的字段,帶2位或者3位小數,從dbf讀取他們的後,我們會把它們保存在Long類型或者Int類型,而不是Float或Double類型,比如1.23,轉換為1230保存。因為Float型或Double型會丟失精度。
如果不優化,讀取步驟為:
1,從byte[]對應的偏移中讀取並保存到String中。
2,對String做trim操作
3,把String轉換為Float類型
4,把Float類型乘以1000並強轉為Long類型。
不用多說,以上的過程一定是低效的,光前兩步就涉及到2次字符串拷貝,2次對象創建。第三步效率也不高。我這裡通過優化,在DBFReader.java中添加一個get_long_efficiently_and_multiply_1000方法,將4個步驟合並為一步,通過一次掃描得到結果。
public long get_long_efficiently_and_multiply_1000(byte[] src, final int index)
{
long multiplicand = 3;
long result =0;
Field field = getFields()[index];
boolean in_decimal_part = false;
boolean negative = false;
int offset = field.getOffset();
int length = field.getLength();
int end = offset+length;
for(int i =field.getOffset(); i< end; i++)
{
byte ch = src[i];
if(ch>=48 && ch<=57) //如果是數字
{
result *= 10;
result += ch-48;
if(in_decimal_part)
multiplicand--;
if(multiplicand==0) break;
continue;
}
if(ch==32) //如果是空格
continue;
if(ch == 46) //如果是小數點
{
in_decimal_part = true;
continue;
}
if(ch == '-') //如果是負號
{
negative = true;
}
throw new NumberFormatException();
}
if(multiplicand == 3)
result *= 1000;
else if (multiplicand == 2)
result *=100;
else if (multiplicand == 1)
result *=10;
if(negative)
{
result= 0 - result;
}
return result;
}
上面的算法負責讀取字段轉換為數字的同時,對它乘以1000。並且代碼中盡量優化了執行步驟。
對於整形的讀取,我們也進行了優化,添加一個get_long_efficiently:
public long get_long_efficiently(byte[] src, final int index)
{
long result =0;
boolean negative = false;
Field field = getFields()[index];
for(int i =field.getOffset(); i< field.getOffset()+ field.getLength(); i++)
{
byte ch = src[i];
if(ch>=48 && ch<=57) //如果是數字
{
result = result*10 + (src[i]-48);
continue;
}
if(src[i]==32) //如果是空格
continue;
if(ch == '-') //如果是負號
{
negative = true;
}
throw new NumberFormatException();
}
if(negative)
{
result= 0 - result;
}
return result;
}
以上的2個算法並不復雜,但卻非常關鍵,一個dbf文件包含大約5000行,每行包括20~30個Float類型或者Int類型的字段,該優化涉及10萬+個字段的讀取。測試下來,這步改進將讀取速度從50ms-70ms提升至15ms至20ms,細節在魔鬼當中,這是速度提升最快的一項優化。
(優化五的代碼在改進的DBFReader中,上午中已經提供下載,這裡再提供下載鏈接:
DBFReader庫 )
對5000多個行進行字段讀取並轉換成對象,采用多線程處理是最自然不過的優化方式。
一般我們采用的方法是把任務分成等份的塊,每個線程處理一大塊。比如,如果采用5個線程處理,那麼把5000行分成1000個行一塊,每個線程處理一塊。這樣看貌似公平,其實不然,因為我們的操作系統是分時操作系統,每個線程開始工作的時間,占用的CPU時間片,和任務的強度都不完全一致。等分的辦法貌似平均,但是很有可能導致有些線程完成工作了,另外一些還有很多沒做完。
這裡介紹一種我喜歡的任務分配方式:每個線程每次從5000個行的任務中申請一小塊,比如16個行,完成後,再申請16個行。這樣快的線程就會多工作些,慢的就少工作些,直到所有的行處理完畢。那麼,這些線程怎麼協調呢,任務分配豈不是要用到鎖?不用鎖,我們采用CAS機制就能做到(實際用的是AtomicInteger,AtomicInteger就是基於CAS實現的),這裡不解釋太多了。看代碼:
class ReaderTask implements Runnable {
Collector collector;
List<byte[]> recordList;
CountDownLatch countDownLatch;
AtomicInteger cursor;
DBFReader reader;
public ReaderTask(Collector collector, DBFReader dbfreader, List<byte[]> recordList, AtomicInteger cursor,
CountDownLatch countDownLatch) {
this.collector = collector;
this.reader = dbfreader;
this.recordList = recordList;
this.cursor = cursor;
this.countDownLatch = countDownLatch;
}
@Override
public void run() {
try {
int length = recordList.size();
do {
final int step = 16; //每次分配16行給該線程處理。
int endIndex = cursor.addAndGet(step);
int startIndex = endIndex - step ;
for (int i = startIndex; i < endIndex && i < length; i++) {
byte[] row = recordList.get(i);
MarketRealtimeData SHData = new MarketRealtimeData();
SHData.setMarketType(Constants.MARKET_SH_STOCK);
SHData.setIdNum(reader.get_string_efficiently(row, 0));
SHData.setPrefix(reader.get_string_efficiently(row, 1));
SHData.setPreClosePrice(reader.get_long_efficiently_and_multiply_1000(row, 2));
SHData.setOpenPrice(reader.get_long_efficiently_and_multiply_1000(row, 3));
SHData.setTurnover(reader.get_long_efficiently_and_multiply_1000(row, 4));
SHData.setHighPrice(reader.get_long_efficiently_and_multiply_1000(row, 5));
SHData.setLowPrice(reader.get_long_efficiently_and_multiply_1000(row, 6));
SHData.setMatchPrice(reader.get_long_efficiently_and_multiply_1000(row, 7));
//讀取所有的Field,以下省略若干行
//... ...
//... ...
if (collector != null) {
collector.collect(SHData);
}
}
} while (cursor.get() < length);
} finally {
if (countDownLatch != null)
countDownLatch.countDown();
}
}
}
private void readHangqingFile(String path, String name) throws Exception {
// Long t1 = System.nanoTime();
DBFReader dbfreader_SH = null;
try {
dbfreader_SH = new DBFReader(new File(path+File.separator + name));
List<byte[]> list_sh = dbfreader_SH.recordsWithOutDel_efficiently(cacheManager);
AtomicInteger cursor = new AtomicInteger(0); //原子變量,用於線程間分配任務
CountDownLatch countDownLatch = new CountDownLatch(WORK_THREAD_COUNT);
for (int i = 0; i < WORK_THREAD_COUNT - 1; i++) { //把任務分配給線程池多個線程
ReaderTask task = new ReaderTask(collector, dbfreader_SH, list_sh, cursor, countDownLatch);
globalExecutor.execute(task);
}
new ReaderTask(collector, dbfreader_SH, list_sh, cursor, countDownLatch).run(); //當前線程自己也作為工作線程
countDownLatch.await();
//Long t2 = System.nanoTime();
//System.out.println("speed time on read and object:" + (t2 - t1));
} finally {
if (dbfreader_SH != null)
dbfreader_SH.close();
}
}
測試表明,在使用4個線程並行處理的情況下,處理時間從15ms-20ms縮短至4ms-7ms。
在使用本文章介紹的所有優化方法,整個讀取效率從耗時300ms以上,優化至5ms-10ms之間。我們討論的是從文件更新始,到完成文件讀取,完成5000多個對象,100,000個字段的轉換的總耗時。
如果繼續深入,我們可能還有不少細節可以改進。測試表明,時延的穩定性還不夠好,很可能是由於GC造成的,我們還可以從減少對象的創建,以減少性能損耗,減少GC;並且控制GC執行的時間,讓GC在空閒時執行等方面優化。
Binhua Liu原創文章,轉載請注明原地址http://www.cnblogs.com/Binhua-Liu/p/5616761.html