Java多线程
阿里云国内75折 回扣 微信号:monov8 |
阿里云国际,腾讯云国际,低至75折。AWS 93折 免费开户实名账号 代冲值 优惠多多 微信号:monov8 飞机:@monov6 |
文章目录
一、集合框架结构
Java语言的集合框架父接口是Iterable从这个接口向下一一继承就可以得到完整的Java集合框架结构。
接口Collection下有三个继承分支List、Set、Queue它是集合框架主要功能的抽象。。另一个接口是Iterable。
1.接口 Iterable
接口Iterable的主要作用是迭代循环接口Iterable结构非常简洁其中包含方法iterator()通过此方法返回Iterator对象以进行循环处理。
2. 接口 Collection
接口Collection提供了集合框架最主要、最常用的操作。接口内部提供的方法主要是针对数据的增删改查操作。
3.接口 List
接口 List 对接口 Collection进行了扩展允许根据索引位置操作数据并且允许内容重复。接口List最常用的非并发实现类是ArrayList它是非线程安全的可以对数据以顺序表的形式进行组织使数据呈现有序的效果。
类Vector是线程安全的所以在多线程并发操作数据时可以无误地处理集合中的数据。Vector的iterator()方法返回的Iterator对象不支持并发。
4.接口 Set
接口Set也是对接口Collection进行了扩展特点是不允许内容重复排序方式为自然排序即无序。其防止元素重复的原理原理是元素需要重写hashCode()和equals()方法。接口Set最常用的非并发实现类是HashSet。
HashSet默认以无序的方式组织元素LinkedHashSet类可以有序地组织元素。
接口Set还有另外一个实现类即TreeSet。它不仅实现了接口Set还实现了接口SortedSet和NavigableSet。接口SortedSet和接口NavigableSet的父接口为Set接口SortedSet和接口NavigableSet在功能上得到了扩展比如可以获取接口Set中内容的子集支持获取表头和表尾的数据等。
5.接口 Queue
接口Queue对接口Collection进行了扩展它可以方便地操作队列头。接口Queue的非并发实现类有PriorityQueue它是一个基于优先级的无界优先级队列。
6.接口 Deque
接口Queue支持对表头的操作而接口Deque不仅支持对表头的操作而且支持对表尾的操作所以Deque的全称Double Ended Queue(双端队列)。
接口Queue和Deque之间有继承关系。即Deque继承了Queue。
接口Deque的非并发实现类有ArrayDeque和LinkedList。如果只想从队列两端获取数据则使用ArrayDeque如果想从队列两端获取数据的同时根据索引的位置操作数据则使用LinkedList。
二、非阻塞队列
非阻塞队列的特色是队列里面没有数据时返回异常或null。
在JDK的并发包中常见的非阻塞队列有ConcurrentHashMap、ConcurrentSkipListMap、ConcurrentSkipListSet、ConcurrentLinkedQueue、ConcurrentLinkedDeque、CopyOnWriteArrayList、CopyOnWriteArraySet。
1.ConcurrentHashMap的使用
HashMap是线程不安全的。Hashtable是线程安全的在多线程环境下支持put操作但不支持remove操作。
类ConcurrentHashMap是JDK并发包中支持并发操作的Map对象在多线程环境下支持put操作和remove操作。
1.1 验证ConcurrentHashMap的线程安全性
-
创建包concurrent.service在包下创建类MyService
package concurrent.service; import java.util.concurrent.ConcurrentHashMap; public class MyService { public ConcurrentHashMap cmap = new ConcurrentHashMap(); public void testMethod() { for (int i = 0; i < 50000; i++) { cmap.put(Thread.currentThread().getName()+" "+(i+1),Thread.currentThread().getName()+" "+(i+1)); System.out.println(Thread.currentThread().getName()+" "+(i+1)); } } }
-
创建包concurrent.thread在包下创建线程类ThreadA
package concurrent.thread; import concurrent.service.MyService; public class ThreadA extends Thread{ private MyService myService; public ThreadA(MyService myService) { this.myService = myService; } @Override public void run() { myService.testMethod(); } }
-
创建包concurrent.thread在包下创建线程类ThreadB
package concurrent.thread; import concurrent.service.MyService; public class ThreadB extends Thread{ private MyService myService; public ThreadB(MyService myService) { this.myService = myService; } @Override public void run() { myService.testMethod(); } }
-
创建包concurrent.test在包下创建类Run
package concurrent.test; import concurrent.service.MyService; import concurrent.thread.ThreadA; import concurrent.thread.ThreadB; //ConcurrentHashMap public class Run { public static void main(String[] args) { MyService myService = new MyService(); ThreadA threadA = new ThreadA(myService); ThreadB threadB = new ThreadB(myService); threadA.start(); threadB.start(); } }
即在启动两个线程类同时使用同个MyService类中的ConcurrentHashMap实例的put方法。
1.2 验证ConcurrentHashMap支持并发remove删除操作
-
创建包concurrent.service在包下创建类MyService2
package concurrent.service; import java.util.Iterator; import java.util.concurrent.ConcurrentHashMap; public class MyService2 { public ConcurrentHashMap cmap = new ConcurrentHashMap(); //在构造方法中给cmap赋值 public MyService2() { for (int i = 0; i < 100000; i++) { cmap.put(Thread.currentThread().getName()+" "+(i+1),"abc"); } } //将构造方法中赋的值删除 public void testMethod() { Iterator iterator = cmap.keySet().iterator(); while(iterator.hasNext()) { Object o = iterator.next(); System.out.println(o.toString()); iterator.remove(); System.out.println(cmap.size()+" "+Thread.currentThread().getName()); } } }
-
创建包concurrent.thread在包下创建线程类ThreadA2
package concurrent.thread; import concurrent.service.MyService2; public class ThreadA2 extends Thread{ private MyService2 myService; public ThreadA2(MyService2 myService) { this.myService = myService; } @Override public void run() { myService.testMethod(); } }
-
创建包concurrent.thread在包下创建线程类ThreadB2
package concurrent.thread; import concurrent.service.MyService2; public class ThreadB2 extends Thread{ private MyService2 myService; public ThreadB2(MyService2 myService) { this.myService = myService; } @Override public void run() { myService.testMethod(); } }
-
创建包concurrent.test在包下创建类Run2
package concurrent.test; import concurrent.service.MyService2; import concurrent.thread.ThreadA2; import concurrent.thread.ThreadB2; //ConcurrentHashMap public class Run2 { public static void main(String[] args) { MyService2 myService = new MyService2(); ThreadA2 threadA2 = new ThreadA2(myService); ThreadB2 threadB2 = new ThreadB2(myService); threadA2.start(); threadB2.start(); } }
即在MyService2的构造方法中给ConcurrentHashMap实例赋值然后使用两个线程同时在方法testMethod中将这些值remove删除掉。两个线程在remove操作时有时候没有删除成功直接运行后面的程序。即它是支持并发remove操作的。
2.ConcurrentSkipListMap的使用
-
创建包concurrent.entity在包下创建实体类UserInfo
package concurrent.entity; public class UserInfo implements Comparable<UserInfo>{ private int id; private String username; public UserInfo(int id,String username) { this.id = id; this.username = username; } public int getId() { return id; } public void setId(int id) { this.id = id; } public String getUsername() { return username; } public void setUsername(String username) { this.username = username; } @Override public int compareTo(UserInfo o) { if (this.getId()>o.getId()) return 1; else return -1; } }
-
创建包concurrent.service在包下创建类MyService3
package concurrent.service; import concurrent.entity.UserInfo; import java.util.Iterator; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentSkipListMap; public class MyService3 { //ConcurrentSkipListMap支持线程安全且支持排序 public ConcurrentSkipListMap<UserInfo,String> map = new ConcurrentSkipListMap(); //在构造方法中给cmap赋值 public MyService3() { UserInfo userinfo1 = new UserInfo(1, "userinfo1"); UserInfo userinfo2 = new UserInfo(2, "userinfo2"); UserInfo userinfo3 = new UserInfo(3, "userinfo3"); UserInfo userinfo4 = new UserInfo(4, "userinfo4"); UserInfo userinfo5 = new UserInfo(5, "userinfo5"); map.put(userinfo1,"u1"); map.put(userinfo2,"u2"); map.put(userinfo3,"u3"); map.put(userinfo4,"u4"); map.put(userinfo5,"u5"); } //将构造方法中赋的值删除 public void testMethod() { //弹出Map中的第一个值 Map.Entry<UserInfo, String> entry = map.pollFirstEntry(); System.out.println("map.size()="+map.size()); UserInfo key = entry.getKey(); //输出key并且验证key在map中是否存在 System.out.println(key.getId()+" "+key.getUsername()+" "+map.get(key)+" "+entry.getValue()); } }
-
创建包concurrent.thread在包下创建线程类MyThread
package concurrent.thread; import concurrent.service.MyService3; public class MyThread extends Thread{ private MyService3 myService; public MyThread(MyService3 myService) { this.myService = myService; } @Override public void run() { myService.testMethod(); } }
-
创建包concurrent.test在包下创建类Run3
package concurrent.test; import concurrent.service.MyService3; import concurrent.thread.MyThread; //ConcurrentSkipListMap public class Run3 { public static void main(String[] args) throws InterruptedException { MyService3 myService = new MyService3(); MyThread thread1 = new MyThread(myService); MyThread thread2 = new MyThread(myService); MyThread thread3 = new MyThread(myService); MyThread thread4 = new MyThread(myService); MyThread thread5 = new MyThread(myService); thread1.start(); //此时sleep是为了让其顺序的打印出来 Thread.sleep(1000); thread2.start(); Thread.sleep(1000); thread3.start(); Thread.sleep(1000); thread4.start(); Thread.sleep(1000); thread5.start(); Thread.sleep(1000); } }
即在UserInfo类中实现Comparable接口并实现compareTo方法按属性id的值进行升序。然后在MyService3类中的ConcurrentSkipListMap实例中指定该UserInfo类作为keyString类型作为value。在put时就可以自动排序。
3.ConcurrentSkipListSet的使用
类ConcurrentSkipListSet支持并发安全和排序且不允许元素重复。
-
创建包concurrent.entity在包下创建类UserInfo1
package concurrent.entity; public class UserInfo1 implements Comparable<UserInfo1>{ private int id; private String username; public UserInfo1(int id, String username) { this.id = id; this.username = username; } public int getId() { return id; } public void setId(int id) { this.id = id; } public String getUsername() { return username; } public void setUsername(String username) { this.username = username; } @Override public int compareTo(UserInfo1 o) { //升序 if (this.getId()>o.getId()) return 1; //降序 if(this.getId()<o.getId()) return -1; //不处理 return 0; } @Override public int hashCode() { final int prime = 31; int result = 1; result = prime*result +id; result = prime*result+((username==null)? 0 : username.hashCode()); return result; } @Override public boolean equals(Object o) { if (this == o) return true; if (o == null || getClass() != o.getClass()) return false; UserInfo1 userInfo1 = (UserInfo1) o; return id == userInfo1.id && username.equals(userInfo1.username); } }
-
创建包concurrent.service在包下创建类MyService4
package concurrent.service; import concurrent.entity.UserInfo1; import java.util.concurrent.ConcurrentSkipListSet; public class MyService4 { //ConcurrentSkipListSet支持线程安全、支持排序且不允许元素重复 public ConcurrentSkipListSet set = new ConcurrentSkipListSet(); //在构造方法中给set赋值 public MyService4() { UserInfo1 userinfo1 = new UserInfo1(1, "userinfo1"); UserInfo1 userinfo2 = new UserInfo1(2, "userinfo2"); UserInfo1 userinfo3 = new UserInfo1(3, "userinfo3"); UserInfo1 userinfo4 = new UserInfo1(4, "userinfo4"); UserInfo1 userinfo32 = new UserInfo1(3, "userinfo3"); UserInfo1 userinfo5 = new UserInfo1(5, "userinfo5"); set.add(userinfo1); set.add(userinfo3); set.add(userinfo2); set.add(userinfo32); set.add(userinfo5); set.add(userinfo4); } }
-
创建包concurrent.thread在包下创建线程类MyThread2
package concurrent.thread; import concurrent.entity.UserInfo1; import concurrent.service.MyService4; public class MyThread2 extends Thread{ private MyService4 myService; public MyThread2(MyService4 myService) { this.myService = myService; } @Override public void run() { UserInfo1 userInfo1 = (UserInfo1) myService.set.pollFirst(); System.out.println(userInfo1.getId()+" "+userInfo1.getUsername()); } }
-
创建包concurrent.test在包下创建类Run4
package concurrent.test; import concurrent.service.MyService4; import concurrent.thread.MyThread2; //ConcurrentSkipListMap public class Run4 { public static void main(String[] args) throws InterruptedException { MyService4 myService = new MyService4(); MyThread2 thread1 = new MyThread2(myService); MyThread2 thread2 = new MyThread2(myService); MyThread2 thread3 = new MyThread2(myService); MyThread2 thread4 = new MyThread2(myService); MyThread2 thread5 = new MyThread2(myService); thread1.start(); //此时sleep是为了让其顺序的打印出来 Thread.sleep(1000); thread2.start(); Thread.sleep(1000); thread3.start(); Thread.sleep(1000); thread4.start(); Thread.sleep(1000); thread5.start(); Thread.sleep(1000); } }
即在UserInfo1类中实现Comparable接口并实现compareTo方法按属性id的值进行升序。然后在MyService4类中的ConcurrentSkipListSet实例中将该UserInfo1类作为value进行添加操作的同时进行升序排序。此时因为Set中防止元素重复的原理就是防止hashCode值相同即当value值相同时hashCode值相同则无法添加。
4.ConcurrentLinkedQueue的使用
类ConcurrentLinkedQueue提供了并发环境下的队列操作。
4.1 ConcurrentLinkedQueue的线程安全性
-
创建包concurrent.service在包下创建类MyService5
package concurrent.service; import java.util.concurrent.ConcurrentLinkedQueue; public class MyService5 { public ConcurrentLinkedQueue queue = new ConcurrentLinkedQueue(); }
-
创建包concurrent.thread在包下创建线程类ThreadA3
package concurrent.thread; import concurrent.service.MyService5; public class ThreadA3 extends Thread{ private MyService5 myService; public ThreadA3(MyService5 myService) { this.myService = myService; } @Override public void run() { for (int i = 0; i < 50; i++) { myService.queue.add("ThreadA"+(i+1)); } } }
-
创建包concurrent.thread在包下创建线程类ThreadB3
package concurrent.thread; import concurrent.service.MyService5; public class ThreadB3 extends Thread{ private MyService5 myService; public ThreadB3(MyService5 myService) { this.myService = myService; } @Override public void run() { for (int i = 0; i < 50; i++) { myService.queue.add("ThreadB"+(i+1)); } } }
-
创建包concurrent.test在包下创建类Run5
package concurrent.test; import concurrent.service.MyService5; import concurrent.thread.ThreadA3; import concurrent.thread.ThreadB3; public class Run5 { public static void main(String[] args){ try { MyService5 myService = new MyService5(); ThreadA3 a = new ThreadA3(myService); ThreadB3 b = new ThreadB3(myService); a.start(); b.start(); a.join(); b.join(); System.out.println(myService.queue.size()); } catch (InterruptedException e) { throw new RuntimeException(e); } } }
即创建两个线程类实例传入同一个MyService对象2个线程实例会给一个MyService的ConcurrentLinkedQueue属性添加元素。
然后在打印ConcurrentLinkedQueue对象的元素大小时之前将调用两个线程的join方法让当前线程main休眠等待两个线程执行完再执行后面的代码。此时主要实现并发下该ConcurrentLinkedQueue对象的线程安全性。
4.2 ConcurrentLinkedQueue的方法
- 创建包concurrent.test在包下创建类Run6
poll()方法没有获得数据时返回null获得数据时移除表头并将表头返回。package concurrent.test; import concurrent.service.MyService5; public class Run6 { public static void main(String[] args) { MyService5 myService= new MyService5(); myService.queue.add("a"); myService.queue.add("b"); myService.queue.add("c"); //ConcurrentLinkedQueue.poll(),为空时会返回null返回数据并移除表头 System.out.println(myService.queue.poll()); System.out.println("begin size="+myService.queue.size()); //ConcurrentLinkedQueue.element(),为空时会出现异常,返回数据但不移除表头 System.out.println(myService.queue.element()); //ConcurrentLinkedQueue.peek(),为空时会返回null,返回数据但不移除表头 System.out.println(myService.queue.peek()); System.out.println("end size="+myService.queue.size()); } }
element()方法没有获得数据时出现NoSuchElementException异常获得数据时不移除表头并将表头进行返回。
peek()方法没有获得数据时返回null获得数据时不移除表头并将表头进行返回。
5.ConcurrentLinkedDeque的使用
由于ConcurrentLinkedQueue仅支持对列头进行操作而ConcurrentLinkedDeque支持对列头和列尾双向进行操作。
-
创建包concurrent.service在包下创建类MyService6
package concurrent.service; import java.util.concurrent.ConcurrentLinkedDeque; public class MyService6 { //双向队列 public ConcurrentLinkedDeque queue = new ConcurrentLinkedDeque(); public MyService6() { for (int i = 0; i < 2; i++) { queue.add("string"+(i+1)); //queue.addFirst("string"+(i+1)); //queue.addLast("string"+(i+1)); } } }
-
创建包concurrent.thread在包下创建线程类ThreadA4
package concurrent.thread; import concurrent.service.MyService6; public class ThreadA4 extends Thread{ private MyService6 myService6; public ThreadA4(MyService6 myService6) { this.myService6 = myService6; } @Override public void run() { System.out.println("value="+myService6.queue.pollFirst()+"queue.size()="+myService6.queue.size()); } }
-
创建包concurrent.thread在包下创建线程类ThreadB4
package concurrent.thread; import concurrent.service.MyService6; public class ThreadB4 extends Thread{ private MyService6 myService6; public ThreadB4(MyService6 myService6) { this.myService6 = myService6; } @Override public void run() { System.out.println("value="+myService6.queue.pollLast()+"queue.size()="+myService6.queue.size()); } }
-
创建包concurrent.test在包下创建类Run7
package concurrent.test; import concurrent.service.MyService6; import concurrent.thread.ThreadA4; import concurrent.thread.ThreadB4; public class Run7 { public static void main(String[] args) throws InterruptedException { MyService6 myService6 = new MyService6(); ThreadA4 first1 = new ThreadA4(myService6); ThreadA4 first2 = new ThreadA4(myService6); ThreadB4 last1 = new ThreadB4(myService6); ThreadB4 last2 = new ThreadB4(myService6); new ThreadB4(myService6); first1.start(); Thread.sleep(1000); last1.start(); Thread.sleep(1000); first2.start(); Thread.sleep(1000); last2.start(); Thread.sleep(1000); } }
在MyService6类中创建一个双向队列在构造方法中队列赋值然后线程类在构造方法中给MyService6赋值线程类A在run方法调用该双向队列的首元素弹出方法pollFirst()线程类B在run方法调用该双向队列的尾元素弹出方法pollLast()在启动类Run7中新建该线程类并将MyService6实例传入再该线程类即可Thread.sleep()是为了保证队列按指定顺序弹出。
6.CopyOnWriteArrayList的使用
ArrayList为非线程安全的。在并发环境下可以使用CopyOnWriteArrayList。
-
创建包concurrent.service在包下创建类MyService7
package concurrent.service; import java.util.concurrent.CopyOnWriteArrayList; public class MyService7 { public static CopyOnWriteArrayList list = new CopyOnWriteArrayList(); }
-
创建包concurrent.thread在包下创建线程类MyThread3
package concurrent.thread; import concurrent.service.MyService7; public class MyThread3 extends Thread{ private MyService7 myService7; public MyThread3(MyService7 myService7) { this.myService7 = myService7; } @Override public void run() { for (int i = 0; i < 100; i++) { myService7.list.add("anyString"+(i+1)); } } }
-
创建包concurrent.test在包下创建类Run8
package concurrent.test; import concurrent.service.MyService7; import concurrent.thread.MyThread3; public class Run8 { public static void main(String[] args) throws InterruptedException { MyService7 myService7 = new MyService7(); MyThread3[] array = new MyThread3[100]; for (int i = 0; i < array.length; i++) { array[i] = new MyThread3(myService7); } for (int i = 0; i < array.length; i++) { array[i].start(); } Thread.sleep(3000); System.out.println(myService7.list.size()); System.out.println("可以取得值"+myService7.list.get(5)); } }
在MyService7类中创建一个CopyOnWriteArrayList对象然后线程类MyThread3再构造方法中给内部的MyService7对象赋值在run方法中给该MyService7对象的CopyOnWriteArrayList属性赋值在Run8启动类中将MyService7作为参数传入MyThread3类中然后启动MyThread3休眠3秒后获取该CopyOnWriteArrayList对象的元素数量再取出第五个元素。
7.CopyOnWriteArraySet的使用
CopyOnWriteArraySet解决多线程环境下HashSet不安全问题。
-
创建包concurrent.service在包下创建类MyService8
package concurrent.service; import java.util.concurrent.CopyOnWriteArraySet; public class MyService8 { public static CopyOnWriteArraySet set = new CopyOnWriteArraySet(); }
-
创建包concurrent.thread在包下创建线程类MyThread4
package concurrent.thread; import concurrent.service.MyService8; public class MyThread4 extends Thread{ private MyService8 myService8; public MyThread4(MyService8 myService8) { this.myService8 = myService8; } @Override public void run() { for (int i = 0; i < 100; i++) { myService8.set.add(Thread.currentThread().getName()+"anyString"+(i+1)); } } }
-
创建包concurrent.test在包下创建类Run9
package concurrent.test; import concurrent.service.MyService8; import concurrent.thread.MyThread4; public class Run9 { public static void main(String[] args) throws InterruptedException { MyService8 myService8 = new MyService8(); MyThread4[] array = new MyThread4[100]; for (int i = 0; i < array.length; i++) { array[i] = new MyThread4(myService8); } for (int i = 0; i < array.length; i++) { array[i].start(); } Thread.sleep(3000); System.out.println(myService8.set.size()); } }
在MyService8类中创建一个CopyOnWriteArraySet对象然后线程类MyThread4再构造方法中给内部的MyService8对象赋值在run方法中给该MyService8对象的CopyOnWriteArraySet属性赋值在Run9启动类中将MyService8作为参数传入MyThread4类中然后启动MyThread4休眠3秒后获取该CopyOnWriteArraySet对象的元素数量。
三、阻塞队列
阻塞队列即如果BlockQueue为空时从该队列中取数据的操作将会被阻塞进入阻塞状态直到该BlockQueue不为空时才会被唤醒。同样在如果BlockQueue为满也就是没有空余空间时往队列中存放数据的操作也会被阻塞进入等待状态直到BlockingQueue有剩余空间时才会被唤醒。
简而言之就是阻塞队列为空时取数据的操作等待存放数据的操作后才被唤醒为满时存数据的操作等待取数据的操作后才被唤醒。
1.ArrayBlockingQueue与公平/非公平锁的使用
ArrayBlockingQueue提供一种有界阻塞队列。
-
创建包concurrent.service在包下创建类MyService9
package concurrent.service; import java.util.concurrent.ArrayBlockingQueue; public class MyService9 { public ArrayBlockingQueue queue; public MyService9(boolean fair) { queue = new ArrayBlockingQueue(10,fair); } public void take() { try { System.out.println(Thread.currentThread().getName()+" take"); String takeString = ""+queue.take(); System.out.println(Thread.currentThread().getName()+" take value"+takeString); } catch (InterruptedException e) { throw new RuntimeException(e); } } }
-
创建包concurrent.thread在包下创建线程类TakeThread
package concurrent.thread; import concurrent.service.MyService9; public class TakeThread extends Thread{ private MyService9 myService9; public TakeThread(MyService9 myService9) { this.myService9 = myService9; } @Override public void run() { myService9.take(); } }
-
创建包concurrent.test在包下创建类Run10
package concurrent.test; import concurrent.service.MyService9; import concurrent.thread.TakeThread; public class Run10 { public static void main(String[] args) throws InterruptedException { //false 创建非公平锁 true 创建公平锁 MyService9 myService9 = new MyService9(false); TakeThread[] take1 = new TakeThread[10]; TakeThread[] take2 = new TakeThread[10]; for (int i = 0; i < take1.length; i++) { take1[i] = new TakeThread(myService9); take1[i].setName("+++"); } for (int i = 0; i < take1.length; i++) { take1[i].start(); } for (int i = 0; i < take2.length; i++) { take2[i] = new TakeThread(myService9); take2[i].setName("---"); } Thread.sleep(300); myService9.queue.put("a1"); myService9.queue.put("a2"); myService9.queue.put("a3"); myService9.queue.put("a4"); myService9.queue.put("a5"); myService9.queue.put("a6"); myService9.queue.put("a7"); myService9.queue.put("a8"); myService9.queue.put("a9"); myService9.queue.put("a10"); for (int i = 0; i < take2.length; i++) { take2[i].start(); } } }
此时创建一个非公平锁而公平锁只需要将false改为true即可。
在MyService9类的构造方法中传入fair创建一个公平/非公平锁并创建一个take方法队列的take方法前面的打印信息是肯定能打印输出的然后调用ArrayBlockingQueue的take方法然后该方法后面的打印信息是看该take方法会不会被阻塞来决定打印出来证明没有被阻塞。
TakeThread在构造方法中给内部的MyService9赋值然后再run方法中调用MyService9的take方法。启动类Run10中先创建一个MyService9实例然后给定义的两个线程数组赋值第一个线程数组赋值并启动休眠300毫秒是为了让第一个线程数组中所有线程都启动完毕再给ArrayBlockingQueue添加元素后再启动第二个数组的所有线程。
此时因为是非公平锁所以第二个线程数组也可以抢占锁并调用take方法取值。
若将该锁创建时传入true即改为公平锁此时只有第一个线程数组可以按顺序拿到锁并调用take方法赋值。
2.PriorityBlockingQueue的使用
PriorityBlockingQueue支持在并发情况下使用优先级队列。
-
创建包concurrent.entity在包下创建实体类UserInfo2
package concurrent.entity; public class UserInfo2 implements Comparable<UserInfo2>{ private int id; public UserInfo2() { } public UserInfo2(int id) { this.id = id; } public int getId() { return id; } public void setId(int id) { this.id = id; } @Override public int compareTo(UserInfo2 o) { if(this.id<o.getId()) return -1; if(this.id>o.getId()) return 1; return 0; } }
-
创建包concurrent.test在包下创建类Test1
package concurrent.test; import concurrent.entity.UserInfo2; import java.util.concurrent.PriorityBlockingQueue; public class Test1 { public static void main(String[] args) { PriorityBlockingQueue<UserInfo2> queue = new PriorityBlockingQueue<>(); queue.add(new UserInfo2(222)); queue.add(new UserInfo2(1515)); queue.add(new UserInfo2(01)); queue.add(new UserInfo2(07)); System.out.println(queue.poll().getId()); System.out.println(queue.poll().getId()); System.out.println(queue.poll().getId()); System.out.println(queue.poll().getId()); System.out.println(queue.poll()); } }
-
创建包concurrent.test在包下创建类Test2
package concurrent.test; import concurrent.entity.UserInfo2; import java.util.concurrent.PriorityBlockingQueue; public class Test2 { public static void main(String[] args) throws InterruptedException { PriorityBlockingQueue<UserInfo2> queue = new PriorityBlockingQueue<>(); System.out.println("begin"); System.out.println(queue.take()); System.out.println("end"); } }
创建实体类UserInfo2后实现Comparable接口从而实现根据id排序的比较器。Test1的PriorityBlockingQueue的poll方法在数据为空时返回null值不会阻塞。而Test2的PriorityBlockingQueue的take方法在数据为空时进入阻塞状态等待数据被添加到该队列中。
3.SynchronousQueue的使用
SynchronousQueue经常在多个线程之间传输数据时使用。
-
创建包concurrent.test在包下创建类Test3
package concurrent.test; import java.util.concurrent.SynchronousQueue; public class Test3 { public static void main(String[] args) { try { SynchronousQueue queue = new SynchronousQueue(); System.out.println("step1"); queue.put("anyString"); System.out.println("step2"); System.out.println(queue.take()); System.out.println("step3"); } catch (InterruptedException e) { throw new RuntimeException(e); } } }
此时的线程是主线程mainSynchronousQueue在put后因为没有其他线程将数据取走所以程序不能继续执行。
-
创建包concurrent.service在包下创建类MyService10
package concurrent.service; import java.util.concurrent.SynchronousQueue; public class MyService10 { //同步队列 public static SynchronousQueue queue = new SynchronousQueue(); public void putMethod() throws InterruptedException { String putString = "anyString"+Math.random(); System.out.println("put="+putString); queue.put(putString); } public void takeMethod() throws InterruptedException { System.out.println("take="+queue.take()); } }
-
创建包concurrent.thread在包下创建线程类ThreadPut
package concurrent.thread; import concurrent.service.MyService10; public class ThreadPut extends Thread{ private MyService10 myService10; public ThreadPut(MyService10 myService10) { this.myService10 = myService10; } @Override public void run() { for (int i = 0; i < 10; i++) { try { myService10.putMethod(); } catch (InterruptedException e) { throw new RuntimeException(e); } } } }
-
创建包concurrent.thread在包下创建线程类ThreadTake
package concurrent.thread; import concurrent.service.MyService10; public class ThreadTake extends Thread{ private MyService10 myService10; public ThreadTake(MyService10 myService10) { this.myService10 = myService10; } @Override public void run() { for (int i = 0; i < 10; i++) { try { myService10.takeMethod(); } catch (InterruptedException e) { throw new RuntimeException(e); } } } }
-
创建包concurrent.test在包下创建类Test4
package concurrent.test; import concurrent.service.MyService10; import concurrent.thread.ThreadPut; import concurrent.thread.ThreadTake; public class Test4 { public static void main(String[] args) throws InterruptedException { MyService10 myService10 = new MyService10(); ThreadPut put = new ThreadPut(myService10); ThreadTake take = new ThreadTake(myService10); take.start(); Thread.sleep(2000); put.start(); } }
MyService10在内部创建了一个SynchronousQueue实例并创建putMethod()和takeMethod()用以调用改队列的put和take方法。
ThreadPut的run用以调用MyService10的putMethod()方法ThreadTake的run用以调用MyService10的takeMethod()方法。
启动类Test4给线程类ThreadPut和ThreadTake实例化并传入MyService10实例先启动take线程再休眠2秒是为了体现该队列的阻塞性。
SynchronousQueue只适用于多线程因为put之后必须要有其他线程调用take即一次put操作后等待其他线程的一次take。
4.DelayQueue的使用
-
创建包concurrent.entity在包下创建类UserInfo3
package concurrent.entity; import java.util.concurrent.Delayed; import java.util.concurrent.TimeUnit; public class UserInfo3 implements Delayed { private String username; private long runNanoTime; public UserInfo3(String username, long secondTime) { this.username = username; long delayNanoTime = TimeUnit.SECONDS.toNanos(secondTime); this.runNanoTime = System.nanoTime()+delayNanoTime; } public String getUsername() { return username; } @Override public long getDelay(TimeUnit unit) { return runNanoTime - System.nanoTime(); } @Override public int compareTo(Delayed o) { UserInfo3 userInfo3 = (UserInfo3) o; if(this.runNanoTime>userInfo3.runNanoTime) return 1; else return -1; } }
-
创建包concurrent.test在包下创建类Test5
package concurrent.test; import concurrent.entity.UserInfo3; import java.util.concurrent.DelayQueue; public class Test5 { public static void main(String[] args) throws InterruptedException { UserInfo3 userInfo5 = new UserInfo3("5",5); UserInfo3 userInfo4 = new UserInfo3("4",4); UserInfo3 userInfo1 = new UserInfo3("1",1); UserInfo3 userInfo2 = new UserInfo3("2",2); UserInfo3 userInfo3 = new UserInfo3("3",3); DelayQueue<UserInfo3> queue = new DelayQueue<>(); queue.add(userInfo1); queue.add(userInfo2); queue.add(userInfo3); queue.add(userInfo4); queue.add(userInfo5); System.out.println(queue.take().getUsername()+" "+System.currentTimeMillis()); System.out.println(queue.take().getUsername()+" "+System.currentTimeMillis()); System.out.println(queue.take().getUsername()+" "+System.currentTimeMillis()); System.out.println(queue.take().getUsername()+" "+System.currentTimeMillis()); System.out.println(queue.take().getUsername()+" "+System.currentTimeMillis()); } }
UserInfo3实现Delayed接口实现该接口的两个方法getDelaycompareTogetDelay是用来获取是否到达延迟后的时间compareTo用来将其根据runNanoTime属性进行排序。
在Test5启动类中创建多个UserInfo实例添加到DelayQueue延迟队列从而实现延迟获取并打印数据同时该take方法是阻塞的方法。
5.LinkedTransferQueue的使用
LinkedTransferQueue和SynchronousQueue类似但它可以尝试性地添加一些数据。
该队列的transfer(e)方法和上面阻塞队列的添加方法一样操作会存在阻塞。
但是tryTransfer(e)方法会立即传输数据如果不存在等待获取数据的消费者线程则返回false数据不放入队列中整个过程是不阻塞的。
5.1 tryTransfer(E e)的使用
-
创建包concurrent.service在包下创建类MyServiceB
package concurrent.service; import java.util.concurrent.LinkedTransferQueue; import java.util.concurrent.TransferQueue; public class MyServiceB { public TransferQueue queue = new LinkedTransferQueue(); }
-
创建包concurrent.thread在包下创建线程类ThreadB5
package concurrent.thread; import concurrent.service.MyServiceA; import concurrent.service.MyServiceB; public class ThreadB5 extends Thread{ private MyServiceB myServiceB; public ThreadB5(MyServiceB myServiceB) { this.myServiceB = myServiceB; } @Override public void run() { System.out.println(Thread.currentThread().getName()+" beginB"+System.currentTimeMillis()); System.out.println("tryTransfer 返回值"+myServiceB.queue.tryTransfer("ThreadB5")); System.out.println(Thread.currentThread().getName()+" endB"+System.currentTimeMillis()); } }
-
创建包concurrent.test在包下创建类Test7
package concurrent.test; import concurrent.service.MyServiceB; import concurrent.thread.ThreadB5; public class Test7 { public static void main(String[] args) throws InterruptedException { MyServiceB myServiceB = new MyServiceB(); ThreadB5 threadB5 = new ThreadB5(myServiceB); threadB5.setName("b"); threadB5.start(); //休眠一秒是为了等待线程ThreadB5尝试传输元素 Thread.sleep(1000); System.out.println("队列中的元素个数为"+myServiceB.queue.size()); } }
MyServiceB创建一个LinkedTransferQueue队列。
ThreadB5通过构造方法给内部的MyServiceB赋值并在run方法中调用队列的tryTransfer(e)方法。
启动类Test7中实例化MyServiceB并将该实例传入ThreadB5中休眠一秒是为了让线程的run方法执行完毕最后打印队列中的元素个数。此时没有等待获取数据的消费者线程所以该数据会被丢失并返回false但是不会阻塞该线程。
5.2 tryTransfer(E e,long timeout,TimeUnit unit)的使用
该方法相当于tryTransfer(E e)的扩展在指定时间timeout时间单位unit之后还没有获取数据的消费者则返回false。即比tryTransfer(E e)方法多了等待一段指定时间。
-
创建包concurrent.service在包下创建类MyServiceB
package concurrent.service; import java.util.concurrent.LinkedTransferQueue; import java.util.concurrent.TransferQueue; public class MyServiceB { public TransferQueue queue = new LinkedTransferQueue(); }
-
创建包concurrent.thread在包下创建线程类ThreadB6
package concurrent.thread; import concurrent.service.MyServiceB; import java.util.concurrent.TimeUnit; public class ThreadB6 extends Thread{ private MyServiceB myServiceB; public ThreadB6(MyServiceB myServiceB) { this.myServiceB = myServiceB; } @Override public void run() { try { System.out.println(Thread.currentThread().getName()+" beginB"+System.currentTimeMillis()); System.out.println("tryTransfer 返回值"+myServiceB.queue.tryTransfer("ThreadB6",3, TimeUnit.SECONDS)); System.out.println(Thread.currentThread().getName()+" endB"+System.currentTimeMillis()); } catch (InterruptedException e) { throw new RuntimeException(e); } } }
-
创建包concurrent.test在包下创建类Test8
package concurrent.test; import concurrent.service.MyServiceB; import concurrent.thread.ThreadB6; public class Test8 { public static void main(String[] args) throws InterruptedException { MyServiceB myServiceB = new MyServiceB(); ThreadB6 threadB6 = new ThreadB6(myServiceB); threadB6.setName("b"); threadB6.start(); } }
MyServiceB创建一个LinkedTransferQueue队列。
ThreadB6通过构造方法给内部的MyServiceB赋值并在run方法中调用队列的tryTransfer(E e,long timeout,TimeUnit unit)方法。
启动类Test8中实例化MyServiceB并将该实例传入ThreadB6中。此时没有等待获取数据的消费者线程所以该数据会等待3秒后被丢失并返回false但是不会阻塞该线程。
5.3 boolean hasWaitingConsumer()和int getWaitingConsumerCount()方法的使用
boolean hasWaitingConsumer()方法是判断有没有消费者在等待数据
int getWaitingConsumerCount()方法是等待数据的消费者线程的数量。
-
创建包concurrent.service在包下创建类MyServiceC
package concurrent.service; import java.util.concurrent.LinkedTransferQueue; import java.util.concurrent.TransferQueue; public class MyServiceC { public TransferQueue queue = new LinkedTransferQueue(); }
-
创建包concurrent.thread在包下创建线程类ThreadC
package concurrent.thread; import concurrent.service.MyServiceC; public class ThreadC extends Thread{ private MyServiceC myServiceC; public ThreadC(MyServiceC myServiceC) { this.myServiceC = myServiceC; } @Override public void run() { try { System.out.println(Thread.currentThread().getName()+" 取得的值"+myServiceC.queue.take()); } catch (InterruptedException e) { throw new RuntimeException(e); } } }
-
创建包concurrent.test在包下创建类Test9
package concurrent.test; import concurrent.service.MyServiceC; import concurrent.thread.ThreadC; public class Test9 { public static void main(String[] args) throws InterruptedException { MyServiceC myServiceC = new MyServiceC(); for (int i = 0; i < 10; i++) { ThreadC threadC = new ThreadC(myServiceC); threadC.setName("c"); threadC.start(); } Thread.sleep(1000); System.out.println("有没有线程正在等待数据"+myServiceC.queue.hasWaitingConsumer()); System.out.println(myServiceC.queue.getWaitingConsumerCount()+"个消费者线程正在等待数据"); } }
MyServiceC创建一个LinkedTransferQueue队列。
ThreadC通过构造方法给内部的MyServiceC赋值并在run方法中调用队列的take方法即启动消费者线程。
启动类Test9中实例化MyServiceC并将该实例传入ThreadC中。此时有10个等待获取数据的消费者线程返回true数量为10。
总结
-
Iterator接口是集合框架的父接口Collection接口继承了Iterator接口而接口List、Set、Queue继承了Collection接口而Deque接口继承了Queue接口。使用接口继承接口可以让继承的接口对被继承的接口进行扩展。
List接口的实现类中的元素是有序可重复的而Set接口的实现类中的元素是无序不可重复的。接口Queue支持对表头的操作而Deque对Queue进行扩展也支持对表尾进行操作。
java.util.Concurrent包下的类都是线程安全的。 -
HashMap是线程不安全的。
Hashtable是线程安全的在多线程环境下支持put操作但不支持remove操作。
类ConcurrentHashMap是JDK并发包中支持并发操作的Map对象在多线程环境下支持put操作和remove操作。
类ConcurrentHashMap不支持排序类LinkedHashMap支持key排序但不支持并发。 -
类ConcurrentSkipListMap并发安全且支持排序。
类ConcurrentSkipListSet支持并发安全和排序且不允许元素重复。
Map是以键值对key-value的形式进行存储而Set则直接存储value。
ConcurrentLinkedQueue的三个主要方法poll()、element()、peek()。
poll()方法没有获得数据时返回null获得数据时移除表头并将表头返回。
element()方法没有获得数据时出现NoSuchElementException异常获得数据时不移除表头并将表头进行返回。
peek()方法没有获得数据时返回null获得数据时不移除表头并将表头进行返回。
双向队列ConcurrentLinkedDeque在插入或者弹出操作时可以双向操作。
ArrayList为非线程安全的。在并发环境下可以使用CopyOnWriteArrayList。
CopyOnWriteArraySet是线程安全的无序集合。相当于线程安全的HashSet。 -
阻塞队列即如果BlockQueue为空时从该队列中取数据的操作将会被阻塞进入阻塞状态直到该BlockQueue不为空时才会被唤醒。同样在如果BlockQueue为满也就是没有空余空间时往队列中存放数据的操作也会被阻塞进入等待状态直到BlockingQueue有剩余空间时才会被唤醒。
简而言之就是阻塞队列为空时取数据的操作等待存放数据的操作后才被唤醒为满时存数据的操作等待取数据的操作后才被唤醒。
ArrayBlockingQueue的公平锁也就是按照排队的顺序来获取锁并调用take方法取值而非公平锁就是不是按照排队的顺序来获取锁并调用take方法取值即哪个线程都可以抢占锁并调用take方法取值。
PriorityBlockingQueue支持在并发情况下使用优先级队列。
LinkedBlockingQueue和ArrayBlockingQueue在功能上大体一样都是有界的都有阻塞特性。两者在使用上最明显的区别就是ArrayBlockingQueue比LinkedBlockingQueue运行效率快得多。
LinkedBlockingDeque提供双端节点的操作与LinkedBlockingQueue一样具有阻塞特性。 -
SynchronousQueue为同步队列它是“一种阻塞队列每个插入操作都必须等待另一个线程的对应移除操作完成。同步队列没有任何内部容量甚至连一个队列的容量都没有。同步队列上不能执行peek操作因为仅在试图要移除元素时该元素才存在除非另一个线程试图移除某个元素否则也不能使用任何方法插入元素。同步队列上也不能执行迭代操作因为其中没有元素可用于迭代。”
SynchronousQueue经常在多个线程之间传输数据时使用。
LinkedTransferQueue和SynchronousQueue类似但它可以尝试性地添加一些数据。
tryTransfer(E e)该队列的transfer(e)方法和上面阻塞队列的添加方法一样操作会存在阻塞。
tryTransfer(E e,long timeout,TimeUnit unit)在指定时间timeout时间单位unit之后还没有获取数据的消费者则返回false。
boolean hasWaitingConsumer()方法是判断有没有消费者在等待数据
int getWaitingConsumerCount()方法是等待数据的消费者线程的数量。