`
XinTeng2012
  • 浏览: 94717 次
社区版块
存档分类
最新评论

JAVA线程池例子

 
阅读更多
  • 用途及用法

网络请求通常有两种形式:第一种,请求不是很频繁,而且每次连接后会保持相当一段时间来读数据或者写数据,最后断开,如文件下载,网络流媒体等。另一种形式是请求频繁,但是连接上以后读/写很少量的数据就断开连接。考虑到服务的并发问题,如果每个请求来到以后服务都为它启动一个线程,那么这对服务的资源可能会造成很大的浪费,特别是第二种情况。因为通常情况下,创建线程是需要一定的耗时的,设这个时间为T1,而连接后读/写服务的时间为T2,当T1>>T2时,我们就应当考虑一种策略或者机制来控制,使得服务对于第二种请求方式也能在较低的功耗下完成。

通常,我们可以用线程池来解决这个问题,首先,在服务启动的时候,我们可以启动好几个线程,并用一个容器(如线程池)来管理这些线程。当请求到来时,可以从池中去一个线程出来,执行任务(通常是对请求的响应),当任务结束后,再将这个线程放入池中备用;如果请求到来而池中没有空闲的线程,该请求需要排队等候。最后,当服务关闭时销毁该池即可。

  • 结构

线程池中通常由这样几个概念(接口)组成:

  1. 线程池(Thread pool),池是一个容器,容器中有很多个执行器,每一个执行器是一个线程。当然,这个容器的实现,可以是链表,可以是数组等等,不需要关心,需要关心的是,池必须提供一个可以从中取出执行器的方法,可能还需要一个池中现有活动线程数方法,销毁池的方法等。
  2. 执行器(Executor),每个执行器是一个线程,每个执行器可以执行一个任务,任务是做什么,此时还不很明确,它需要提供任务的setter/getter方法,并且作为一个线程,他可以独立运行,执行器执行完自身后,需要将自身放入池中。
  3. 任务(Task),任务是每个线程具体要做的事,如资源下载,播放flash片段,打印一段文字到控制台等等,它本身不能执行,而需要将自身交给执行器。

整个池的机制和结构就是这样,当然,需要一个调度者(scheduler)来协调主线程和池的关系。结构,或者接口的目的是为了让我们从细节中解脱出来,从一个比较抽象的层次来描述系统,这样的好处是简单,而且设计出来的框架比较通用,可以适应很多相近相似的情况。由于Task具体干什么我们不知道,所以它几乎可以干任何适应于上边总结的网络连接的第二种情况(T1>>T2)。

  • 类的结构图

虽然为一个简单的实现设计一个标准的UML视图是不太现实的,但是这是一种受鼓励的做法,至少应该用铅笔在草纸上画出相关的视图,这样可以帮助以后的维护和更高级的扩展。

frame

  • 线程池的简单实现

实现可以是通过多种语言的,我们在此选择面向对象的JAVA,而如果你使用C的话,也没有问题,问题在上一小节已经描述清楚,语言是不重要的。

池是一个容器,我们考虑使用java.util.LinkedList类(可能由于它的长度是可变的,而且不需要我们使用者来考虑),也就是说,池需要维护一个链表。

Java代码复制代码
  1. publicinterfacePool{//池接口
  2. ExecutorgetExecutor();
  3. voiddestroy();
  4. }
  1. publicinterfacePool{//池接口
  2. ExecutorgetExecutor();
  3. voiddestroy();
  4. }

Java代码复制代码
  1. publicinterfaceExecutor{//执行器接口
  2. voidsetTask(Tasktask);
  3. TaskgetTask();
  4. voidstartTask();
  5. }
  1. publicinterfaceExecutor{//执行器接口
  2. voidsetTask(Tasktask);
  3. TaskgetTask();
  4. voidstartTask();
  5. }

鉴于执行器是池中的对象,而且外部没有必要知道其细节,我们考虑将Executor接口的实现做为Pool接口的实现的内部类。这样做的另一个好处是,更便于池的管理。

Java代码复制代码
  1. importjava.util.LinkedList;
  2. importjava.util.Properties;
  3. importredesigned.utils.PropReader;
  4. publicclassThreadPoolimplementsPool{
  5. privatebooleanisShut;
  6. privateLinkedListpool;
  7. privatestaticPropertiesprop=PropReader.getProperties("webconfig.properties");
  8. privateintsize=Integer.parseInt(prop.getProperty("threadsperpage","3"));
  9. publicThreadPool(){
  10. //readconfigurationandsetthe
  11. //contentofpoolbyobjectsofExecutor
  12. isShut=false;//setthestatusofpooltoactive
  13. pool=newLinkedList();
  14. for(inti=0;i<size;i++){
  15. Executorexecutor=newExecutorImpl();//newaexecutorthread
  16. pool.add(executor);//addittopool
  17. ((ExecutorImpl)executor).start();//startit
  18. }
  19. }
  20. publicvoiddestroy(){//Destroy
  21. synchronized(pool){
  22. isShut=true;//setthestatusofpooltoinactive
  23. pool.notifyAll();//notifyalllistener.
  24. pool.clear();//clearthelistofthreads
  25. }
  26. }
  27. publicExecutorgetExecutor(){
  28. Executorret=null;
  29. synchronized(pool){//returnifany.
  30. if(pool.size()>0){
  31. ret=(Executor)pool.removeFirst();
  32. }else{
  33. try{
  34. pool.wait();
  35. }catch(InterruptedExceptione){
  36. e.printStackTrace();
  37. }
  38. ret=(Executor)pool.removeFirst();
  39. }
  40. }
  41. returnret;
  42. }
  43. Executor接口的实现作为ThreadPool的内部类
  44. privateclassExecutorImplextendsThreadimplementsExecutor{
  45. privateTasktask;
  46. privateObjectlock=newObject();
  47. //privatebooleanloop=true;
  48. publicExecutorImpl(){}
  49. publicTaskgetTask(){
  50. returnthis.task;
  51. }
  52. publicvoidsetTask(Tasktask){
  53. this.task=task;
  54. }
  55. publicvoidstartTask(){
  56. //System.out.println("starthere");
  57. synchronized(lock){
  58. lock.notify();
  59. }
  60. }
  61. publicvoidrun(){
  62. //getataskifany
  63. //thenrunit
  64. //thenputselftopool
  65. while(!isShut){
  66. synchronized(lock){
  67. try{
  68. lock.wait();//waitforresource
  69. }catch(InterruptedExceptione){
  70. e.printStackTrace();
  71. }
  72. }
  73. getTask().execute();//executethetask
  74. synchronized(pool){//putitselftothepoolwhenfinishthetask
  75. pool.addFirst(ExecutorImpl.this);
  76. pool.notifyAll();
  77. }
  78. }
  79. }
  80. }
  81. }
  1. importjava.util.LinkedList;
  2. importjava.util.Properties;
  3. importredesigned.utils.PropReader;
  4. publicclassThreadPoolimplementsPool{
  5. privatebooleanisShut;
  6. privateLinkedListpool;
  7. privatestaticPropertiesprop=PropReader.getProperties("webconfig.properties");
  8. privateintsize=Integer.parseInt(prop.getProperty("threadsperpage","3"));
  9. publicThreadPool(){
  10. //readconfigurationandsetthe
  11. //contentofpoolbyobjectsofExecutor
  12. isShut=false;//setthestatusofpooltoactive
  13. pool=newLinkedList();
  14. for(inti=0;i<size;i++){
  15. Executorexecutor=newExecutorImpl();//newaexecutorthread
  16. pool.add(executor);//addittopool
  17. ((ExecutorImpl)executor).start();//startit
  18. }
  19. }
  20. publicvoiddestroy(){//Destroy
  21. synchronized(pool){
  22. isShut=true;//setthestatusofpooltoinactive
  23. pool.notifyAll();//notifyalllistener.
  24. pool.clear();//clearthelistofthreads
  25. }
  26. }
  27. publicExecutorgetExecutor(){
  28. Executorret=null;
  29. synchronized(pool){//returnifany.
  30. if(pool.size()>0){
  31. ret=(Executor)pool.removeFirst();
  32. }else{
  33. try{
  34. pool.wait();
  35. }catch(InterruptedExceptione){
  36. e.printStackTrace();
  37. }
  38. ret=(Executor)pool.removeFirst();
  39. }
  40. }
  41. returnret;
  42. }
  43. Executor接口的实现作为ThreadPool的内部类
  44. privateclassExecutorImplextendsThreadimplementsExecutor{
  45. privateTasktask;
  46. privateObjectlock=newObject();
  47. //privatebooleanloop=true;
  48. publicExecutorImpl(){}
  49. publicTaskgetTask(){
  50. returnthis.task;
  51. }
  52. publicvoidsetTask(Tasktask){
  53. this.task=task;
  54. }
  55. publicvoidstartTask(){
  56. //System.out.println("starthere");
  57. synchronized(lock){
  58. lock.notify();
  59. }
  60. }
  61. publicvoidrun(){
  62. //getataskifany
  63. //thenrunit
  64. //thenputselftopool
  65. while(!isShut){
  66. synchronized(lock){
  67. try{
  68. lock.wait();//waitforresource
  69. }catch(InterruptedExceptione){
  70. e.printStackTrace();
  71. }
  72. }
  73. getTask().execute();//executethetask
  74. synchronized(pool){//putitselftothepoolwhenfinishthetask
  75. pool.addFirst(ExecutorImpl.this);
  76. pool.notifyAll();
  77. }
  78. }
  79. }
  80. }
  81. }

好了,池设计好了,再来看看任务(Task)的接口和实现

Java代码复制代码
  1. publicinterfaceTask{//这个接口也比较简单,可以执行,可以取到执行结果
  2. voidexecute();
  3. byte[]getResult();
  4. }
  1. publicinterfaceTask{//这个接口也比较简单,可以执行,可以取到执行结果
  2. voidexecute();
  3. byte[]getResult();
  4. }

Task的实现可以是多种多样的,下边的例子是一个加载资源的Task.使用方式

Java代码复制代码
  1. Poolpool=newThreadPool();//newaThreadPool
  2. //loadresourcesoneachpage,andstart#softhread.
  3. for(inti=0;i<resourceList.size();i++){
  4. Executorexecutor=pool.getExecutor();//getExecutorformpool
  5. TaskresourceLoader=newResourceLoader((String)resourceList.get(i));
  6. executor.setTask(resourceLoader);//setthetasktoexecutor
  7. executor.startTask();//trytostarttheexecutor.
  8. }
  9. //waitwhilealltaskaredone,thedestroythepool.
  10. pool.destroy();
  1. Poolpool=newThreadPool();//newaThreadPool
  2. //loadresourcesoneachpage,andstart#softhread.
  3. for(inti=0;i<resourceList.size();i++){
  4. Executorexecutor=pool.getExecutor();//getExecutorformpool
  5. TaskresourceLoader=newResourceLoader((String)resourceList.get(i));
  6. executor.setTask(resourceLoader);//setthetasktoexecutor
  7. executor.startTask();//trytostarttheexecutor.
  8. }
  9. //waitwhilealltaskaredone,thedestroythepool.
  10. pool.destroy();

  • 优势,或者适用范围
  1. 在并发时,需要被并发的线程不需要知道自己什么时候需要被启动,它子需要考虑这样一种情况:它自己从一个地方取出来一个执行器,然后把任务交给执行器,然后等待执行器结束即可,他关心的是自己所需要干的事,或者自己负责的事。这样,大家都简单,因为只需要做好自己的事情就好了。面向对象的一个秘诀为:永远相信合作者,使用别人的接口而不是自己去实现所有的接口。
  2. 这种T1>>T2的请求方式在网络中固然是常见的,在实际问题中同样是常见的。因此,掌握这种模式可能会对我们以后的程序设计提供方便和好处。
  • 小结

同步问题:同步在线程的并发中意义非常之大,对临界资源的控制是并发时最关键的地方。如在线程池中,当池中没有空闲的线程时,新来的请求就必须等待,而一旦一个Task运行结束后,一方面将自己放入池中,一方面需要通知等待在pool中的其他线程。每一个执行器线程,一开始启动,则进入等待状态,此时不会消耗CPU资源。而当在外部调用执行器的startTask()方法,即可通知线程从等待状态中醒来,去出Task,执行之,将执行器本身放入池中,然后继续等待。

当然,实现的策略是可以多种多样的,但是问题的本质已经在第二小节结构很明确的被定义了。

分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics