concurrency-principle

多线程

为什么要创建线程池

如果系统要运行多个线程,大量反复的启动创建和回收线程会非常占用系统资源,导致性能下降.

创建线程池,可以: 1.降低资源消耗 2.提升响应速度 3.提高

线程池原理

线程池一般由两种角色构成:多个工作线程 和 一个阻塞队列。

  • 工作线程 : 工作线程是一组已经处在运行中的线程,它们不断地向阻塞队列中领取任务执行。

  • 阻塞队列 : 阻塞队列用于存储工作线程来不及处理的任务。当工作线程都在执行任务时,到来的新任务就只能暂时在阻塞队列中存储。

提交一个任务到线程池中,线程池的处理流程如下:
1.判断线程池里的核心线程是否都在执行任务,如果不是(核心线程空闲或者核心线程没有被创建)则创建一个新的工作线程来执行任务.如果核心线程都在执行任务,则进入下个流程.
2.线程池判断工作队列是否已满,如果工作路径没有满,则新提交的任务储存在这个工作队列里.如果工作队列满了,则进入下个流程.
3.判断线程池里的线程是否都处于工作状态,如果没有,则创建一个新的工作线程来执行任务.如果已经满了,则交给饱和策略来处理这个任务.

线程池的分类

public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler);

ThreadPoolExecutor是线程池的真正实现, 他通过构造方法的一系列参数,来构成不同配置的线程池。 corePoolSize: 核心池的大小。 当有任务来之后,就会创建一个线程去执行任务,当线程池中的线程数目达到corePoolSize后,就会把到达的任务放到缓存队列当中 maximumPoolSize: 线程池最大线程数,它表示在线程池中最多能创建多少个线程; keepAliveTime: 表示线程没有任务执行时最多保持多久时间会终止。 unit: 参数keepAliveTime的时间单位,有7种取值,在TimeUnit类中有7种静态属性。 workQueue:一个阻塞队列,提交的任务将会被放到这个队列里。 threadFactory:线程工厂,用来创建线程,主要是为了给线程起名字,默认工厂的线程名字:pool-1-thread-3。 handler:拒绝策略,当线程池里线程被耗尽,且队列也满了的时候会调用。

线程池的创建方法

Java通过Executors(jdk1.5并发包)提供四种线程池,分别为:

newCachedThreadPool创建一个可缓存线程池,如果线程池长度超过处理需要,可灵活回收空闲线程,若无可回收,则新建线程。 案例演示:

newFixedThreadPool 创建一个定长线程池,可控制线程最大并发数,超出的线程会在队列中等待。

newScheduledThreadPool 创建一个定长线程池,支持定时及周期性任务执行。

newSingleThreadExecutor 创建一个单线程化的线程池,它只会用唯一的工作线程来执行任务,保证所有任务按照指定顺序(FIFO, LIFO, 优先级)执行。

newCachedThreadPool
创建一个可缓存线程池如果线程池长度超过处理需要可灵活回收空闲线程
若无可回收则新建线程

package cn.qbz.thread;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class Test111907 {
    public static void main(String[] args) {
        ExecutorService executorService = Executors.newCachedThreadPool();
        for (int i = 0; i < 10; i++) {
            final int temp = i;
            executorService.execute(new Runnable() {
                @Override
                public void run() {
                    try {
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    System.out.println(Thread.currentThread().getName() + "   i=" + temp);
                }
            });
        }
    }
}

    public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }
newFixedThreadPool
创建一个定长线程池,可控制线程最大并发数,超出的线程会在队列中等待。

package cn.qbz.thread;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class Test111907 {
    public static void main(String[] args) {
        ExecutorService executorService = Executors.newFixedThreadPool(3);
        for (int i = 0; i < 10; i++) {
            final int temp = i;
            executorService.execute(new Runnable() {
                @Override
                public void run() {
                    try {
                        Thread.sleep(100);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    System.out.println(Thread.currentThread().getName() + "   i=" + temp);
                }
            });
        }
    }
}


    public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }
newScheduledThreadPool
创建一个定长线程池,支持定时及周期性任务执行。

package cn.qbz.thread;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

public class Test111907 {
    public static void main(String[] args) {
        final long begin = System.currentTimeMillis();
        ExecutorService executorService = Executors.newScheduledThreadPool(3);
        for (int i = 0; i < 10; i++) {
            final int temp = i;
            final long time = begin;
            executorService.schedule(new Runnable() {
                @Override
                public void run() {
                    try {
                        Thread.sleep(100);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    System.out.println(Thread.currentThread().getName() + "   i=" + temp + "   time=" + (System.currentTimeMillis() - time));
                }
            }, 5, TimeUnit.SECONDS);
        }
    }
}
    public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
        return new ScheduledThreadPoolExecutor(corePoolSize);
    }
    public ScheduledThreadPoolExecutor(int corePoolSize) {
        super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS,
              new DelayedWorkQueue());
    }
newSingleThreadExecutor
创建一个单线程化的线程池,它只会用唯一的工作线程来执行任务,
保证所有任务按照指定顺序(FIFO, LIFO, 优先级)执行。
public class Test111907 {
    public static void main(String[] args) {
        ExecutorService executorService = Executors.newSingleThreadExecutor();
        for (int i = 0; i < 10; i++) {
            final int temp = i;
            executorService.execute(new Runnable() {
                @Override
                public void run() {
                    try {
                        Thread.sleep(100);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    System.out.println(Thread.currentThread().getName() + "   i=" + temp);
                }
            });
        }
    }
}

关闭线程池

关闭线程池有两种方式:shutdown和shutdownNow,关闭时,会遍历所有的线程,调用它们的interrupt函数中断线程。但这两种方式对于正在执行的线程处理方式不同。

  1. shutdown() 仅停止阻塞队列中等待的线程,那些正在执行的线程就会让他们执行结束。

  2. shutdownNow() 不仅会停止阻塞队列中的线程,而且会停止正在执行的线程。

ThreadPoolExecutor运行机制

当有请求到来时:

  1. 若当前实际线程数量 少于 corePoolSize,即使有空闲线程,也会创建一个新的工作线程;
  2. 若当前实际线程数量处于corePoolSize和maximumPoolSize之间,并且阻塞队列没满,则任务将被放入阻塞队列中等待执行;
  3. 若当前实际线程数量 小于 maximumPoolSize,但阻塞队列已满,则直接创建新线程处理任务;
  4. 若当前实际线程数量已经达到maximumPoolSize,并且阻塞队列已满,则使用饱和策略。

设置合理的线程池大小

任务一般可分为:CPU密集型、IO密集型、混合型,对于不同类型的任务需要分配不同大小的线程池。

CPU密集型任务

  • 尽量使用较小的线程池,一般为CPU核心数+1。 因为CPU密集型任务使得CPU使用率很高,若开过多的线程数,只能增加上下文切换的次数,因此会带来额外的开销。
  • IO密集型任务 可以使用稍大的线程池,一般为2*CPU核心数。 IO密集型任务CPU使用率并不高,因此可以让CPU在等待IO的时候去处理别的任务,充分利用CPU时间。
  • 混合型任务 可以将任务分成IO密集型和CPU密集型任务,然后分别用不同的线程池去处理。 只要分完之后两个任务的执行时间相差不大,那么就会比串行执行来的高效。 因为如果划分之后两个任务执行时间相差甚远,那么先执行完的任务就要等后执行完的任务,最终的时间仍然取决于后执行完的任务,而且还要加上任务拆分与合并的开销,得不偿失。

Executor两级调度模型

在这里插入图片描述

在HotSpot虚拟机中,Java中的线程将会被一一映射为操作系统的线程。 在Java虚拟机层面,用户将多个任务提交给Executor框架,Executor负责分配线程执行它们; 在操作系统层面,操作系统再将这些线程分配给处理器执行。

Executor结构在这里插入图片描述

Executor框架中的所有类可以分成三类:

  1. 任务 任务有两种类型:Runnable和Callable。
  2. 任务执行器 Executor框架最核心的接口是Executor,它表示任务的执行器。 Executor的子接口为ExecutorService。 ExecutorService有两大实现类:ThreadPoolExecutor和ScheduledThreadPoolExecutor。
  3. 执行结果 Future接口表示异步的执行结果,它的实现类为FutureTask。

线程池 Executors工厂类可以创建四种类型的线程池,通过Executors.newXXX即可创建。

1. FixedThreadPool

public static ExecutorService newFixedThreadPool(int nThreads){
    return new ThreadPoolExecutor(nThreads,nThreads,0L,TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>());
}

v2-984ba405f67249ab3cc043c75dbcbedd_hd.jpg

  • 它是一种固定大小的线程池;
  • corePoolSize和maximunPoolSize都为用户设定的线程数量nThreads;
  • keepAliveTime为0,意味着一旦有多余的空闲线程,就会被立即停止掉;但这里keepAliveTime无效;
  • 阻塞队列采用了LinkedBlockingQueue,它是一个无界队列;
  • 由于阻塞队列是一个无界队列,因此永远不可能拒绝任务;
  • 由于采用了无界队列,实际线程数量将永远维持在nThreads,因此maximumPoolSize和keepAliveTime将无效。

2. CachedThreadPool

	public static ExecutorService newCachedThreadPool(){
	    return new ThreadPoolExecutor(0,Integer.MAX_VALUE,60L,TimeUnit.MILLISECONDS,new SynchronousQueue<Runnable>());
	}
123

v2-f9cff0865c6143ace452274046322335_hd.jpg

  • 它是一个可以无限扩大的线程池;
  • 它比较适合处理执行时间比较小的任务;
  • corePoolSize为0,maximumPoolSize为无限大,意味着线程数量可以无限大;
  • keepAliveTime为60S,意味着线程空闲时间超过60S就会被杀死;
  • 采用SynchronousQueue装等待的任务,这个阻塞队列没有存储空间,这意味着只要有请求到来,就必须要找到一条工作线程处理他,如果当前没有空闲的线程,那么就会再创建一条新的线程。

3. SingleThreadExecutor

public static ExecutorService newSingleThreadExecutor(){
    return new ThreadPoolExecutor(1,1,0L,TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>());
}
123

在这里插入图片描述

  • 它只会创建一条工作线程处理任务;
  • 采用的阻塞队列为LinkedBlockingQueue;

4. ScheduledThreadPool

它用来处理延时任务或定时任务。

img

  • 它接收SchduledFutureTask类型的任务,有两种提交任务的方式:
  • scheduledAtFixedRate
  • scheduledWithFixedDelay
  • SchduledFutureTask接收的参数:
  • time:任务开始的时间
  • sequenceNumber:任务的序号
  • period:任务执行的时间间隔
  • 它采用DelayQueue存储等待的任务
  • DelayQueue内部封装了一个PriorityQueue,它会根据time的先后时间排序,若time相同则根据sequenceNumber排序;
  • DelayQueue也是一个无界队列;
  • 工作线程的执行过程:
  • 工作线程会从DelayQueue取已经到期的任务去执行;
  • 执行结束后重新设置任务的到期时间,再次放回DelayQueue

微信公众号:每日学习干货

Licensed under CC BY-NC-SA 4.0
Last updated on Mar 23, 2024 06:11 UTC
让过去的过去,给时间点时间
Built with Hugo
Theme Stack designed by Jimmy