Java 基础
  • Introduction
  • 工具类
    • 日期
  • 枚举
  • 多线程
    • java中多线程实现的3种方式
    • ThreadPoolExecutor线程池参数设置技巧
  • Java Stream API 入门篇
  • gradle项目构建工具
    • gradle使用总结
      • eclipse中指定gradle的java home
      • gradle 跨工程引用
      • Gradle 引入本地定制 jar 包
      • gradle构建脚本基础
      • maven-publish plugin
      • 灵铱公司-会员订单项目配置
      • ssh plugin
  • json
    • gson
      • gson使用指南
  • 并发编程
    • Lock
    • Condition
    • Semaphore对象池-令牌桶
    • 读多写少·读写互斥·本地缓存
    • 线程池线程任务执行完成计数器
    • 线程池使用
  • Google Guava官方教程
    • Guave Cache
Powered by GitBook
On this page

Was this helpful?

  1. 并发编程

Semaphore对象池-令牌桶

PreviousConditionNext读多写少·读写互斥·本地缓存

Last updated 6 years ago

Was this helpful?

上面的例子,我们用信号量实现了一个最简单的互斥锁功能。估计你会觉得奇怪,既然有 Java SDK 里面提供了 Lock,为啥还要提供一个 Semaphore ?其实实现一个互斥锁,仅仅是 Semaphore 的部分功能,Semaphore 还有一个功能是 Lock 不容易实现的,那就是:Semaphore 可以允许多个线程访问一个临界区。

现实中还有这种需求?有的。比较常见的需求就是我们工作中遇到的各种池化资源,例如连接池、对象池、线程池等等。其中,你可能最熟悉数据库连接池,在同一时刻,一定是允许多个线程同时使用连接池的,当然,每个连接在被释放前,是不允许其他线程使用的。

其实前不久,我在工作中也遇到了一个对象池的需求。所谓对象池呢,指的是一次性创建出 N 个对象,之后所有的线程重复利用这 N 个对象,当然对象在被释放前,也是不允许其他线程使用的。对象池,可以用 List 保存实例对象,这个很简单。但关键是限流器的设计,这里的限流,指的是不允许多于 N 个线程同时进入临界区。那如何快速实现一个这样的限流器呢?这种场景,我立刻就想到了信号量的解决方案。

信号量的计数器,在上面的例子中,我们设置成了 1,这个 1 表示只允许一个线程进入临界区,但如果我们把计数器的值设置成对象池里对象的个数 N,就能完美解决对象池的限流问题了。下面就是对象池的示例代码。

class ObjPool<T, R> {

 final List<T> pool;

 // 用信号量实现限流器

 final Semaphore sem;

 // 构造函数

 ObjPool(int size, T t){

 pool = new Vector<T>(){};

 for(int i=0; i<size; i++){

 pool.add(t);

 }

 sem = new Semaphore(size);

 }

 // 利用对象池的对象,调用 func

 R exec(Function<T,R> func) {

 T t = null;

 sem.acquire();

 try {

 t = pool.remove(0);

 return func.apply(t);

 } finally {

 pool.add(t);

 sem.release();

 }

 }

}

// 创建对象池

ObjPool<Long, String> pool =

 new ObjPool<Long, String>(10, 2);

// 通过对象池获取 t,之后执行

pool.exec(t -> {

 System.out.println(t);

 return t.toString();

});

假如我有一个user对象,有一个user对象池,里面有3个user对象,并发获取user对象,执行user的toString方法

为什么看到效果,假如我们的toString方法耗时5s

import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.Setter;

@Setter
@Getter
@AllArgsConstructor
public class User {

    private String username;

    private Integer age;

    @Override
    public String toString() {
        try {
            Thread.currentThread().sleep(5000);
        } catch (InterruptedException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
        return "User [username=" + username + ", age=" + age + "]";
    }




}
import java.util.List;
import java.util.Vector;
import java.util.concurrent.Semaphore;

public class ObjPool {

    final List<User> pool;

    // 用信号量实现限流器 final Semaphore
    final Semaphore sem;

    public ObjPool(User... users) {

        pool = new Vector<User>();
        for (User user : users) {
            pool.add(user);
        }
        System.out.println(users.length);
        sem = new Semaphore(users.length);
    }

    public User remove()  {
        try {
            sem.acquire();
        } catch (InterruptedException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
        User user = pool.remove(0);
        return user;

    }

    public void add(User user) {
        pool.add(user);
        sem.release();
    }


}

测试

package com.example.lua.demo;

import lombok.extern.slf4j.Slf4j;

@Slf4j
public class Test {

    public static void main(String[] args) throws Exception {
        User user1 = new User("u1", 1);
        User user2 = new User("u2", 2);
        User user3 = new User("u3", 3);

        ObjPool objPool = new ObjPool(user1, user2, user3);


        for(int i = 0; i < 100; i++) {

            new Thread(()->{

                User user = objPool.remove();
                String string = user.toString();
                log.info(string);
                objPool.add(user);
            }).start(); 


        }





    }

}