背景

做一个记录,在对python 自带的 multiprocessing.pool 下的 ThreadPoolPool实现原理学习后简单弄一下

1
from  multiprocessing.pool  import ThreadPool,Pool

线程池核心

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
# encoding: utf-8
import threading
import contextlib
import time,random

try:
import queue #兼容python3
rest = queue.Queue()
except:
import Queue #兼容python2
rest = Queue.Queue()

class ThreadPoolManager:
def __init__(self, max_num, max_task_num=None):
'''
:param max_num: 设置线程池最多可实例化的线程数
:param max_task_num: 设置最多任务数量
'''
# 创建字典,用于存储回调函数处理结果
self.result = {}
# 创建空对象,用于停止线程
self.StopEvent = object()
if max_task_num:
try:
self.q = queue.Queue(max_task_num)
except:
self.q = Queue.Queue(max_task_num) # 对于python2的支持
else:
try:
self.q = queue.Queue()
except:
self.q = Queue.Queue() # 对于python2的支持

self.max_num = max_num
# 任务取消标识
self.cancel = False
# 任务中断标识
self.terminal = False
# 已实例化的线程列表
self.generate_list = []
# 处于空闲状态的线程列表
self.free_list = []
# 已完成的任务id
self.done = []

def put(self, func, *args):
'''
往任务队列放入一个任务
:param func:
:param args:
:return:
'''
# 判断任务是否被取消
if self.cancel:
return
# 如果没有空闲的线程,并且已创建的线程数量小于预定义的最大线程数,则创建新的线程
if len(self.free_list) == 0 and len(self.generate_list) < self.max_num:
self.generate_thread()
w = (func, args[0],args[1:])
self.q.put(w)

def generate_thread(self):
'''
创建一个线程
:return:
'''
t = threading.Thread(target=self.call)
t.start()


def call(self):
'''
循环获取任务函数并执行任务,
正常情况下,每个线程保存生存状态,直至获取线程终止的flag
:return:
'''
# 获取当前线程的名称
current_thread = threading.currentThread().getName()
# 将当前线程的名称加入已实例化的线程列表中
self.generate_list.append(current_thread)
# 从任务队列中获取一个任务
event = self.q.get()
while event != self.StopEvent:
# 解析任务中封装的3个参数
func, tags, arguments = event
try:
print "run taskid: %s"%(arguments[0])
result = func(arguments)
except Exception as e:
print("thread run error:task_id:%s,error message: %s"%(arguments[0],e))
result = None
try:
rest.put({tags:result})
except Exception as e:
print(e)
# 当某个线程正常执行完一个任务时,先执行work_state()
with self.worker_state(self.free_list, current_thread):
# 强制终止线程
if self.terminal:
event = self.StopEvent
else:
# 从这里开始又一个正常的任务循环
event = self.q.get()
else:
self.generate_list.remove(current_thread)


def close(self):
'''
执行完所有任务后,让所有的线程都停止
:return:
'''
self.cancel = True
full_size = len(self.generate_list)
while full_size:
self.q.put(self.StopEvent)
full_size -= 1

def terminate(self):
'''
在任务执行过程中,终止线程,提前退出
:return:
'''
self.terminal = True
while self.generate_list:
self.q.put(self.StopEvent)

@contextlib.contextmanager
def worker_state(self, start_list, worker_thread):
'''
用于记录空闲的线程,或从空闲列表中取出线处理任务
:param start_list:
:param worker_thread:
:return:
'''
start_list.append(worker_thread)
try:
yield
finally:
start_list.remove(worker_thread)

def callback(self,tags=None):
'''
等待回收处理完成的数据
:return:
'''
if tags in self.result:
df = self.result.pop(tags)
return df
while self.q.qsize() != 0 or self.generate_list:
df = rest.get()
if tags in df:
return df.pop(tags)
self.result.update(df)
return self.result

测试类

1
2
3
4
5
6
7
8
9
10
class Reat:
def __init__(self, a, b, c):
self.a = a
self.b = b
self.c = c

def test_ceng(self,args):
timesleep = int(random.random()*10)
time.sleep(timesleep)
return {"fdsa":"fdsaf"},["fdsa","fdsaf"]

测试函数

1
2
3
4
5
def test_ceng2(id,d=[]):
timesleep = int(random.random()*10)
time.sleep(timesleep)
return {"fdsa2":"fdsaf"}

测试

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
def Test_run():
pool = ThreadPoolManager(10)
try:
t1 = time.time()
t = Reat(1, 3, 4)
pool.put(t.test_ceng,"test_ceng996",['1','23','132','3213'],['1','23','132','3213'])
pool.put(t.test_ceng, "test_ceng997",{"df":"hh"})
pool.put(t.test_ceng, "test_ceng998",{"df":"hh"})
print time.time() - t1
pool.close()
finally:
# pool.callback()
print pool.callback("test_ceng998")
print pool.callback("test_ceng997")
print pool.callback("test_ceng996")
# print pool.callback()

if __name__ == "__main__":
Test_run()