背景
做一个记录,在对python 自带的 multiprocessing.pool
下的 ThreadPool
及 Pool
实现原理学习后简单弄一下
1
| from multiprocessing.pool import ThreadPool,Pool
|
线程池核心
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155
| import threading import contextlib import time,random
try: import queue rest = queue.Queue() except: import Queue rest = Queue.Queue()
class ThreadPoolManager: def __init__(self, max_num, max_task_num=None): ''' :param max_num: 设置线程池最多可实例化的线程数 :param max_task_num: 设置最多任务数量 ''' self.result = {} self.StopEvent = object() if max_task_num: try: self.q = queue.Queue(max_task_num) except: self.q = Queue.Queue(max_task_num) else: try: self.q = queue.Queue() except: self.q = Queue.Queue()
self.max_num = max_num self.cancel = False self.terminal = False self.generate_list = [] self.free_list = [] self.done = []
def put(self, func, *args): ''' 往任务队列放入一个任务 :param func: :param args: :return: ''' if self.cancel: return if len(self.free_list) == 0 and len(self.generate_list) < self.max_num: self.generate_thread() w = (func, args[0],args[1:]) self.q.put(w)
def generate_thread(self): ''' 创建一个线程 :return: ''' t = threading.Thread(target=self.call) t.start()
def call(self): ''' 循环获取任务函数并执行任务, 正常情况下,每个线程保存生存状态,直至获取线程终止的flag :return: ''' current_thread = threading.currentThread().getName() self.generate_list.append(current_thread) event = self.q.get() while event != self.StopEvent: func, tags, arguments = event try: print "run taskid: %s"%(arguments[0]) result = func(arguments) except Exception as e: print("thread run error:task_id:%s,error message: %s"%(arguments[0],e)) result = None try: rest.put({tags:result}) except Exception as e: print(e) with self.worker_state(self.free_list, current_thread): if self.terminal: event = self.StopEvent else: event = self.q.get() else: self.generate_list.remove(current_thread)
def close(self): ''' 执行完所有任务后,让所有的线程都停止 :return: ''' self.cancel = True full_size = len(self.generate_list) while full_size: self.q.put(self.StopEvent) full_size -= 1
def terminate(self): ''' 在任务执行过程中,终止线程,提前退出 :return: ''' self.terminal = True while self.generate_list: self.q.put(self.StopEvent)
@contextlib.contextmanager def worker_state(self, start_list, worker_thread): ''' 用于记录空闲的线程,或从空闲列表中取出线处理任务 :param start_list: :param worker_thread: :return: ''' start_list.append(worker_thread) try: yield finally: start_list.remove(worker_thread)
def callback(self,tags=None): ''' 等待回收处理完成的数据 :return: ''' if tags in self.result: df = self.result.pop(tags) return df while self.q.qsize() != 0 or self.generate_list: df = rest.get() if tags in df: return df.pop(tags) self.result.update(df) return self.result
|
测试类
1 2 3 4 5 6 7 8 9 10
| class Reat: def __init__(self, a, b, c): self.a = a self.b = b self.c = c
def test_ceng(self,args): timesleep = int(random.random()*10) time.sleep(timesleep) return {"fdsa":"fdsaf"},["fdsa","fdsaf"]
|
测试函数
1 2 3 4 5
| def test_ceng2(id,d=[]): timesleep = int(random.random()*10) time.sleep(timesleep) return {"fdsa2":"fdsaf"}
|
测试
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| def Test_run(): pool = ThreadPoolManager(10) try: t1 = time.time() t = Reat(1, 3, 4) pool.put(t.test_ceng,"test_ceng996",['1','23','132','3213'],['1','23','132','3213']) pool.put(t.test_ceng, "test_ceng997",{"df":"hh"}) pool.put(t.test_ceng, "test_ceng998",{"df":"hh"}) print time.time() - t1 pool.close() finally: print pool.callback("test_ceng998") print pool.callback("test_ceng997") print pool.callback("test_ceng996")
if __name__ == "__main__": Test_run()
|