基于 QWorker 的多线程编程 – 作业分组

在前面的章节中,我们讨论了常见的简单作业类型的处理,这种作业如果有多步,我们就需要自己手动在作业中投递新的作业,那么我们有没有一种办法来简化这种操作呢?这就是我们接下来要讨论的内容。

作业分组在 QWorker 中使用 TQJobGroup 来管理,用于将一到多个作业给组合在一起当做一个对象来处理。这些作业有可能是串行执行,也可能是并行执行。他们的区别在于,对于并行执行的作业,相互之间没有啥顺序要求,谁先谁后无所谓,而串行执行的作业,必需挨个执行,一个执行完成后,才能轮到下一个执行(这不废话嘛:))。

好吧,废话说了一堆,我们先来看看 TQJobGroup 的接口声明:

    /// <summary>构造函数</summary>
    /// <param name="AByOrder">指定是否是顺序作业,如果为True,则作业会按依次执行</param>
    constructor Create(AByOrder: Boolean = False); overload;
    /// <summary>析构函数</summary>
    destructor Destroy; override;
    /// <summary>取消剩下未执行的作业执行</summary>
    /// <param name="AWaitRunningDone">是否等待正在执行的作业执行完成,默认为True</param>
    /// <remark>如果在当前作业中等待执行的作业完成,则请不要设置AWaitRunningDone为True,
    /// 否则,会出现死等</remark>
    procedure Cancel(AWaitRunningDone: Boolean = True);
    /// <summary>要准备添加作业,实际增加内部计数器</summary>
    /// <remarks>Prepare和Run必需匹配使用,否则可能造成作业不会被执行</remarks>
    procedure Prepare;
    /// <summary>减少内部计数器,如果计数器减为0,则开始实际执行作业</summary>
    /// <param name="ATimeout">等待时长,单位为毫秒</param>
    procedure Run(ATimeout: Cardinal = INFINITE);

    /// <summary>添加一个作业过程,如果准备内部计数器为0,则直接执行,否则只添加到列表</summary>
    /// <param name="AProc">要执行的作业过程</param>
    /// <param name="AData">附加数据指针</param>
    /// <param name="AInMainThread">作业是否需要在主线程中执行</param>
    /// <param name="AFreeType">AData指定的附加数据指针释放方式</param>
    /// <returns>成功,返回True,失败,返回False</returns>
    /// <remarks>添加到分组中的作业,要么执行完成,要么被取消,不运行通过句柄取消</remarks>
    function Add(AProc: TQJobProc; AData: Pointer;
      AInMainThread: Boolean = False;
      AFreeType: TQJobDataFreeType = jdfFreeByUser): Boolean; overload;
    /// <summary>添加一个作业过程,如果准备内部计数器为0,则直接执行,否则只添加到列表</summary>
    /// <param name="AProc">要执行的作业过程</param>
    /// <param name="AData">附加数据指针</param>
    /// <param name="AInMainThread">作业是否需要在主线程中执行</param>
    /// <param name="AFreeType">AData指定的附加数据指针释放方式</param>
    /// <returns>成功,返回True,失败,返回False</returns>
    /// <remarks>添加到分组中的作业,要么执行完成,要么被取消,不运行通过句柄取消</remarks>
    function Add(AProc: TQJobProcG; AData: Pointer;
      AInMainThread: Boolean = False;
      AFreeType: TQJobDataFreeType = jdfFreeByUser): Boolean; overload;
{$IFDEF UNICODE}
    /// <summary>添加一个作业过程,如果准备内部计数器为0,则直接执行,否则只添加到列表</summary>
    /// <param name="AProc">要执行的作业过程</param>
    /// <param name="AData">附加数据指针</param>
    /// <param name="AInMainThread">作业是否需要在主线程中执行</param>
    /// <param name="AFreeType">AData指定的附加数据指针释放方式</param>
    /// <returns>成功,返回True,失败,返回False</returns>
    /// <remarks>添加到分组中的作业,要么执行完成,要么被取消,不运行通过句柄取消</remarks>
    function Add(AProc: TQJobProcA; AData: Pointer;
      AInMainThread: Boolean = False;
      AFreeType: TQJobDataFreeType = jdfFreeByUser): Boolean; overload;
{$ENDIF}
/// <summary>插入一个作业过程,如果准备内部计数器为0,则直接执行,否则只添加到列表</summary>
    /// <param name="AIndex">要插入的的位置索引</param>
    /// <param name="AProc">要执行的作业过程</param>
    /// <param name="AData">附加数据指针</param>
    /// <param name="AInMainThread">作业是否需要在主线程中执行</param>
    /// <param name="AFreeType">AData指定的附加数据指针释放方式</param>
    /// <returns>成功,返回True,失败,返回False</returns>
    /// <remarks>添加到分组中的作业,要么执行完成,要么被取消,不运行通过句柄取消</remarks>
    function Insert(AIndex:Integer;AProc: TQJobProc; AData: Pointer;
      AInMainThread: Boolean = False;
      AFreeType: TQJobDataFreeType = jdfFreeByUser): Boolean; overload;
    /// <summary>插入一个作业过程,如果准备内部计数器为0,则直接执行,否则只添加到列表</summary>
    /// <param name="AIndex">要插入的的位置索引</param>
    /// <param name="AProc">要执行的作业过程</param>
    /// <param name="AData">附加数据指针</param>
    /// <param name="AInMainThread">作业是否需要在主线程中执行</param>
    /// <param name="AFreeType">AData指定的附加数据指针释放方式</param>
    /// <returns>成功,返回True,失败,返回False</returns>
    /// <remarks>添加到分组中的作业,要么执行完成,要么被取消,不运行通过句柄取消</remarks>
    function Insert(AIndex:Integer;AProc: TQJobProcG; AData: Pointer;
      AInMainThread: Boolean = False;
      AFreeType: TQJobDataFreeType = jdfFreeByUser): Boolean; overload;
    {$IFDEF UNICODE}
    /// <summary>插入一个作业过程,如果准备内部计数器为0,则直接执行,否则只添加到列表</summary>
    /// <param name="AIndex">要插入的的位置索引</param>
    /// <param name="AProc">要执行的作业过程</param>
    /// <param name="AData">附加数据指针</param>
    /// <param name="AInMainThread">作业是否需要在主线程中执行</param>
    /// <param name="AFreeType">AData指定的附加数据指针释放方式</param>
    /// <returns>成功,返回True,失败,返回False</returns>
    /// <remarks>添加到分组中的作业,要么执行完成,要么被取消,不运行通过句柄取消</remarks>
    function Insert(AIndex:Integer;AProc: TQJobProcA; AData: Pointer;
      AInMainThread: Boolean = False;
      AFreeType: TQJobDataFreeType = jdfFreeByUser): Boolean; overload;
    {$ENDIF}
    /// <summary>等待作业完成</summary>
    /// <param name="ATimeout">最长等待时间,单位为毫秒</param>
    /// <returns>返回等待结果</returns>
    /// <remarks>WaitFor会阻塞当前线程的执行,如果是主线程中调用,建议使用MsgWaitFor
    /// 以保证在主线中的作业能够被执行</remarks>
    function WaitFor(ATimeout: Cardinal = INFINITE): TWaitResult;
    /// <summary>等待作业完成</summary>
    /// <param name="ATimeout">最长等待时间,单位为毫秒</param>
    /// <param name="AMsgWait">是否检查消息队列,如果不在主线程中执行,则忽略该参数</param>
    /// <returns>返回等待结果</returns>
    /// <remarks>WaitFor会阻塞当前线程的执行,如果是主线程中调用,建议使用MsgWaitFor
    /// 以保证在主线中的作业能够被执行</remarks>
    function WaitFor(AMsgWait:Boolean;ATimeout: Cardinal = INFINITE): TWaitResult;overload;
    /// <summary>等待作业完成</summary>
    /// <param name="ATimeout">最长等待时间,单位为毫秒</param>
    /// <returns>返回等待结果</returns>
    /// <remarks>如果当前在主线程中执行,MsgWaitFor会检查是否有消息需要处理,而
    /// WaitFor不会,如果在后台线程中执行,会直接调用WaitFor。因此,在主线程中调用
    /// WaitFor会影响主线程中作业的执行,而MsgWaitFor不会
    /// </remarks>
    function MsgWaitFor(ATimeout: Cardinal = INFINITE): TWaitResult;
    /// <summary>未完成的作业数量</summary>
    property Count: Integer read GetCount;
    /// <summary>全部作业执行完成时触发的回调事件</summary>
    property AfterDone: TNotifyEvent read FAfterDone write FAfterDone;
    /// <summary>是否是按顺序执行,只能在构造函数中指定,此处只读</summary>
    property ByOrder: Boolean read FByOrder;
    /// <summary>用户自定的分组附加标签</summary>
    property Tag: Pointer read FTag write FTag;
    /// <summary>是否在作业完成后自动释放自身</summary>
    property FreeAfterDone: Boolean read FFreeAfterDone write FFreeAfterDone;

1、构造函数 Create

TQJobGroup 的构造函数接受一个唯一的参数 AByOrder 来决定分组内的作业是否需要按顺序执行。如果该值为 True ,则后面 Add 时添加的子作业将都是顺序执行的。

2、析构函数 Destroy

TQJobGroup 的析构函数这个唯一要说的是,它在析构时会取消所有未执行的作业,并等待正在执行的作业完成,所以,嘿嘿,你懂的,不要在 TQJobGroup 的子作业里释放分组对象,否则后果就是死循环。

3、取消未执行的分组作业函数 Cancel

Cancel 取消所有后续未进入执行状态的作业,如果作业已经执行,则是否等待执行完成,取决于 AWaitRunningDone 参数,默认是等待。但如果你在分组作业的子作业中取消,那么你就必需传入False,以避免死循环。

4、准备批量添加作业函数 Prepare

Prepare 将使 TQJobGroup 对象进入批量添加作业状态,它会增加内部的计数器,以避免此后调用 Add 函数加的作业在调用 Run 函数之前执行。这样可以减少资源冲突,加快添加速度。在这里要注意一点,Prepare 和 Run 必需配对使用,否则,你的作业可能永远得不到执行机会。

5、开始运行分组内作业函数 Run

Run 将减少由于 Prepare 增加的引用计数,如果这个计数器减为0,则会将作业提交给 Workers 这个包工头去分配给实际的工人处理。注意其中有一个超时设置,这个超时值的单位为毫秒,面向的是整个作业分组,也就是说,所有的作业总的执行时间不能超过这个时间。一旦超时,后续未执行的作业将会被取消,得不到执行的机会,并触发 AfterDone 事件。如果调用 WaitFor 等待中,则会触发结束等待并返回 wrTimeout。

6、添加作业处理函数 Add / Insert

Add 为分组添加一个子作业,它添加到这个作业分组的末尾。注意这个作业只能是立即执行类型的作业,不能是定时或基于信号的作业。作业的具体参数可以参考普通作业 Post 函数的声明,它们是统一的。

如果未进入批量添加状态,则新增的作业根据构造函数的 AByOrder 参数,可能会立即执行( AByOrder=False或是第一个作业),也可能会被计划到后面执行。

Insert 用于顺序执行作业中,临时插入特定的作业到分组中。如果是非顺序作业,那么它和 Add 实际上没有啥区别。

一个提示是 Add / Insert 函数可以在内部作业过程中使用,而不用担心线程安全问题。关于 Insert 的一个例子在更新日志中做了详细说明,请参考:https://blog.qdac.cc/?p=2051 来获得相关示例。

7、等待作业处理完成函数 WaitFor/MsgWaitFor

这两个函数的作业都是等待作业完成直到超时,默认超时值单位为毫秒。注意它与 Run 里那个超时的区别是一致的,任意一个超时都会造成分组超时并取消后续作业的执行。

8、未执行的作业数量 Count

用于统计分组中尚未交付给 Workers 执行的作业数量。

9、分组所有作业执行完成事件 AfterDone

在所有作业执行完成时会触发该事件,如果指定了,则在所有作业执行完成或者被清队干净时会被触发。

10、是否按顺序执行 ByOrder

这个属性是只读的,值由构造函数传入。

11、用户附加标签 Tag

这个 Tag 的值及其管理由用户自行负责,它可以用于传递一些额外的信息给分组中的作业。

12、是否在所有作业完成后自动释放自身 FreeAfterDone

这一属性一般和 Run 结合在一起使用,以便在所有作业结束时,自动释放对象自身。

好了,上面说了一堆,下面说一下分组作业的一般调用流程:

1、创建 TQJobGroup 对象,并确定它的类型(是顺序还是并行),假设我们初始化 AByOrder 参数为True,按顺序执行一组作业,则创建的对应代码如下:

【Delphi】

var
  AGroup:TQJobGroup;
begin
AGroup:=TQJobGroup.Create(true)
...

【C++ Builder】

TQJobGroup *AGroup=new TQJobGroup(true);

当然,接下来就可以设置其它属性,如设置 AfterDone、FreeAfterDone 等属性的值。

2、调用 Prepare 和 Add 添加作业:

【Delphi】

AGroup.Prepare;
AGroup.Add(DoStep1,nil);
AGroup.Add(DoStep2,nil);
...
AGroup.Add(DoStepn,nil);

【C++ Builder】

AGroup->Prepare();
AGroup->Add(DoStep1,NULL);
AGroup->Add(DoStep2,NULL);
...
AGroup->Add(DoStepn,NULL);

3、调用 Run 来启动作业,一般情况下不需要提供 Run 里的超时参数(默认永远等待)。

【Delphi】

AGroup.Run;

【C++ Builder】

AGroup->Run();

4、如果是设置了 AfterDone 事件并准备异步等待所有作业处理完,则从步骤开始可以省略。如果不是,那么我们就需要调用 WaitFor 或 MsgWaitFor 来等待分组作业执行完成。

选择使用 WaitFor 或 MsgWaitFor 取决于下面两步判断的结果:

(1)、当前代码是否运行在主线程,如果不是,WaitFor 和 MsgWaitFor 没有任何区别,MsgWaitFor 会直接调用WaitFor 死等,此时两者皆可。

(2)、当前程序是否有运行在主线程中的作业,如果有,那么使用 MsgWaitFor 以避免阻塞主线程作业的处理,如果没有,可以选择 WaitFor,但显然由于WaitFor也会阻塞其它主线程消息的处理,所以会造成UI在等待结束前假死的现象,请斟酌选择。

【Delphi】

AGroup.WaitFor;

【C++ Builder】

AGroup->WaitFor();

5、如果没有指定 FreeAfterDone 为 True,则在需要时释放 TQJobGroup 对象实例,以避免内存泄露。

下面是两个常问的问题:

1、如果没有调用 Prepare 直接调用 Add 添加会有什么后果?

没啥后果,只是相当于添加后立即就调度执行,如果添加多个作业,有可能产生多次触发 AfterDone 事件的问题。当然,如果你再同时设置了 FreeAfterDone,那就可能会出问题了,AV错误在等着你(有可能在你添加后一条记录之前,触发了释放代码)。

2、Run 的超时和 WaitFor 的超时有啥区别?

认真看前面 Run 函数的说明。

分享到: