一个基于thread和queue的线程池,以任务为队列元素,动态创建线程,重复利用线程,

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
# -*- coding: utf-8 -*-

"""
一个基于thread和queue的线程池,以任务为队列元素,动态创建线程,重复利用线程,
通过close和terminate方法关闭线程池。
"""
try:
import queue
except:
import Queue
import threading
import contextlib
import time


# 创建空对象,用于停止线程
StopEvent = object()
# 存储处理结果
StoreCallBack={}

def callback(status, result):
"""
根据需要进行的回调函数,默认不执行。
:param status: action函数的执行状态
:param result: action函数的返回值
:return:
"""
print result
StoreCallBack[result[u'task']]=result["result"]
return status,result


def action(thread_name, arg):
"""
真实的任务定义在这个函数里
:param thread_name: 执行该方法的线程名
:param arg: 该函数需要的参数
:return:
"""
# 模拟该函数执行了0.1秒
# time.sleep(0.1)
return {u"result":u"dsaf",u"task":arg}


class ThreadPool:

def __init__(self, max_num, max_task_num=None):
"""
初始化线程池
:param max_num: 线程池最大线程数量
:param max_task_num: 任务队列长度
"""
# 如果提供了最大任务数的参数,则将队列的最大元素个数设置为这个值。
if max_task_num:
try:
self.q = queue.Queue(max_task_num)
except:
self.q = Queue.Queue(max_task_num)
# 默认队列可接受无限多个的任务
else:
try:
self.q = queue.Queue()
except:
self.q = Queue.Queue()
# 设置线程池最多可实例化的线程数
self.max_num = max_num
# 任务取消标识
self.cancel = False
# 任务中断标识
self.terminal = False
# 已实例化的线程列表
self.generate_list = []
# 处于空闲状态的线程列表
self.free_list = []

def put(self, func, args, callback=None):
"""
往任务队列里放入一个任务
:param func: 任务函数
:param args: 任务函数所需参数
:param callback: 任务执行失败或成功后执行的回调函数,回调函数有两个参数
1、任务函数执行状态;2、任务函数返回值(默认为None,即:不执行回调函数)
:return: 如果线程池已经终止,则返回True否则None
"""
# 先判断标识,看看任务是否取消了
if self.cancel:
return
# 如果没有空闲的线程,并且已创建的线程的数量小于预定义的最大线程数,则创建新线程。
if len(self.free_list) == 0 and len(self.generate_list) < self.max_num:
self.generate_thread()
# 构造任务参数元组,分别是调用的函数,该函数的参数,回调函数。
w = (func, args, callback,)
# 将任务放入队列
self.q.put(w)

def generate_thread(self):
"""
创建一个线程
"""
# 每个线程都执行call方法
t = threading.Thread(target=self.call)
t.start()

def call(self):
"""
循环去获取任务函数并执行任务函数。在正常情况下,每个线程都保存生存状态, 直到获取线程终止的flag。
"""
# 获取当前线程的名字
current_thread = threading.currentThread().getName()
# 将当前线程的名字加入已实例化的线程列表中
self.generate_list.append(current_thread)
# 从任务队列中获取一个任务
event = self.q.get()
# 让获取的任务不是终止线程的标识对象时
while event != StopEvent:
# 解析任务中封装的三个参数
func, arguments, callback = event
# 抓取异常,防止线程因为异常退出
try:
# 正常执行任务函数
result = func(current_thread, *arguments)
success = True
except Exception as e:
# 当任务执行过程中弹出异常
result = None
success = False
# 如果有指定的回调函数
if callback is not None:
# 执行回调函数,并抓取异常
try:
callback(success, result)
except Exception as e:
pass
# 当某个线程正常执行完一个任务时,先执行worker_state方法
with self.worker_state(self.free_list, current_thread):
# 如果强制关闭线程的flag开启,则传入一个StopEvent元素
if self.terminal:
event = StopEvent
# 否则获取一个正常的任务,并回调worker_state方法的yield语句
else:
# 从这里开始又是一个正常的任务循环
event = self.q.get()
else:
# 一旦发现任务是个终止线程的标识元素,将线程从已创建线程列表中删除
self.generate_list.remove(current_thread)


def close(self):
"""
执行完所有的任务后,让所有线程都停止的方法
"""
# 设置flag
self.cancel = True
# 计算已创建线程列表中线程的个数,
# 然后往任务队列里推送相同数量的终止线程的标识元素
full_size = len(self.generate_list)
while full_size:
self.q.put(StopEvent)
full_size -= 1


def terminate(self):
"""
在任务执行过程中,终止线程,提前退出。
"""
self.terminal = True
# 强制性的停止线程
while self.generate_list:
self.q.put(StopEvent)

# 该装饰器用于上下文管理
@contextlib.contextmanager
def worker_state(self, state_list, worker_thread):
"""
用于记录空闲的线程,或从空闲列表中取出线程处理任务
"""
# 将当前线程,添加到空闲线程列表中
state_list.append(worker_thread)
# 捕获异常
try:
# 在此等待
yield
finally:
# 将线程从空闲列表中移除
state_list.remove(worker_thread)


class Rtet:
def __init__(self,a,b,c):
self.a = a
self.b = b
self.c = c

def ab(self,thread_name, arg):
print(thread_name)
return self.a+self.b

def ac(self,thread_name):
print(thread_name)
return self.c*self.a



def threadPoolCore():
pool = ThreadPool(5)
# 创建100个任务,让线程池进行处理
# for i in range(100):
t = Rtet(23, 12, 34)
pool.put(action, (u"test",), callback)
pool.put(action, (u"test2",), callback)
pool.put(action, (u"test3",), callback)
# pool.put(t.ab, (u"test2",), callback)
# pool.put(t.ac,(),callback)

# 等待一定时间,让线程执行任务
time.sleep(3)
print("-" * 50)
print(u"\033[32;0m任务停止之前线程池中有%s个线程,空闲的线程有%s个!\033[0m"
% (len(pool.generate_list), len(pool.free_list)))
# 正常关闭线程池
pool.close()
print(u"任务执行完毕,正常退出!")
return StoreCallBack

# 调用方式
# if __name__ == '__main__':
# # 创建一个最多包含5个线程的线程池
# pool = ThreadPool(5)
# # 创建100个任务,让线程池进行处理
# for i in range(100):
# pool.put(action, (i,), callback)
# # 等待一定时间,让线程执行任务
# time.sleep(3)
# print("-" * 50)
# print("\033[32;0m任务停止之前线程池中有%s个线程,空闲的线程有%s个!\033[0m"
# % (len(pool.generate_list), len(pool.free_list)))
# # 正常关闭线程池
# pool.close()
# print("任务执行完毕,正常退出!")
# 强制关闭线程池
# pool.terminate()
# print("强制停止任务!")
threadPoolCore()

#http://www.langzi.fun/Python%20threading%20%E5%A4%9A%E7%BA%BF%E7%A8%8B%E6%A8%A1%E5%9D%97.html
#https://zhuanlan.zhihu.com/p/353368824
#https://zhuanlan.zhihu.com/p/352935072
#https://www.zhihu.com/people/lan-yan-xiao-cang-shu/posts
#https://zhuanlan.zhihu.com/p/353370031