基于 QWorker 的多线程编程 – 状态机:信号与广播

前面我们讨论了定时作业(Post/Delay/At)和直接用 Post 来触发作业,也就是说,作业触发和执行之间是紧密耦合的。作业的触发者知道作业的响应者在那儿,要干什么,但如果我们要将作业的触发与执行分离,该怎么做呢?信号!QWorker 提供了信号机制来解决这一问题。

QWorker 中信号的作用就在于建立触发者和响应者之间松散耦合的中介,前面我们在作业类型的讨论中,知道了信号作业的几个步骤,我们先来回顾下:

(1)、注册信号

(2)、注册信号对应的作业

(3)、触发信号

实际上,除了第一步是必需首先执行的,步骤(2)和步骤(3)并没有顺序要求。信号的触发并不要求有信号处理作业,相应的,注册信号对应的作业与触发信号的执行当然也没多大关系。信号实际上是一种状态机,只有两种状态:

(1)、等待状态

信号一旦注册,它就已经处于等待状态。同样如果一个信号没有被触发,那么它也处于等待状态。

(2)、触发状态

信号一旦被触发,它就处于触发状态。此时,它会触发所有已注册的作业执行一遍。注意,这些被触发的作业是被并行执行的,所以作业执行的顺序与其注册的顺序无关。

一盏灯点亮了,屋里所有的人都看到了光明。信号也一样,它可以有多个作业做为信号的接收者,一个信号一旦被触发了,所有注册的作业都会被触发执行,形成广播的效果。

在设计基于信号的 QWorker 作业时,我希望提醒大家以下几点:

(1)、信息的处理作业可以运行在主线程,也可以运行在后台线程,这点和普通的作业没什么两样,只需要在使用 Wait 注册信号等待作业时,标记作业需要在主线程中执行即可。

我们来看下 Wait 函数的声明:

    /// <summary>投寄一个等待信号才开始的作业</summary>
    /// <param name="AProc">要执行的作业过程</param>
    /// <param name="ASignalId">等待的信号编码,该编码由RegisterSignal函数返回</param>
    /// <param name="ARunInMainThread">作业要求在主线程中执行</param>
    /// <returns>成功投寄返回句柄,否则返回0</returns>
    function Wait(AProc: TQJobProc; ASignalId: Integer;
      ARunInMainThread: Boolean = False): IntPtr; overload;

ARunInMainThread 参数决定了作业在主线程和后台线程中执行。

(2)、作业触发时,参数通过信号触发函数 Signal 传递,如果指定了 AFreeType 方式,则参数会在不需要时,由 QWorker 负责释放。由于信号作业是多播并发的,具有通知的特性,所以要注意:

  • 不要更改参数的值,也就是说,参数值只能当常量访问,否则,可能会造成与其它信号处理作业之间的冲突。
  • 不要使用会引发参数状态变化的参数。因为它同样会影响其它作业的处理,比如 TStream 在 Read 或 Write 时都会调整当前位置,如果使用它做为参数,显示,如果你Read或Write时,其它作业处理函数如果也 Read 或 Write 处理,就会得不到正确的结果。
  • 不要传递栈上的变量作为参数,信号作业的处理过程是异步的,不是同步的,所以不要指望你触发信号结束后,对应的作业也结束了。如果用栈上的变量,那么由于函数退出后,栈上的变量就被释放了,所以会造成意外的结果,这显然是不安全的。

综上所述,我们一定要注意信号作业的参数不能用于传出值,只能用于传入值。如果要传递返回参数,那么应该通过新的作业来传递(如触发一个新的信号,将返回值作为参数传递回去,这需要规范好作业双方的协议)。

关于信号作业的一个实例,在 QWorkerDemo 和 StateWorker 两个演示程序中,我们进行了展示。我们摘抄一段 QWorkerDemo 示例中多播的代码来进行简单的说明:

1、首先,是注册信号名称,获得一个唯一的信号ID,这个ID用于唯一标记这个信号,并在整个程序的生命周期内不会变动。

【Delphi】

FMulticastSignal := Workers.RegisterSignal('Multicase.Start');

【C++ Builder】

FMulticastSignal = Workers->RegisterSignal(L"Multicase.Start");

RegisterSignal 接受的唯一一个参数是信号名称,该信号名称是整个程序唯一的,同名的重复注册会被认为是同一个信号,返回同一个ID。我们将其值保存到 FMulticastSignal 变量中。由此,我们的作业响应函数注册的地方,就可以简单的重复调用 RegisterSignal 来获取这个ID。

2、接下来,我们注册这个信号的作业响应函数:

【Delphi】

Workers.Wait(DoMulticastSingal1, FMulticastSignal);
Workers.Wait(DoMulticastSingal2, FMulticastSignal);

【C++ Builder】

Workers->Wait(DoMulticastSingal1, FMulticastSignal);
Workers->Wait(DoMulticastSingal2, FMulticastSignal);

这样,我们在触发 Multicase.Start 信号时,将会同时触发 DoMulticastSingal1 和 DoMulticastSingal2 两项作业。

3、最后,我们人为的来触发下这个信号。

触发信号可以按 ID 或名称来触发,我们推荐按 ID 来触发,因为按名称触发实际上触发了内部的一个额外的查找过程,效率上有所损失。我们看一下两个声明:

    /// <summary>触发一个信号</summary>
    /// <param name="AId">信号编码,由RegisterSignal返回</param>
    /// <param name="AData">附加给作业的用户数据指针地址</param>
    /// <param name="AFreeType">附加数据指针释放方式</param>
    /// <remarks>触发一个信号后,QWorkers会触发所有已注册的信号关联处理过程的执行</remarks>
    procedure Signal(AId: Integer; AData: Pointer = nil;
      AFreeType: TQJobDataFreeType = jdfFreeByUser); overload;
    /// <summary>按名称触发一个信号</summary>
    /// <param name="AName">信号名称</param>
    /// <param name="AData">附加给作业的用户数据指针地址</param>
    /// <param name="AFreeType">附加数据指针释放方式</param>
    /// <remarks>触发一个信号后,QWorkers会触发所有已注册的信号关联处理过程的执行</remarks>
    procedure Signal(const AName: QStringW; AData: Pointer = nil;
      AFreeType: TQJobDataFreeType = jdfFreeByUser); overload;

好了,我们看下触发的过程:

【Delphi】

procedure TForm1.Button24Click(Sender: TObject);
var
  AParams: TQMsgPack;
begin
AParams := TQMsgPack.Create;
AParams.Add('TimeStamp').AsDateTime := Now;
AParams.Add('Sender', 'Button24');
Workers.Signal(FMulticastSignal, AParams, jdfFreeAsObject);
end;

【C++ Builder】

void __fastcall TForm1::Button24Click(System::TObject *Sender);
{
TQMsgPack *AParams = new TQMsgPack;
AParams->Add(L"TimeStamp")->AsDateTime = Now();
AParams->Add(L"Sender", "Button24");
Workers->Signal(FMulticastSignal, AParams, jdfFreeAsObject);
}

话说,有人可能发现,信号没有取消注册函数(注意等待信号的作业本身可以直接用 Clear 系列函数取消与信号的关联)。这里这么设计有以下几个原因:

(1)、信号的注册、触发和响应分离的机制,决定了信号的反注册时机的把握对程序来说,是不好把握的。

(2)、一个信号注册信息所占用的资源并不大,而一个程序中,信号的数量是相对有限的,其由此带来的资源额外开销可以忽略不计。

当然,对于追求完美主义者来说,确实不够完美,但目前的实现就是这样,而在目前的规划中,我并不打算加入它。

分享到: