同步类容器和并发类容器

335人浏览 / 0人评论

并发编程(一)同步类容器和并发类容器

一、同步类容器

同步类容器是 线程安全 的,如 Vector、HashTable 等容器的同步功能都是由 Collections.synchronizedMap 等工厂方法去创建实现的,底层使用 synchronized 关键字,每次只有一个线程访问容器。这明显__不满足高并发的需求__。

源代码: 【Vector】 底层使用 synchronized 修辞,显然是线程安全的

public synchronized boolean add(E e) {
    modCount++;
    ensureCapacityHelper(elementCount + 1);
    elementData[elementCount++] = e;
    return true;
}

源代码: 【HashMap】 底层没有用 synchronized 修辞

public V put(K key, V value) {
    return putVal(hash(key), key, value, false, true);
}

由于 java.util.HashMap 底层没有用 synchronized 修辞,显然不是线程安全的,为了实现线程安全可以用 Collections.synchronizedMap 装饰一下,实现线程安全。

//HashMap不是线程安全容器,加工后成功线程安全
Map<String, String> map = Collections.synchronizedMap(new HashMap<String, String>());

二、并发类容器

并发类容器

说明

ConcurrentHashMap

替代 HashTable

ConcurrentSkipListMap

排序

CopyOnWriteArrayList

替代 Vector

ConcurrentLinkedQueue

高性能队列,无阻塞

LinkedBlockingQueue

阻塞形式队列,阻塞

2.1 ConcurrentMap 容器

ConcurrentHashMap 容器内部使用(Segment)来表示不同的部分,每个段其实就是一个小的 HashTable ,它们有自己的锁。只要多个修改操作发生在不同的段上,它们就可以并发进行。把一个整体分成了16个段(Segment)。也就是最高支持16个线程的并发修改操作。这也是在多线程场景时 减小锁的粒度从而降低锁竞争 一种方案。并且代码中大多共享变量使用 volatile 关键字声明,目的是第一时间获取修改的内容,性能非常好。

ConcurrentMap 接口下两个重要的实现:

  • ConcurrentHashMap
  • ConcurrentSkipListMap(排序)

2.2 Copy-On-Write 容器

CopyOnWrite 容器既写时复制的容器, 用于读多写少的场景 。往一个容器添加元素时,不直接往当前容器添加,而是先将当前容器 Copy ,复制一个新的容器,然后往新的容器添加元素,添加完成之后,再将原容器的引用指向新的容器,这样做的好处是可以对 CopyOnWrite 容器进行并发的读,而不需要加锁,因为当前容器不会添加任何元素。所以 CopyOnWrite 容器是一种读写分离的思想,读和写不同的容器。

源代码: 【CopyOnWriteArrayList】

// Copy-On-Write 容器是一种读写分离的思想
public class CopyOnWriteArrayList<E> {
    //Copy-On-Write 容器写操作时加锁,写操作结束后解锁
    public boolean add(E e) {
        final ReentrantLock lock = this.lock;
        lock.lock();//加锁
        try {
            //1. 获取原容器
            Object[] elements = getArray();
            int len = elements.length;
            //2. 原容器 -> Copy -> 新容器
            Object[] newElements = Arrays.copyOf(elements, len + 1);
            //3. 往新容器写入内容
            newElements[len] = e;
            //4. 指向新容器
            setArray(newElements);
            return true;
        } finally {
            lock.unlock();//解锁
        }
    }

//读操作没有加锁,可以支持并发操作
    @SuppressWarnings("unchecked")
    private E get(Object[] a, int index) {
        return (E) a[index];
    }

public E get(int index) {
        return get(getArray(), index);
    }
}

Copy-On-Write 容器下两个重要的实现:

  • CopyOnWriteArrayList
  • CopyOnWriteArraySet

//CopyOnWriteArrayList <======> List 使用方法与List集合相同
CopyOnWriteArrayList<String> list = new CopyOnWriteArrayList<String>();
list.add("a");
for (int i = 0; i < list.size(); i++) {
    System.out.println(list.get(i));
}

2.3 ConcurrentLinkedQueue 无阻塞队列

适合高并发场景下的队列,通过无锁的方式,实现了高并发状态下的高性能,通常 ConcurrentLinkedQueue 性能好于 LinkedBlockingQueue。它是一个基于链接节点的无界线程安全队列。该队列的元素遵循先进先出的原则,且不允许 null 元素。更多实现原理

ConcurrentLinkedQueue 重要方法:

  1. add()offer():添加元素(ConcurrentLinkedQueue 下两个方法无区别)
  2. poll()peek():取头元素节点,区别在于前者删除元素,后者不会。注意: 没有元素时返回 null,不会阻塞队列。

import java.util.Random;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class ConcurrentLinkedQueueTest {

private static ConcurrentLinkedQueue<Integer> queue = new ConcurrentLinkedQueue<Integer>();
    private static int count = 2; // 线程个数
    //CountDownLatch,一个同步辅助类,在完成一组正在其他线程中执行的操作之前,它允许一个或多个线程一直等待。
    private static CountDownLatch latch = new CountDownLatch(count);

public static void main(String[] args) throws InterruptedException {
        long timeStart = System.currentTimeMillis();
        ExecutorService pool = Executors.newFixedThreadPool(4);
        for (int i = 1; i <= 100; i++) {
            queue.offer(i);
        }
        for (int i = 0; i < count; i++) {
            pool.submit(new Runnable() {
                @Override
                public void run() {
                    Random random = new Random();
                    while (!queue.isEmpty()) {
                        try {
                            Thread.sleep(random.nextInt(10) * 50);
                        } catch (InterruptedException e) {
                            ;
                        }
                        // queue.poll() 可能为 null
                        System.out.println(Thread.currentThread().getName() + "" + queue.poll());
                    }
                    latch.countDown();
                }
            });
        }
        latch.await(); //使得主线程(main)阻塞直到latch.countDown()为零才继续执行
        System.out.println("cost time " + (System.currentTimeMillis() - timeStart) + "ms");
        pool.shutdown();
    }
}

2.4 BlockingQueue 阻塞队列

与 ConcurrentLinkedQueue 相比,BlockingQueue 是阻塞的,即,put 方法在队列满的时候会阻塞直到有队列成员被消费,take 方法在队列空的时候也会阻塞,直到有队列成员被放进来。自定义阻塞队列

BlockingQueu API: BlockingQueue 方法以四种形式出现,对于不能立即满足但可能在将来某一时刻可以满足的操作,这四种形式的处理方式不同:

  • 第一种是抛出一个异常
  • 第二种是返回一个特殊值(null false,具体取决于操作)
  • 第三种是在操作可以成功前,无限期地阻塞当前线程
  • 第四种是在放弃前只在给定的最大时间限制内阻塞。下表中总结了这些方法:

操作

抛出异常

特殊值

阻塞

超时

插入

add(e)

offer(e)

put(e)

offer(e, time, unit)

移除

remove()

poll()

take()

poll(time, unit)

检查

element()

peek()

不可用

不可用

(1) offer

将指定元素插入此队列中(如果立即可行且不会违反容量限制),成功时返回 true,如果当前没有可用的空间,则返回 false,不会抛异常:

java源代码:

public boolean offer(E e) { 
   if (e == null) throw new NullPointerException(); 
   final ReentrantLock lock = this.lock; 
   lock.lock(); 
   try
       if (count == items.length) 
           return false
       else
           insert(e); 
           return true
       } 
   } finally
       lock.unlock(); 
   } 

(2) put

将指定元素插入此队列中,将等待可用的空间.通俗点说就是>maxSize 时候,阻塞,直到能够有空间插入元素

java源代码:

public void put(E e) throws InterruptedException { 
    if (e == null) throw new NullPointerException(); 
    final E[] items = this.items; 
    final ReentrantLock lock = this.lock; 
    lock.lockInterruptibly(); 
    try
        try
            while (count == items.length) 
                notFull.await(); 
        } catch (InterruptedException e) { 
            notFull.signal(); // propagate to non-interrupted thread 
            throw e; 
        } 
        insert(e); 
   } finally
        lock.unlock(); 
   } 

(3) take

获取并移除此队列的头部,在元素变得可用之前一直等待 。queue的长度 == 0 的时候,一直阻塞

java 源代码:

public E take() throws InterruptedException { 
    final ReentrantLock lock = this.lock; 
    lock.lockInterruptibly(); 
    try
        try
            while (count == 0) 
                notEmpty.await(); 
        } catch (InterruptedException ie) { 
            notEmpty.signal(); // propagate to non-interrupted thread 
            throw ie; 
        } 
        E x = extract(); 
        return x; 
    } finally
        lock.unlock(); 
    } 

(4) add

和 collection 的 add 一样,没什么可以说的。如果当前没有可用的空间,则抛出 IllegalStateException。

(5) poll/peek

和 collection 的 poll/peek 一样,队列为空是返回 null

BlockingQueue<String> queue = new LinkedBlockingQueue<String>(3);
queue.offer("1");   // offer方法不是阻塞的
queue.put("2");     // put方法是阻塞的
queue.poll();       // poll方法不是阻塞的,删除第一个元素
queue.peek();       // peek方法不是阻塞的
queue.take();       // take方法是阻塞的

2.4.1 ArrayBlockingQueue

基于数组的阻塞队列实现,其内部维护了一个定长数组,以便缓存队列中的数据对象。没有实现读写分离,也就意味着生产和消费不能完全并行,需要定义长度,可以指定先进先出或者先进后出,也叫__有界队列__。

2.4.2 LinkedBlockingQueue

基于链表的阻塞队列实现,其内部维护了一个数据缓冲队列(链表构成),以便缓存队列中的数据对象。实现了读写分离(读和写两个锁),从而实现生产和消费的完全并行,进而能够高效的处理并发数据,是一个__无界队列__。

2.4.3 SynchronousQueue

一种没有缓冲的队列,生产者产生的数据直接会被消费者获取并消费。

2.4.4 PriorityBlockingQueue

基于优先级的阻塞队列(优先级的判断通过构造函数传入的Compator对象来决定,即传入队列的对象必须实现Comparable接口)实现。其内部控制线程同步的锁采用的是公平锁,也是一个无界队列。

import java.util.concurrent.PriorityBlockingQueue;

public class PriorityBlockingQueueTest {

public static void main(String[] args) {
        PriorityBlockingQueue queue = new PriorityBlockingQueue();
        queue.add(new Task(1));
        queue.add(new Task(6));
        queue.add(new Task(5));

while (true) {
            if (queue.size() == 0)
                break;
            try {
                System.out.println(queue.take()); // (1)
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }

/*for (Iterator it = queue.iterator(); it.hasNext();) {
            Task task = (Task) it.next();  // (2)
            System.out.println(task);
        }*/

    }
}

class Task implements Comparable {
    private int id;

@Override
    public int compareTo(Object o) {
        Task task = (Task) o;
        return this.id > task.id ? 1 : (this.id < task.id ? -1 : 0);
    }

public void setId(int id) {
        this.id = id;
    }
    public Task(int id) {
        this.id = id;
    }
    public String toString() {
        return "Task{" + "id=" + id + '}';
    }
}

  1. take/poll/peek 时,queue 队列按优先级顺序取出元素,程序执行结果如下:Task{id=1}Task{id=5}Task{id=6}
  2. 注意:queue.iterator() 时,queue 队列并__不是__按优先级顺序,结果如下:Task{id=1}Task{id=6}Task{id=5}

2.4.5 DelayQueue

带有延迟时间的队列,其中的元素只有当其指定的延迟时间到了,才能够从队列中获取元素(传入的元素必须实现Delayed接口),也是一个无界队列。应用场景比如缓存超时的数据进行移除、任务超时处理、空闲连接的关闭等等。

import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;

/__
 * 延迟队列:模仿网吧上网场景
 */
public class DelayQueueTest extends Thread {

DelayQueue queue =  new DelayQueue();
    SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd hh:mm:ss");

public void shangji(String name, int money) {
        WangMing wm = new WangMing(name, System.currentTimeMillis() + money * 1000l);
        queue.add(wm);
        System.out.println(name + "开始上网,时间:" + format.format(new Date()) +
                ",预计下机时间为:" + format.format(new Date(wm.getEndTime())));
    }

public void xiaji(WangMing wm) {
        System.out.println(wm.getName() + "下机,时间:" + format.format(new Date(wm.getEndTime())));
    }

public void run() {
        while (true) {
            try {
                WangMing wm = (WangMing) queue.take();
                xiaji(wm);
            } catch (InterruptedException e) {
                ;
            }
        }
    }

public static void main(String[] args) {
        DelayQueueTest wangba = new DelayQueueTest();
        wangba.start();

wangba.shangji("A", 5);
        wangba.shangji("B", 2);
        wangba.shangji("C", 4);
    }
}

/__
 * 网民,必须实现 Delayed 接口
 */
class WangMing implements Delayed {
    private String name;
    private long endTime;
    private TimeUnit timeUnit = TimeUnit.SECONDS;

@Override
    public long getDelay(TimeUnit unit) {
        return endTime - System.currentTimeMillis();
    }

@Override
    public int compareTo(Delayed o) {
        WangMing wm = (WangMing) o;
        return this.getDelay(timeUnit) - wm.getDelay(timeUnit) > 0 ? 1 :
                (this.getDelay(timeUnit) - wm.getDelay(timeUnit) < 0 ? -1 : 0);
    }

public WangMing(String name, long endTime) {
        this.name = name;
        this.endTime = endTime;
    }

public String getName() {
        return name;
    }

public void setName(String name) {
        this.name = name;
    }

public long getEndTime() {
        return endTime;
    }

public void setEndTime(long endTime) {
        this.endTime = endTime;
    }
}

程序执行结果:

A开始上网,时间:2017-12-07 09:37:52,预计下机时间为:2017-12-07 09:37:57
B开始上网,时间:2017-12-07 09:37:52,预计下机时间为:2017-12-07 09:37:54
C开始上网,时间:2017-12-07 09:37:52,预计下机时间为:2017-12-07 09:37:56
B下机,时间:2017-12-07 09:37:54
C下机,时间:2017-12-07 09:37:56
A下机,时间:2017-12-07 09:37:57

来自 <https://www.cnblogs.com/binarylei/p/10024261.html>

支付宝扫码打赏 微信打赏

如果文章对您有帮助,欢迎移至上方按钮打赏,非常感谢你的支持!

全部评论