博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
[python] 线程池
阅读量:7174 次
发布时间:2019-06-29

本文共 9694 字,大约阅读时间需要 32 分钟。

特别感谢simomo

什么是线程池?

诸如web服务器、服务器、文件服务器和邮件服务器等许多服务器应用都面向处理来自某些远程来源的大量短小的任务。构建服务器应用程序的一个过于简单的模型是:每当一个请求到达就创建一个新的服务对象,然后在新的服务对象中为请求服务。但当有大量请求并发访问时,服务器不断的创建和销毁对象的开销很大。所以提高服务器效率的一个手段就是尽可能减少创建和销毁对象的次数,特别是一些很耗资源的对象创建和销毁,这样就引入了“池”的概念,“池”的概念使得人们可以定制一定量的资源,然后对这些资源进行复用,而不是频繁的创建和销毁。

线程池是预先创建线程的一种技术。线程池在还没有任务到来之前,创建一定数量的线程,放入空闲队列中。这些线程都是处于睡眠状态,即均为启动,不消耗CPU,而只是占用较小的内存空间。当请求到来之后,缓冲池给这次请求分配一个空闲线程,把请求传入此线程中运行,进行处理。当预先创建的线程都处于运行状态,即预制线程不够,线程池可以自由创建一定数量的新线程,用于处理更多的请求。当系统比较闲的时候,也可以通过移除一部分一直处于停用状态的线程。

线程池的注意事项

虽然线程池是构建多线程应用程序的强大机制,但使用它并不是没有风险的。在使用线程池时需注意线程池大小与性能的关系,注意并发风险、死锁、资源不足和线程泄漏等问题。

(1)线程池大小。多线程应用并非线程越多越好,需要根据系统运行的软硬件环境以及应用本身的特点决定线程池的大小。一般来说,如果代码结构合理的话,线程数目与CPU 数量相适合即可。如果线程运行时可能出现阻塞现象,可相应增加池的大小;如有必要可采用自适应来动态调整线程池的大小,以提高CPU 的有效利用率和系统的整体性能。

(2)并发错误。多线程应用要特别注意并发错误,要从逻辑上保证程序的正确性,注意避免死锁现象的发生。

(3)线程泄漏。这是线程池应用中一个严重的问题,当任务执行完毕而线程没能返回池中就会发生线程泄漏现象。

简单线程池的设计

一个典型的线程池,应该包括如下几个部分:

1、线程池管理器(ThreadPool),用于启动、停用,管理线程池
2、工作线程(WorkThread),线程池中的线程
3、请求接口(WorkRequest),创建请求对象,以供工作线程调度任务的执行
4、请求队列(RequestQueue),用于存放和提取请求
5、结果队列(ResultQueue),用于存储请求执行后返回的结果

线程池管理器,通过添加请求的方法(putRequest)向请求队列(RequestQueue)添加请求,这些请求事先需要实现请求接口,即传递工作函数、参数、结果处理函数、以及异常处理函数。之后初始化一定数量的工作线程,这些线程通过轮询的方式不断查看请求队列(RequestQueue),只要有请求存在,则会提取出请求,进行执行。然后,线程池管理器调用方法(poll)查看结果队列(resultQueue)是否有值,如果有值,则取出,调用结果处理函数执行。通过以上讲述,不难发现,这个系统的核心资源在于请求队列和结果队列,工作线程通过轮询requestQueue获得人物,主线程通过查看结果队列,获得执行结果。因此,对这个队列的设计,要实现线程同步,以及一定阻塞和超时机制的设计,以防止因为不断轮询而导致的过多cpu开销。在本文中,将会用python语言实现,python的Queue,就是很好的实现了对线程同步机制。

使用Python实现:

1 #-*-encoding:utf-8-*-  2 '''  3 Created on 2012-3-9  4 @summary: 线程池  5 @contact: mailto:zhanglixinseu@gmail.com  6 @author: zhanglixin  7 '''  8 import sys  9 import threading 10 import Queue 11 import traceback 12  13 # 定义一些Exception,用于自定义异常处理 14  15 class NoResultsPending(Exception): 16     """All works requests have been processed""" 17     pass 18  19 class NoWorkersAvailable(Exception): 20     """No worket threads available to process remaining requests.""" 21     pass 22  23 def _handle_thread_exception(request, exc_info): 24     """默认的异常处理函数,只是简单的打印""" 25     traceback.print_exception(*exc_info) 26  27 #classes  28  29 class WorkerThread(threading.Thread): 30     """后台线程,真正的工作线程,从请求队列(requestQueue)中获取work, 31     并将执行后的结果添加到结果队列(resultQueue)""" 32     def __init__(self,requestQueue,resultQueue,poll_timeout=5,**kwds): 33         threading.Thread.__init__(self,**kwds) 34         '''设置为守护进行''' 35         self.setDaemon(True) 36         self._requestQueue = requestQueue 37         self._resultQueue = resultQueue 38         self._poll_timeout = poll_timeout 39         '''设置一个flag信号,用来表示该线程是否还被dismiss,默认为false''' 40         self._dismissed = threading.Event() 41         self.start() 42          43     def run(self): 44         '''每个线程尽可能多的执行work,所以采用loop, 45         只要线程可用,并且requestQueue有work未完成,则一直loop''' 46         while True: 47             if self._dismissed.is_set(): 48                 break 49             try: 50                 ''' 51                 Queue.Queue队列设置了线程同步策略,并且可以设置timeout。 52                 一直block,直到requestQueue有值,或者超时 53                 ''' 54                 request = self._requestQueue.get(True,self._poll_timeout) 55             except Queue.Empty: 56                 continue 57             else: 58                 '''之所以在这里再次判断dimissed,是因为之前的timeout时间里,很有可能,该线程被dismiss掉了''' 59                 if self._dismissed.is_set(): 60                     self._requestQueue.put(request) 61                     break 62                 try: 63                     '''执行callable,讲请求和结果以tuple的方式放入requestQueue''' 64                     result = request.callable(*request.args,**request.kwds) 65                     print self.getName() 66                     self._resultQueue.put((request,result)) 67                 except: 68                     '''异常处理''' 69                     request.exception = True 70                     self._resultQueue.put((request,sys.exc_info())) 71      72     def dismiss(self): 73         '''设置一个标志,表示完成当前work之后,退出''' 74         self._dismissed.set() 75  76  77 class WorkRequest: 78     ''' 79     @param callable_:,可定制的,执行work的函数 80     @param args: 列表参数 81     @param kwds: 字典参数 82     @param requestID: id 83     @param callback: 可定制的,处理resultQueue队列元素的函数 84     @param exc_callback:可定制的,处理异常的函数  85     ''' 86     def __init__(self,callable_,args=None,kwds=None,requestID=None, 87                  callback=None,exc_callback=_handle_thread_exception): 88         if requestID == None: 89             self.requestID = id(self) 90         else: 91             try: 92                 self.requestID = hash(requestID) 93             except TypeError: 94                 raise TypeError("requestId must be hashable")     95         self.exception = False 96         self.callback = callback 97         self.exc_callback = exc_callback 98         self.callable = callable_ 99         self.args = args or []100         self.kwds = kwds or {}101         102     def __str__(self):103         return "WorkRequest id=%s args=%r kwargs=%r exception=%s" % \104             (self.requestID,self.args,self.kwds,self.exception)105             106 class ThreadPool:107     '''108     @param num_workers:初始化的线程数量109     @param q_size,resq_size: requestQueue和result队列的初始大小110     @param poll_timeout: 设置工作线程WorkerThread的timeout,也就是等待requestQueue的timeout111     '''112     def __init__(self,num_workers,q_size=0,resq_size=0,poll_timeout=5):113         self._requestQueue = Queue.Queue(q_size)114         self._resultQueue = Queue.Queue(resq_size)115         self.workers = []116         self.dismissedWorkers = []117         self.workRequests = {} #设置个字典,方便使用118         self.createWorkers(num_workers,poll_timeout)119 120     def createWorkers(self,num_workers,poll_timeout=5):121         '''创建num_workers个WorkThread,默认timeout为5'''122         for i in range(num_workers):123             self.workers.append(WorkerThread(self._requestQueue,self._resultQueue,poll_timeout=poll_timeout))                           124     125     def dismissWorkers(self,num_workers,do_join=False):126         '''停用num_workers数量的线程,并加入dismiss_list'''127         dismiss_list = []128         for i in range(min(num_workers,len(self.workers))):129             worker = self.workers.pop()130             worker.dismiss()131             dismiss_list.append(worker)132         if do_join :133             for worker in dismiss_list:134                 worker.join()135         else:136             self.dismissedWorkers.extend(dismiss_list)137     138     def joinAllDismissedWorkers(self):139         '''join 所有停用的thread'''140         #print len(self.dismissedWorkers)141         for worker in self.dismissedWorkers:142             worker.join()143         self.dismissedWorkers = []144     145     def putRequest(self,request ,block=True,timeout=None):146         assert isinstance(request,WorkRequest)147         assert not getattr(request,'exception',None)148         '''当queue满了,也就是容量达到了前面设定的q_size,它将一直阻塞,直到有空余位置,或是timeout'''149         self._requestQueue.put(request, block, timeout)150         self.workRequests[request.requestID] = request151         152     def poll(self,block = False):153         while True:154             if not self.workRequests:155                 raise NoResultsPending156             elif block and not self.workers:157                 raise NoWorkersAvailable158             try:159                 '''默认只要resultQueue有值,则取出,否则一直block'''160                 request , result = self._resultQueue.get(block=block)161                 if request.exception and request.exc_callback:162                     request.exc_callback(request,result)163                 if request.callback and not (request.exception and request.exc_callback):164                     request.callback(request,result)165                 del self.workRequests[request.requestID]166             except Queue.Empty:167                 break168     169     def wait(self):170         while True:171             try:172                 self.poll(True)173             except NoResultsPending:174                 break175     176     def workersize(self):177         return len(self.workers)178     179     def stop(self):180         '''join 所有的thread,确保所有的线程都执行完毕'''181         self.dismissWorkers(self.workersize(),True)182         self.joinAllDismissedWorkers()183 184 if __name__=='__main__':185     import random186     import time187     import datetime188     def do_work(data):189         time.sleep(random.randint(1,3))190         res = str(datetime.datetime.now()) + "" +str(data)191         return res192     193     def print_result(request,result):194         print "---Result from request %s : %r" % (request.requestID,result)195     196     main = ThreadPool(3)197     for i in range(40):198         req = WorkRequest(do_work,args=[i],kwds={},callback=print_result)199         main.putRequest(req)200         print "work request #%s added." % req.requestID201     202     print '-'*20, main.workersize(),'-'*20203     204     counter = 0205     while True:206         try:207             time.sleep(0.5)208             main.poll()209             if(counter==5):210                 print "Add 3 more workers threads"211                 main.createWorkers(3)212                 print '-'*20, main.workersize(),'-'*20213             if(counter==10):214                 print "dismiss 2 workers threads"215                 main.dismissWorkers(2)216                 print '-'*20, main.workersize(),'-'*20217             counter+=1218         except NoResultsPending:219             print "no pending results"220             break221     222     main.stop()223     print "Stop"

参考:http://blog.csdn.net/pi9nc/article/details/17056961

转载于:https://www.cnblogs.com/xiaofeiIDO/p/6187709.html

你可能感兴趣的文章
熟悉dstat命令用法
查看>>
kali linux字体渲染和infinality安装配置
查看>>
高效出去List集合和数组中的重复元素
查看>>
我的友情链接
查看>>
grep常见选项及命令
查看>>
nodejs终端的坑--新手向
查看>>
pycharm修改默认__author__ = '$USER'
查看>>
DNS服务的原理与配置(详细图文教程)
查看>>
threejs 之box贴图--6个 面集中在一个图片上
查看>>
Exchange Server 2010安装测试
查看>>
设计模式第九课 策略模式
查看>>
给定两个排序后的数组A和B,其中A的末端有足够的缓冲空间容纳B。将B合并入A并排序...
查看>>
FFmpeg avcodec_version函数使用
查看>>
一步步手动构建小于10M的类嵌入式Linux系统
查看>>
saltstack细节要点
查看>>
端口回流与dns-map与域内NAT
查看>>
linux 中特殊符号用法详解
查看>>
转载自马哥视频学习笔记---awk的基本用法说明
查看>>
Unity3D加密流程文档
查看>>
Java多线程同步的五种方法
查看>>