在前面的章节中,我们讨论了常见的简单作业类型的处理,这种作业如果有多步,我们就需要自己手动在作业中投递新的作业,那么我们有没有一种办法来简化这种操作呢?这就是我们接下来要讨论的内容。
作业分组在 QWorker 中使用 TQJobGroup 来管理,用于将一到多个作业给组合在一起当做一个对象来处理。这些作业有可能是串行执行,也可能是并行执行。他们的区别在于,对于并行执行的作业,相互之间没有啥顺序要求,谁先谁后无所谓,而串行执行的作业,必需挨个执行,一个执行完成后,才能轮到下一个执行(这不废话嘛:))。
好吧,废话说了一堆,我们先来看看 TQJobGroup 的接口声明:
/// <summary>构造函数</summary> /// <param name="AByOrder">指定是否是顺序作业,如果为True,则作业会按依次执行</param> constructor Create(AByOrder: Boolean = False); overload; /// <summary>析构函数</summary> destructor Destroy; override; /// <summary>取消剩下未执行的作业执行</summary> /// <param name="AWaitRunningDone">是否等待正在执行的作业执行完成,默认为True</param> /// <remark>如果在当前作业中等待执行的作业完成,则请不要设置AWaitRunningDone为True, /// 否则,会出现死等</remark> procedure Cancel(AWaitRunningDone: Boolean = True); /// <summary>要准备添加作业,实际增加内部计数器</summary> /// <remarks>Prepare和Run必需匹配使用,否则可能造成作业不会被执行</remarks> procedure Prepare; /// <summary>减少内部计数器,如果计数器减为0,则开始实际执行作业</summary> /// <param name="ATimeout">等待时长,单位为毫秒</param> procedure Run(ATimeout: Cardinal = INFINITE); /// <summary>添加一个作业过程,如果准备内部计数器为0,则直接执行,否则只添加到列表</summary> /// <param name="AProc">要执行的作业过程</param> /// <param name="AData">附加数据指针</param> /// <param name="AInMainThread">作业是否需要在主线程中执行</param> /// <param name="AFreeType">AData指定的附加数据指针释放方式</param> /// <returns>成功,返回True,失败,返回False</returns> /// <remarks>添加到分组中的作业,要么执行完成,要么被取消,不运行通过句柄取消</remarks> function Add(AProc: TQJobProc; AData: Pointer; AInMainThread: Boolean = False; AFreeType: TQJobDataFreeType = jdfFreeByUser): Boolean; overload; /// <summary>添加一个作业过程,如果准备内部计数器为0,则直接执行,否则只添加到列表</summary> /// <param name="AProc">要执行的作业过程</param> /// <param name="AData">附加数据指针</param> /// <param name="AInMainThread">作业是否需要在主线程中执行</param> /// <param name="AFreeType">AData指定的附加数据指针释放方式</param> /// <returns>成功,返回True,失败,返回False</returns> /// <remarks>添加到分组中的作业,要么执行完成,要么被取消,不运行通过句柄取消</remarks> function Add(AProc: TQJobProcG; AData: Pointer; AInMainThread: Boolean = False; AFreeType: TQJobDataFreeType = jdfFreeByUser): Boolean; overload; {$IFDEF UNICODE} /// <summary>添加一个作业过程,如果准备内部计数器为0,则直接执行,否则只添加到列表</summary> /// <param name="AProc">要执行的作业过程</param> /// <param name="AData">附加数据指针</param> /// <param name="AInMainThread">作业是否需要在主线程中执行</param> /// <param name="AFreeType">AData指定的附加数据指针释放方式</param> /// <returns>成功,返回True,失败,返回False</returns> /// <remarks>添加到分组中的作业,要么执行完成,要么被取消,不运行通过句柄取消</remarks> function Add(AProc: TQJobProcA; AData: Pointer; AInMainThread: Boolean = False; AFreeType: TQJobDataFreeType = jdfFreeByUser): Boolean; overload; {$ENDIF} /// <summary>插入一个作业过程,如果准备内部计数器为0,则直接执行,否则只添加到列表</summary> /// <param name="AIndex">要插入的的位置索引</param> /// <param name="AProc">要执行的作业过程</param> /// <param name="AData">附加数据指针</param> /// <param name="AInMainThread">作业是否需要在主线程中执行</param> /// <param name="AFreeType">AData指定的附加数据指针释放方式</param> /// <returns>成功,返回True,失败,返回False</returns> /// <remarks>添加到分组中的作业,要么执行完成,要么被取消,不运行通过句柄取消</remarks> function Insert(AIndex:Integer;AProc: TQJobProc; AData: Pointer; AInMainThread: Boolean = False; AFreeType: TQJobDataFreeType = jdfFreeByUser): Boolean; overload; /// <summary>插入一个作业过程,如果准备内部计数器为0,则直接执行,否则只添加到列表</summary> /// <param name="AIndex">要插入的的位置索引</param> /// <param name="AProc">要执行的作业过程</param> /// <param name="AData">附加数据指针</param> /// <param name="AInMainThread">作业是否需要在主线程中执行</param> /// <param name="AFreeType">AData指定的附加数据指针释放方式</param> /// <returns>成功,返回True,失败,返回False</returns> /// <remarks>添加到分组中的作业,要么执行完成,要么被取消,不运行通过句柄取消</remarks> function Insert(AIndex:Integer;AProc: TQJobProcG; AData: Pointer; AInMainThread: Boolean = False; AFreeType: TQJobDataFreeType = jdfFreeByUser): Boolean; overload; {$IFDEF UNICODE} /// <summary>插入一个作业过程,如果准备内部计数器为0,则直接执行,否则只添加到列表</summary> /// <param name="AIndex">要插入的的位置索引</param> /// <param name="AProc">要执行的作业过程</param> /// <param name="AData">附加数据指针</param> /// <param name="AInMainThread">作业是否需要在主线程中执行</param> /// <param name="AFreeType">AData指定的附加数据指针释放方式</param> /// <returns>成功,返回True,失败,返回False</returns> /// <remarks>添加到分组中的作业,要么执行完成,要么被取消,不运行通过句柄取消</remarks> function Insert(AIndex:Integer;AProc: TQJobProcA; AData: Pointer; AInMainThread: Boolean = False; AFreeType: TQJobDataFreeType = jdfFreeByUser): Boolean; overload; {$ENDIF} /// <summary>等待作业完成</summary> /// <param name="ATimeout">最长等待时间,单位为毫秒</param> /// <returns>返回等待结果</returns> /// <remarks>WaitFor会阻塞当前线程的执行,如果是主线程中调用,建议使用MsgWaitFor /// 以保证在主线中的作业能够被执行</remarks> function WaitFor(ATimeout: Cardinal = INFINITE): TWaitResult; /// <summary>等待作业完成</summary> /// <param name="ATimeout">最长等待时间,单位为毫秒</param> /// <param name="AMsgWait">是否检查消息队列,如果不在主线程中执行,则忽略该参数</param> /// <returns>返回等待结果</returns> /// <remarks>WaitFor会阻塞当前线程的执行,如果是主线程中调用,建议使用MsgWaitFor /// 以保证在主线中的作业能够被执行</remarks> function WaitFor(AMsgWait:Boolean;ATimeout: Cardinal = INFINITE): TWaitResult;overload; /// <summary>等待作业完成</summary> /// <param name="ATimeout">最长等待时间,单位为毫秒</param> /// <returns>返回等待结果</returns> /// <remarks>如果当前在主线程中执行,MsgWaitFor会检查是否有消息需要处理,而 /// WaitFor不会,如果在后台线程中执行,会直接调用WaitFor。因此,在主线程中调用 /// WaitFor会影响主线程中作业的执行,而MsgWaitFor不会 /// </remarks> function MsgWaitFor(ATimeout: Cardinal = INFINITE): TWaitResult; /// <summary>未完成的作业数量</summary> property Count: Integer read GetCount; /// <summary>全部作业执行完成时触发的回调事件</summary> property AfterDone: TNotifyEvent read FAfterDone write FAfterDone; /// <summary>是否是按顺序执行,只能在构造函数中指定,此处只读</summary> property ByOrder: Boolean read FByOrder; /// <summary>用户自定的分组附加标签</summary> property Tag: Pointer read FTag write FTag; /// <summary>是否在作业完成后自动释放自身</summary> property FreeAfterDone: Boolean read FFreeAfterDone write FFreeAfterDone;
1、构造函数 Create
TQJobGroup 的构造函数接受一个唯一的参数 AByOrder 来决定分组内的作业是否需要按顺序执行。如果该值为 True ,则后面 Add 时添加的子作业将都是顺序执行的。
2、析构函数 Destroy
TQJobGroup 的析构函数这个唯一要说的是,它在析构时会取消所有未执行的作业,并等待正在执行的作业完成,所以,嘿嘿,你懂的,不要在 TQJobGroup 的子作业里释放分组对象,否则后果就是死循环。
3、取消未执行的分组作业函数 Cancel
Cancel 取消所有后续未进入执行状态的作业,如果作业已经执行,则是否等待执行完成,取决于 AWaitRunningDone 参数,默认是等待。但如果你在分组作业的子作业中取消,那么你就必需传入False,以避免死循环。
4、准备批量添加作业函数 Prepare
Prepare 将使 TQJobGroup 对象进入批量添加作业状态,它会增加内部的计数器,以避免此后调用 Add 函数加的作业在调用 Run 函数之前执行。这样可以减少资源冲突,加快添加速度。在这里要注意一点,Prepare 和 Run 必需配对使用,否则,你的作业可能永远得不到执行机会。
5、开始运行分组内作业函数 Run
Run 将减少由于 Prepare 增加的引用计数,如果这个计数器减为0,则会将作业提交给 Workers 这个包工头去分配给实际的工人处理。注意其中有一个超时设置,这个超时值的单位为毫秒,面向的是整个作业分组,也就是说,所有的作业总的执行时间不能超过这个时间。一旦超时,后续未执行的作业将会被取消,得不到执行的机会,并触发 AfterDone 事件。如果调用 WaitFor 等待中,则会触发结束等待并返回 wrTimeout。
6、添加作业处理函数 Add / Insert
Add 为分组添加一个子作业,它添加到这个作业分组的末尾。注意这个作业只能是立即执行类型的作业,不能是定时或基于信号的作业。作业的具体参数可以参考普通作业 Post 函数的声明,它们是统一的。
如果未进入批量添加状态,则新增的作业根据构造函数的 AByOrder 参数,可能会立即执行( AByOrder=False或是第一个作业),也可能会被计划到后面执行。
Insert 用于顺序执行作业中,临时插入特定的作业到分组中。如果是非顺序作业,那么它和 Add 实际上没有啥区别。
一个提示是 Add / Insert 函数可以在内部作业过程中使用,而不用担心线程安全问题。关于 Insert 的一个例子在更新日志中做了详细说明,请参考:https://blog.qdac.cc/?p=2051 来获得相关示例。
7、等待作业处理完成函数 WaitFor/MsgWaitFor
这两个函数的作业都是等待作业完成直到超时,默认超时值单位为毫秒。注意它与 Run 里那个超时的区别是一致的,任意一个超时都会造成分组超时并取消后续作业的执行。
8、未执行的作业数量 Count
用于统计分组中尚未交付给 Workers 执行的作业数量。
9、分组所有作业执行完成事件 AfterDone
在所有作业执行完成时会触发该事件,如果指定了,则在所有作业执行完成或者被清队干净时会被触发。
10、是否按顺序执行 ByOrder
这个属性是只读的,值由构造函数传入。
11、用户附加标签 Tag
这个 Tag 的值及其管理由用户自行负责,它可以用于传递一些额外的信息给分组中的作业。
12、是否在所有作业完成后自动释放自身 FreeAfterDone
这一属性一般和 Run 结合在一起使用,以便在所有作业结束时,自动释放对象自身。
好了,上面说了一堆,下面说一下分组作业的一般调用流程:
1、创建 TQJobGroup 对象,并确定它的类型(是顺序还是并行),假设我们初始化 AByOrder 参数为True,按顺序执行一组作业,则创建的对应代码如下:
【Delphi】
var AGroup:TQJobGroup; begin AGroup:=TQJobGroup.Create(true) ...
【C++ Builder】
TQJobGroup *AGroup=new TQJobGroup(true);
当然,接下来就可以设置其它属性,如设置 AfterDone、FreeAfterDone 等属性的值。
2、调用 Prepare 和 Add 添加作业:
【Delphi】
AGroup.Prepare; AGroup.Add(DoStep1,nil); AGroup.Add(DoStep2,nil); ... AGroup.Add(DoStepn,nil);
【C++ Builder】
AGroup->Prepare(); AGroup->Add(DoStep1,NULL); AGroup->Add(DoStep2,NULL); ... AGroup->Add(DoStepn,NULL);
3、调用 Run 来启动作业,一般情况下不需要提供 Run 里的超时参数(默认永远等待)。
【Delphi】
AGroup.Run;
【C++ Builder】
AGroup->Run();
4、如果是设置了 AfterDone 事件并准备异步等待所有作业处理完,则从步骤开始可以省略。如果不是,那么我们就需要调用 WaitFor 或 MsgWaitFor 来等待分组作业执行完成。
选择使用 WaitFor 或 MsgWaitFor 取决于下面两步判断的结果:
(1)、当前代码是否运行在主线程,如果不是,WaitFor 和 MsgWaitFor 没有任何区别,MsgWaitFor 会直接调用WaitFor 死等,此时两者皆可。
(2)、当前程序是否有运行在主线程中的作业,如果有,那么使用 MsgWaitFor 以避免阻塞主线程作业的处理,如果没有,可以选择 WaitFor,但显然由于WaitFor也会阻塞其它主线程消息的处理,所以会造成UI在等待结束前假死的现象,请斟酌选择。
【Delphi】
AGroup.WaitFor;
【C++ Builder】
AGroup->WaitFor();
5、如果没有指定 FreeAfterDone 为 True,则在需要时释放 TQJobGroup 对象实例,以避免内存泄露。
下面是两个常问的问题:
1、如果没有调用 Prepare 直接调用 Add 添加会有什么后果?
没啥后果,只是相当于添加后立即就调度执行,如果添加多个作业,有可能产生多次触发 AfterDone 事件的问题。当然,如果你再同时设置了 FreeAfterDone,那就可能会出问题了,AV错误在等着你(有可能在你添加后一条记录之前,触发了释放代码)。
2、Run 的超时和 WaitFor 的超时有啥区别?
认真看前面 Run 函数的说明。