我平時比較喜歡從網上聽歌,有些鏈接下載速度太慢了。如果用 HttpURLConnection類的方法打開連接,然後用InputStream類獲得輸入流,再用 BufferedInputStream構造出帶緩沖區的輸入流,如果網速太慢的話,無論緩沖 區設置多大,聽起來都是斷斷續續的,達不到真正緩沖的目的。於是嘗試編寫代 碼實現用緩沖方式讀取遠程文件,以下貼出的代碼是我寫的MP3解碼器的一部分 。我是不怎麼贊同使用多線程下載的,加之有的鏈接下載速度本身就比較快,所 以在下載速度足夠的情況下,就讓下載線程退出,直到只剩下一個下載線程。當 然,多線程中令人頭痛的死鎖問題、HttpURLConnection的超時阻塞問題都會使 代碼看起來異常復雜。
簡要介紹一下實現多線程環形緩沖的方法。將緩沖區buf[]分為16塊,每塊 32K,下載線程負責向緩沖區寫數據,每次寫一塊;讀線程(BuffRandAcceURL類 )每次讀小於32K的任意字節。同步描述:寫/寫互斥等待空閒塊;寫/寫並發填 寫buf[];讀/寫並發使用buf[]。
經過我很長一段時間使用,我認為比較滿意地實現了我的目標,同其它MP3播 放器對比,我的這種方法能夠比較流暢、穩定地下載並播放。我把實現多線程下 載緩沖的方法寫出來,不足之處懇請批評指正。
一、HttpReader類功能:HTTP協議從指定URL讀取數據
/** *//**
* author by http://www.bt285.cn http://www.5a520.cn
*/
package instream;
import java.io.IOException;
import java.io.InputStream;
import java.net.HttpURLConnection;
import java.net.URL;
public final class HttpReader {
public static final int MAX_RETRY = 10;
private static long content_length;
private URL url;
private HttpURLConnection httpConnection;
private InputStream in_stream;
private long cur_pos; //用於決定seek方法中
是否執行文件定位
private int connect_timeout;
private int read_timeout;
public HttpReader(URL u) {
this(u, 5000, 5000);
}
public HttpReader(URL u, int connect_timeout, int read_timeout)
{
this.connect_timeout = connect_timeout;
this.read_timeout = read_timeout;
url = u;
if (content_length == 0) {
int retry = 0;
while (retry < HttpReader.MAX_RETRY)
try {
this.seek(0);
content_length =
httpConnection.getContentLength();
break;
} catch (Exception e) {
retry++;
}
}
}
public static long getContentLength() {
return content_length;
}
public int read(byte[] b, int off, int len) throws IOException
{
int r = in_stream.read(b, off, len);
cur_pos += r;
return r;
}
public int getData(byte[] b, int off, int len) throws
IOException {
int r, rema = len;
while (rema > 0) {
if ((r = in_stream.read(b, off, rema)) == -1) {
return -1;
}
rema -= r;
off += r;
cur_pos += r;
}
return len;
}
public void close() {
if (httpConnection != null) {
httpConnection.disconnect();
httpConnection = null;
}
if (in_stream != null) {
try {
in_stream.close();
} catch (IOException e) {}
in_stream = null;
}
url = null;
}
/**//*
* 拋出異常通知再試
* 響應碼503可能是由某種暫時的原因引起的,例如同一IP頻繁的連接
請求可能遭服務器拒絕
*/
public void seek(long start_pos) throws IOException {
if (start_pos == cur_pos && in_stream != null)
return;
if (httpConnection != null) {
httpConnection.disconnect();
httpConnection = null;
}
if (in_stream != null) {
in_stream.close();
in_stream = null;
}
httpConnection = (HttpURLConnection)
url.openConnection();
httpConnection.setConnectTimeout(connect_timeout);
httpConnection.setReadTimeout(read_timeout);
String sProperty = "bytes=" + start_pos + "-";
httpConnection.setRequestProperty("Range", sProperty);
//httpConnection.setRequestProperty("Connection",
"Keep-Alive");
int responseCode = httpConnection.getResponseCode();
if (responseCode < 200 || responseCode >= 300) {
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
throw new IOException("HTTP
responseCode="+responseCode);
}
in_stream = httpConnection.getInputStream();
cur_pos = start_pos;
}
}
二、IWriterCallBack接口功能:實現讀/寫通信。
package instream;
public interface IWriterCallBack {
public boolean tryWriting(Writer w) throws
InterruptedException;
public void updateBuffer(int i, int len);
public void updateWriterCount();
public void terminateWriters();
}
三、Writer類:下載線程,負責向buf[]寫數據。
/** *//**
* http://www.bt285.cn http://www.5a520.cn
*/
package instream;
import java.io.IOException;
import java.net.URL;
public final class Writer implements Runnable {
private static boolean isalive = true;
private byte[] buf;
private IWriterCallBack icb;
protected int index; //buf[]內"塊"索引號
protected long start_pos; //index對應的文件位置(相
對於文件首的偏移量)
protected int await_count; //用於判斷:下載速度足夠就
退出一個"寫"線程
private HttpReader hr;
public Writer(IWriterCallBack call_back, URL u, byte[] b, int
i) {
hr = new HttpReader(u);
if(HttpReader.getContentLength() == 0) //實例化
HttpReader對象都不成功
return;
icb = call_back;
buf = b;
Thread t = new Thread(this,"dt_"+i);
t.setPriority(Thread.NORM_PRIORITY + 1);
t.start();
}
public void run() {
int write_bytes=0, write_pos=0, rema = 0, retry = 0;
boolean cont = true;
while (cont) {
try {
// 1.等待空閒塊
if(retry == 0) {
if (icb.tryWriting(this) ==
false)
break;
write_bytes = 0;
rema =
BuffRandAcceURL.UNIT_LENGTH;
write_pos = index <<
BuffRandAcceURL.UNIT_LENGTH_BITS;
}
// 2.定位
hr.seek(start_pos);
// 3.下載"一塊"
int w;
while (rema > 0 && isalive)
{
w = (rema < 2048) ? rema :
2048; //每次讀幾K合適?
if ((w = hr.read(buf,
write_pos, w)) == -1) {
cont = false;
break;
}
rema -= w;
write_pos += w;
start_pos += w;
write_bytes += w;
}
//4.通知"讀"線程
retry = 0;
icb.updateBuffer(index, write_bytes);
} catch (InterruptedException e) {
isalive = false;
icb.terminateWriters();
break;
} catch (IOException e) {
if(++retry == HttpReader.MAX_RETRY) {
isalive = false;
icb.terminateWriters();
break;
}
}
}
icb.updateWriterCount();
try {
hr.close();
} catch (Exception e) {}
hr = null;
buf = null;
icb = null;
}
}
四、IRandomAccess接口:隨機讀取文件接口,BuffRandAcceURL類和 BuffRandAcceFile類實現接口方法。BuffRandAcceFile類實現讀取本地磁盤文件 ,這兒就不給出其源碼了。
package instream;
public interface IRandomAccess {
public int read() throws Exception;
public int read(byte b[]) throws Exception;
public int read(byte b[], int off, int len) throws Exception;
public int dump(int src_off, byte b[], int dst_off, int len)
throws Exception;
public void seek(long pos) throws Exception;
public long length();
public long getFilePointer();
public void close();
}
五、BuffRandAcceURL類功能:創建下載線程;read方法從buf[]讀數據。
關鍵是如何簡單有效防止死鎖?以下只是我的一次嘗試,請指正。
/** *//**
* http://www.5a520.cn http://www.bt285.cn
*/
package instream;
import java.net.URL;
import java.net.URLDecoder;
import decode.Header;
import tag.MP3Tag;
import tag.TagThread;
public final class BuffRandAcceURL implements IRandomAccess,
IWriterCallBack {
public static final int UNIT_LENGTH_BITS = 15;
//32K
public static final int UNIT_LENGTH = 1 <<
UNIT_LENGTH_BITS;
public static final int BUF_LENGTH = UNIT_LENGTH << 4;
//16塊
public static final int UNIT_COUNT = BUF_LENGTH >>
UNIT_LENGTH_BITS;
public static final int BUF_LENGTH_MASK = (BUF_LENGTH - 1);
private static final int MAX_WRITER = 8;
private static long file_pointer;
private static int read_pos;
private static int fill_bytes;
private static byte[] buf; //同時也作讀寫同步
鎖:buf.wait()/buf.notify()
private static int[] buf_bytes;
private static int buf_index;
private static int alloc_pos;
private static URL url = null;
private static boolean isalive = true;
private static int writer_count;
private static int await_count;
private long file_length;
private long frame_bytes;
public BuffRandAcceURL(String sURL) throws Exception {
this(sURL,MAX_WRITER);
}
public BuffRandAcceURL(String sURL, int download_threads)
throws Exception {
buf = new byte[BUF_LENGTH];
buf_bytes = new int[UNIT_COUNT];
url = new URL(sURL);
//創建線程以異步方式解析ID3
new TagThread(url);
//打印當前文件名
try {
String s = URLDecoder.decode(sURL, "GBK");
System.out.println("start>> " +
s.substring(s.lastIndexOf("/") + 1));
s = null;
} catch (Exception e) {
System.out.println("start>> " + sURL);
}
//創建"寫"線程
for(int i = 0; i < download_threads; i++)
new Writer(this, url, buf, i+1);
frame_bytes = file_length =
HttpReader.getContentLength();
if(file_length == 0) {
Header.strLastErr = "連接URL出錯,重試 " +
HttpReader.MAX_RETRY + " 次後放棄。";
throw new Exception("retry " +
HttpReader.MAX_RETRY);
}
writer_count = download_threads;
//緩沖
try_cache();
//跳過ID3 v2
MP3Tag mP3Tag = new MP3Tag();
int v2_size = mP3Tag.checkID3V2(buf,0);
if (v2_size > 0) {
frame_bytes -= v2_size;
//seek(v2_size):
fill_bytes -= v2_size;
file_pointer = v2_size;
read_pos = v2_size;
read_pos &= BUF_LENGTH_MASK;
int units = v2_size >> UNIT_LENGTH_BITS;
for(int i = 0; i < units; i++) {
buf_bytes[i] = 0;
this.notifyWriter();
}
buf_bytes[units] -= v2_size;
this.notifyWriter();
}
mP3Tag = null;
}
private void try_cache() throws InterruptedException {
int cache_size = BUF_LENGTH;
if(cache_size > (int)file_length - alloc_pos)
cache_size = (int)file_length - alloc_pos;
cache_size -= UNIT_LENGTH;
//等待填寫當前正在讀的那"一塊"緩沖區
/**//*if(fill_bytes >= cache_size &&
writer_count > 0) {
synchronized (buf) {
buf.wait();
}
return;
}*/
//等待填滿緩沖區
while (fill_bytes < cache_size) {
if (writer_count == 0 || isalive == false)
return;
if(BUF_LENGTH > (int)file_length -
alloc_pos)
cache_size = (int)file_length -
alloc_pos - UNIT_LENGTH;
System.out.printf("\r[緩沖%1$6.2f%%] ",(float)
fill_bytes / cache_size * 100);
synchronized (buf) {
buf.wait();
}
}
System.out.printf("\r");
}
private int try_reading(int i, int len) throws Exception {
int n = (i == UNIT_COUNT - 1) ? 0 : (i + 1);
int r = (buf_bytes[i] == 0) ? 0 : (buf_bytes[i] +
buf_bytes[n]);
while (r < len) {
if (writer_count == 0 || isalive == false)
return r;
try_cache();
r = (buf_bytes[i] == 0) ? 0 : (buf_bytes[i] +
buf_bytes[n]);
}
return len;
}
/**//*
* 各個"寫"線程互斥等待空閒塊
*/
public synchronized boolean tryWriting(Writer w) throws
InterruptedException {
await_count++;
while (buf_bytes[buf_index] != 0 && isalive) {
this.wait();
}
//下載速度足夠就結束一個"寫"線程
if(writer_count > 1 && w.await_count >=
await_count &&
w.await_count >= writer_count)
return false;
if(alloc_pos >= file_length)
return false;
w.await_count = await_count;
await_count--;
w.start_pos = alloc_pos;
w.index = buf_index;
alloc_pos += UNIT_LENGTH;
buf_index = (buf_index == UNIT_COUNT - 1) ? 0 :
buf_index + 1;
return isalive;
}
public void updateBuffer(int i, int len) {
synchronized (buf) {
buf_bytes[i] = len;
fill_bytes += len;
buf.notify();
}
}
public void updateWriterCount() {
synchronized (buf) {
writer_count--;
buf.notify();
}
}
public synchronized void notifyWriter() {
this.notifyAll();
}
public void terminateWriters() {
synchronized (buf) {
if (isalive) {
isalive = false;
Header.strLastErr = "讀取文件超時。重試
" + HttpReader.MAX_RETRY
+ " 次後放棄,請您稍後
再試。";
}
buf.notify();
}
notifyWriter();
}
public int read() throws Exception {
int iret = -1;
int i = read_pos >> UNIT_LENGTH_BITS;
// 1."等待"有1字節可讀
while (buf_bytes[i] < 1) {
try_cache();
if (writer_count == 0)
return -1;
}
if(isalive == false)
return -1;
// 2.讀取
iret = buf[read_pos] & 0xff;
fill_bytes--;
file_pointer++;
read_pos++;
read_pos &= BUF_LENGTH_MASK;
if (--buf_bytes[i] == 0)
notifyWriter(); // 3.通知
return iret;
}
public int read(byte b[]) throws Exception {
return read(b, 0, b.length);
}
public int read(byte[] b, int off, int len) throws Exception {
if(len > UNIT_LENGTH)
len = UNIT_LENGTH;
int i = read_pos >> UNIT_LENGTH_BITS;
// 1."等待"有足夠內容可讀
if(try_reading(i, len) < len || isalive == false)
return -1;
// 2.讀取
int tail_len = BUF_LENGTH - read_pos; // write_pos !=
BUF_LENGTH
if (tail_len < len) {
System.arraycopy(buf, read_pos, b, off,
tail_len);
System.arraycopy(buf, 0, b, off + tail_len, len
- tail_len);
} else
System.arraycopy(buf, read_pos, b, off, len);
fill_bytes -= len;
file_pointer += len;
read_pos += len;
read_pos &= BUF_LENGTH_MASK;
buf_bytes[i] -= len;
if (buf_bytes[i] < 0) {
int ni = read_pos >> UNIT_LENGTH_BITS;
buf_bytes[ni] += buf_bytes[i];
buf_bytes[i] = 0;
notifyWriter();
} else if (buf_bytes[i] == 0)
notifyWriter();
return len;
}
/**//*
* 從src_off位置復制,不移動文件"指針"
*/
public int dump(int src_off, byte b[], int dst_off, int len)
throws Exception {
int rpos = read_pos + src_off;
if(try_reading(rpos >> UNIT_LENGTH_BITS, len)
< len || isalive == false)
return -1;
int tail_len = BUF_LENGTH - rpos;
if (tail_len < len) {
System.arraycopy(buf, rpos, b, dst_off,
tail_len);
System.arraycopy(buf, 0, b, dst_off + tail_len,
len - tail_len);
} else
System.arraycopy(buf, rpos, b, dst_off, len);
// 不發信號
return len;
}
public long length() {
return file_length;
}
public long getFilePointer() {
return file_pointer;
}
public void close() {
//...
}
//
public void seek(long pos) throws Exception {
//...
}
}