程序師世界是廣大編程愛好者互助、分享、學習的平台,程序師世界有你更精彩!
首頁
編程語言
C語言|JAVA編程
Python編程
網頁編程
ASP編程|PHP編程
JSP編程
數據庫知識
MYSQL數據庫|SqlServer數據庫
Oracle數據庫|DB2數據庫
 程式師世界 >> 編程語言 >> JAVA編程 >> JAVA編程入門知識 >> java線程阻塞中斷與LockSupport使用介紹

java線程阻塞中斷與LockSupport使用介紹

編輯:JAVA編程入門知識
上周五和周末,工作忙裡偷閒,在看java cocurrent中也順便再溫故了一下Thread.interrupt和java 5之後的LockSupport的實現。
在介紹之前,先拋幾個問題。
Thread.interrupt()方法和InterruptedException異常的關系?是由interrupt觸發產生了InterruptedException異常?
Thread.interrupt()會中斷線程什麼狀態的工作? RUNNING or BLOCKING?
一般Thread編程需要關注interrupt中斷不?一般怎麼處理?可以用來做什麼?
LockSupport.park()和unpark(),與object.wait()和notify()的區別?
LockSupport.park(Object blocker)傳遞的blocker對象做什麼用?
LockSupport能響應Thread.interrupt()事件不?會拋出InterruptedException異常?
Thread.interrupt()處理是否有對應的回調函數?類似於鉤子調用?
如果你都都能很明確的答上來了,說明你已經完全懂Thread.interrupt,可以不用往下看那了。
那如果不清楚的,帶著這幾個問題,一起來梳理下。
Thread的interrupt處理的幾個方法:
public void interrupt() : 執行線程interrupt事件
public boolean isInterrupted() : 檢查當前線程是否處於interrupt
public static boolean interrupted() : check當前線程是否處於interrupt,並重置interrupt信息。類似於resetAndGet()
理解:
1. 每個線程都有一個interrupt status標志位,用於表明當前線程是否處於中斷狀態
2. 一般調用Thread.interrupt()會有兩種處理方式
遇到一個低優先級的block狀態時,比如object.wait(),object.sleep(),object.join()。它會立馬觸發一個unblock解除阻塞,並throw一個InterruptedException。
其他情況,Thread.interrupt()僅僅只是更新了status標志位。然後你的工作線程通過Thread.isInterrrupted()進行檢查,可以做相應的處理,比如也throw InterruptedException或者是清理狀態,取消task等。
在interrupt javadoc中描述:
 
最佳實踐
IBM上有篇文章寫的挺不錯。Java theory and practice: Dealing with InterruptedException , 裡面提到了Interrupt處理的幾條最佳實踐。
Don't swallow interrupts (別吃掉Interrupt,一般是兩種處理: 繼續throw InterruptedException異常。 另一種就是繼續設置Thread.interupt()異常標志位,讓更上一層去進行相應處理。
代碼如下:

public class TaskRunner implements Runnable {
private BlockingQueue<Task> queue;
public TaskRunner(BlockingQueue<Task> queue) {
this.queue = queue;
}
public void run() {
try {
while (true) {
Task task = queue.take(10, TimeUnit.SECONDS);
task.execute();
}
}
catch (InterruptedException e) {
// Restore the interrupted status
Thread.currentThread().interrupt();
}
}
}

代碼如下:

public class TaskRunner implements Runnable {
private BlockingQueue<Task> queue;
public TaskRunner(BlockingQueue<Task> queue) {
this.queue = queue;
}
public void run() {
try {
while (true) {
Task task = queue.take(10, TimeUnit.SECONDS);
task.execute();
}
}
catch (InterruptedException e) {
// Restore the interrupted status
Thread.currentThread().interrupt();
}
}
}

Implementing cancelable tasks with Interrupt (使用Thread.interrupt()來設計和支持可被cancel的task)
代碼如下:

public class PrimeProducer extends Thread {
private final BlockingQueue<BigInteger> queue;
PrimeProducer(BlockingQueue<BigInteger> queue) {
this.queue = queue;
}
public void run() {
try {
BigInteger p = BigInteger.ONE;
while (!Thread.currentThread().isInterrupted())
queue.put(p = p.nextProbablePrime());
} catch (InterruptedException consumed) {
/* Allow thread to exit */
}
}
public void cancel() { interrupt(); } // 發起中斷
}<SPAN > </SPAN>

代碼如下:

public class PrimeProducer extends Thread {
private final BlockingQueue<BigInteger> queue;
PrimeProducer(BlockingQueue<BigInteger> queue) {
this.queue = queue;
}
public void run() {
try {
BigInteger p = BigInteger.ONE;
while (!Thread.currentThread().isInterrupted())
queue.put(p = p.nextProbablePrime());
} catch (InterruptedException consumed) {
/* Allow thread to exit */
}
}
public void cancel() { interrupt(); } // 發起中斷
}<SPAN > </SPAN>

注冊Interrupt處理事件(非正常用法)
一般正常的task設計用來處理cancel,都是采用主動輪詢的方式檢查Thread.isInterrupt(),對業務本身存在一定的嵌入性,還有就是存在延遲,你得等到下一個檢查點(誰知道下一個檢查點是在什麼時候,特別是進行一個socket.read時,遇到過一個HttpClient超時的問題)。
來看一下,主動拋出InterruptedException異常的實現,借鑒於InterruptibleChannel的設計,比較取巧。
代碼如下:

interface InterruptAble { // 定義可中斷的接口
public void interrupt() throws InterruptedException;
}
abstract class InterruptSupport implements InterruptAble {
private volatile boolean interrupted = false;
private Interruptible interruptor = new Interruptible() {
public void interrupt() {
interrupted = true;
InterruptSupport.this.interrupt(); // 位置3
}
};
public final boolean execute() throws InterruptedException {
try {
blockedOn(interruptor); // 位置1
if (Thread.currentThread().isInterrupted()) { // 立馬被interrupted
interruptor.interrupt();
}
// 執行業務代碼
bussiness();
} finally {
blockedOn(null); // 位置2
}
return interrupted;
}
public abstract void bussiness() ;
public abstract void interrupt();
// -- sun.misc.SharedSecrets --
static void blockedOn(Interruptible intr) { // package-private
sun.misc.SharedSecrets.getJavaLangAccess().blockedOn(Thread.currentThread(), intr);
}
}

代碼如下:

interface InterruptAble { // 定義可中斷的接口
public void interrupt() throws InterruptedException;
}
abstract class InterruptSupport implements InterruptAble {
private volatile boolean interrupted = false;
private Interruptible interruptor = new Interruptible() {
public void interrupt() {
interrupted = true;
InterruptSupport.this.interrupt(); // 位置3
}
};
public final boolean execute() throws InterruptedException {
try {
blockedOn(interruptor); // 位置1
if (Thread.currentThread().isInterrupted()) { // 立馬被interrupted
interruptor.interrupt();
}
// 執行業務代碼
bussiness();
} finally {
blockedOn(null); // 位置2
}
return interrupted;
}
public abstract void bussiness() ;
public abstract void interrupt();
// -- sun.misc.SharedSecrets --
static void blockedOn(Interruptible intr) { // package-private
sun.misc.SharedSecrets.getJavaLangAccess().blockedOn(Thread.currentThread(), intr);
}
}

代碼說明,幾個取巧的點:
位置1:利用sun提供的blockedOn方法,綁定對應的Interruptible事件處理鉤子到指定的Thread上。
位置2:執行完代碼後,清空鉤子。避免使用連接池時,對下一個Thread處理事件的影響。
位置3:定義了Interruptible事件鉤子的處理方法,回調InterruptSupport.this.interrupt()方法,子類可以集成實現自己的業務邏輯,比如sock流關閉等等。
使用:
代碼如下:

class InterruptRead extends InterruptSupport {
private FileInputStream in;
@Override
public void bussiness() {
File file = new File("/dev/urandom"); // 讀取linux黑洞,永遠讀不完
try {
in = new FileInputStream(file);
byte[] bytes = new byte[1024];
while (in.read(bytes, 0, 1024) > 0) {
// Thread.sleep(100);
// if (Thread.interrupted()) {// 以前的Interrupt檢查方式
// throw new InterruptedException("");
// }
}
} catch (Exception e) {
throw new RuntimeException(e);
}
}
public FileInputStream getIn() {
return in;
}
@Override
public void interrupt() {
try {
in.getChannel().close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
public static void main(String args[]) throws Exception {
final InterruptRead test = new InterruptRead();
Thread t = new Thread() {
@Override
public void run() {
long start = System.currentTimeMillis();
try {
System.out.println("InterruptRead start!");
test.execute();
} catch (InterruptedException e) {
System.out.println("InterruptRead end! cost time : " + (System.currentTimeMillis() - start));
e.printStackTrace();
}
}
};
t.start();
// 先讓Read執行3秒
Thread.sleep(3000);
// 發出interrupt中斷
t.interrupt();
}

代碼如下:

class InterruptRead extends InterruptSupport {
private FileInputStream in;
@Override
public void bussiness() {
File file = new File("/dev/urandom"); // 讀取linux黑洞,永遠讀不完
try {
in = new FileInputStream(file);
byte[] bytes = new byte[1024];
while (in.read(bytes, 0, 1024) > 0) {
// Thread.sleep(100);
// if (Thread.interrupted()) {// 以前的Interrupt檢查方式
// throw new InterruptedException("");
// }
}
} catch (Exception e) {
throw new RuntimeException(e);
}
}
public FileInputStream getIn() {
return in;
}
@Override
public void interrupt() {
try {
in.getChannel().close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
public static void main(String args[]) throws Exception {
final InterruptRead test = new InterruptRead();
Thread t = new Thread() {
@Override
public void run() {
long start = System.currentTimeMillis();
try {
System.out.println("InterruptRead start!");
test.execute();
} catch (InterruptedException e) {
System.out.println("InterruptRead end! cost time : " + (System.currentTimeMillis() - start));
e.printStackTrace();
}
}
};
t.start();
// 先讓Read執行3秒
Thread.sleep(3000);
// 發出interrupt中斷
t.interrupt();
}

jdk源碼介紹:
1. sun提供的鉤子可以查看System的相關代碼, line : 1125
代碼如下:

sun.misc.SharedSecrets.setJavaLangAccess(new sun.misc.JavaLangAccess(){
public sun.reflect.ConstantPool getConstantPool(Class klass) {
return klass.getConstantPool();
}
public void setAnnotationType(Class klass, AnnotationType type) {
klass.setAnnotationType(type);
}
public AnnotationType getAnnotationType(Class klass) {
return klass.getAnnotationType();
}
public <E extends Enum<E>>
E[] getEnumConstantsShared(Class<E> klass) {
return klass.getEnumConstantsShared();
}
public void blockedOn(Thread t, Interruptible b) {
t.blockedOn(b);
}
});

代碼如下:

sun.misc.SharedSecrets.setJavaLangAccess(new sun.misc.JavaLangAccess(){
public sun.reflect.ConstantPool getConstantPool(Class klass) {
return klass.getConstantPool();
}
public void setAnnotationType(Class klass, AnnotationType type) {
klass.setAnnotationType(type);
}
public AnnotationType getAnnotationType(Class klass) {
return klass.getAnnotationType();
}
public <E extends Enum<E>>
E[] getEnumConstantsShared(Class<E> klass) {
return klass.getEnumConstantsShared();
}
public void blockedOn(Thread t, Interruptible b) {
t.blockedOn(b);
}
});

2. Thread.interrupt()
代碼如下:

public void interrupt() {
if (this != Thread.currentThread())
checkAccess();
synchronized (blockerLock) {
Interruptible b = blocker;
if (b != null) {
interrupt0(); // Just to set the interrupt flag
b.interrupt(); //回調鉤子
return;
}
}
interrupt0();
}

代碼如下:

public void interrupt() {
if (this != Thread.currentThread())
checkAccess();
synchronized (blockerLock) {
Interruptible b = blocker;
if (b != null) {
interrupt0(); // Just to set the interrupt flag
b.interrupt(); //回調鉤子
return;
}
}
interrupt0();
}

更多
更多關於Thread.stop,suspend,resume,interrupt的使用注意點,可以看一下sun的文檔,比如http://download.oracle.com/javase/6/docs/technotes/guides/concurrency/threadPrimitiveDeprecation.html
最後來解答一下之前的幾個問題:
問題1: Thread.interrupt()方法和InterruptedException異常的關系?是由interrupt觸發產生了InterruptedException異常?
答: Thread.interrupt()只是在Object.wait() .Object.join(), Object.sleep()幾個方法會主動拋出InterruptedException異常。而在其他的的block常見,只是通過設置了Thread的一個標志位信息,需要程序自我進行處理。
代碼如下:

if (Thread.interrupted()) // Clears interrupted status!
throw new InterruptedException();

代碼如下:

if (Thread.interrupted()) // Clears interrupted status!
throw new InterruptedException();

問題2:Thread.interrupt()會中斷線程什麼狀態的工作? RUNNING or BLOCKING?
答:Thread.interrupt設計的目的主要是用於處理線程處於block狀態,比如wait(),sleep()狀態就是個例子。但可以在程序設計時為支持task cancel,同樣可以支持RUNNING狀態。比如Object.join()和一些支持interrupt的一些nio channel設計。
問題3: 一般Thread編程需要關注interrupt中斷不?一般怎麼處理?可以用來做什麼?
答: interrupt用途: unBlock操作,支持任務cancel, 數據清理等。
問題4: LockSupport.park()和unpark(),與object.wait()和notify()的區別?
答:
1. 面向的主體不一樣。LockSuport主要是針對Thread進進行阻塞處理,可以指定阻塞隊列的目標對象,每次可以指定具體的線程喚醒。Object.wait()是以對象為緯度,阻塞當前的線程和喚醒單個(隨機)或者所有線程。
2. 實現機制不同。雖然LockSuport可以指定monitor的object對象,但和object.wait(),兩者的阻塞隊列並不交叉。可以看下測試例子。object.notifyAll()不能喚醒LockSupport的阻塞Thread.
問題5: LockSupport.park(Object blocker)傳遞的blocker對象做什麼用?
答: 對應的blcoker會記錄在Thread的一個parkBlocker屬性中,通過jstack命令可以非常方便的監控具體的阻塞對象.
代碼如下:

public static void park(Object blocker) {
Thread t = Thread.currentThread();
setBlocker(t, blocker); // 設置Thread.parkBlocker屬性的值
unsafe.park(false, 0L);
setBlocker(t, null); // 清除Thread.parkBlocker屬性的值
}

代碼如下:

public static void park(Object blocker) {
Thread t = Thread.currentThread();
setBlocker(t, blocker); // 設置Thread.parkBlocker屬性的值
unsafe.park(false, 0L);
setBlocker(t, null); // 清除Thread.parkBlocker屬性的值
}

具體LockSupport的javadoc描述也比較清楚,可以看下:
 
問題6: LockSupport能響應Thread.interrupt()事件不?會拋出InterruptedException異常?
答:能響應interrupt事件,但不會拋出InterruptedException異常。針對LockSupport對Thread.interrupte支持,也先看一下javadoc中的描述:
 
相關測試代碼
代碼如下:

package com.agapple.cocurrent;
import java.io.File;
import java.io.FileInputStream;
import java.lang.reflect.Field;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.LockSupport;
public class LockSupportTest {
private static LockSupportTest blocker = new LockSupportTest();
public static void main(String args[]) throws Exception {
lockSupportTest();
parkTest();
interruptParkTest();
interruptSleepTest();
interruptWaitTest();
}
/**
* LockSupport.park對象後,嘗試獲取Thread.blocker對象,調用其single喚醒
*
* @throws Exception
*/
private static void lockSupportTest() throws Exception {
Thread t = doTest(new TestCallBack() {
@Override
public void callback() throws Exception {
// 嘗試sleep 5s
System.out.println("blocker");
LockSupport.park(blocker);
System.out.println("wakeup now!");
}
@Override
public String getName() {
return "lockSupportTest";
}
});
t.start(); // 啟動讀取線程
Thread.sleep(150);
synchronized (blocker) {
Field field = Thread.class.getDeclaredField("parkBlocker");
field.setAccessible(true);
Object fBlocker = field.get(t);
System.out.println(blocker == fBlocker);
Thread.sleep(100);
System.out.println("notifyAll");
blocker.notifyAll();
}
}
/**
* 嘗試去中斷一個object.wait(),會拋出對應的InterruptedException異常
*
* @throws InterruptedException
*/
private static void interruptWaitTest() throws InterruptedException {
final Object obj = new Object();
Thread t = doTest(new TestCallBack() {
@Override
public void callback() throws Exception {
// 嘗試sleep 5s
obj.wait();
System.out.println("wakeup now!");
}
@Override
public String getName() {
return "interruptWaitTest";
}
});
t.start(); // 啟動讀取線程
Thread.sleep(2000);
t.interrupt(); // 檢查下在park時,是否響應中斷
}
/**
* 嘗試去中斷一個Thread.sleep(),會拋出對應的InterruptedException異常
*
* @throws InterruptedException
*/
private static void interruptSleepTest() throws InterruptedException {
Thread t = doTest(new TestCallBack() {
@Override
public void callback() throws Exception {
// 嘗試sleep 5s
Thread.sleep(5000);
System.out.println("wakeup now!");
}
@Override
public String getName() {
return "interruptSleepTest";
}
});
t.start(); // 啟動讀取線程
Thread.sleep(2000);
t.interrupt(); // 檢查下在park時,是否響應中斷
}
/**
* 嘗試去中斷一個LockSupport.park(),會有響應但不會拋出InterruptedException異常
*
* @throws InterruptedException
*/
private static void interruptParkTest() throws InterruptedException {
Thread t = doTest(new TestCallBack() {
@Override
public void callback() {
// 嘗試去park 自己線程
LockSupport.parkNanos(blocker, TimeUnit.SECONDS.toNanos(5));
System.out.println("wakeup now!");
}
@Override
public String getName() {
return "interruptParkTest";
}
});
t.start(); // 啟動讀取線程
Thread.sleep(2000);
t.interrupt(); // 檢查下在park時,是否響應中斷
}
/**
* 嘗試去中斷一個LockSupport.unPark(),會有響應
*
* @throws InterruptedException
*/
private static void parkTest() throws InterruptedException {
Thread t = doTest(new TestCallBack() {
@Override
public void callback() {
// 嘗試去park 自己線程
LockSupport.park(blocker);
System.out.println("wakeup now!");
}
@Override
public String getName() {
return "parkTest";
}
});
t.start(); // 啟動讀取線程
Thread.sleep(2000);
LockSupport.unpark(t);
t.interrupt();
}
public static Thread doTest(final TestCallBack call) {
return new Thread() {
@Override
public void run() {
File file = new File("/dev/urandom"); // 讀取linux黑洞
try {
FileInputStream in = new FileInputStream(file);
byte[] bytes = new byte[1024];
while (in.read(bytes, 0, 1024) > 0) {
if (Thread.interrupted()) {
throw new InterruptedException("");
}
System.out.println(bytes[0]);
Thread.sleep(100);
long start = System.currentTimeMillis();
call.callback();
System.out.println(call.getName() + " callback finish cost : "
+ (System.currentTimeMillis() - start));
}
} catch (Exception e) {
e.printStackTrace();
}
}
};
}
}
interface TestCallBack {
public void callback() throws Exception;
public String getName();
}

代碼如下:

package com.agapple.cocurrent;
import java.io.File;
import java.io.FileInputStream;
import java.lang.reflect.Field;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.LockSupport;
public class LockSupportTest {
private static LockSupportTest blocker = new LockSupportTest();
public static void main(String args[]) throws Exception {
lockSupportTest();
parkTest();
interruptParkTest();
interruptSleepTest();
interruptWaitTest();
}
/**
* LockSupport.park對象後,嘗試獲取Thread.blocker對象,調用其single喚醒
*
* @throws Exception
*/
private static void lockSupportTest() throws Exception {
Thread t = doTest(new TestCallBack() {
@Override
public void callback() throws Exception {
// 嘗試sleep 5s
System.out.println("blocker");
LockSupport.park(blocker);
System.out.println("wakeup now!");
}
@Override
public String getName() {
return "lockSupportTest";
}
});
t.start(); // 啟動讀取線程
Thread.sleep(150);
synchronized (blocker) {
Field field = Thread.class.getDeclaredField("parkBlocker");
field.setAccessible(true);
Object fBlocker = field.get(t);
System.out.println(blocker == fBlocker);
Thread.sleep(100);
System.out.println("notifyAll");
blocker.notifyAll();
}
}
/**
* 嘗試去中斷一個object.wait(),會拋出對應的InterruptedException異常
*
* @throws InterruptedException
*/
private static void interruptWaitTest() throws InterruptedException {
final Object obj = new Object();
Thread t = doTest(new TestCallBack() {
@Override
public void callback() throws Exception {
// 嘗試sleep 5s
obj.wait();
System.out.println("wakeup now!");
}
@Override
public String getName() {
return "interruptWaitTest";
}
});
t.start(); // 啟動讀取線程
Thread.sleep(2000);
t.interrupt(); // 檢查下在park時,是否響應中斷
}
/**
* 嘗試去中斷一個Thread.sleep(),會拋出對應的InterruptedException異常
*
* @throws InterruptedException
*/
private static void interruptSleepTest() throws InterruptedException {
Thread t = doTest(new TestCallBack() {
@Override
public void callback() throws Exception {
// 嘗試sleep 5s
Thread.sleep(5000);
System.out.println("wakeup now!");
}
@Override
public String getName() {
return "interruptSleepTest";
}
});
t.start(); // 啟動讀取線程
Thread.sleep(2000);
t.interrupt(); // 檢查下在park時,是否響應中斷
}
/**
* 嘗試去中斷一個LockSupport.park(),會有響應但不會拋出InterruptedException異常
*
* @throws InterruptedException
*/
private static void interruptParkTest() throws InterruptedException {
Thread t = doTest(new TestCallBack() {
@Override
public void callback() {
// 嘗試去park 自己線程
LockSupport.parkNanos(blocker, TimeUnit.SECONDS.toNanos(5));
System.out.println("wakeup now!");
}
@Override
public String getName() {
return "interruptParkTest";
}
});
t.start(); // 啟動讀取線程
Thread.sleep(2000);
t.interrupt(); // 檢查下在park時,是否響應中斷
}
/**
* 嘗試去中斷一個LockSupport.unPark(),會有響應
*
* @throws InterruptedException
*/
private static void parkTest() throws InterruptedException {
Thread t = doTest(new TestCallBack() {
@Override
public void callback() {
// 嘗試去park 自己線程
LockSupport.park(blocker);
System.out.println("wakeup now!");
}
@Override
public String getName() {
return "parkTest";
}
});
t.start(); // 啟動讀取線程
Thread.sleep(2000);
LockSupport.unpark(t);
t.interrupt();
}
public static Thread doTest(final TestCallBack call) {
return new Thread() {
@Override
public void run() {
File file = new File("/dev/urandom"); // 讀取linux黑洞
try {
FileInputStream in = new FileInputStream(file);
byte[] bytes = new byte[1024];
while (in.read(bytes, 0, 1024) > 0) {
if (Thread.interrupted()) {
throw new InterruptedException("");
}
System.out.println(bytes[0]);
Thread.sleep(100);
long start = System.currentTimeMillis();
call.callback();
System.out.println(call.getName() + " callback finish cost : "
+ (System.currentTimeMillis() - start));
}
} catch (Exception e) {
e.printStackTrace();
}
}
};
}
}
interface TestCallBack {
public void callback() throws Exception;
public String getName();
}

最後
發覺文章越寫越長,那就索性發到了論壇,大家一起討論下.畢竟文章中描述的都是一些使用層面的東東,並沒有從操作系統或者sun native實現上去介紹Thread的一些機制,熟悉這塊的大牛門也可以出來發表下高見.
  1. 上一頁:
  2. 下一頁:
Copyright © 程式師世界 All Rights Reserved