Java多线程详解

多线程之前了解的有点乱,特此整理

锁类型

公平性:

  1. 公平锁:线程按照他们发出请求的顺序来获得锁。如果有另一个线程持有这个锁或者有其他线程在队列中等待这个锁,那么新发出请求的线程将放入队列中。
  2. 非公平锁:非公平锁允许“插队”:当一个线程请求非公平的锁时,如果在发出请求的同时该锁的状态变为可用,那么这个线程将跳过排队队列,直接获取这个锁。

也就是说非公平锁在入队同时可以强锁,如果没抢到就得排队,公平锁只能排队

悲观/乐观:

  1. 悲观锁:把数据库里的某行数据锁住,或者整张表锁住,不让其他事务访问,数据库里一般会使用排它锁来实现悲观锁,但是因为其他事务不能访问,所以效率很低,适合数据竞争激烈的环境。可以防脏读。

  2. 乐观锁:在提交数据更新之前,每个事务会先检查在该事务读取数据后,有没有其他事务又修改了该数据。如果其他事务有更新的话,正在提交的事务会进行回滚。它可以让各事务能够在不产生锁的情况下处理各自影响的那部分数据。

    乐观锁是基于CAS实现的,为了解决ABA问题,可以加上version的方法解决

排他(独占)/共享:

  1. 排他锁:只能有1个线程持有锁,可以有效的防止脏读
  2. 共享锁:共享锁不会阻止其他用户读,但是阻止其他的用户写和修改
锁状态

锁的状态总共有四种:无锁状态、偏向锁、轻量级锁和重量级锁。随着锁的竞争,锁可以从偏向锁升级到轻量级锁,再升级的重量级锁(但是锁的升级是单向的,也就是说只能从低到高升级,不会出现锁的降级)

偏向锁:

适合只有一个线程访问同步代码的场景

只要加了一次锁,之后访问里面的临界区都不用再加锁

自旋锁:

自身空循环来等待,然后再CAS请求锁

轻量级锁:

b线程在锁竞争时,发现锁已经被a线程占用,则b线程不进入内核态,让b线程自旋,执行空循环,等待a线程释放锁。如果,完成自旋策略执行CAS换锁操作,发现a线程还是没有释放锁,或者让c线程占用了。则b线程试图将轻量级锁升级为重量级锁。始终得不到锁竞争的线程使用自旋会消耗CPU。

重量级锁:

线程竞争不使用自旋,不会消耗CPU 线程阻塞,响应时间缓慢,适合追求吞吐量,锁占用时间较长

优点 缺点 适用场景
偏向锁 加锁和解锁不需要额外的消耗,和执行非同步方法相比仅存在纳秒级的差距 如果线程间存在锁竞争,会带来额外的锁撤销的消耗 适用于只有一个线程访问同步块场景
轻量级锁 竞争的线程不会阻塞,提高了程序的响应速度 如果始终得不到索竞争的线程,使用自旋会消耗CPU 追求响应速度,同步块执行速度非常快
重量级锁 线程竞争不使用自旋,不会消耗CPU 线程阻塞,响应时间缓慢 追求吞吐量,同步块执行速度较长

另外

可重入锁:当一个线程执行到某个synchronized方法时,比如说method1,而在method1中会调用另外一个synchronized方法method2,此时线程不必重新去申请锁,而是可以直接执行方法method2。

速学路径

http://kaimingwan.com/post/java/javanei-zhi-suo-kai-xiao-you-hua-pian-xiang-suo-qing-liang-ji-suo

CAS介绍

CAS(Compare and Swap)即比较并替换,设计并发算法时常用到的一种技术。返回boolean值。

CAS是通过unsafe类的compareAndSwap方法实现的,是底层实现的原子指令

CAS用户程序不用直接调用,但是automicInteger内实现了这些方法,比如incrementAndGet()方法实际上就是不断循环CAS操作,直到成功

具体说明

赋值操作即是新值替换旧值,所以先比较现在的值是否为自己所以为的旧值,如果不是,则说明自己发生了脏读。本次操作失败返回false。

举例

CAS有3个操作数,内存值V,旧的预期值A,要修改的新值B。当且仅当预期值A和内存值V相同时,将内存值V修改为B,否则什么都不做。

缺陷

就是一个变量V,如果变量V初次读取的时候是A,并且在准备赋值的时候检查到它仍然是A,那能说明它的值没有被其他线程修改过了吗?如果在这段期间它的值曾经被改成了B,然后又改回A,那CAS操作就会误认为它从来没有被修改过。

ABA问题的解决思路就是使用版本号。在变量前面追加上版本号,每次变量更新的时候把版本号加1。java并发包中提供了一个带有标记的原子引用类”AtomicStampedReference”,它可以通过控制变量值的版本来保证CAS的正确性。

备注

java.util.concurrent包完全建立在CAS之上的,乐观锁就是基于CAS。

java.util.concurrent.atomic包下的原子操作类都是基于CAS实现的

AQS介绍

1
2
3
4
5
6
7
8
9
public abstract class AbstractQueuedSynchronizer
extends AbstractOwnableSynchronizer implements java.io.Serializable{
// 等待对列的头节点
private transient volatile Node head;
// 等待队列的尾节点
private transient volatile Node tail;
// 同步状态
private valatile int state;
}
1
2
3
4
5
6
7
static final class Node{
volatile int waitStatus;//该节点的状态
volatile Node prev;//同步等待队列前驱节点
volatile Node next;//同步等待队列后继节点
volatile Thread thread;//持有该节点的线程
Node nextWaiter;//存储condition队列中的后继节点。
}
1
2
3
4
5
//tryAcquire()修改状态,获取锁
public final void acquire(int arg) {
if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
//加入到等待队列,用cas原子替换tail,防止多线程错误
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failur
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
//不断循环获取队列前一个node,如果是头结点,则再次尝试修改state,获取锁,不然被阻塞
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}

原理

队列同步器AQS是用来构建锁或其他同步组件的基础框架,内部使用了一个int类型的volatile修饰的state来表示同步状态,通过acquire来尝试修改state的值(未实现),修改成功的线程表示获取到该锁,没有修改成功,或者发现状态state已经是加锁状态,则把当前线程包装成一个node节点,并通过CAS指令插入到FIFO等待队列的队尾。node插入到队尾后该线程不会马上挂起,而是进行自旋操作。如果前一个节点是头结点,他会自旋后再次尝试tryAcquire方法获取锁。只有当前一个节点pred的线程状态位SIGNAL时,当前节点的线程才能被挂起,并等待被唤醒。等待队列的头节点head和尾结点tail都是通过volatile修饰,保证了多个线程之间的可见性。

synchronized介绍

synchronized是java语言的关键字,由jvm实现的语法级别的锁机制。是一种可重入/非公平/悲观/独占锁。

原理:

synchronized可以对一个代码块或是对一个方法上锁,方法块的原理是使用monitorenter和monitorexit指令,被“锁住”的地方称为临界区。线程执行到 monitorenter 指令时,将会尝试获取对象所对应的 monitor 的所有权,即尝试获得对象的锁,这样其他尝试进入临界区的线程会因无法获取monitor而被阻塞。由于等待另一个线程释放monitor而被阻塞的线程无法被中断。

同步方法 并不是由 monitorenter 和 monitorexit 指令来实现同步的,而是由方法调用指令读取运行时常量池中方法的 ACC_SYNCHRONIZED 标志来隐式实现的

monitorenter指令是在编译后插入到同步代码块的开始位置,而monitorexit是插入到方法结束处和异常处,每个monitorenter必须有一个monitorexit对应。

Java中的每个对象都可以作为锁。

任何对象都有一个 monitor 与之关联,当且一个monitor 被持有后,它将处于锁定状态

  1. 普通同步方法,锁是当前实例对象。
  2. 静态同步方法,锁是当前类的class对象。
  3. 同步代码块,锁是括号中的对象。

作用于一段代码或方法,synchronized和Lock能保证同一时刻只有一个线程获取锁然后执行同步代码,并且在释放锁之前会将对变量的修改刷新到物理内存中。因此可以保证可见性,也能够保证原子性,synchronized修饰方法是获得对象锁

举例

1
2
3
4
public synchronized void method()
{
// todo
}
1
2
3
4
5
6
public void method()
{
synchronized(this) {
// todo
}
}

写法一修饰的是一个方法,写法二修饰的是一个代码块,但写法一与写法二是等价的,都是锁定了整个方法时的内容。

死锁

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
public class DeadlockTest {

public static void main(String[] args) {
String str1 = new String("资源1");
String str2 = new String("资源2");

new Thread(new Lock(str1, str2), "线程1").start();
new Thread(new Lock(str2, str1), "线程2").start();
}
}

class Lock implements Runnable {

private String str1;
private String str2;

public Lock(String str1, String str2) {
super();
this.str1 = str1;
this.str2 = str2;
}

@Override
public void run() {
try {
System.out.println(Thread.currentThread().getName() + "运行");
synchronized (str1) {
System.out.println(Thread.currentThread().getName() + "锁住" + str1);
Thread.sleep(1000);
synchronized (str2) {
// 执行不到这里
System.out.println(Thread.currentThread().getName() + "锁住" + str2);
}
}
} catch (Exception e) {
e.printStackTrace();
}
}
}

效率低

监视器锁本质又是依赖于底层的操作系统的Mutex Lock来实现的。而操作系统实现线程之间的切换这就需要从用户态转换到核心态,这个成本非常高,状态之间的转换需要相对比较长的时间,这就是为什么Synchronized效率低的原因

备注

  1. synchronized关键字不能继承。
  2. 在定义接口方法时不能使用synchronized关键字。
  3. 构造方法不能使用synchronized关键字,但可以使用synchronized代码块来进行同步。
  4. 一个线程访问一个对象中的synchronized(this)同步代码块时,其他试图访问该对象的线程将被阻塞。

synchronized实现死锁

Lock介绍

Lock是Java 5以后引入的新的API

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public interface Lock {
//获取锁,否则会一直等待
void lock();
//获取锁,否则一直等待,但是等待状态可以被其他线程中断
void lockInterruptibly() throws InterruptedException;
//尝试获得锁,没抢到直接返回失败
boolean tryLock();
//尝试获取锁,没抢到则会等待一段时间,等待状态可以被其他线程中断
boolean tryLock(long time, TimeUnit unit) throws InterruptedException;
//释放锁
void unlock();
//生成锁的条件变量,线程获取锁后,可以等待或者通知该条件变量
Condition newCondition();
}

举例

1
2
3
4
5
6
7
8
9
10
11
Lock lock= ...;//获取锁

lock.lock();

try{
//处理任务
}catch(Exception e){

}finally{
lock.unlock();//释放锁
}

synchronized和Lock区别

相同点

Lock 能完成synchronized所实现的所有功能

不同点

  • synchronized会自动释放锁,而Lock一定要求程序员手工释放,且最好在finally 块中释放(否则会出现死锁)
  • 通过Lock可以知道有没有成功获取锁,而synchronized却无法办到
  • synchronized在发生异常时,会自动释放线程占有的锁,因此不会导致死锁现象发生。Lock在发生异常时,如果没有主动通过unLock()方法去释放锁,则很可能造成死锁的现象,因此使用Lock时需要在finally块中释放锁

ReentrantLock介绍

ReentrantLock实现了 Lock 接口,并提供了与 synchronized 相同的互斥和内存可见性

ReentrantLock 是可重入锁,根据构造器传入的布尔值来决定是否要公平锁

ReentrantLock是基于AQS实现的

可重入锁

可重入锁中可重入表示的意义在于对于同一个线程,可以继续调用加锁的方法,而不会被挂起。可重入锁内部维护一个计数器,对于同一个线程调用lock方法,计数器+1,调用unlock方法,计数器-1

举例:在一个加锁方法execute中调用另外一个加锁方法anotherLock并不会被挂起,可以直接调用

调用execute方法时计数器+1,然后内部又调用了anotherLock方法,计数器+1,变成了2

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public void execute() {
lock.lock();
try {
System.out.println(Thread.currentThread().getName() + " do something synchronize");
try {
anotherLock();
Thread.sleep(5000l);
} catch (InterruptedException e) {
System.err.println(Thread.currentThread().getName() + " interrupted");
Thread.currentThread().interrupt();
}
} finally {
lock.unlock();
}
}

public void anotherLock() {
lock.lock();
try {
System.out.println(Thread.currentThread().getName() + " invoke anotherLock");
} finally {
lock.unlock();
}
}
1
2
3
4
5
Thread-0 do something synchronize
Thread-0 invoke anotherLock
// 隔了5秒钟 输入下面
Thread-1 do something synchronize
Thread-1 invoke anotherLock

方法

  1. lock(),如果获取了锁立即返回,如果别的线程持有锁,当前线程则一直处于休眠状态,直到获取锁
  2. tryLock(),如果获取了锁立即返回true,如果别的线程正持有锁,立即返回false;
  3. tryLock(long timeout,TimeUnit unit),如果获取了锁定立即返回true,如果别的线程正持有锁,会等待参数给定的时间,在等待的过程中,如果获取了锁定,就返回true,如果等待超时,返回false;
  4. lockInterruptibly,如果获取了锁定立即返回,如果没有获取锁定,当前线程处于休眠状态,这期间当前线程可以被中断。举个粟子,当A,B线程都试图使用lockInterruptibly()获取锁时,如果A获得了锁,B线程正在等待获取锁,则可以调用threadB.interrupt()能够中断线程B的等待。

举例

线程A和B都要获取对象O的锁定,假设A获取了对象O锁,B将等待A释放对O的锁定,如果使用 synchronized ,如果A不释放,B将一直等下去,不能被中断如果 使用ReentrantLock,如果A不释放,可以使B在等待了足够长的时间以后,中断等待,而干别的事情

1
2
3
4
5
6
7
8
9
10
11
Lock lock = new ReentrantLock();
Condition condition = lock.newCondition();
lock.lock();
try {
while(条件判断表达式) {
condition.await();
}
// 处理逻辑
} finally {
lock.unlock();
}

和synchronized区别

synchronized ReentrantLock
获取锁方式 抢锁失败只能无限等待 提供多种等待锁方式,可以中断正在等候获取一个锁的线程
可以限制等待超时
等待时可以被中断
可以无阻塞尝试获取锁
等待线程调度 未知,视 JVM 实现 公平锁:FIFO,按照进入同步队列顺序
非公平锁:第一次获取锁时有机会插队
编程便利性 获取锁/等待锁/释放锁都有内部实现,使用便利 需要显示获取锁,释放锁,切记要捕捉 exception, 在 finally 中释放锁
性能 略低,Java6 后有显著提高 较高,Java6 后差距减小
调试 线程转储中给出哪些调用帧获得哪些锁 Java6 后提供管理和调试接口,锁需要通过该接口注册,相关加锁信息出现在线程转储中

总结

Synchronized 与Lock都是可重入锁,同一个线程再次进入同步代码的时候.可以使用自己已经获取到的锁。

Synchronized是悲观锁机制,独占锁。而Locks.ReentrantLock是,每次不加锁而是假设没有冲突而去完成某项操作,如果因为冲突失败就重试,直到成功为止。 ReentrantLock适用场景

  1. 某个线程在等待一个锁的控制权的这段时间需要中断
  2. 需要分开处理一些wait-notify,ReentrantLock里面的Condition应用,能够控制notify哪个线程,锁可以绑定多个条件。
  3. 具有公平锁功能,每个到来的线程都将排队等候。

Condition介绍

Condition是为解决Object.wait/notify/notifyAll难以使用的问题

一个ReentrantLock对象可以有多个Condition对象

Condition的await方法代替Object的wait;
Condition的signal方法代替Object的notify方法;
Condition的signalAll方法代替Object的notifyAll方法;
Condition实例在使用时需要绑定到一个锁上,可以通过newCondition方法获取Condition实例。

举例

一个线程调用condition.await(),此线程就会释放锁,阻塞。当另一个线程调用相同的condition的signal时,它才会重新竞争锁,然后从之前阻塞的地方继续开始。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
public class ConditionDemo {
@Test
public void test() {
final ReentrantLock reentrantLock = new ReentrantLock();
final Condition condition = reentrantLock.newCondition();

new Thread(new Runnable() {
@Override
public void run() {
try {
reentrantLock.lock();
System.out.println(Thread.currentThread().getName() + "在等待被唤醒");
condition.await();
System.out.println(Thread.currentThread().getName() + "恢复执行了");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
reentrantLock.unlock();
}
}
}, "thread1").start();

new Thread(new Runnable() {
@Override
public void run() {
try {
reentrantLock.lock();
System.out.println(Thread.currentThread().getName() + "抢到了锁");
condition.signal();
System.out.println(Thread.currentThread().getName() + "唤醒其它等待的线程");
} catch (Exception e) {
e.printStackTrace();
} finally {
reentrantLock.unlock();
}
}
}, "thread2").start();
}
}

输出结果如下所示:

1
2
3
4
thread1在等待被唤醒
thread2抢到了锁
thread2唤醒其它等待的线程
thread1恢复执行了

常用方法

  1. await()

调用await方法后,当前线程在接收到唤醒信号之前或被中断之前一直处于等待休眠状态。调用此方法时,当前线程保持了与此Condition有关联的锁,调用此方法后,当前线程释放持有的锁。此方法在返回当前线程之前,都必须重新获取与此条件有关的锁,在线程返回时,可以保证它保持此锁

  1. await(long time,TimeUnit unit)

调用此方法后,会造成当前线程在接收到唤醒信号之前、被中断之前或到达指定等待时间之前一直处于等待状态。调用此方法时,当前线程保持了与此Condition有关联的锁,调用此方法后,当前线程释放持有的锁。time参数为最长等待时间;unit参数为time的时间单位。如果在从此方法返回前检测到等待时间超时,则返回 false,否则返回true。此方法在返回当前线程之前,都必须重新获取与此条件有关的锁,在线程返回时,可以保证它保持此锁。

  1. signal()

唤醒一个等待线程,如果所有的线程都在等待此条件,则选择其中的一个唤醒。在从await返回之前,该线程必须重新获取锁。

  1. signalAll()

唤醒所有等待线程,如果所有的线程都在等待此条件,则唤醒所有线程。 在从await返回之前,每个线程必须重新获取锁。

生产者消费者

生产者消费者的实现

  1. 通过java阻塞队列
  2. wait,notify或await,signal的方式
  3. 通过不断轮询地cas
  4. 通过观察者模式
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
final Lock lock = new ReentrantLock();//锁对象  
final Condition notFull = lock.newCondition();//写线程条件
final Condition notEmpty = lock.newCondition();//读线程条件

final Object[] items = new Object[100];//缓存队列
int putptr/*写索引*/, takeptr/*读索引*/, count/*队列中存在的数据个数*/;

public void put(Object x) throws InterruptedException {
lock.lock();
try {
while (count == items.length)//如果队列满了
notFull.await();//阻塞写线程
items[putptr] = x;//赋值
if (++putptr == items.length) putptr = 0;//如果写索引写到队列的最后一个位置了,那么置为0
++count;//个数++
notEmpty.signal();//唤醒读线程
} finally {
lock.unlock();
}
}

public Object take() throws InterruptedException {
lock.lock();
try {
while (count == 0)//如果队列为空
notEmpty.await();//阻塞读线程
Object x = items[takeptr];//取值
if (++takeptr == items.length) takeptr = 0;//如果读索引读到队列的最后一个位置了,那么置为0
--count;//个数--
notFull.signal();//唤醒写线程
return x;
} finally {
lock.unlock();
}
}

注意

上面代码中使用while来判断队列,而不是用if,是因为多线程情况下,唤醒了wait状态的线程,如果保证不了它是想要的那个线程,那就用while来多判断一次缓冲队列的状态

  • 一种是wait和notify体系,消费者一次消费结束,notify了另一个阻塞的消费者,而那个消费者没有做再一次的判断,所以会造成多消费
  • 一种是普遍模式,如果生产者生产了,然后唤醒了一个消费者,但是不幸又来了一个消费者提前把生产的消费了,所以这个被唤醒的消费者,如果不再一次进行检查,那会造成又一次的消费

所以wait一定要放在while语句里

参考路径:

生产者消费者的五种实现

生产者和消费者的例子,有解释

Java 多线程之并发协作生产者消费者设计模式

volatile介绍

volatile关键字提供了内存可见性和禁止内存重排序

具体说明

  • 某一个线程修改了被volatile修饰的变量之后,会立即把值更新到主内存,其他线程获取这个变量的时候不会在工作内存中拿取,而是直接去主内存中拿取,保证了可见性
  • volatile可以禁止进行指令重排,保证了有序性

原理

volatile是基于内存屏障(Memory Barrier)实现的

内存屏障,又称内存栅栏,是一个CPU指令,基本上它是一条这样的指令:
1、插入一个内存屏障,相当于告诉CPU和编译器先于这个命令的必须先执行,后于这个命令的必须后执行。
2、强制更新一次不同CPU的缓存。例如,一个写屏障会把这个屏障前写入的数据刷新到缓存,这样任何试图读取该数据的线程将得到最新值,而不用考虑到底是被哪个cpu核心或者哪颗CPU执行的。
3、如果你的字段是volatile,Java内存模型将在写操作后插入一个写屏障指令,在读操作前插入一个读屏障指令。这意味着如果你对一个volatile字段进行写操作,你必须知道:1、一旦你完成写入,任何访问这个字段的线程将会得到最新的值。2、在你写入前,会保证所有之前发生的事已经发生,并且任何更新过的数据值也是可见的,因为内存屏障会把之前的写入值都刷新到缓存。

原子性原理

即一个操作或者多个操作 要么全部执行并且执行的过程不会被任何因素打断,要么就都不执行。

比如x++包括3个操作:

1)读取volatile变量值到local; 2)增加变量的值;3)把local的值写回,让其它的线程可见。

这3步的jvm指令为:

1
2
3
4
mov    0xc(%r10),%r8d ; Load
inc %r8d ; Increment
mov %r8d,0xc(%r10) ; Store
lock addl $0x0,(%rsp) ; StoreLoad Barrier //注意最后一步是内存屏障

从Load到store到内存屏障,一共4步,其中最后一步jvm让这个最新的变量的值在所有线程可见,也就是最后一步让所有的CPU内核都获得了最新的值,但在执行内存屏障之前,中间的几步(从Load到Store)是不安全的。

内存屏障可以被分为以下几种类型:

LoadLoad屏障:对于这样的语句Load1; LoadLoad; Load2,在Load2及后续读取操作要读取的数据被访问前,保证Load1要读取的数据被读取完毕。

StoreStore屏障:对于这样的语句Store1; StoreStore; Store2,在Store2及后续写入操作执行前,保证Store1的写入操作对其它处理器可见。

LoadStore屏障:对于这样的语句Load1; LoadStore; Store2,在Store2及后续写入操作被刷出前,保证Load1要读取的数据被读取完毕。

StoreLoad屏障:对于这样的语句Store1; StoreLoad; Load2,在Load2及后续所有读取操作执行前,保证Store1的写入对所有处理器可见。它的开销是四种屏障中最大的。

可见性原理

为在虚拟机内存中有主内存和工作内存的概念,每个线程都有自己的工作内存,当读取一个普通变量时,优先读取工作内存的变量,如果工作内存中没有对应的变量,则从主内存中加载到工作内存,对工作内存的普通变量进行修改,不会立马同步到主内存。对volatile修饰的变量进行写操作时,直接把最新值写到主内存中,并清空其它cpu工作内存中该变量所在的内存行数据(Write-Barrier(写入屏障)将刷新所有在 Barrier 之前写入 cache 的数据),对volatile修饰的变量进行读操作时,会读取主内存的数据。内存可见性保证了在多线程的场景下,保证了线程A对变量的修改,其它线程可以读到最新值。

有序性原理

插入一条Memory Barrier会告诉编译器和CPU:不管什么指令都不能和这条Memory Barrier指令重排序

缺陷

volatile并不保证原子性,比如i++,因为两个线程可以同时在主存获取变量i的值,然后同时修改后,保存到主存,这个过程一点也不违背可见性(获取最新,修改后立马更新),比如i=0的时候,两个线程同时获取,然后同时修改后,i是等于1,而不是等于2

不要将volatile用在getAndOperate场合(这种场合不原子,需要再加锁),仅仅set或者get的场景是适合volatile的。

参考路径

http://blog.csdn.net/sinat_35512245/article/details/60325685

ThreadLocal介绍

ThreadLocal为每个使用该变量的线程提供独立的变量副本,所以每一个线程都可以独立地改变自己的副本,而不会影响其它线程所对应的副本。

原理

每当线程去访问TreadLocal时,它都会从线程对应的TreadLocalMap(其实是一个Entry数组,索引是ThreadLocal对象的hash与运算,如果重复会nextIndex,再获取一个索引)中拿出与之对应的Entry,Entry的key是当前ThreadLocal对象,Entry的value就是当前线程下ThreadLocal的值

ThreadLocalMap实现

1
2
3
4
5
6
7
8
9
10
11
void createMap(Thread t, T firstValue) {
//创建一个ThreadLocalMap对象赋值给当前线程的成员变量threadLocals
t.threadLocals = new ThreadLocalMap(this, firstValue);
}
ThreadLocalMap(ThreadLocal<?> firstKey, Object firstValue) {
table = new Entry[INITIAL_CAPACITY];
int i = firstKey.threadLocalHashCode & (INITIAL_CAPACITY - 1);
table[i] = new Entry(firstKey, firstValue);
size = 1;
setThreshold(INITIAL_CAPACITY);
}

ThreadLocal.set() 实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
public void set(T value) {
Thread t = Thread.currentThread();
ThreadLocalMap map = getMap(t);
if (map != null)
map.set(this, value);
else
createMap(t, value);
}

ThreadLocalMap getMap(Thread t) {
return t.threadLocals;
}

private void set(ThreadLocal<?> key, Object value) {
Entry[] tab = table;
int len = tab.length;
int i = key.threadLocalHashCode & (len-1);

for (Entry e = tab[i]; e != null; e = tab[i = nextIndex(i, len)]) {
ThreadLocal<?> k = e.get();
if (k == key) {
e.value = value;
return;
}
if (k == null) {
replaceStaleEntry(key, value, i);
return;
}
}

tab[i] = new Entry(key, value);
int sz = ++size;
if (!cleanSomeSlots(i, sz) && sz >= threshold)
rehash();
}

ThreadLocal.get() 实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public T get() {
Thread t = Thread.currentThread();
ThreadLocalMap map = getMap(t);
if (map != null) {
ThreadLocalMap.Entry e = map.getEntry(this);
if (e != null) {
@SuppressWarnings("unchecked")
T result = (T)e.value;
return result;
}
}
return setInitialValue();
}

private Entry getEntry(ThreadLocal<?> key) {
int i = key.threadLocalHashCode & (table.length - 1);
Entry e = table[i];
if (e != null && e.get() == key)
return e;
else
return getEntryAfterMiss(key, i, e);
}

举例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public class ThreadLocalTest {

private static ThreadLocal<SimpleDateFormat> dateFormatThreadLocal = new ThreadLocal<SimpleDateFormat>() {
@Override
protected SimpleDateFormat initialValue() {
return new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
}
};

public static void main(String[] args) {
Thread thread1 = new Thread(new Runnable() {
@Override
public void run() {
Date date = new Date();
System.out.println(dateFormatThreadLocal.get().format(date));
}
});
Thread thread2 = new Thread(new Runnable() {
@Override
public void run() {
Date date = new Date();
//dateFormatThreadLocal.set(new SimpleDateFormat("yyyy-MM-dd"));
System.out.println(dateFormatThreadLocal.get().format(date));
}
});
thread1.start();
thread2.start();
}
}

线程安全问题

Servlet不是线程安全的

serlvet采用多线程来处理多个请求同时访问,Tomcat容器维护了一个线程池来服务请求,serlvet也只会初始化一次,当被请求访问到的时候初始化。当容器收到一个Servlet请求,Dispatcher线程从线程池中选出一个工作组线程,将请求传递给该线程,然后由该线程来执行Servlet的service方法。当这个线程正在执行的时候,容器收到另一个请求,调度者线程将从线程池中选出另外一个工作组线程来服务这个新的请求,容器并不关心这个请求是否访问的是同一个Servlet还是另一个Servlet。当容器收到对同一个Servlet的多个请求的时候,那这个servlet的service方法将在多线程中并发的执行。

SimpleDateFormat线程不安全

SimpleDateFormat里会保持一个Calendar变量,如果每个线程都操作这个变量,就会发生线程安全问题

http://blog.csdn.net/zq602316498/article/details/40263083

线程

线程是操作系统能够进行运算调度的最小单位,它被包含在进程之中,是进程中的实际运作单位。

优先级

当在某个线程创建一个新的线程,这个线程有与创建线程相同的优先级。
setPriority(int newPriority)

守护线程

当一个jvm启动,这里通常有一个非守护线程(运行main函数),但是jvm只剩下守护线程,守护线程不会支持jvm继续运行。

setDaemon(boolean on)

线程API

1
2
3
4
5
6
//当前线程可转让cpu控制权,让优先级更高的线程运行(有可能还是自己)
public static Thread.yield()
//线程暂停一段时间,但在等待的时候仍然会持有monitor或者锁
public static Thread.sleep()
//在一个线程中调用other.join(),将等待other执行完后才继续本线程
public join()

线程中断

1
2
3
4
5
6
public void interrupt()
//判断是否有中断信息,同时也会对中断状态进行复位
public static boolean interrupted() {
return currentThread().isInterrupted(true);
}
private native boolean isInterrupted(boolean ClearInterrupted);

interrupt方法通过修改了被调用线程的中断状态来告知那个线程, 说它被中断了. 对于非阻塞中的线程, 只是改变了中断状态, 即Thread.isInterrupted()将返回true; 对于可取消的阻塞状态中的线程, 比如等待在这些函数上的线程, Thread.sleep(), Object.wait(), Thread.join(), 这个线程收到中断信号后, 会抛出InterruptedException,提前结束阻塞状态,但是run方法不会中断,需要自己实现中断方法执行。不是所有的阻塞方法收到中断后都可以取消阻塞状态, 输入和输出流类会阻塞等待 I/O 完成,但是它们不抛出 InterruptedException,而且在被中断的情况下也不会退出阻塞状态. 同时会把中断状态置回为true.但调用Thread.interrupted()会对中断状态进行复位。只有方法上有InterruptedException,才能进行中断,比如使用synchronized时线程获取锁而受阻,就无法中断,但是因为lock的lock方法上有这个异常,所以lock锁的阻塞可以中断。

http://blog.csdn.net/canot/article/details/51087772

线程状态

  • NEW :还未开始的线程
  • RUNNABLE :正在被jvm执行,但可能正在等待CPU调度
  • BLOCKED : 等待获取锁以进入同步方法/代码块
  • WAITING : 线程在这个状态下等待其他线程执行特定操作。通常为当执行以下操作后
    Object.wait,
    Thread.join,
    LockSupport.park
  • TIMED_WAITING :当线程开始等待一段时间。通常为当执行以下操作后
    Thread.sleep(long),
    Object.wait(long),
    Thread.join(long),
    LockSupport.parkNanos,
    LockSupport.parkUntil
  • TERMINATED :线程已结束

操作系统的线程状态

  • 就绪
  • 运行
  • 阻塞

线程阻塞

java线程到阻塞状态,需要从用户态转换到核心态,所以会耗费很多处理器时间,要尽量减少阻塞状态的频繁切换

线程与进程的区别

  • 线程是进程的子集,一个进程可以有很多线程,每条线程并行执行不同的任务
  • 进程是资源分配的最小单位,线程是程序执行的最小单位
  • 一个进程是一个独立的运行环境,它可以被看作一个程序或者一个应用
  • 一个进程下的所有线程共用一片内存,但是它们有各自的栈内存

创建线程

  • 继承Thread,覆盖run()方法,创建线程对象并用start()方法启动线程
  • 实现Runnable接口来创建Thread线程
  • 通过实现Callable接口来创建Thread线程
  • 借助框架程序 可用Executor框架来创建线程池,线程池可以限制线程的数量并且可以回收再利用这些线程

start() 和 run() 方法区别

start()方法被用来启动新创建的线程,使该被创建的线程状态变为可运行状态。
当你调用run()方法的时候,没有新的线程启动,只会是在原来的线程中调用,这个方法同普通类的run方法一样。

等待 / 通知机制

wait():使一个线程处于等待(阻塞)状态,并且释放所持有的对象的锁;

sleep():使一个正在运行的线程处于睡眠状态,是一个静态方法,调用此方法要处理InterruptedException异常;

notify():唤醒一个处于等待状态的线程,当然在调用此方法的时候,并不能确切的唤醒某一个等待状态的线程,而是由JVM确定唤醒哪个线程,而且与优先级无关;notify之后,需要把自身的同步代码块执行完毕后才会让出锁。

notityAll():唤醒所有处于等待状态的线程,该方法并不是将对象的锁给所有线程,而是让它们竞争,只有获得锁的线程才能进入就绪状态;

join():当前线程中调用另一个线程的join()方法,则当前线程转入阻塞状态,直到执行join的线程运行结束,当前线程再由阻塞状态变为就绪状态。

join(long millis):millis的时间过后,当前线程转入阻塞状态,直到执行join的线程运行结束

举例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
import java.util.concurrent.TimeUnit;

/**
* Created by j_zhan on 2016/7/6.
*/
public class WaitNotify {
static boolean flag = true;
static Object lock = new Object();

public static void main(String[] args) throws InterruptedException {
Thread A = new Thread(new Wait(), "wait thread");
A.start();
TimeUnit.SECONDS.sleep(2);
Thread B = new Thread(new Notify(), "notify thread");
B.start();
}

static class Wait implements Runnable {
@Override
public void run() {
synchronized (lock) {
while (flag) {
try {
System.out.println(Thread.currentThread() + " flag is true");
lock.wait();
} catch (InterruptedException e) {

}
}
System.out.println(Thread.currentThread() + " flag is false");
}
}
}

static class Notify implements Runnable {
@Override
public void run() {
synchronized (lock) {
flag = false;
lock.notifyAll();
try {
TimeUnit.SECONDS.sleep(7);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
}

其相关方法定义在java.lang.Object上,线程A在获取锁后调用了对象lock的wait方法进入了等待状态,线程B调用对象lock的notifyAll()方法,线程A收到通知后从wait方法处返回继续执行,线程B对共享变量flag的修改对线程A来说是可见的。

Thread类的sleep()方法和对象的wait()方法都可以让线程暂停执行,它们有什么区别?
sleep()方法(休眠)是线程类(Thread)的静态方法,调用此方法会让当前线程暂停执行指定的时间,将执行机会(CPU)让给其他线程,但是对象的锁依然保持,因此休眠时间结束后会自动恢复。wait()是Object类的方法,调用对象的wait()方法导致当前线程放弃对象的锁(线程暂停执行),进入对象的等待池(wait pool),只有调用对象的notify()方法(或notifyAll()方法)时才能唤醒等待池中的线程进入等锁池(lock pool),如果线程重新获得对象的锁就可以进入就绪状态

注意点

  1. 使用wait()、notify()和notifyAll()时需要先对调用对象加锁,调用wait()方法后会释放锁。
  2. 调用wait()方法之后,线程状态由RUNNING变为WAITING,并将当前线程放置到对象的等待队列中。
  3. notify()或notifyAll()方法调用后,等待线程不会立刻从wait()中返回,需要等该线程释放锁之后,才有机会获取锁之后从wait()返回。
  4. notify()方法将等待队列中的一个等待线程从等待队列中移动到同步队列中;notifyAll()方法则是把等待队列中的所有线程都移动到同步队列中;被移动的线程状态从WAITING变为BLOCKED。
  5. 该线程从wait()方法返回的前提是获得了调用对象的锁。

Runnable介绍

源码分析

1
2
3
public interface Runnable {
public abstract void run();
}

使用方法

1
new Thread(new Runnable()).start();

Callable介绍

源码分析

1
2
3
public interface Callable<V> {
V call() throws Exception;
}

使用方法

1
new Thread(new FutureTask<Integer>(new Callable<Integer>()),"线程名").start();

Callable和Runnable的区别

  1. Callable 的 call() 方法可以返回值和抛出异常,而 Runnable 的 run() 方法没有这些功能。
  2. Callable 可以使用ExecutorService

Future介绍

Future是一个接口,定义了Future对于具体的Runnable或者Callable任务的执行结果进行取消、查询任务是否被取消,查询是否完成、获取结果。

源码分析

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public interface Future<V> {

//试图取消对此任务的执行。如果任务已完成、或已取消,或者由于某些其他原因而无法取消,则此尝试将失败。当调用 cancel() 时,如果调用成功,而此任务尚未启动,则此任务将永不运行。如果任务已经启动,则 mayInterruptIfRunning 参数确定是否应该以试图停止任务的方式来中断执行此任务的线程。此方法返回后,对 isDone() 的后续调用将始终返回 true。如果此方法返回 true,则对 isCancelled() 的后续调用将始终返回 true。
boolean cancel(boolean mayInterruptIfRunning);

//如果在任务正常完成前将其取消,则返回 true
boolean isCancelled();

//如果任务已完成,则返回 true。 可能由于正常终止、异常或取消而完成,在所有这些情况中,此方法都将返回 true
boolean isDone();

//如有必要,等待计算完成,然后获取其结果。如果运算尚未完成get方法将会阻塞
V get() throws InterruptedException, ExecutionException;

// 如有必要,最多等待为使计算完成所给定的时间之后,获取其结果(如果结果可用)
V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
}

举例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
class MyCallable implements Callable<String>{
@Override
public String call() throws Exception {
System.out.println("做一些耗时的任务...");
Thread.sleep(5000);
return "OK";
}
}

public class FutureSimpleDemo {
public static void main(String[] args) throws InterruptedException, ExecutionException {

ExecutorService executorService = Executors.newCachedThreadPool();
Future<String> future = executorService.submit(new MyCallable());

System.out.println("dosomething...");

System.out.println("得到异步任务返回结果:" + future.get());
System.out.println("Completed!");
}
}

FutureTask介绍

FutureTask的父类是RunnableFuture,而RunnableFuture继承了Runnbale和Futrue这两个接口

1
public class FutureTask<V> implements RunnableFuture<V>
1
public interface RunnableFuture<V> extends Runnable, Future<V>

构造方法

1
2
3
4
5
6
public FutureTask(Callable<V> callable) {
if (callable == null)
throw new NullPointerException();
this.callable = callable;
this.state = NEW; // ensure visibility of callable
}
1
2
3
4
public FutureTask(Runnable runnable, V result) {
this.callable = Executors.callable(runnable, result);
this.state = NEW; // ensure visibility of callable
}
  1. FutureTask最终都是执行Callable类型的任务。
  2. 如果构造函数参数是Runnable,会被Executors.callable方法转换为Callable类型。

举例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public class CallableAndFuture {
public static void main(String[] args) {
Callable<Integer> callable = new Callable<Integer>() {
public Integer call() throws Exception {
return new Random().nextInt(100);
}
};
FutureTask<Integer> future = new FutureTask<Integer>(callable);
new Thread(future).start();
try {
Thread.sleep(3000);// 可能做一些事情
System.out.println(future.get());
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
}

线程池

在程序启动的时候就创建若干线程来响应处理,它们被称为线程池,里面的线程叫工作线程

优势

  1. 降低资源消耗。通过重复利用已创建的线程降低线程创建和销毁造成的消耗。
  2. 提高响应速度。当任务到达时,任务可以不需要等到线程创建就能立即执行。
  3. 提高线程的可管理性。

线程池接口图

Executors

普通类 Executors 里面调用的就是 ThreadPoolExecutor。

Executors提供了一系列工厂方法用于创先线程池,返回的线程池都实现了ExecutorService接口。ScheduleExecutorService代表可在指定延迟后或周期性地执行线程任务的线程池。

ThreadPoolExecutor

构造方法
1
2
3
4
5
6
7
8
9
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
threadFactory, defaultHandler);
}

corePoolSize:当提交一个任务到线程池时,线程池会创建一个线程来执行任务,即使其他空闲的基本线程能够执行新任务也会创建线程,等到需要执行的任务数大于线程池基本大小时就不再创建。如果调用了线程池的prestartAllCoreThreads方法,线程池会提前创建并启动所有基本线程。
maximumPoolSize:线程池允许创建的最大线程数。如果队列满了,并且已创建的线程数小于最大线程数,则线程池会再创建新的线程执行任务。值得注意的是如果使用了无界的任务队列这个参数就没什么效果。
keepAliveTime: 线程池维护线程所允许的空闲时间
unit: 线程池维护线程所允许的空闲时间的单位
workQueue: 线程池所使用的缓冲队列
threadFactory:线程工厂,它是一个接口,用来为线程池创建新线程的
handler: 超出线程池容量以及队列长度后拒绝任务的策略

针对handler,线程池提供了4种策略:
1、AbortPolicy:直接抛出异常,默认策略;
2、CallerRunsPolicy:用调用者所在的线程来执行任务;
3、DiscardOldestPolicy:丢弃阻塞队列中靠最前的任务,并执行当前任务;
4、DiscardPolicy:直接丢弃任务;

corePoolSize 和 maximumPoolSize

具体说明

提交任务,线程池中的线程数可以增长至corePoolSize,之后继续提交任务将暂存至队列中,如果队列满,则看是否能继续增长线程数至maximumPoolSize,超出后将进行拒绝策略处理。如果采用无界队列,那么maximumPoolSize将失效,线程池中的线程最多就是corePoolSize个线程工作

BlockingQueue

ArrayBlockingQueue

一个基于数组实现的有界阻塞队列。ArrayBlockingQueue在生产者放入和消费者获取数据时共用一个锁对象,所以说对一个queue而言并没有做到真正的并行

SynchronousQueue

SynchronousQueue无数据缓冲区,相当于生产者和消费者直接交换数据而不通过任何中介。SynchronousQueue中每一个put操作必须等待一个take操作,否则不能继续添加元素。如果没有空闲线程,就构造一个新的线程加入线程池,如果线程池里没有空闲线程,则不会进行任务,newCachedThreadPool采用的便是这种策略

LinkedBlockingQueue

一个基于链表实现的有界阻塞队列。不同于ArrayBlockingQueue,其对生产者和消费者端分别采用了独立的锁来控制数据同步,因此提高了队列的并发性能。需要注意的是此队列的默认长度是Integer,MAX_VALUE,即无限大小,当生产者速度过快时,系统内存有被耗尽的风险,newFixedThreadPool采用的便是这种策略

PriorityBlockingQueue

元素是按照顺序储存的,一个具有优先级的无限阻塞队列

DelayQueue

队列中的每个元素实现了Delayed接口,只有当指定的延迟时间到了,才能够从队列中获取该元素。DelayQueue可以用于保存将要执行的任务和执行时间,newScheduledThreadPool采用的便是这种策略,也是无界

Executors 提供四种线程池

newCachedThreadPool

一个可根据需要创建新线程的线程池,但是在以前构造的线程可用时将重用它们。对于执行很多短期异步任务的程序而言,这个线程池通常可提高程序性能。调用 execute() 将重用以前构造的线程(如果线程可用)。如果现有线程没有可用的,则创建一个新线程并添加到池中。终止并从缓存中移除那些已有 60 秒钟未被使用的线程。如果是长期异步任务,就不要用这种,因为每当创建新线程执行任务,会导致一定的系统开销,所以,使用该线程池时,一定要注意控制并发的任务数,否则创建大量的线程可能导致严重的性能问题。线程池的线程数可达到Integer.MAX_VALUE,即2147483647

newSingleThreadExecutor

一个单线程池,也就是该线程池只有一个线程在工作,所有的任务是串行执行的,如果这个唯一的线程因为异常结束,那么会有一个新的线程来替代它,此线程池保证所有任务的执行顺序按照任务的提交顺序执行

newFixedThreadPool

创建固定大小的线程池,每次提交一个任务就创建一个线程,直到线程达到线程池的最大大小,线程池的大小一旦达到最大值就会保持不变,如果某个线程因为执行异常而结束,那么线程池会补充一个新线程

newScheduledThreadPool

初始化的线程池可以在指定的时间内周期性的执行所提交的任务,在实际的业务场景中可以使用该线程池定期的同步数据。

实现方式

1
2
3
4
ExecutorService executorService0 = Executors.newCachedThreadPool();
ExecutorService executorService1 = Executors.newSingleThreadExecutor();
ExecutorService executorService2 = Executors.newFixedThreadPool(10);
ExecutorService executorService3 = Executors.newScheduledThreadPool(10);

ExecutorService方法

  1. execute(Runnable)
1
2
3
4
5
executorService.execute(new Runnable() {  
public void run() {
System.out.println("Asynchronous task");
}
});
  1. submit(Runnable)
1
2
3
4
5
Future future = executorService.submit(new Runnable() {  
public void run() {
System.out.println("Asynchronous task");
}
});
  1. submit(Callable)
1
2
3
4
5
6
Future future = executorService.submit(new Callable(){  
public Object call() throws Exception {
System.out.println("Asynchronous Callable");
return "Callable Result";
}
});
  1. submit(new FutureTask(Callable))
1
2
3
4
5
6
submit(new FutureTask<Integer>(new Callable(){  
public Object call() throws Exception {
System.out.println("Asynchronous Callable");
return "Callable Result";
}
}));
  1. invokeAny(…)

    方法 invokeAny() 接收一个包含 Callable 对象的集合作为参数。调用该方法不会返回 Future 对象,而是返回集合中某一个 Callable 对象的结果,而且无法保证调用之后返回的结果是哪一个 Callable,只知道它是这些 Callable 中一个执行结束的 Callable 对象。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    ExecutorService executorService = Executors.newSingleThreadExecutor();  

    Set<Callable<String>> callables = new HashSet<Callable<String>>();

    callables.add(new Callable<String>() {
    public String call() throws Exception {
    return "Task 1";
    }
    });
    callables.add(new Callable<String>() {
    public String call() throws Exception {
    return "Task 2";
    }
    });
    callables.add(new Callable<String>() {
    public String call() throws Exception {
    return "Task 3";
    }
    });

    String result = executorService.invokeAny(callables);

    System.out.println("result = " + result);

    executorService.shutdown();
  2. invokeAll(…)

    方法 invokeAll() 会调用存在于参数集合中的所有 Callable 对象,并且返回一个包含 Future 对象的集合,你可以通过这个返回的集合来管理每个 Callable 的执行结果。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    //创建一个线程数固定大小为10的线程池
    ExecutorService executorService = Executors.newFixedThreadPool( 10 ) ;
    List<Callable<String>> list = new ArrayList<>() ;
    //创建第一个 Callable
    Callable<String> callable1 = new Callable<String>() {
    @Override
    public String call() throws Exception {
    Log.d( "callable 1 线程是: "+ Thread.currentThread().getName() );
    return "执行完了 callable 1" ;
    }
    };
    //创建第二个 Callable
    Callable<String> callable2 = new Callable<String>() {
    @Override
    public String call() throws Exception {
    Log.d( "callable 2 线程是: "+ Thread.currentThread().getName() );
    return "执行完了 callable 2" ;
    }
    };
    list.add(callable1 ) ;
    list.add(callable2 ) ;
    List<Future<String>> result;
    try {
    result = executorService.invokeAll( list );
    for (Future<String> future : result) {
    Log.d( "结果是: "+ future.get() );
    }
    } catch (Exception e) {
    e.printStackTrace();
    }
    //关闭线程池
    executorService.shutdown();
  3. shutdown()

    不会立即的终止线程池,而是要等所有任务缓存队列中的任务都执行完后才终止,但再也不会接受新的任务

  4. shutdownNow()

    立即终止线程池,并尝试打断正在执行的任务,并且清空任务缓存队列,返回尚未执行的任务

ForkJoinPool介绍

ForkJoinPool同ThreadPoolExecutor一样,也实现了Executor和ExecutorService接口。它使用了一个无限队列来保存需要执行的任务,而线程的数量则是通过构造函数传入,如果没有向构造函数中传入希望的线程数量,那么当前计算机可用的CPU数量会被设置为线程数量作为默认值。

ForkJoinPool的另外一个特性是它能够实现工作窃取(Work Stealing),在该线程池的每个线程中会维护一个队列来存放需要被执行的任务。当线程自身队列中的任务都执行完毕后,它会从别的线程中拿到未被执行的任务并帮助它执行。

可以把一个任务拆分成多个小任务,然后再把多个小任务合成总的计算结果

  • 分解(Fork)操作:当需要将一个任务拆分成更小的多个任务时,在框架中执行这些任务
  • 合并(Join)操作:当一个主任务等待其创建的多个子任务的完成执行
  • ForkJoinPool类的execute(ForkJoinTask task)方法是异步调用的
  • ForkJoinPool类的invoke(ForkJoinTask task)方法则是同步调用的

区别
区别 Executor ForkJoinPool
接受的对象 Runnable和Callable的实例 Runnable、Callable和ForkJoinTask的实例
调度模式 处于后面等待中的任务需要等待前面任务执行后才有机会被执行,是否被执行取决于具体的调度规则 采用work-stealing模式帮助其他线程执行任务,即ExcuteService解决的是并发问题,而ForkJoinPool解决的是并行问题。
ForkJoinTask

ForkJoinTask继承与Future接口,代表一个可以并行、合并的任务.

它有两个抽象子类:RecursiveAction和RecursiveTask。

  • RecursiveTask代表有返回值的任务
  • RecursiveAction代表没有返回值的任务