千家信息网

Java并发之Semaphore源码的示例分析

发表于:2025-11-12 作者:千家信息网编辑
千家信息网最后更新 2025年11月12日,这篇文章主要介绍了Java并发之Semaphore源码的示例分析,具有一定借鉴价值,感兴趣的朋友可以参考下,希望大家阅读完这篇文章之后大有收获,下面让小编带着大家一起了解一下。Semaphore(信号
千家信息网最后更新 2025年11月12日Java并发之Semaphore源码的示例分析

这篇文章主要介绍了Java并发之Semaphore源码的示例分析,具有一定借鉴价值,感兴趣的朋友可以参考下,希望大家阅读完这篇文章之后大有收获,下面让小编带着大家一起了解一下。

Semaphore(信号量)是JUC包中比较常用到的一个类,它是AQS共享模式的一个应用,可以允许多个线程同时对共享资源进行操作,并且可以有效的控制并发数,利用它可以很好的实现流量控制。Semaphore提供了一个许可证的概念,可以把这个许可证看作公共汽车车票,只有成功获取车票的人才能够上车,并且车票是有一定数量的,不可能毫无限制的发下去,这样就会导致公交车超载。所以当车票发完的时候(公交车以满载),其他人就只能等下一趟车了。如果中途有人下车,那么他的位置将会空闲出来,因此如果这时其他人想要上车的话就又可以获得车票了。利用Semaphore可以实现各种池,我们在本篇末尾将会动手写一个简易的数据库连接池。首先我们来看一下Semaphore的构造器。

//构造器1public Semaphore(int permits) {  sync = new NonfairSync(permits);}//构造器2public Semaphore(int permits, boolean fair) {  sync = fair ? new FairSync(permits) : new NonfairSync(permits);}

Semaphore提供了两个带参构造器,没有提供无参构造器。这两个构造器都必须传入一个初始的许可证数量,使用构造器1构造出来的信号量在获取许可证时会采用非公平方式获取,使用构造器2可以通过参数指定获取许可证的方式(公平or非公平)。Semaphore主要对外提供了两类API,获取许可证和释放许可证,默认的是获取和释放一个许可证,也可以传入参数来同时获取和释放多个许可证。在本篇中我们只讲每次获取和释放一个许可证的情况。

1.获取许可证

//获取一个许可证(响应中断)public void acquire() throws InterruptedException {  sync.acquireSharedInterruptibly(1);}//获取一个许可证(不响应中断)public void acquireUninterruptibly() {  sync.acquireShared(1);}//尝试获取许可证(非公平获取)public boolean tryAcquire() {  return sync.nonfairTryAcquireShared(1) >= 0;}//尝试获取许可证(定时获取)public boolean tryAcquire(long timeout, TimeUnit unit) throws InterruptedException {  return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));}

上面的API是Semaphore提供的默认获取许可证操作。每次只获取一个许可证,这也是现实生活中较常遇到的情况。除了直接获取还提供了尝试获取,直接获取操作在失败之后可能会阻塞线程,而尝试获取则不会。另外还需注意的是tryAcquire方法是使用非公平方式尝试获取的。在平时我们比较常用到的是acquire方法去获取许可证。下面我们就来看看它是怎样获取的。可以看到acquire方法里面直接就是调用sync.acquireSharedInterruptibly(1),这个方法是AQS里面的方法,我们在讲AQS源码系列文章的时候曾经讲过,现在我们再来回顾一下。

//以可中断模式获取锁(共享模式)public final void acquireSharedInterruptibly(int arg) throws InterruptedException {  //首先判断线程是否中断, 如果是则抛出异常  if (Thread.interrupted()) {    throw new InterruptedException();  }  //1.尝试去获取锁  if (tryAcquireShared(arg) < 0) {    //2. 如果获取失败则进人该方法    doAcquireSharedInterruptibly(arg);  }}

acquireSharedInterruptibly方法首先就是去调用tryAcquireShared方法去尝试获取,tryAcquireShared在AQS里面是抽象方法,FairSync和NonfairSync这两个派生类实现了该方法的逻辑。FairSync实现的是公平获取的逻辑,而NonfairSync实现的非公平获取的逻辑。

abstract static class Sync extends AbstractQueuedSynchronizer {  //非公平方式尝试获取  final int nonfairTryAcquireShared(int acquires) {    for (;;) {      //获取可用许可证      int available = getState();      //获取剩余许可证      int remaining = available - acquires;      //1.如果remaining小于0则直接返回remaining      //2.如果remaining大于0则先更新同步状态再返回remaining      if (remaining < 0 || compareAndSetState(available, remaining)) {        return remaining;      }    }  }}//非公平同步器static final class NonfairSync extends Sync {  private static final long serialVersionUID = -2694183684443567898L;  NonfairSync(int permits) {    super(permits);  }  //尝试获取许可证  protected int tryAcquireShared(int acquires) {    return nonfairTryAcquireShared(acquires);  }}//公平同步器static final class FairSync extends Sync {  private static final long serialVersionUID = 2014338818796000944L;  FairSync(int permits) {    super(permits);  }  //尝试获取许可证  protected int tryAcquireShared(int acquires) {    for (;;) {      //判断同步队列前面有没有人排队      if (hasQueuedPredecessors()) {        //如果有的话就直接返回-1,表示尝试获取失败        return -1;      }      //获取可用许可证      int available = getState();      //获取剩余许可证      int remaining = available - acquires;      //1.如果remaining小于0则直接返回remaining      //2.如果remaining大于0则先更新同步状态再返回remaining      if (remaining < 0 || compareAndSetState(available, remaining)) {        return remaining;      }    }  }}

这里需要注意的是NonfairSync的tryAcquireShared方法直接调用的是nonfairTryAcquireShared方法,这个方法是在父类Sync里面的。非公平获取锁的逻辑是先取出当前同步状态(同步状态表示许可证个数),将当前同步状态减去参入的参数,如果结果不小于0的话证明还有可用的许可证,那么就直接使用CAS操作更新同步状态的值,最后不管结果是否小于0都会返回该结果值。这里我们要了解tryAcquireShared方法返回值的含义,返回负数表示获取失败,零表示当前线程获取成功但后续线程不能再获取,正数表示当前线程获取成功并且后续线程也能够获取。我们再来看acquireSharedInterruptibly方法的代码。

//以可中断模式获取锁(共享模式)public final void acquireSharedInterruptibly(int arg) throws InterruptedException {  //首先判断线程是否中断, 如果是则抛出异常  if (Thread.interrupted()) {    throw new InterruptedException();  }  //1.尝试去获取锁  //负数:表示获取失败  //零值:表示当前线程获取成功, 但是后继线程不能再获取了  //正数:表示当前线程获取成功, 并且后继线程同样可以获取成功  if (tryAcquireShared(arg) < 0) {    //2. 如果获取失败则进人该方法    doAcquireSharedInterruptibly(arg);  }}

如果返回的remaining小于0的话就代表获取失败,因此tryAcquireShared(arg) < 0就为true,所以接下来就会调用doAcquireSharedInterruptibly方法,这个方法我们在讲AQS的时候讲过,它会将当前线程包装成结点放入同步队列尾部,并且有可能挂起线程。这也是当remaining小于0时线程会排队阻塞的原因。而如果返回的remaining>=0的话就代表当前线程获取成功,因此tryAcquireShared(arg) < 0就为flase,所以就不会再去调用doAcquireSharedInterruptibly方法阻塞当前线程了。以上是非公平获取的整个逻辑,而公平获取时仅仅是在此之前先去调用hasQueuedPredecessors方法判断同步队列是否有人在排队,如果有的话就直接return -1表示获取失败,否则才继续执行下面和非公平获取一样的步骤。

2.释放许可证

//释放一个许可证public void release() {  sync.releaseShared(1);}

调用release方法是释放一个许可证,它的操作很简单,就调用了AQS的releaseShared方法,我们来看看这个方法。

//释放锁的操作(共享模式)public final boolean releaseShared(int arg) {  //1.尝试去释放锁  if (tryReleaseShared(arg)) {    //2.如果释放成功就唤醒其他线程    doReleaseShared();    return true;  }  return false;}

AQS的releaseShared方法首先调用tryReleaseShared方法尝试释放锁,这个方法的实现逻辑在子类Sync里面。

abstract static class Sync extends AbstractQueuedSynchronizer {  ...  //尝试释放操作  protected final boolean tryReleaseShared(int releases) {    for (;;) {      //获取当前同步状态      int current = getState();      //将当前同步状态加上传入的参数      int next = current + releases;      //如果相加结果小于当前同步状态的话就报错      if (next < current) {        throw new Error("Maximum permit count exceeded");      }      //以CAS方式更新同步状态的值, 更新成功则返回true, 否则继续循环      if (compareAndSetState(current, next)) {        return true;      }    }  }  ...}

可以看到tryReleaseShared方法里面采用for循环进行自旋,首先获取同步状态,将同步状态加上传入的参数,然后以CAS方式更新同步状态,更新成功就返回true并跳出方法,否则就继续循环直到成功为止,这就是Semaphore释放许可证的流程。

3.动手写个连接池

Semaphore代码并没有很复杂,常用的操作就是获取和释放一个许可证,这些操作的实现逻辑也都比较简单,但这并不妨碍Semaphore的广泛应用。下面我们就来利用Semaphore实现一个简单的数据库连接池,通过这个例子希望读者们能更加深入的掌握Semaphore的运用。

public class ConnectPool {    //连接池大小  private int size;  //数据库连接集合  private Connect[] connects;  //连接状态标志  private boolean[] connectFlag;  //剩余可用连接数  private volatile int available;  //信号量  private Semaphore semaphore;    //构造器  public ConnectPool(int size) {     this.size = size;    this.available = size;    semaphore = new Semaphore(size, true);    connects = new Connect[size];    connectFlag = new boolean[size];    initConnects();  }    //初始化连接  private void initConnects() {    //生成指定数量的数据库连接    for(int i = 0; i < this.size; i++) {      connects[i] = new Connect();    }  }    //获取数据库连接  private synchronized Connect getConnect(){     for(int i = 0; i < connectFlag.length; i++) {      //遍历集合找到未使用的连接      if(!connectFlag[i]) {        //将连接设置为使用中        connectFlag[i] = true;        //可用连接数减1        available--;        System.out.println("【"+Thread.currentThread().getName()+"】以获取连接   剩余连接数:" + available);        //返回连接引用        return connects[i];      }    }    return null;  }    //获取一个连接  public Connect openConnect() throws InterruptedException {    //获取许可证    semaphore.acquire();    //获取数据库连接    return getConnect();  }    //释放一个连接  public synchronized void release(Connect connect) {     for(int i = 0; i < this.size; i++) {      if(connect == connects[i]){        //将连接设置为未使用        connectFlag[i] = false;        //可用连接数加1        available++;        System.out.println("【"+Thread.currentThread().getName()+"】以释放连接   剩余连接数:" + available);        //释放许可证        semaphore.release();      }    }  }    //剩余可用连接数  public int available() {    return available;  }  }

测试代码:

public class TestThread extends Thread {    private static ConnectPool pool = new ConnectPool(3);    @Override  public void run() {    try {      Connect connect = pool.openConnect();      Thread.sleep(100); //休息一下      pool.release(connect);    } catch (InterruptedException e) {      e.printStackTrace();    }  }    public static void main(String[] args) {    for(int i = 0; i < 10; i++) {      new TestThread().start();    }  }}

测试结果:

我们使用一个数组来存放数据库连接的引用,在初始化连接池的时候会调用initConnects方法创建指定数量的数据库连接,并将它们的引用存放到数组中,此外还有一个相同大小的数组来记录连接是否可用。每当外部线程请求获取一个连接时,首先调用semaphore.acquire()方法获取一个许可证,然后将连接状态设置为使用中,最后返回该连接的引用。许可证的数量由构造时传入的参数决定,每调用一次semaphore.acquire()方法许可证数量减1,当数量减为0时说明已经没有连接可以使用了,这时如果其他线程再来获取就会被阻塞。每当线程释放一个连接的时候会调用semaphore.release()将许可证释放,此时许可证的总量又会增加,代表可用的连接数增加了,那么之前被阻塞的线程将会醒来继续获取连接,这时再次获取就能够成功获取连接了。测试示例中初始化了一个3个连接的连接池,我们从测试结果中可以看到,每当线程获取一个连接剩余的连接数将会减1,等到减为0时其他线程就不能再获取了,此时必须等待一个线程将连接释放之后才能继续获取。可以看到剩余连接数总是在0到3之间变动,说明我们这次的测试是成功的。

感谢你能够认真阅读完这篇文章,希望小编分享的"Java并发之Semaphore源码的示例分析"这篇文章对大家有帮助,同时也希望大家多多支持,关注行业资讯频道,更多相关知识等着你来学习!

许可证 方法 线程 同步 状态 尝试 成功 构造器 数据 数据库 剩余 数量 逻辑 更新 参数 方式 模式 结果 车票 测试 数据库的安全要保护哪些东西 数据库安全各自的含义是什么 生产安全数据库录入 数据库的安全性及管理 数据库安全策略包含哪些 海淀数据库安全审计系统 建立农村房屋安全信息数据库 易用的数据库客户端支持安全管理 连接数据库失败ssl安全错误 数据库的锁怎样保障安全 金蝶 数据库 删除某单据 聊城管理系统软件开发哪家靠谱 如何建立用户名和密码的数据库 玉溪瀚海软件开发有限公司 财务软件能不能连接空白数据库 宁夏卓远信科网络技术 上海奇迹无限网络技术有限公司 智能停车场数据库整合 软件开发支出计入成本 嘉兴智能软件开发创新服务 历年变更数据库中字段含义 智慧电力软件开发公司 搞网络技术类的工作是什么 惠普服务器设置管理地址 网络安全技术绪论 服务器安全规范文档 数据库服务器连接映射 山东交友软件开发价钱 mysql数据库语句 苏宁易购软件开发工程师 网络安全主题团课感想 聊城管理系统软件开发哪家靠谱 数据源和数据库连接池 苹果软件开发视频 绝地求生提示连接服务器失败 中国最厉害的软件开发公司 深信服数据库安全 苏州不动产服务器异常 高斯数据库查看表结构语句 数据库进入目录命令
0