python dev常用模块-多线程
import threading
class MyThread(threading.Thread):
"""
自定义线程类,用于执行指定的函数并获取返回值。
"""
def __init__(self, func, args=(), kwargs={}):
super(MyThread, self).__init__()
self.func = func
self.args = args
self.kwargs = kwargs
self.result = None
def run(self):
"""
重写run方法,在线程中执行指定的函数。
捕获并打印任何异常,以防止线程因未处理的异常而崩溃。
"""
try:
self.result = self.func(*self.args, **self.kwargs)
except Exception as e:
print(f"Error in thread: {e}")
def get_result(self):
"""
获取线程执行函数的返回值。
"""
return self.result
def multiThreadExecutor(func, **args):
"""
使用多线程并行执行指定的函数。
接受一个函数 func 和一个包含列表参数以及其他参数的字典。
"""
threads = []
results = []
# 查找字典中的列表参数
list_args = {key: value for key, value in args.items() if isinstance(value, list)}
single_args = {key: value for key, value in args.items() if not isinstance(value, list)}
# 如果没有找到列表参数,则抛出错误
if not list_args:
raise ValueError("必须提供一个列表参数")
# 启动线程,每个列表参数的元素都与其他参数结合作为线程的输入
for key in list_args:
for item in list_args[key]:
# 为每个线程创建参数的副本
thread_kwargs = single_args.copy()
thread_kwargs[key] = item # 将当前列表项的值设置为线程的参数
# 创建线程
thread = MyThread(func, (), thread_kwargs)
threads.append(thread)
thread.start()
# 等待所有线程完成
for thread in threads:
thread.join()
# 收集所有线程的结果
for thread in threads:
result = thread.get_result()
if result is not None:
results.append(result) # 使用 append 而不是 extend
return results
def example_func(*args, **kwargs):
"""
示例函数,打印传入的位置参数和关键字参数,并返回一个字符串。
"""
print("Positional arguments:", args)
print("Keyword arguments:", kwargs)
return f"Result with args {args} and kwargs {kwargs}"
# 示例调用
# if __name__ == "__main__":
# # 定义要传递给example_func的参数
# args = {
# 'arg_list': [1, 2, 3], # 列表参数
# 'common_arg1': 'common_value1', # 常量参数
# 'common_arg2': 'common_value2' # 常量参数
# }
# results = multiThreadExecutor(example_func, **args)
# print("Collected results from threads:", results)
需要理解入参时候**args和args的区别
License:
CC BY 4.0