欢迎大家来到IT世界,在知识的湖畔探索吧!
1、什么是线程池
线程池的基本思想是一种对象池,在程序启动时就开辟一块内存空间,里面存放了众多(未死亡)的线程,池中线程执行调度由池管理器来处理。当有线程任务时,从池中取一个,执行完成后线程对象归池,这样可以避免反复创建线程对象所带来的性能开销,节省了系统的资源。
2、使用线程池的好处
减少了创建和销毁线程的次数,每个工作线程都可以被重复利用,可执行多个任务。
运用线程池能有效的控制线程最大并发数,可以根据系统的承受能力,调整线程池中工作线线程的数目,防止因为消耗过多的内存,而把服务器累趴下(每个线程需要大约1MB内存,线程开的越多,消耗的内存也就越大,最后死机)。
对线程进行一些简单的管理,比如:延时执行、定时循环执行的策略等,运用线程池都能进行很好的实现
3、线程池的主要组件
一个线程池包括以下四个基本组成部分:
1. 线程池管理器(ThreadPool):用于创建并管理线程池,包括 创建线程池,销毁线程池,添加新任务;
2. 工作线程(WorkThread):线程池中线程,在没有任务时处于等待状态,可以循环的执行任务;
3. 任务接口(Task):每个任务必须实现的接口,以供工作线程调度任务的执行,它主要规定了任务的入口,任务执行完后的收尾工作,任务的执行状态等;
4. 任务队列(taskQueue):用于存放没有处理的任务。提供一种缓冲机制。
4. Python多线程(进程)管理模块之ThreadPoolExecutor,ProcessPoolExecutor
在concurrent.future模块中有ThreadPoolExecutor和ProcessPoolExecutor两个类,这两个类内部维护着线程/进程池,以及要执行的任务队列,使得操作变得非常简单,不需要关心任何实现细节
来看一个简单的例子
“`python
#!/usr/bin/env python3.6
from concurrent.futures import ThreadPoolExecutor
import requests
import os
DEST_DIR = os.path.join(os.path.dirname(os.path.abspath(__file__)), “download”)
BASE_URL = “http://flupy.org/data/flags”
CC_LIST = (“CN”, “US”, “JP”, “EG”)
if not os.path.exists(DEST_DIR):
os.mkdir(DEST_DIR)
def get_img(cc):
url = “{}/{cc}/{cc}.gif”.format(BASE_URL, cc=cc.lower())
response = requests.get(url)
return response.content
def save_img(img, filename):
path = os.path.join(DEST_DIR, filename)
with open(path, ‘wb’) as f:
f.write(img)
def download_one(cc):
img = get_img(cc)
save_img(img, cc.lower() + “.gif”)
return cc
def download_many(cc_list):
works = len(cc_list)
with ThreadPoolExecutor(works) as exector: # 使用with来管理ThreadPoolExecutor
# map方法和内置的map方法类似,不过exector的map方法会并发调用,返回一个由返回的值构成的生成器
response = exector.map(download_one, cc_list)
return len(list(response))
if __name__ == “__main__”:
download_many(CC_LIST)
“`
Future
concurrent.futures和asyncio中的Future类的作用相同,****都表示可能己经完成或尚未完成的延迟计算****
Future封装待完成的操作,可以放入队列,完成的状态可以查询,得到结果后可以获取结果
使用exector.submit()方法提交执行的函数并获取一个Future,而不是直接创建,传入的参数是一个可调用的对象;获取的Future对象有一个done()方法,判断该Future是否己完成, add_one_callback()设置回调函数, result()来获取Future的结果。as_completed()传一个Future列表,在Future都完成之后返回一个迭代器
使用submit()方法试试看
“`python
def download_many(cc_list):
with ThreadPoolExecutor(max_workers=5) as exector:
future_list = []
for cc in cc_list:
# 使用submit提交执行的函数到线程池中,并返回futer对象(非阻塞)
future = exector.submit(download_one, cc)
future_list.append(future)
print(cc, future)
result = []
# as_completed方法传入一个Future迭代器,然后在Future对象运行结束之后yield Future
for future in futures.as_completed(future_list):
# 通过result()方法获取结果
res = future.result()
print(res, future)
result.append(res)
return len(result)
>>>
CN <Future at 0x7f80d32f5400 state=running>
US <Future at 0x7f80d330c320 state=running>
JP <Future at 0x7f80d330c8d0 state=running>
EG <Future at 0x7f80d330ce10 state=running>
JP <Future at 0x7f80d330c8d0 state=finished returned str>
CN <Future at 0x7f80d32f5400 state=finished returned str>
EG <Future at 0x7f80d330ce10 state=finished returned str>
US <Future at 0x7f80d330c320 state=finished returned str>
“`
ProcessPoolExecutor的使用方法是一样的,唯一需要注意的区别是传入的max_workers这个参数对于ProcessPoolExecutor是可选的,在不使用的情况下默认值是os.cpu_count()的返回值(cpu的数量)
exector.submit()和futures.as_completed()这个组合比exector.map()更灵活,submit()可以处理不同的调用函数和参数,而map只能处理同一个可调用对象。
wait()阻塞主线程,直到所有task都完成。
源码分析:
“`python
class ThreadPoolExecutor(_base.Executor):
# Used to assign unique thread names when thread_name_prefix is not supplied.
_counter = itertools.count().__next__
def __init__(self, max_workers=None, thread_name_prefix=”,
initializer=None, initargs=()):
“””Initializes a new ThreadPoolExecutor instance.
Args:
max_workers: The maximum number of threads that can be used to
execute the given calls.
thread_name_prefix: An optional name prefix to give our threads.
initializer: An callable used to initialize worker threads.
initargs: A tuple of arguments to pass to the initializer.
“””
if max_workers is None:
# Use this number because ThreadPoolExecutor is often
# used to overlap I/O instead of CPU work.
max_workers = (os.cpu_count() or 1) * 5
if max_workers <= 0:
raise ValueError(“max_workers must be greater than 0”)
if initializer is not None and not callable(initializer):
raise TypeError(“initializer must be a callable”)
self._max_workers = max_workers
self._work_queue = queue.SimpleQueue()
self._threads = set()
self._broken = False
self._shutdown = False
self._shutdown_lock = threading.Lock()
self._thread_name_prefix = (thread_name_prefix or
(“ThreadPoolExecutor-%d” % self._counter()))
self._initializer = initializer
self._initargs = initargs
def submit(self, fn, *args, **kwargs):
with self._shutdown_lock:
if self._broken:
raise BrokenThreadPool(self._broken)
if self._shutdown:
raise RuntimeError(‘cannot schedule new futures after shutdown’)
if _shutdown:
raise RuntimeError(‘cannot schedule new futures after ‘
‘interpreter shutdown’)
f = _base.Future()
w = _WorkItem(f, fn, args, kwargs)
self._work_queue.put(w)
self._adjust_thread_count()
return f
submit.__doc__ = _base.Executor.submit.__doc__
def _adjust_thread_count(self):
# When the executor gets lost, the weakref callback will wake up
# the worker threads.
def weakref_cb(_, q=self._work_queue):
q.put(None)
# TODO(bquinlan): Should avoid creating new threads if there are more
# idle threads than items in the work queue.
num_threads = len(self._threads)
if num_threads < self._max_workers:
thread_name = ‘%s_%d’ % (self._thread_name_prefix or self,
num_threads)
t = threading.Thread(name=thread_name, target=_worker,
args=(weakref.ref(self, weakref_cb),
self._work_queue,
self._initializer,
self._initargs))
t.daemon = True
t.start()
self._threads.add(t)
_threads_queues[t] = self._work_queue
def _initializer_failed(self):
with self._shutdown_lock:
self._broken = (‘A thread initializer failed, the thread pool ‘
‘is not usable anymore’)
# Drain work queue and mark pending futures failed
while True:
try:
work_item = self._work_queue.get_nowait()
except queue.Empty:
break
if work_item is not None:
work_item.future.set_exception(BrokenThreadPool(self._broken))
def shutdown(self, wait=True):
with self._shutdown_lock:
self._shutdown = True
self._work_queue.put(None)
if wait:
for t in self._threads:
t.join()
shutdown.__doc__ = _base.Executor.shutdown.__doc__
“`
这是ThreadPoolExecutor的全部代码,还是比较简单的,我们上面提到的线程池的具备的四个基本功能来分析:线程池管理器,工作线程,任务队列,任务接口来分析
线程池管理器: _threads_queues
工作线程:_worker
任务队列:_work_queue
任务接口:submit
submit(func) 干了两件事:
把任务(func)放入queue中
开启一个新线程不断从queue中取出任务,执行woker.run(),即执行func()
_adjust_thread_count()干了两件事:
开启一个新线程执行_worker函数,这个函数的作用就是不断去queue中取出worker, 执行woker.run(),即执行func()
把新线程跟队列queue绑定,防止线程被join(0)强制中断。
来看一下_worker函数源码:
“`python
def _worker(executor_reference, work_queue):
try:
while True:
# 不断从queue中取出worker对象
work_item = work_queue.get(block=True)
if work_item is not None:
# 执行func()
work_item.run()
# Delete references to object. See issue16284
del work_item
continue
# 从弱引用对象中返回executor
executor = executor_reference()
# Exit if:
# – The interpreter is shutting down OR
# – The executor that owns the worker has been collected OR
# – The executor that owns the worker has been shutdown.
# 当executor执行shutdown()方法时executor._shutdown为True,同时会放入None到队列,
# 当work_item.run()执行完毕时,又会进入到下一轮循环从queue中获取worker对象,但是
# 由于shutdown()放入了None到queue,因此取出的对象是None,从而判断这里的if条件分支,
# 发现executor._shutdown是True,又放入一个None到queue中,是来通知其他线程跳出while循环的
# shutdown()中的添加None到队列是用来结束线程池中的某一个线程的,这个if分支中的添加None
# 队列是用来通知其他线程中的某一个线程结束的,这样连锁反应使得所有线程执行完func中的逻辑后都会结束
if _shutdown or executor is None or executor._shutdown:
# Notice other workers
work_queue.put(None)
return
del executor
except BaseException:
_base.LOGGER.critical(‘Exception in worker’, exc_info=True)
“`
可以看出,这个 _worker方法的作用就是在新新线程中不断获得queue中的worker对象,执行worker.run()方法,执行完毕后通过放入None到queue队列的方式来通知其他线程结束。
再来看看_adjust_thread_count()方法中的_threads_queues[t] = self._work_queue这个操作是如何实现防止join(0)的操作强制停止正在执行的线程的。
需要注意的是,ThreadPoolExecutor创建的线程daemon=True,而ProcessPoolExecutor创建的进程daemon使用默认值。
免责声明:本站所有文章内容,图片,视频等均是来源于用户投稿和互联网及文摘转载整编而成,不代表本站观点,不承担相关法律责任。其著作权各归其原作者或其出版社所有。如发现本站有涉嫌抄袭侵权/违法违规的内容,侵犯到您的权益,请在线联系站长,一经查实,本站将立刻删除。 本文来自网络,若有侵权,请联系删除,如若转载,请注明出处:https://itzsg.com/14339.html