基于 QWorker 的多线程编程 – 状态跟踪

当我们将一个作业的执行委托给 Workers 这个包工头以后,那么这个作业实际上就处于三种状态之一:

  • 排队中:作业在队列中等待被调度执行
  • 运行中:作业正在执行
  • 已完成:作业已经执行完,此时作业及相关连的数据都已自动被释放

由于已经完成的作业会被释放掉,所以,实际上,我们能得到的状态只有排队中和运行中两种,第三种由于作业已经不存在,我们就无法检测到其状态。

QWorker 提供了两个函数来跟踪作业的状态:PeekJobState 和 EnumJobStates,前者针对的是单个具体的作业,后者针对的是全部作业。在具体了解这两个函数之前,我们先看一下我们能得到的作业状态信息定义:

【Delphi】

/// <summary>作业状态</summary>
  TQJobState = record
    Handle: IntPtr; // 作业对象句柄
    Proc: TQJobMethod; // 作业过程
    Flags: Integer; // 标志位
    IsRunning: Boolean; // 是否在运行中,如果为False,则作业处于队列中
    Runs: Integer; // 已经运行的次数
    EscapedTime: Int64; // 已经执行时间
    PushTime: Int64; // 入队时间
    PopTime: Int64; // 出队时间
    AvgTime: Int64; // 平均时间
    TotalTime: Int64; // 总执行时间
    MaxTime: Int64; // 最大执行时间
    MinTime: Int64; // 最小执行时间
    NextTime: Int64; // 重复作业的下次执行时间
  end;

【C++ Builder】

struct DECLSPEC_DRECORD TQJobState
{
public:
	NativeInt Handle;
	TQJobMethod Proc;
	int Flags;
	bool IsRunning;
	int Runs;
	__int64 EscapedTime;
	__int64 PushTime;
	__int64 PopTime;
	__int64 AvgTime;
	__int64 TotalTime;
	__int64 MaxTime;
	__int64 MinTime;
	__int64 NextTime;
};

在调用 PeekJobState 时,如果找到作业,则 Handle 与传入的作业句柄一致,Proc 指向作业对应的过程句柄,Flags 是作业的标志位,保存的是 JOB_XXX 标志位,可以通过位与的方式来检查各个标志位是否设置,具体标志位定义参考下表:

JOB_RUN_ONCE : 作业只运行一次
JOB_IN_MAINTHREAD : 作业只能在主线程中运行
JOB_MAX_WORKERS : 尽可能多的开启可能的工作者线程来处理作业,暂不支持
JOB_LONGTIME : 作业需要很长的时间才能完成,以便调度程序减少它对其它作业的影响
JOB_SIGNAL_WAKEUP : 作业根据信号需要唤醒
JOB_TERMINATED : 作业不需要继续进行,可以结束了
JOB_GROUPED : 当前作业是作业组的一员
JOB_ANONPROC : 当前作业过程是匿名函数
JOB_FREE_OBJECT : Data关联的是Object,作业完成或清理时释放
JOB_FREE_RECORD : Data关联的是Record,作业完成或清理时释放
JOB_FREE_INTERFACE : Data关联的是Interface,作业完成时调用_Release
JOB_FREE_CUSTOM1 : Data关联的成员由用户指定的方式1释放
JOB_FREE_CUSTOM2 : Data关联的成员由用户指定的方式2释放
JOB_FREE_CUSTOM3 : Data关联的成员由用户指定的方式3释放
JOB_FREE_CUSTOM4 : Data关联的成员由用户指定的方式4释放
JOB_FREE_CUSTOM5 : Data关联的成员由用户指定的方式5释放
JOB_FREE_CUSTOM6 : Data关联的成员由用户指定的方式6释放
JOB_DATA_OWNER : 作业是Data成员的所有者

好了,现在我们来看 PeekJobState 的定义:

【Delphi】

    /// <summary>获取指定作业的状态</summary>
    /// <param name="AHandle">作业对象句柄</param>
    /// <param name="AResult">作业对象状态</param>
    /// <returns>如果指定的作业存在,则返回True,否则,返回False</returns>
    /// <remarks>
    /// 1.对于只执行一次的作业,在执行完后不复存在,所以也会返回false
    /// 2.在FMX平台,如果使用了匿名函数作业过程,必需调用 ClearJobState 函数来执行清理过程,以避免内存泄露。
    /// </remarks>
    function PeekJobState(AHandle: IntPtr; var AResult: TQJobState): Boolean;

【C++ Builder】

bool __fastcall PeekJobState(NativeInt AHandle, TQJobState &AResult);

请注意其中的注释中的 Remark 部分说明,PeekJobState 的返回值为安全起见,建议使用 ClearJobState 来释放返回的结果,以使匿名函数的引用计数在移动平台工作正常。

我们看下 ClearJobState 的实现:

procedure ClearJobState(var AState: TQJobState);
begin
if IsFMXApp then
  begin
  if (AState.Flags and JOB_ANONPROC) <> 0 then
    begin
    IUnknown(AState.Proc.ProcA)._Release;
    end;
  AState.Proc.Code := nil;
  AState.Proc.Data := nil;
  end;
end;

可以看到,只是在 FMX 平台它起作业,VCL 中,由于 ProcA 定义为 TQJobProcA 所以会自动释放(FMX 平台 ProcA 的定义为 Pointer,定义为 TQJobProcA 无法编译)。

下面的代码是 PeekJobState 的一个例子:

var
  AState: TQJobState;
  S: String;
  ATime: Int64;
  ALoc: TQSymbolLocation;
begin
ATime := GetTimeStamp;
if Workers.PeekJobState(FSignalWaitHandle, AState) then
  begin
  S := '作业 ';
  if (AState.Flags and JOB_ANONPROC) = 0 then
    begin
    if LocateSymbol(AState.Proc.Code, ALoc) then
      S := S + ' - ' + ALoc.FunctionName
    else
      S := S + TObject(AState.Proc.Data).MethodName(AState.Proc.Code);
    end
  else
    S := S + ' - 匿名函数';
  if AState.IsRunning then
    S := S + ' 运行中:'#13#10
  else
    S := S + ' 计划中:'#13#10;
  case AState.Handle and $03 of
    0:
      begin
      S := S + ' 简单作业'#13#10;
      ShowMessage(S);
      Exit;
      end;
    1:
      S := S + ' 重复作业(距下次执行时间: ' + FormatFloat('0.#',
        (AState.NextTime - ATime) / 10) + 'ms)'#13#10;
    2:
      S := S + ' 信号作业'#13#10;
  end;
  S := S + ' 已运行:' + IntToStr(AState.Runs) + ' 次'#13#10 + ' 任务提交时间:' +
    RollupTime((ATime - AState.PushTime) div 10000) + ' 前'#13#10;
  if AState.PopTime <> 0 then
    S := S + ' 末次执行时间:' + RollupTime((ATime - AState.PopTime) div 10000) +
      ' 前' + #13#10;
  S := S + ' 平均每次用时:' + FormatFloat('0.#', AState.AvgTime / 10) + 'ms'#13#10
    + ' 总计用时:' + FormatFloat('0.#', AState.TotalTime / 10) + 'ms'#13#10 +
    ' 最大用时:' + FormatFloat('0.#', AState.MaxTime / 10) + 'ms'#13#10 +
    ' 最小用时:' + FormatFloat('0.#', AState.MinTime / 10) + 'ms'#13#10;
  S := S + ' 标志位:';
  if (AState.Flags and JOB_RUN_ONCE) <> 0 then
    S := S + '单次,';
  if (AState.Flags and JOB_IN_MAINTHREAD) <> 0 then
    S := S + '主线程,';
  if (AState.Flags and JOB_GROUPED) <> 0 then
    S := S + '已分组,';
  if S[Length(S)] = ',' then
    SetLength(S, Length(S) - 1);
  ShowMessage(S);
  ClearJobState(AState);
  end
else
  ShowMessage('未找到请求的句柄对应的作业,作业可能已经完成。');
end;

当然,上面的例子由于用到了 QMapSymbols 单元的函数,所以只能在 Delphi 的 Windows 下程序中运行。

与 PeekJobState 不同,EnumJobStates 是返回一个动态数组来包含所有的作业的状态,声明如下:

【Delphi】

    /// <summary>枚举所有的作业状态</summary>
    /// <returns>返回作业状态列表</summary>
    /// <remarks>在FMX平台,如果使用了匿名函数作业过程,必需调用 ClearJobStates 函数来执行清理过程</remarks>
    function EnumJobStates: TQJobStateArray;

【C++ Builder】

TQJobStateArray __fastcall EnumJobStates(void);

这个和 PeekJobState 很类似,就不再赘述,直接上示例代码:

var
  AStates: TQJobStateArray;
  I: Integer;
  ATime: Int64;
  ALoc: TQSymbolLocation;
  ABuilder:TQStringCatHelperW;
begin
ATime := GetTimeStamp;
AStates := Workers.EnumJobStates;
ABuilder:=TQStringCatHelperW.Create;
ABuilder.Cat('共发现 ').Cat(Length(AStates)).Cat(' 项作业'#13#10);
for I := 0 to High(AStates) do
  begin
  if ABuilder.Position>1000 then
    begin
    ABuilder.Cat('...(后续省略)');
    Break;
    end;
  ABuilder.Cat('作业 #').Cat(I + 1);
  if (AStates[I].Flags and JOB_ANONPROC) = 0 then
    begin
    if LocateSymbol(AStates[I].Proc.Code, ALoc) then
      ABuilder.Cat(' - ').cat(ALoc.FunctionName)
    else
      ABuilder.Cat(TObject(AStates[I].Proc.Data).MethodName(AStates[I].Proc.Code));
    end
  else
    ABuilder.Cat(' - 匿名函数');
  if AStates[I].IsRunning then
    ABuilder.Cat(' 运行中:'#13#10)
  else
    ABuilder.Cat(' 计划中:'#13#10);
  case AStates[I].Handle and $03 of
    0:
      begin
      ABuilder.Cat(' 简单作业'#13#10);
      Continue;
      end;
    1:
      ABuilder.Cat(' 重复作业(距下次执行时间: ' + FormatFloat('0.#',
        (AStates[I].NextTime - ATime) / 10) + 'ms)'#13#10);
    2:
      ABuilder.Cat(' 信号作业'#13#10);
  end;
  ABuilder.Cat(' 已运行:').Cat(AStates[I].Runs).Cat(' 次'#13#10).Cat(' 任务提交时间: ').Cat(
    RollupTime((ATime - AStates[I].PushTime) div 10000)).Cat(' 前'#13#10);
  if AStates[I].PopTime <> 0 then
    ABuilder.Cat(' 末次执行时间: ').Cat(RollupTime((ATime - AStates[I].PopTime) div 10000)).Cat(
      ' 前'#13#10);
  ABuilder.Cat(' 平均每次用时:').Cat(FormatFloat('0.#', AStates[I].AvgTime / 10)).Cat('ms'#13#10)
    .Cat(' 总计用时:').Cat(FormatFloat('0.#', AStates[I].TotalTime / 10)).Cat('ms'#13#10).Cat(
    ' 最大用时:').Cat(FormatFloat('0.#', AStates[I].MaxTime / 10)).Cat('ms'#13#10).Cat(
    ' 最小用时:').Cat(FormatFloat('0.#', AStates[I].MinTime / 10)).Cat('ms'#13#10);
  ABuilder.Cat(' 标志位:');
  if (AStates[I].Flags and JOB_RUN_ONCE) <> 0 then
    ABuilder.Cat('单次,');
  if (AStates[I].Flags and JOB_IN_MAINTHREAD) <> 0 then
    ABuilder.Cat('主线程,');
  if (AStates[I].Flags and JOB_GROUPED) <> 0 then
    ABuilder.Cat('已分组,');
  ABuilder.Cat(SLineBreak);
  end;
ClearJobStates(AStates);
ShowMessage(ABuilder.Value);
FreeObject(ABuilder);
end;

在这个示例中,限制了下最多显示前1000个字节的内容,否则太长了,用 ShowMessage 显示会造成假死的现象。

分享到: