多线程

为什么要创建线程池

如果系统要运行多个线程,大量反复的启动创建和回收线程会非常占用系统资源,导致性能下降.

创建线程池,可以:
1.降低资源消耗
2.提升响应速度
3.提高

线程池原理

线程池一般由两种角色构成:多个工作线程 和 一个阻塞队列。

  • 工作线程 :
    工作线程是一组已经处在运行中的线程,它们不断地向阻塞队列中领取任务执行。

  • 阻塞队列 :
    阻塞队列用于存储工作线程来不及处理的任务。当工作线程都在执行任务时,到来的新任务就只能暂时在阻塞队列中存储。

提交一个任务到线程池中,线程池的处理流程如下:
1.判断线程池里的核心线程是否都在执行任务,如果不是(核心线程空闲或者核心线程没有被创建)则创建一个新的工作线程来执行任务.如果核心线程都在执行任务,则进入下个流程.
2.线程池判断工作队列是否已满,如果工作路径没有满,则新提交的任务储存在这个工作队列里.如果工作队列满了,则进入下个流程.
3.判断线程池里的线程是否都处于工作状态,如果没有,则创建一个新的工作线程来执行任务.如果已经满了,则交给饱和策略来处理这个任务.

线程池的分类

public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler);

ThreadPoolExecutor是线程池的真正实现,
他通过构造方法的一系列参数,来构成不同配置的线程池。
corePoolSize: 核心池的大小。 当有任务来之后,就会创建一个线程去执行任务,当线程池中的线程数目达到corePoolSize后,就会把到达的任务放到缓存队列当中
maximumPoolSize: 线程池最大线程数,它表示在线程池中最多能创建多少个线程;
keepAliveTime: 表示线程没有任务执行时最多保持多久时间会终止。
unit: 参数keepAliveTime的时间单位,有7种取值,在TimeUnit类中有7种静态属性。
workQueue:一个阻塞队列,提交的任务将会被放到这个队列里。
threadFactory:线程工厂,用来创建线程,主要是为了给线程起名字,默认工厂的线程名字:pool-1-thread-3。
handler:拒绝策略,当线程池里线程被耗尽,且队列也满了的时候会调用。

线程池的创建方法

Java通过Executors(jdk1.5并发包)提供四种线程池,分别为:

newCachedThreadPool创建一个可缓存线程池,如果线程池长度超过处理需要,可灵活回收空闲线程,若无可回收,则新建线程。
案例演示:

newFixedThreadPool 创建一个定长线程池,可控制线程最大并发数,超出的线程会在队列中等待。

newScheduledThreadPool 创建一个定长线程池,支持定时及周期性任务执行。

newSingleThreadExecutor 创建一个单线程化的线程池,它只会用唯一的工作线程来执行任务,保证所有任务按照指定顺序(FIFO, LIFO, 优先级)执行。

newCachedThreadPool
创建一个可缓存线程池,如果线程池长度超过处理需要,可灵活回收空闲线程,
若无可回收,则新建线程。

package cn.qbz.thread;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class Test111907 {
    public static void main(String[] args) {
        ExecutorService executorService = Executors.newCachedThreadPool();
        for (int i = 0; i < 10; i++) {
            final int temp = i;
            executorService.execute(new Runnable() {
                @Override
                public void run() {
                    try {
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    System.out.println(Thread.currentThread().getName() + "   i=" + temp);
                }
            });
        }
    }
}

    public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }
newFixedThreadPool
创建一个定长线程池,可控制线程最大并发数,超出的线程会在队列中等待。

package cn.qbz.thread;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class Test111907 {
    public static void main(String[] args) {
        ExecutorService executorService = Executors.newFixedThreadPool(3);
        for (int i = 0; i < 10; i++) {
            final int temp = i;
            executorService.execute(new Runnable() {
                @Override
                public void run() {
                    try {
                        Thread.sleep(100);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    System.out.println(Thread.currentThread().getName() + "   i=" + temp);
                }
            });
        }
    }
}


    public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }
newScheduledThreadPool
创建一个定长线程池,支持定时及周期性任务执行。

package cn.qbz.thread;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

public class Test111907 {
    public static void main(String[] args) {
        final long begin = System.currentTimeMillis();
        ExecutorService executorService = Executors.newScheduledThreadPool(3);
        for (int i = 0; i < 10; i++) {
            final int temp = i;
            final long time = begin;
            executorService.schedule(new Runnable() {
                @Override
                public void run() {
                    try {
                        Thread.sleep(100);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    System.out.println(Thread.currentThread().getName() + "   i=" + temp + "   time=" + (System.currentTimeMillis() - time));
                }
            }, 5, TimeUnit.SECONDS);
        }
    }
}
    public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
        return new ScheduledThreadPoolExecutor(corePoolSize);
    }
    public ScheduledThreadPoolExecutor(int corePoolSize) {
        super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS,
              new DelayedWorkQueue());
    }
newSingleThreadExecutor
创建一个单线程化的线程池,它只会用唯一的工作线程来执行任务,
保证所有任务按照指定顺序(FIFO, LIFO, 优先级)执行。
public class Test111907 {
    public static void main(String[] args) {
        ExecutorService executorService = Executors.newSingleThreadExecutor();
        for (int i = 0; i < 10; i++) {
            final int temp = i;
            executorService.execute(new Runnable() {
                @Override
                public void run() {
                    try {
                        Thread.sleep(100);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    System.out.println(Thread.currentThread().getName() + "   i=" + temp);
                }
            });
        }
    }
}

关闭线程池

关闭线程池有两种方式:shutdown和shutdownNow,关闭时,会遍历所有的线程,调用它们的interrupt函数中断线程。但这两种方式对于正在执行的线程处理方式不同。

  1. shutdown()
    仅停止阻塞队列中等待的线程,那些正在执行的线程就会让他们执行结束。

  2. shutdownNow()
    不仅会停止阻塞队列中的线程,而且会停止正在执行的线程。

ThreadPoolExecutor运行机制

当有请求到来时:

  1. 若当前实际线程数量 少于 corePoolSize,即使有空闲线程,也会创建一个新的工作线程;
  2. 若当前实际线程数量处于corePoolSize和maximumPoolSize之间,并且阻塞队列没满,则任务将被放入阻塞队列中等待执行;
  3. 若当前实际线程数量 小于 maximumPoolSize,但阻塞队列已满,则直接创建新线程处理任务;
  4. 若当前实际线程数量已经达到maximumPoolSize,并且阻塞队列已满,则使用饱和策略。

设置合理的线程池大小

任务一般可分为:CPU密集型、IO密集型、混合型,对于不同类型的任务需要分配不同大小的线程池。

CPU密集型任务

  • 尽量使用较小的线程池,一般为CPU核心数+1。
    因为CPU密集型任务使得CPU使用率很高,若开过多的线程数,只能增加上下文切换的次数,因此会带来额外的开销。
  • IO密集型任务
    可以使用稍大的线程池,一般为2*CPU核心数。
    IO密集型任务CPU使用率并不高,因此可以让CPU在等待IO的时候去处理别的任务,充分利用CPU时间。
  • 混合型任务
    可以将任务分成IO密集型和CPU密集型任务,然后分别用不同的线程池去处理。
    只要分完之后两个任务的执行时间相差不大,那么就会比串行执行来的高效。
    因为如果划分之后两个任务执行时间相差甚远,那么先执行完的任务就要等后执行完的任务,最终的时间仍然取决于后执行完的任务,而且还要加上任务拆分与合并的开销,得不偿失。

Executor两级调度模型

在这里插入图片描述
在这里插入图片描述

在HotSpot虚拟机中,Java中的线程将会被一一映射为操作系统的线程。
在Java虚拟机层面,用户将多个任务提交给Executor框架,Executor负责分配线程执行它们;
在操作系统层面,操作系统再将这些线程分配给处理器执行。

Executor结构在这里插入图片描述

Executor框架中的所有类可以分成三类:

  1. 任务
    任务有两种类型:Runnable和Callable。
  2. 任务执行器
    Executor框架最核心的接口是Executor,它表示任务的执行器。
    Executor的子接口为ExecutorService。
    ExecutorService有两大实现类:ThreadPoolExecutor和ScheduledThreadPoolExecutor。
  3. 执行结果
    Future接口表示异步的执行结果,它的实现类为FutureTask。

线程池
Executors工厂类可以创建四种类型的线程池,通过Executors.newXXX即可创建。

1. FixedThreadPool

public static ExecutorService newFixedThreadPool(int nThreads){
    return new ThreadPoolExecutor(nThreads,nThreads,0L,TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>());
}
v2-984ba405f67249ab3cc043c75dbcbedd_hd.jpg
v2-984ba405f67249ab3cc043c75dbcbedd_hd.jpg
  • 它是一种固定大小的线程池;
  • corePoolSize和maximunPoolSize都为用户设定的线程数量nThreads;
  • keepAliveTime为0,意味着一旦有多余的空闲线程,就会被立即停止掉;但这里keepAliveTime无效;
  • 阻塞队列采用了LinkedBlockingQueue,它是一个无界队列;
  • 由于阻塞队列是一个无界队列,因此永远不可能拒绝任务;
  • 由于采用了无界队列,实际线程数量将永远维持在nThreads,因此maximumPoolSize和keepAliveTime将无效。

2. CachedThreadPool

    public static ExecutorService newCachedThreadPool(){
        return new ThreadPoolExecutor(0,Integer.MAX_VALUE,60L,TimeUnit.MILLISECONDS,new SynchronousQueue<Runnable>());
    }
123
v2-f9cff0865c6143ace452274046322335_hd.jpg
v2-f9cff0865c6143ace452274046322335_hd.jpg
  • 它是一个可以无限扩大的线程池;
  • 它比较适合处理执行时间比较小的任务;
  • corePoolSize为0,maximumPoolSize为无限大,意味着线程数量可以无限大;
  • keepAliveTime为60S,意味着线程空闲时间超过60S就会被杀死;
  • 采用SynchronousQueue装等待的任务,这个阻塞队列没有存储空间,这意味着只要有请求到来,就必须要找到一条工作线程处理他,如果当前没有空闲的线程,那么就会再创建一条新的线程。

3. SingleThreadExecutor

public static ExecutorService newSingleThreadExecutor(){
    return new ThreadPoolExecutor(1,1,0L,TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>());
}
123
在这里插入图片描述
在这里插入图片描述
  • 它只会创建一条工作线程处理任务;
  • 采用的阻塞队列为LinkedBlockingQueue;

4. ScheduledThreadPool

它用来处理延时任务或定时任务。

img
img
  • 它接收SchduledFutureTask类型的任务,有两种提交任务的方式:
  • scheduledAtFixedRate
  • scheduledWithFixedDelay
  • SchduledFutureTask接收的参数:
  • time:任务开始的时间
  • sequenceNumber:任务的序号
  • period:任务执行的时间间隔
  • 它采用DelayQueue存储等待的任务
  • DelayQueue内部封装了一个PriorityQueue,它会根据time的先后时间排序,若time相同则根据sequenceNumber排序;
  • DelayQueue也是一个无界队列;
  • 工作线程的执行过程:
  • 工作线程会从DelayQueue取已经到期的任务去执行;
  • 执行结束后重新设置任务的到期时间,再次放回DelayQueue

微信公众号:每日学习干货

评论