全国免费咨询:

13245491521

VR图标白色 VR图标黑色
X

中高端软件定制开发服务商

与我们取得联系

13245491521     13245491521

2024-02-06_解谜 Dart VM中的线程池:并发编程艺术的详细分析

您的位置:首页 >> 新闻 >> 行业资讯

解谜 Dart VM中的线程池:并发编程艺术的详细分析 点击关注公众号,”技术干货”及时达!准备在这个地址下载源码(https://chromium.googlesource.com/external/github.com/dart-lang/sdk/+/refs/heads/stable/runtime)VSCode 安装好「C/C++ for Visual Studio Code」 插件了解 Dart Isolate 的基础使用打开 vm/thread_pool.h 与 vm/thread_pool.cc 两个文件。 开始众所周知如需要在 Dart 开发中要使用「多线程」的能力那肯定离不开 Isolate ,与传统多线程的概念不同 Dart 语言下的 Isolate 之间进行数据同步与通信时需要借助「消息机制」。Isolate 在设计理念上淡化了线程的概念却更接近进程,在使用上与进程间通信类似但又比进程间通信更简单。实际上 Isolate 在 Runtime 底层实现依然还是使用「多线程」的能力,它通过包装与抽象多线程让 Dart 代码可以直接使用 Isolate 来替代多线程,其中「线程池(以下用 ThreadPool 替代)」是这层抽象的基础。根据官方的定义: 「在使用 Isolate 时能保证任意线程不会同时进入多个 Isolate 且同一个 Isolate 不会被多个线程同时运行」 Dart Runtime 中的 ThreadPool 不仅用来承载线程(Worker)还承载了任务(Task),Worker 代表「消费者」来消费当前的 Task(引用 MessageHandler),这两个类型是 ThreadPool 的内置类型本文后面会详细介绍。 ?如果你看过本专栏第一篇内容应该对此有大致印象,如果没有也没有关系,本文内容相对独不影响对 ThreadPool 的理解。 ?看源码之前切换到上帝视角来看 ThreadPool 的基础作用: C++ Isolate 类型与 Dart 中的 Isolate 类型对应 每个 C++ Isolate 关联了一个 MessageHandler 对象,MessageHandler 保存了当前 Isolate 的所有 Message Message 包含 Dart 代码传进来的基础数据与 port_id,Message 可来自于当前 Isolate 或另一个 Isolate,Message 被消费时会将数据再回调给 Dart 代码(另一个 Dart Isolate 回调) Isolate 负责触发 ThreadPool 对 Task (Task 会引用 MessageHandler)的创建流程,创建好的 Task 会保存到 ThreadPool 自身的队列中等待自身 Worker 来消费 每个 Worker 会保证消费掉一个 Task 内的所有 Message 后再进入另一个 Task 多个 Isolate 共享同一个 ThreadPool 锁?本小节为 C++ 基础,C++ 大佬可略过 ?在源码中总是能看到类似下面的代码,函数作用域内又定义了一个内部作用域,从语法层面上看这个内部作用域似乎没有什么意义,有没有它也不影响代码的执行逻辑。那它有什么作用呢?不卖关子,这里其实是 C++ 多线程下「锁」的巧妙用法。 boolThreadPool::RunImpl(std::unique_ptrTasktask){ Worker*new_worker=nullptr; {//声明内部作用域 MonitorLockerml(&pool_monitor_);//定义变量 if(shutting_down_){ returnfalse; } new_worker=ScheduleTaskLocked(std::move(task)); }//作用域结束 if(new_worker!=nullptr){ new_worker-StartThread(); } returntrue; } 一般情况下我们在多线程下使用锁时都是直接在临界资源访问前后直接调用加解锁的代码,伪代码示例如下: int globalCount = 1; void threadEntry() { // 直接加锁 thread_lock(); globalCount++; // 直接解锁 thread_unlock(); } 这种写法很直观,访问临界资源前加锁占用当前资源,防止其它线程再访问当前资源,访问结束后再解锁释放当前资源。注意这里的加解锁一定得是一个对称操作,有加锁的代码就一定要对称出现解锁代码否则问题很严重。但如果要保护的临界资源较长(加解锁保护的代码较长)或者相同的加解锁代码太多又或者有提前 return 边界条件,如何防止解锁代码漏写就比较麻烦了。 C++ 类定义时有构造函数与析构函数,当 new 出一个对象时构造函数会被调用,delete 释放对象时析构函数会被调用,所以当类的临时变量超过作用域它的析构函数会被调用。正是利用 C++ 的这个特点可以通过添加作用域的方式,来保护临界资源。来一个简单示例: #includeiostream #includemutex //简化版的互斥锁类 classMyMutex{ public: MyMutex(std::mutexmtx):mutexRef(mtx){ mutexRef.lock(); } ~MyMutex(){ mutexRef.unlock(); } private: std::mutexmutexRef; }; intglobalCount=1; intmain(){ //一个互斥锁 std::mutex_mutex; //在作用域中创建MyMutex对象,构造函数加锁,离开作用域时析构函数解锁 { MyMutexmyMutex(_mutex); globalCount++; } //MyMutex对象离开作用域后,互斥锁已被解锁 return0; } 回到 ThreadPool 中的锁类型,ThreadPool 中有涉及到了 Runtime 中两种锁类型 Monitor 与 MonitorLocker。从 Monitor 的构造函数与析构函数可以看出,Monitor 正是利用了析构特性实现了超出作用域自动解锁的能力,是对条件锁(也称条件变量)的一层包装。同时 Monitor 也用来屏蔽不同 OS 的锁实现,下面的代码正是在 MacOS/iOS 实现(os_thread_macos.cc)。 ?条件变量扩展阅读:pthread 条件变量(https://www.cnblogs.com/sinkinben/p/14087320.html) ?//构造函数 Monitor::Monitor(){ pthread_mutexattr_tattr; intresult=pthread_mutexattr_init( result=pthread_mutex_init(data_.mutex(), result=pthread_mutexattr_destroy( result=pthread_cond_init(data_.cond(),nullptr); } //析构函数 Monitor::~Monitor(){ intresult=pthread_mutex_destroy(data_.mutex()); result=pthread_cond_destroy(data_.cond()); } 而 MonitorLocker 则更为直接,从类定义来看它仅仅是对 Monitor 进行了二次包装。知识点:在 Dart Runtime 中并不直接使用 Monitor 进行锁操作,而是使用 Monitor 的封装类 MonitorLocker 进行锁操作。 classMonitorLocker{ public: explicitMonitorLocker(Monitor*monitor):monitor_(monitor){ monitor_-Enter(); } virtual~MonitorLocker(){monitor_-Exit();} Monitor::WaitResultWait(int64_tmillis=Monitor::kNoTimeout){ returnmonitor_-Wait(millis); } voidNotify(){monitor_-Notify();} voidNotifyAll(){monitor_-NotifyAll();} private: //对不同操作系统条件变量的封装 Monitor*constmonitor_; } 上面对 ThreadPool 中涉及到的锁进行了介绍,相信看到类似的代码后不会再感到困惑。 两个类型本文开头提到了线程池模型中的生产者(Task)与消费者(Worker),整个 ThreadPool 都是围绕这两个类型的队列进行逻辑处理,本小节将重点介绍这两个类型。 TaskTask 被定义在 ThreadPool 中,是 ThreadPool 的内置类型,同时通过搜索整个 Runtime 仓库可知 Task 也是一个基类,它派生出了不同的子类,每个子类都代表某种任务。 基类 Task 内只有一个函数 Run, 同时它的构造函数被 protected 修饰,说明它及它的子类都只能在 ThreadPool 及 ThreadPool 子类中实例化。实际上在 Task 类定义的下方就有 Task 的实例化逻辑,只不过它是用模板(C++ 中的模板相当于泛型)实现。 classThreadPool{ public: //基类Task的定义,继承自IntrusiveDListEntry说明它的子类可以进行列队操作 classTask:publicIntrusiveDListEntryTask{ protected: Task(){} public: virtual~Task(){} //虚函数,由子负责类实现 virtualvoidRun()=0; //模板(泛型)函数,负责实例化Task子类 templatetypenameT,typename...Args boolRun(Args&args){ returnRunImpl(std::unique_ptrTask(newT(std::forwardArgs(args)...))); } private: usingTaskList=IntrusiveDListTask TaskListtasks_; } Task 子类实例化后便调用了 ThreadPool::RunImpl 方法,MessageHandler 正是由此触发了 Task 的创建流程。Task 子类众多,这里我们暂时只关注 MessageHandler 相关的部分,也就是 MessageHandlerTask,可以看看它的实现。 WorkerWorker 字面意思有「工具人」的味道,从类定义来看它持有当前线程(os_thread_ 成员变量),且有一个 Main 静态函数,说明这个 Main 是线程的入口函数。 classThreadPool{ public: private: classWorker:publicIntrusiveDListEntryWorker{ public: explicitWorker(ThreadPool*pool); voidStartThread(); private: friendclassThreadPool; //线程创建后的入口函数 staticvoidMain(uwordargs); ThreadPool*pool_; ThreadJoinIdjoin_id_; //持有当前函数 OSThread*os_thread_=nullptr; usingWorkerList=IntrusiveDListWorker //不同状态的Worker队列 WorkerListrunning_workers_; WorkerListidle_workers_; WorkerListdead_workers_; } 通过 Worker 的实现可知, Main 函数在 ThreadPool::Worker::StartThread 方法中被传入操作系统线程开始执行。 voidThreadPool::Worker::StartThread(){ intresult=OSThread::Start("DartWorker",&Worker::Main, reinterpret_castuword(this)); //省略result判断 } 依然以 MacOS/iOS 系统平台代码为例,线程创建的代码如下所示(对源码略有简化)。从源码来看,线程的创建并没有任何特殊处理。 intOSThread::Start(constchar*name, ThreadStartFunctionfunction, uwordparameter){ //...省略其它代码 //ThreadPool::Worker::Main函数与参数保存在data对象中 ThreadStartData*data=newThreadStartData(name,function,parameter); pthread_t //创建线程 result=pthread_create(&tid,&attr,ThreadStart,data); //...省略其它代码 return0; } //Worker优先级全局常量,默认值为:kMinInt intFLAG_worker_thread_priority=Flags::Register_int(&FLAG_worker_thread_priority,"worker_thread_priority",kMinInt,"ThethreadprioritytheVMshouldusefornewworkerthreads."); staticvoid*ThreadStart(void*data_ptr){ //如果优先级不为kMinInt时则设置线程优选级 if(FLAG_worker_thread_priority!=kMinInt){ //这里的FLAG_worker_thread_priority全局变量默认值为kMinInt //所以优选级不会永远不会被设置 constpthread_tthread=pthread_self(); intpolicy=SCHED_FIFO; structsched_paramschedule; pthread_getschedparam(thread,&policy,&schedule); schedule.sched_priority=FLAG_worker_thread_priority; pthread_setschedparam(thread,policy,&schedule); } //取出ThreadPool::Worker内的静态Main函数与参数 OSThread::ThreadStartFunctionfunction=data-function(); uwordparameter=data-parameter(); //调用ThreadPool::Worker::Main函数 function(parameter); returnnullptr; } 所有线程创建后均使用默认优先级(pthread_create 创建的线程默认优先级为 0),说明 Dart 还没有针对 Apple M 系列的芯片做针对性的性能优化,这可能会使 Dart 在计算密集型的场景处于不利位置。因为根据少数派这篇文章所述,Apple M 系列芯片使用大小核架构(大核:性能核心简称 P 核,小核:效能核心简称 E 核),优先级低的线程只会分配到 E 核心上,只有当 E 核心分配满了才会分配 P 核心。 ?扩展阅读:M1 CPU 那么多的核,macOS 是怎样管理的?。虽然这篇文章所述的优先级均是 QoS (NSOperation)优先级,但根据苹果的 Prioritize Work with Quality of Service Classes 文档与 XNU 源码 可知 QoS 与 pthread 优先级存在映身关系。 ?如果你的 Dart 应用(包含 Flutter 桌面 App,甚至 Dart 编译前端)对性能有更高要求,理论上可以尝试更改 FLAG_worker_thread_priority 的默认值(如:63)然后重新编译 Dart SDK,让 MacOS 操作系统强制优先分配 P 核心来提升性能。(由于我这边没有 M 芯片 Mac 无法做验证,如果你做了相关验证请一定要让我知道 ??) 相似思路:如果需要优化 Flutter App 的启动性能也可以更改这个优先级变量,参考依据:不改一行业务代码,飞书 iOS 低端机启动优化实践(https://juejin.cn/post/7135343788653805605) 核心上面介绍完了 ThreadPool 的基础知识,如果有 C/C++ 基础知识其实就能完全看懂这部分代码了,这里只对对一些核心细节进行详细说明。 入口如前所述线程池是生产者与消费者模型,生产者通过 ThreadPool::Run 函数触发 Task 派生子类型的创建,然后会调用到 ThreadPool::RunImpl 函数。这里 ThreadPool::RunImpl 便是线程池的真正「入口」。 //ThreadPool入口 templatetypenameT,typename...Args boolRun(Args&args){ returnRunImpl(std::unique_ptrTask(newT(std::forwardArgs(args)...))); } boolThreadPool::RunImpl(std::unique_ptrTasktask){ Worker*new_worker=nullptr; { MonitorLockerml(&pool_monitor_); if(shutting_down_){ returnfalse; } //Task与(潜在的)Worker创建 new_worker=ScheduleTaskLocked(std::move(task)); } if(new_worker!=nullptr){ //如果创建了空闲Worker就启动它 new_worker-StartThread(); } returntrue; } //ThreadPool入口使用方式以MessageHandler中对线程池的使用做为示例 boolMessageHandler::Run(ThreadPool*pool, StartCallbackstart_callback, EndCallbackend_callback, CallbackDatadata){ //...省略 //message_handle.cc中对线程池入口的调用,this参数是当前MessageHandler对象 //MessageHandlerTask泛型指定生成的Task类型为MessageHandlerTask pool_-RunMessageHandlerTask(this); //...省略 } 启动ThreadPool::RunImpl 方法内部通过调用 ThreadPool::ScheduleTaskLocked 方法来负责 Worker 与 Task 的创建。Task 直接创建且只有一个显式的状态: 「Pending」(通过 pending_tasks_ 变量维护),一个 Task 不是在 Pending 状态就是在运行状态。而 Worker 创建的过程中会判断当前存活的线程数量(也就是 Worker 队列数量)是否超过最大限制(max_pool_size_),如果超过限制则尝试唤醒空闲线程(Worker)。 ?Worker 的唤醒机制是通过向条件变量发送通知来唤醒通过 WaitMicros 方法进入阻塞状态的线程。关于「条件变量」使用可查阅相关文章进行了解,例如这篇。 ?ThreadPool::Worker*ThreadPool::ScheduleTaskLocked(MonitorLocker*ml, std::unique_ptrTasktask){ //将task添加到队列并记录待运行task数量 tasks_.Append(task.release()); pending_tasks_++; //如果空闲线程大于等于pending状态task数量,则优先唤醒空闲线程 if(count_idle_=pending_tasks_){ ml-Notify(); returnnullptr; } //正在运行与空闲的线程数超过最大线程数限制,则优先唤醒空闲线程 if(max_pool_size_0(count_idle_+count_running_)=max_pool_size_){ if(!idle_workers_.IsEmpty()){ ml-Notify(); } returnnullptr; } //否则直接创建空闲的Worker并返回 autonew_worker=newWorker(this); idle_workers_.Append(new_worker); count_idle_++; returnnew_worker; } 正如你所想,Worker 除了有空闲状态(「Idle」)、还有运行状态(「Running」)、死亡状态(「Dead」),三者之间的转换关系如下: ThreadPool 中与之对应的状态变化方法分别是: 方法作用ThreadPool::IdleToRunningLocked空闲转运行ThreadPool::RunningToIdleLocked运行转空闲ThreadPool::IdleToDeadLocked空闲转死亡状态变化操作仅仅只是将 Worker 在不同的队列中移动并改变状态计数,以 ThreadPool::IdleToRunningLocked 方法为例: //Worker从空闲转移到运行状态 voidThreadPool::IdleToRunningLocked(Worker*worker){ //从空闲队列中移除 idle_workers_.Remove(worker); //添加到运行状态队列 running_workers_.Append(worker); //维护状态变量 count_idle_--; count_running_++; } Worker 创建出来后默认是 「Idle」 状态,ThreadPool::ScheduleTaskLocked 方法内被创建出来后立即执行了 ThreadPool::StartThread 方法来启动 Worker(即开启新线程调用 ThreadPool::Worker::Main 方法),启动后其状态会变成 「Running」 状态。启动的详细过程如上节「Worker 类型」介绍所述。 //在新线程内运行Worker::Main方法,this参数为当前ThreadPool对象,最终会传入Main方法内 voidThreadPool::Worker::StartThread(){ intresult=OSThread::Start("DartWorker",&Worker::Main, reinterpret_castuword(this)); } 这里需要关注的是 ThreadPool::Worker::Main 函数内部实现,并且 ThreadPool::Worker::Main 函数的执行是在「新的线程,新的线程,新的线程」内,已与「入口」函数所在线程不同。它的核心是通过 ThreadPool::WorkerLoop 在新线程内消费当前 ThreadPool 内保存的 「Pending」 状态的 Task 队列。 //源码略有简化 voidThreadPool::Worker::Main(uwordargs){ //获取当前OSThread对象(OSThread是Runtime对平台线程的抽象,保存在当前线程的TLS) OSThread*os_thread=OSThread::Current(); //将传过来的参数转换为ThreadPool对象 Worker*worker=reinterpret_castWorker*(args); ThreadPool*pool=worker- //将Worker与OSThread相互关联 os_thread-owning_thread_pool_worker_=worker; worker-os_thread_=os_thread; //保存join_id用于资源清理 worker-join_id_=OSThread::GetCurrentThreadJoinId(os_thread); //开始消费Task循环 pool-WorkerLoop(worker); //退出循环清理绑定关系 worker-os_thread_=nullptr; os_thread-owning_thread_pool_worker_=nullptr; } 整个 ThreadPool 内核心中的核心便是 ThreadPool::WorkerLoop,它负责来消费处于 Pending 状态的 Task。整个方法主体只有一个 while 循环,注意循环作用域开头的 MonitorLocker ,它是一个条件变量互斥锁(详情参考上面 「锁」 小节的介绍),作用域内加锁,离开作用域后解锁。MonitorLocker 变量的存在保证了多个线程不会进入同时进入同一个 Task,即多个线程不会同时进入同一个 Isolate。 下面这段代码看起来长,但相信我它并不复杂请一定耐心看完。 v2-14f83138be18df2e39a24125c79a734b_b.webpvoidThreadPool::WorkerLoop(Worker*worker){ //在当前线程中收集dead状态的worker WorkerListdead_workers_to_join; while(true){ //声明线程锁并加锁,该锁在变量离开作用域后解锁 MonitorLockerml(&pool_monitor_); //Pending任务队列不为空,进入内部循环 if(!tasks_.IsEmpty()){ //将当前worker的状态转移至Running IdleToRunningLocked(worker); //消费task_直到列表为空 while(!tasks_.IsEmpty()){ //将task_从队列中取出 std::unique_ptrTasktask(tasks_.RemoveFirst()); //减少Pendingtask数量 pending_tasks_--; //对上面声名的锁临时解锁,允许其它线程可以继续消费其它task_ MonitorLeaveScopemls(&ml); //运行task_,消费内部的Message task- ASSERT(Isolate::Current()==nullptr); //task指针置空 task.reset(); } //task_队列为空后将当前worker的状态转移至Idle RunningToIdleLocked(worker); } //所有线程都空闲时整个线程池进入空闲状态 if(running_workers_.IsEmpty()){ OnEnterIdleLocked( if(!tasks_.IsEmpty()){ continue; } } //如果线程池关闭则将当前worker转移到Dead状态 if(shutting_down_){ //收集之前其它线程已经Dead的worker ObtainDeadWorkersLocked(&dead_workers_to_join); IdleToDeadLocked(worker); break; } //下面的代码核心逻辑是将当前线程挂起,在挂起的时间内等待被唤醒 //由于挂起前worker已进入Idle状态,如果等待超时,则当前线程进会进入Dead状态 constint64_tidle_start=OS::GetCurrentMonotonicMicros(); booldone=false; while(!done){ //线程默认挂起时长为5秒 constautoresult=ml.WaitMicros(ComputeTimeout(idle_start)); if(!tasks_.IsEmpty())break; if(shutting_down_||result==Monitor::kTimedOut){ done=true; break; } } //如果超时或关闭 if(done){ //收集之前其它线程已经Dead的worker ObtainDeadWorkersLocked(&dead_workers_to_join); //将当前worker置于Dead状态 IdleToDeadLocked(worker); break; } } //如收集到的Dead状态Worker不为空,则等待它们结束后再结束当前线程 JoinDeadWorkersLocked(&dead_workers_to_join); } 注意 while (!tasks_.IsEmpty()) 循环的存在,它表明当前线程会消费 tasks_ 队列中所有 Task,同时通过 MonitorLeaveScope 将线程锁临时解锁,也给其它线程来消费 tasks_ 队列的机会。 「所谓消费 Task 就是执行其 Run 方法,Run 方法处理完所有 Message 才结束」 在消费 Task 前后会改变 Worker 的状态(「Running/Idle」),Worker 进入 Idle 状态后马上会被挂起,直到超时或被唤醒。超时后会进入 「Dead」 状态,唤醒则变成 「Running」 状态然后继续消费 Task 。正是由这套状态机制的保证了一个线程(Worker)不会同时进入两个 Isolate。 One More Thing还有几个值得注意的细节是整个 Runtime 内并不是只有一个线程池实例,实际上 ThreadPool 还有一个派生类型 MutatorThreadPool,所以整个 Dart Runtime 「只有两个线程池实例」。MutatorThreadPool 类型的实例用来运行 Dart 代码,而 ThreadPool 类型的线程池用来做内存的 GC 操作或编译等辅助工作。并且在线程池数量限制上也有所不同,MutatorThreadPool 类型线程池默认最大线程数量是 「8」,而 ThreadPool 类型对「线程数量没有限制」。 另外线程挂起的默认超时时长是 5 秒,只要线程在 5 秒内被唤醒它仍然会苟活于世。至于为什么是 5 秒不是 10 秒,我也不知道,如果你知道勿必告诉我 ?? ThreadPool 相关的知识点不复杂,理解起来不会有太多阻碍。核心思想仍然是传统的生产者与消费者模式,再加上针对不同平台的线程抽象结合生命周期定义组成了整程线池的核心逻辑。 阅读原文

上一篇:2025-04-23_性能飞跃!Node.js 亿级文件读写优化 下一篇:2023-11-29_NBA新赛季 , 品牌如何在快手找到主场

TAG标签:

13
网站开发网络凭借多年的网站建设经验,坚持以“帮助中小企业实现网络营销化”为宗旨,累计为4000多家客户提供品质建站服务,得到了客户的一致好评。如果您有网站建设网站改版域名注册主机空间手机网站建设网站备案等方面的需求...
请立即点击咨询我们或拨打咨询热线:13245491521 13245491521 ,我们会详细为你一一解答你心中的疑难。
项目经理在线

相关阅读 更多>>

猜您喜欢更多>>

我们已经准备好了,你呢?
2022我们与您携手共赢,为您的企业营销保驾护航!

不达标就退款

高性价比建站

免费网站代备案

1对1原创设计服务

7×24小时售后支持

 

全国免费咨询:

13245491521

业务咨询:13245491521 / 13245491521

节假值班:13245491521()

联系地址:

Copyright © 2019-2025      ICP备案:沪ICP备19027192号-6 法律顾问:律师XXX支持

在线
客服

技术在线服务时间:9:00-20:00

在网站开发,您对接的直接是技术员,而非客服传话!

电话
咨询

13245491521
7*24小时客服热线

13245491521
项目经理手机

微信
咨询

加微信获取报价