YxdWorker 后台工作者管理库(由QWorker变异)

说明
——————————————————————–
YxdWorker 基于QDAC项目的QWorker ,并且绝大部分代码来自于此,
感谢swish和他的QWorker,QDAC,YxdWorker 版权归swish, YangYxd所有
QWorker来自QDAC项目,版权归swish(QQ:109867294)所有
QDAC官方群:250530692
——————————————————————–

感谢Swish兄长,为了QWorker负出了很多时间和精力。

{*******************************************************}
{                                                       }
{       YxdWorker 后台工作者管理库                      }
{                                                       }
{       版权所有 (C) 2013 GXT  YangYxd                  }
{                                                       }
{*******************************************************}
{
 --------------------------------------------------------------------
  说明
 --------------------------------------------------------------------
  YxdWorker 基于QDAC项目的QWorker ,并且绝大部分代码来自于此,
  感谢swish和他的QWorker,QDAC,YxdWorker 版权归swish, YangYxd所有
  QWorker来自QDAC项目,版权归swish(QQ:109867294)所有
  QDAC官方群:250530692

 --------------------------------------------------------------------
  更新记录
 --------------------------------------------------------------------

 2014.08.23 ver 1.0.4
 --------------------------------------------------------------------
  - 解决Busy计数器BUG

 2014.08.22 ver 1.0.3
 --------------------------------------------------------------------
  - 解决JobGroup超时和Cancel的问题,解决某些原因引起测速很慢的问题
  - 提取合并部分代码,减少体积

 2014.08.16 ver 1.0.2
 --------------------------------------------------------------------
  - 改进长时间任务处理方式 ,TSimpleJobs增加 FLongFirst,FLongLast 专
    门应对长时间任务,解决长时间任务导致Clear失败BUG
  - 同步QWorker修改TQJobGroup.AfterDone改为除了在完成时,在中断或超时
    时仍然触发
  - 同步QWorker增加TQJobGroup.Run函数加入超时设置,超过指定的时间如果
    仍未执行完成,则中止后续执行
  - 同步QWorker增加TQJobGroup.Cancel函数用于取消未执行的作业执行

 2014.08.16 ver 1.0.1
 --------------------------------------------------------------------
  - 增加 FOnErrorNotify通知事件,以便使用者可以记录相关日志
  - 将原QWorker中的Delay,At,Post合并为Post方法。
  - 将原QWorker中的时间精度由0.1ms调整为1ms.
  - 将原QWorker中TJobHelper的功能直接放入TJob中,以便在D2007中还能保
    持良好的语法提示
  - 将原QWorker中Worker类设置Flags相关功能改为GetValue,SetValue,减小
    单元大小
  - 对JobGroup的Add功能增加参数AFreeType, 并默认AInMainThread=False
  - 提取合并部分代码,减少单元大小
  - 删除Job中的Owner字段

 --------------------------------------------------------------------
}
unit YxdWorker;

{.$DEFINE WORKER_SIMPLE_LOCK} // 是否使用原子自旋锁?

interface

uses
  {$IFDEF UNICODE}Generics.Collections, {$ENDIF}
  {$IFDEF NEXTGEN}Fmx.Forms, System.Diagnostics, {$ENDIF}
  {$IFDEF POSIX}Posix.Unistd, Posix.Pthread, {$ENDIF}
  {$IFDEF MSWINDOWS}Windows, Messages, Forms, Activex, {$ENDIF}
  YxdHash, SysUtils, Classes, Types, SyncObjs;

const
  JOB_RUN_ONCE = $0001;         // 作业只运行一次
  JOB_IN_MAINTHREAD = $0002;    // 作业只能在主线程中运行
  JOB_MAX_WORKERS = $0004;      // 尽可能多的开启可能的工作者线程来处理作业,暂不支持
  JOB_LONGTIME = $0008;         // 作业需要很长的时间才能完成,以便调度程序减少它对其它作业的影响
  JOB_SIGNAL_WAKEUP = $0010;    // 作业根据信号需要唤醒
  JOB_TERMINATED = $0020;       // 作业不需要继续进行,可以结束了
  JOB_FREE_OBJECT = $0040;      // Data关联的是Object,作业完成或清理时释放
  JOB_FREE_RECORD = $0080;      // Data关联的是Record,作业完成或清理时释放
  JOB_FREE_INTERFACE = $0100;   // Data关联的是Interface,作业完成时调用_Release
  JOB_GROUPED = $0200;          // 当前作业是作业组的一员
  JOB_ANONPROC = $0400;         // 当前作业过程是匿名函数
  JOB_DATA_OWNER = JOB_FREE_OBJECT + JOB_FREE_RECORD + JOB_FREE_INTERFACE;
                                // 作业是Data成员的所有者

  WORKER_ISBUSY = $01;          // 工作者忙碌
  WORKER_PROCESSLONG = $02;     // 当前处理的一个长时间作业
  WORKER_RESERVED = $04;        // 当前工作者是一个保留工作者
  WORKER_COM_INITED = $08;      // 工作者已初始化为支持COM的状态(仅限Windows)
  WORKER_LOOKUP = $10;          // 工作者正在查找作业
  WORKER_EXECUTING = $20;       // 工作者正在执行作业
  WORKER_EXECUTED = $40;        // 工作者已经完成作业

  WAITJOB_TIMEOUT = 15000;      // 工作者等待作业超时时间 (15秒)

const
  WOSecond = 1000;             // 1s
  WOMinute = 60000;            // 60s/1min
  WOHour = 3600000;            // 3600s/60min/1hour
  WODay = Int64(86400000);     // 1天

type
  /// <summary>作业空闲原因,内部使用</summary>
  TWorkerIdleReason = (
    irTimeout,                  // 工作者已经等待超时,可以被释放
    irNoJob                     // 没有需要处理的作业,此时工作者会进行WAITJOB_TIMEOUT释放
                                // 等待状态,如果在WAITJOB_TIMEOUT内有新作业进来,则工作者
                                // 会被唤醒,否则超时后会被释放
  );

type
  /// <summary>作业结束时如何处理Data成员</summary>
  TJobDataFreeType = (
    jdfFreeByUser,              // 用户管理对象的释放
    jdfFreeAsObject,            // 附加的是一个TObject继承的对象,作业完成时会调用FreeObject释放
    jdfFreeAsRecord,            // 附加的是一个Record对象,作业完成时会调用Dispose释放
    jdfFreeAsInterface          // 附加的是一个接口对象,添加时会增加计数,作业完成时会减少计数
  );

type
  TJobBase = class;
  TJobGroup = class;
  TSimpleJobs = class;
  TRepeatJobs = class;
  TYXDWorker = class;
  TYXDWorkers = class;
  {$IFNDEF UNICODE}
  IntPtr = Integer;
  {$ENDIF}
  PJob = ^TJob;

  // 作业处理回调函数
  TJobProc = procedure(AJob: PJob) of object;
  TJobProcG = procedure(AJob: PJob);
  {$IFDEF UNICODE}
  TJobProcA = reference to procedure(AJob: PJob);
  {$ENDIF}

  TWorkerWaitParam = record
    WaitType: Byte;
    Data: Pointer;
    case Integer of
      0:
        (Bound: Pointer); // 按对象清除
      1:
        (WorkerProc: TMethod);
      2:
        (SourceJob: PJob);
      3:
        (Group: Pointer);
  end;

  // 信号的内部定义
  PSignal = ^TSignal;
  TSignal = packed record
    Id: Integer;    // 信号的索引
    Fired: Integer; // 信号已触发次数
    Name: string;   // 信号的名称
    First: PJob;    // 首个作业
  end;

  TJob = record
  private
    function GetAvgTime: Integer; inline;
    function GetElapseTime: Int64; inline;
    function GetValue(Index: Integer): Boolean; inline;
    procedure SetValue(Index: Integer; const Value: Boolean); inline;
    function GetIsTerminated: Boolean; inline;
    procedure SetIsTerminated(const Value: Boolean); inline;
    procedure AfterRun(AUsedTime: Int64);
    procedure UpdateNextTime;
  public
    procedure Create(AProc: TJobProc);
    /// <summary>值拷贝函数</summary>
    /// <remarks>Worker/Next/Source不会复制并会被置空,Owner不会被复制</remarks>
    procedure Assign(const ASource: PJob);
    /// <summary>重置内容,以便为从队列中弹出做准备</summary>
    procedure Reset; inline;

    /// <summary>平均每次运行时间,单位为1ms</summary>
    property AvgTime: Integer read GetAvgTime;
    /// <summmary>本次已运行时间,单位为1ms</summary>
    property ElapseTime: Int64 read GetElapseTime;

    /// <summary>是否只运行一次,投递作业时自动设置</summary>
    property Runonce: Boolean index JOB_RUN_ONCE read GetValue;
    /// <summary>是否要求在主线程执行作业,实际效果比Windows的PostMessage相似</summary>
    property InMainThread: Boolean index JOB_IN_MAINTHREAD read GetValue;
    /// <summary>是否是一个运行时间比较长的作业,用Workers.LongtimeWork设置</summary>
    property IsLongtimeJob: Boolean index JOB_LONGTIME read GetValue;
    /// <summary>是否是一个信号触发的作业</summary>
    property IsSignalWakeup: Boolean index JOB_SIGNAL_WAKEUP read GetValue;
    /// <summary>是否是分组作业的成员</summary>
    property IsGrouped: Boolean index JOB_GROUPED read GetValue;
    /// <summary>是否要求结束当前作业</summary>
    property IsTerminated: Boolean read GetIsTerminated write SetIsTerminated;
    /// <summary>判断作业是否拥有Data数据成员</summary>
    property IsDataOwner: Boolean index JOB_DATA_OWNER read GetValue;

    /// <summary>判断作业的Data指向的是一个对象且要求作业完成时自动释放</summary>
    property IsObjectOwner: Boolean index JOB_FREE_OBJECT read GetValue write SetValue;
    /// <summary>判断作业的Data指向的是一个记录且要求作业完成时自动释放</summary>
    property IsRecordOwner: Boolean index JOB_FREE_RECORD read GetValue write SetValue;
    /// <summary>判断作业的Data指向的是一个接口且要求作业完成时自动释放</summary>
    property IsInterfaceOwner: Boolean index JOB_FREE_INTERFACE read GetValue write SetValue;
    /// <summary>判断作业处理过程是否是一个匿名函数</summary>
    property IsAnonWorkerProc: Boolean index JOB_ANONPROC read GetValue write SetValue;
  public
    FirstTime: Int64;       // 作业第一次开始时间
    StartTime: Int64;       // 本次作业开始时间,8B
    PushTime: Int64;        // 入队时间
    PopTime: Int64;         // 出队时间
    NextTime: Int64;        // 下一次运行的时间,+8B=16B
    WorkerProc: TJobProc;   // 作业处理函数+8/16B
    {$IFDEF UNICODE}
    WorkerProcA: TJobProcA;
    {$ENDIF}
    Owner: TJobBase;        // 作业所隶属的队列
    Next: PJob;             // 下一个结点
    Worker: TYXDWorker;     // 当前作业工作者
    Runs: Integer;          // 已经运行的次数+4B
    MinUsedTime: Integer;   // 最小运行时间+4B
    TotalUsedTime: Integer; // 运行总计花费的时间,TotalUsedTime/Runs可以得出平均执行时间+4B
    MaxUsedTime: Integer;   // 最大运行时间+4B
    Flags: Integer;         // 作业标志位+4B
    Data: Pointer;          // 附加数据内容
    case Integer of
      0:
        (
          SignalId: Integer;  // 信号编码
          Source: PJob;       // 源作业地址
          RefCount: PInteger; // 源数据
        );
      1:
        (
          Interval: Int64;    // 运行时间间隔,单位为0.1ms,实际精度受不同操作系统限制+8B
          FirstDelay: Int64;  // 首次运行延迟,单位为0.1ms,默认为0
        );
      2:
        (
          Group: Pointer;     // 分组作业支持
        );
  end;

  // 作业队列对象的基类,提供基础的接口封装
  TJobBase = class(TObject)
  protected
    FOwner: TYXDWorkers;
    function InternalPush(AJob: PJob): Boolean; virtual; abstract;
    function InternalPop: PJob; virtual; abstract;
    function GetCount: Integer; virtual; abstract;
    function GetEmpty: Boolean;
  public
    constructor Create(AOwner: TYXDWorkers); virtual;
    destructor Destroy; override;
    // 投寄一个作业 (外部不应尝试直接投寄任务到队列,其由Workers的相应函数内部调用。)
    function Push(AJob: PJob): Boolean; virtual;
    // 弹出一个作业
    function Pop: PJob; virtual;
    // 空所有作业
    procedure Clear; overload; virtual;
    // 清空指定的作业
    function Clear(AProc: TJobProc; AData: Pointer; AMaxTimes: Integer): Integer; overload; virtual; abstract;
    // 清空一个对象关联的所有作业
    function Clear(AObject: Pointer; AMaxTimes: Integer): Integer; overload; virtual; abstract;
    /// 不可靠警告:Count和Empty值仅是一个参考,在多线程环境下可能并不保证下一句代码执行时,会一致
    property Empty: Boolean read GetEmpty; // 当前队列是否为空
    property Count: Integer read GetCount; // 当前队列元素数量
  end;

  {$IFDEF WORKER_SIMPLE_LOCK}
  // 一个基于位锁的简单锁定对象,使用原子函数置位
  TSimpleLock = class
  private
    FFlags: Integer;
  public
    constructor Create;
    procedure Enter; inline;
    procedure Leave; inline;
  end;
  {$ELSE}
  TSimpleLock = TCriticalSection;
  {$ENDIF}

  /// <summary>
  /// 工作者线程使用单向链表管理,而不是进行排序检索是因为对于工作者数量有限,额外
  /// 的处理反而不会直接最简单的循环直接有效
  /// </summary>
  TYXDWorker = class(TThread)
  private
    FOwner: TYXDWorkers;
    FEvent: TEvent;
    //FNext: TYXDWorker;
    FFlags: Integer;
    FTimeout: Integer;
    FTerminatingJob: PJob;
    function GetValue(Index: Integer): Boolean; inline;
    procedure SetValue(Index: Integer; const Value: Boolean); inline;
    function GetIsIdle: Boolean; inline;
  protected
    FActiveJob: PJob;
    // 之所以不直接使用FActiveJob的相关方法,是因为保证外部可以线程安全的访问这两个成员
    FActiveJobProc: TJobProc;
    FActiveJobData: Pointer;
    FActiveJobSource: PJob;
    FActiveJobGroup: TJobGroup;
    FActiveJobFlags: Integer;
    procedure Execute; override;
    procedure FireInMainThread;
    procedure DoJob(AJob: PJob);
  public
    constructor Create(AOwner: TYXDWorkers); overload;
    destructor Destroy; override;
    procedure ComNeeded(AInitFlags: Cardinal = 0);
    // 判断COM是否已经初始化为支持COM
    property ComInitialized: Boolean index WORKER_COM_INITED read GetValue;
    // 判断当前是否处于长时间作业处理过程中
    property InLongtimeJob: Boolean index WORKER_PROCESSLONG read GetValue;
    // 判断当前是否空闲
    property IsIdle: Boolean read GetIsIdle;
    // 判断当前是否忙碌
    property IsBusy: Boolean index WORKER_ISBUSY read GetValue;
    // 判断当前工作者是否是内部保留的工作者
    property IsReserved: Boolean index WORKER_RESERVED read GetValue;
    property IsLookuping: Boolean index WORKER_LOOKUP read GetValue;
    property IsExecuting: Boolean index WORKER_EXECUTING read GetValue;
    property IsExecuted: Boolean index WORKER_EXECUTED read GetValue;
  end;

  // 工作者错误通知事件
  TWorkerErrorNotify = procedure(AJob: PJob; E: Exception; const ErrSource: string) of object;
  // 自定义数据释放事件
  TCustomFreeDataEvent = procedure(ASender: TYXDWorkers; AFreeType: TJobDataFreeType; var AData: Pointer) of object;

  /// <summary>
  /// 工作者管理对象,用来管理工作者和作业
  /// </summary>
  TYXDWorkers = class(TObject)
  private
    FWorkers: array of TYXDWorker;
    FWorkerCount: Integer;
    FDisableCount: Integer;
    FBusyCount: Integer;
    FMinWorkers: Integer;
    FMaxWorkers: Integer;
    FMaxSignalId: Integer;
    FLongTimeWorkers: Integer;    // 记录下长时间作业中的工作者,这种任务长时间不释放资源,可能会造成其它任务无法及时响应
    FMaxLongtimeWorkers: Integer; // 允许最多同时执行的长时间任务数,不允许超过MaxWorkers的一半
    FTerminating: Boolean;
    FCPUNum: Integer;
    FLocker: TCriticalSection;
    FSimpleJobs: TSimpleJobs;
    FRepeatJobs: TRepeatJobs;
    FSignalJobs: TYXDHashTable;

    FOnErrorNotify: TWorkerErrorNotify;
    FOnCustomFreeData: TCustomFreeDataEvent;
    {$IFDEF MSWINDOWS}
    FMainWorker: HWND;
    procedure DoMainThreadWork(var AMsg: TMessage);
    {$ENDIF}
    function GetEnabled: Boolean;
    function PostWaitJob(AJob: PJob; ASignalId: Integer): Boolean;
    function ClearSignalJobs(ASource: PJob): Integer;
    function ClearJobs(AObject: Pointer; AProc: TJobProc; AData: Pointer; AMaxTimes: Integer): Integer;
    function ClearWaitJobs(ASignalId: Integer; const ASignalName: string): Integer;
    procedure SetEnabled(const Value: Boolean);
    procedure SetMaxLongtimeWorkers(const Value: Integer);
    procedure SetMaxWorkers(const Value: Integer);
    procedure SetMinWorkers(const Value: Integer);
    procedure EnableWorkers;
    procedure DisableWorkers;
    procedure ClearWorkers;
    procedure FreeJob(AJob: PJob);
    procedure FreeJobData(AData: Pointer; AFreeType: TJobDataFreeType);
  protected
    function Popup: PJob;
    function Post(AJob: PJob): Boolean; overload;
    function LookupIdleWorker: Boolean;
    function SignalIdByName(const AName: string): Integer;
    procedure SignalWorkDone(AJob: PJob; AUsedTime: Int64);
    procedure WorkerIdle(AWorker: TYXDWorker; AReason: TWorkerIdleReason);
    procedure WorkerTerminate(AWorker: TObject);
    procedure WaitRunningDone(const AParam: TWorkerWaitParam);
    procedure FireSignalJob(ASignal: PSignal; AData: Pointer; AFreeType: TJobDataFreeType);
    procedure DoJobFree(ATable: TObject; AHash: Cardinal; AData: Pointer);
    procedure DoCustomFreeData(AFreeType: TJobDataFreeType; var AData: Pointer);
  public
    constructor Create(AMinWorkers: Integer = 2); overload;
    destructor Destroy; override;

    // 获取Job池大小
    class function JobPoolCount(): Integer;
    // 获取实例
    class function GetInstance: TYXDWorkers;

    // 清除所有作业
    procedure Clear; overload;
    /// <summary>清除一个对象相关的所有作业</summary>
    /// <param name="AObject">要释放的作业处理过程关联对象</param>
    /// <param name="AMaxTimes">最多清除的数量,如果<0,则全清</param>
    /// <returns>返回实际清除的作业数量</returns>
    /// <remarks>一个对象如果计划了作业,则在自己释放前应调用本函数以清除关联的作业,
    /// 否则,未完成的作业可能会触发异常。</remarks>
    function Clear(AObject: Pointer; AMaxTimes: Integer = -1): Integer; overload;
    /// <summary>清除所有投寄的指定过程作业</summary>
    /// <param name="AProc">要清除的作业执行过程</param>
    /// <param name="AData">要清除的作业附加数据指针地址,如果值为Pointer(-1),
    /// 则清除所有的相关过程,否则,只清除附加数据地址一致的过程</param>
    /// <param name="AMaxTimes">最多清除的数量,如果<0,则全清</param>
    /// <returns>返回实际清除的作业数量</returns>
    function Clear(AProc: TJobProc; AData: Pointer; AMaxTimes: Integer = -1): Integer; overload;
    /// <summary>清除指定信号关联的所有作业</summary>
    /// <param name="ASingalId">要清除的信号名称</param>
    /// <returns>返回实际清除的作业数量</returns>
    function Clear(const ASignalName: string): Integer; overload;
    /// <summary>清除指定信号关联的所有作业</summary>
    /// <param name="ASingalId">要清除的信号ID</param>
    /// <returns>返回实际清除的作业数量</returns>
    function Clear(ASignalId: Integer): Integer; overload;

    /// <summary>投寄一个作业</summary>
    /// <param name="AJobProc">要定时执行的作业过程</param>
    /// <param name="ADelay">第一次执行前延迟时间,小于等于0则立即执行</param>
    /// <param name="AInterval">后续重复作业间隔,如果小于等于0,则作业只执行一次</param>
    /// <param name="ARunInMainThread">是否要求作业在主线程中执行</param>
    function Post(AJobProc: TJobProc; AData: Pointer; ARunInMainThread: Boolean = False;
      const ADelay: Int64 = 0; const AInterval: Int64 = 0; AFreeType: TJobDataFreeType = jdfFreeByUser): Boolean; overload;
    function Post(AJobProc: TJobProcG; AData: Pointer; ARunInMainThread: Boolean = False;
      const ADelay: Int64 = 0; const AInterval: Int64 = 0; AFreeType: TJobDataFreeType = jdfFreeByUser): Boolean; overload;
    {$IFDEF UNICODE}
    function Post(AJobProc: TJobProcA; AData: Pointer; ARunInMainThread: Boolean = False;
      const ADelay: Int64 = 0; const AInterval: Int64 = 0; AFreeType: TJobDataFreeType = jdfFreeByUser): Boolean; overload;
    {$ENDIF}

    /// <summary>投寄一个在指定时间才开始的重复作业</summary>
    /// <param name="AJobProc">要定时执行的作业过程</param>
    /// <param name="ATime">执行时间,只要时间部分,日期忽略</param>
    /// <param name="AInterval">后续重复作业间隔,如果小于等于0,则作业只执行一次</param>
    /// <param name="ARunInMainThread">是否要求作业在主线程中执行</param>
    function Post(AJobProc: TJobProc; const ATime: TDateTime; const AInterval: Int64;
      AData: Pointer; ARunInMainThread: Boolean = False; AFreeType: TJobDataFreeType = jdfFreeByUser): Boolean; overload;
    function Post(AJobProc: TJobProcG; const ATime: TDateTime; const AInterval: Int64;
      AData: Pointer; ARunInMainThread: Boolean = False; AFreeType: TJobDataFreeType = jdfFreeByUser): Boolean; overload;
    {$IFDEF UNICODE}
    function Post(AJobProc: TJobProcA; const ATime: TDateTime; const AInterval: Int64;
      AData: Pointer; ARunInMainThread: Boolean = False; AFreeType: TJobDataFreeType = jdfFreeByUser): Boolean; overload;
    {$ENDIF}

    /// <summary>投寄一个后台长时间执行的作业</summary>
    /// <param name="AJobProc">要执行的作业过程</param>
    /// <param name="AData">作业附加的用户数据指针</param>
    /// <returns>成功投寄返回True,否则返回False</returns>
    /// <remarks>长时间作业强制在后台线程中执行,而不允许投递到主线程中执行</remarks>
    function PostLongJob(AJobProc: TJobProc; AData: Pointer;
      AFreeType: TJobDataFreeType = jdfFreeByUser): Boolean; overload;
    function PostLongJob(AJobProc: TJobProcG; AData: Pointer;
      AFreeType: TJobDataFreeType = jdfFreeByUser): Boolean; overload;
    {$IFDEF UNICODE}
    function PostLongJob(AJobProc: TJobProcA; AData: Pointer;
      AFreeType: TJobDataFreeType = jdfFreeByUser): Boolean; overload;
    {$ENDIF}

    /// <summary>投寄一个等待信号才开始的作业</summary>
    /// <param name="AJobProc">要执行的作业过程</param>
    /// <param name="ASignalId">等待的信号编码,该编码由RegisterSignal函数返回</param>
    /// <param name="ARunInMainThread">作业要求在主线程中执行</param>
    /// <returns>成功投寄返回True,否则返回False</returns>
    function PostWait(AJobProc: TJobProc; ASignalId: Integer;
      ARunInMainThread: Boolean = False): Boolean; overload;
    function PostWait(AJobProc: TJobProcG; ASignalId: Integer;
      ARunInMainThread: Boolean = False): Boolean; overload;
    {$IFDEF UNICODE}
    function PostWait(AJobProc: TJobProcA; ASignalId: Integer;
      ARunInMainThread: Boolean = False): Boolean; overload;
    {$ENDIF}

    /// <summary>触发一个信号</summary>
    /// <param name="AId">信号编码,由RegisterSignal返回</param>
    /// <param name="AData">附加给作业的用户数据指针地址</param>
    /// <remarks>触发一个信号后,Workers会触发所有已注册的信号关联处理过程的执行</remarks>
    procedure SendSignal(AId: Integer; AData: Pointer = nil; AFreeType: TJobDataFreeType = jdfFreeByUser); overload;
    /// <summary>按名称触发一个信号</summary>
    /// <param name="AName">信号名称</param>
    /// <param name="AData">附加给作业的用户数据指针地址</param>
    /// <remarks>触发一个信号后,Workers会触发所有已注册的信号关联处理过程的执行</remarks>
    procedure SendSignal(const AName: string; AData: Pointer = nil; AFreeType: TJobDataFreeType = jdfFreeByUser); overload;

    /// <summary>注册一个信号</summary>
    /// <param name="AName">信号名称</param>
    /// <remarks>
    /// 1.重复注册同一名称的信号将返回同一个编码
    /// 2.信号一旦注册,则只有程序退出时才会自动释放
    /// </remarks>
    function RegisterSignal(const AName: string): Integer;


    // 最大允许工作者数量,不能小于2
    property MaxWorkers: Integer read FMaxWorkers write SetMaxWorkers;
    // 最小工作者数量,不能小于2
    property MinWorkers: Integer read FMinWorkers write SetMinWorkers;
    // 大允许的长时间作业工作者数量,等价于允许开始的长时间作业数量
    property MaxLongtimeWorkers: Integer read FMaxLongtimeWorkers write SetMaxLongtimeWorkers;
    // 是否允许开始作业,如果为false,则投寄的作业都不会被执行,直到恢复为True
    // (Enabled为False时已经运行的作业将仍然运行,它只影响尚未执行的作来)
    property Enabled: Boolean read GetEnabled write SetEnabled;
    // 是否正在释放TQWorkers对象自身
    property Terminating: Boolean read FTerminating;
    // 当前系统CPU数量
    property CPUNum: Integer read FCPUNum;
    // 繁忙的工作者数量
    property BusyWorkerCount: Integer read FBusyCount;
    // 当前工作者数量
    property WorkerCount: Integer read FWorkerCount;
    // 工作者错误回调通知事件
    property OnErrorNotify: TWorkerErrorNotify read FOnErrorNotify write FOnErrorNotify;
    // 用户指定的作业的Data对象释放方式
    property OnCustomFreeData: TCustomFreeDataEvent read FOnCustomFreeData write FOnCustomFreeData;
  end;

  {$IFDEF UNICODE}
  TJobItemList = TList<PJob>;
  {$ELSE}
  TJobItemList = TList;
  {$ENDIF}

  /// <summary>
  /// 作业组,放在一起顺序执行或乱序执行,可以使用 WaitFor 等待全部完成
  /// </summary>
  TJobGroup = class(TObject)
  private
    FOwner: TYXDWorkers;
    FLocker: TSimpleLock;
    FEvent: TEvent;           // 事件,用于等待作业完成
    FCount: Integer;
    FByOrder: Boolean;
    FWaitResult: TWaitResult;
    FAfterDone: TNotifyEvent; // 作业完成事件通知
    FTimeoutCheck: Boolean; // 是否检查作业超时
    FTag: Pointer;
  protected
    FItems: TJobItemList;     // 作业列表
    FPrepareCount: Integer;   // 准备计数
    procedure DoJobExecuted(AJob: PJob);
    procedure DoJobsTimeout(AJob: PJob);
    procedure DoAfterDone;
  public
    constructor Create(AByOrder: Boolean = False); overload;
    constructor Create(AOwner: TYXDWorkers; AByOrder: Boolean = False); overload;
    destructor Destroy; override;
    // 取消未完成的作业
    procedure Cancel;
    // 准备添加作业,实际增加内部计数器
    procedure Prepare;
    // 减少内部计数器,如果计数器减为0,则开始实际执行作业
    procedure Run(ATimeout: Cardinal = INFINITE);
    // 添加一个作业过程,如果准备内部计数器为0,则直接执行,否则只添加到列表
    function Add(AProc: TJobProc; AData: Pointer; AInMainThread: Boolean = False;
      AFreeType: TJobDataFreeType = jdfFreeByUser): Boolean;
    // 等待作业完成,ATimeout为最长等待时间
    function WaitFor(ATimeout: Cardinal = INFINITE): TWaitResult;
    // 等待作业完成,ATimeout为最长等待时间,不同的是MsgWaitFor不阻塞消息处理
    function MsgWaitFor(ATimeout: Cardinal = INFINITE): TWaitResult;
    // 未完成的作业数量
    property Count: Integer read FCount;
    // 全部作业执行完成时触发的回调事件
    property AfterDone: TNotifyEvent read FAfterDone write FAfterDone;
    // 是否是按顺序执行(即必需等待上一个作业完成后才执行下一个)
    property ByOrder: Boolean read FByOrder;
    property Tag: Pointer read FTag write FTag;
  end;

  /// <summary>
  /// 用于管理计划型任务,需要在指定的时间点触发
  /// </summary>
  TRepeatJobs = class(TJobBase)
  private
    FLocker: TCriticalSection;
    FFirstFireTime: Int64;
    function ClearJobs(AObject: Pointer; AProc: TJobProc; AData: Pointer; AMaxTimes: Integer): Integer;
    procedure AfterJobRun(AJob: PJob; AUsedTime: Int64);
  protected
    FItems: TRBTree;
    function InternalPush(AJob: PJob): Boolean; override;
    function InternalPop: PJob; override;
    function DoTimeCompare(P1, P2: Pointer): Integer;
    procedure DoJobDelete(ATree: TRBTree; ANode: PRBNode);
    function GetCount: Integer; override;
  public
    constructor Create(AOwner: TYXDWorkers); override;
    destructor Destroy; override;
    procedure Clear; override;
    function Clear(AObject: Pointer; AMaxTimes: Integer): Integer; overload; override;
    function Clear(AProc: TJobProc; AData: Pointer; AMaxTimes: Integer): Integer; overload; override;
  end;

  /// <summary>
  /// 用于管理简单的异步调用,没有触发时间要求的作业
  /// </summary>
  TSimpleJobs = class(TJobBase)
  private
    FFirst, FLast: PJob;
    FCount: Integer;
    FLocker: TSimpleLock;
    function ClearJobs(AObject: Pointer; AProc: TJobProc; AData: Pointer; AMaxTimes: Integer): Integer;
  protected
    function InternalPush(AJob: PJob): Boolean; override;
    function InternalPop: PJob; override;
    function GetCount: Integer; override;
  public
    constructor Create(AOwner: TYXDWorkers); override;
    destructor Destroy; override;
    procedure Clear; overload; override;
    function Clear(AObject: Pointer; AMaxTimes: Integer): Integer; overload; override;
    function Clear(AProc: TJobProc; AData: Pointer; AMaxTimes: Integer): Integer; overload; override;
  end;

var
  Workers: TYXDWorkers = nil;  // 需要时初始化,也可以自己定义,允许多个

// 返回值的时间精度为1ms
function GetTimestamp: Int64;
// 获取CPU数量
function GetCPUCount: Integer;
{$IF RTLVersion<26}
// 为与D2007兼容, 原子操作函数
function AtomicCmpExchange(var Target: Integer; Value, Comparand: Integer): Integer; inline;
function AtomicExchange(var Target: Integer; Value: Integer): Integer; inline;
function AtomicIncrement(var Target: Integer): Integer; inline;
function AtomicDecrement(var Target: Integer): Integer; inline;
{$IFEND}
{$IFDEF WORKER_SIMPLE_LOCK}
// 原子操作函数
function AtomicAnd(var Dest: Integer; const AMask: Integer): Integer; inline;
function AtomicOr(var Dest: Integer; const AMask: Integer): Integer; inline;
{$ENDIF}
// 将全局的作业处理函数转换为TJobProc类型,以便正常调度使用
function MakeJobProc(const AProc: TJobProcG): TJobProc; inline;
// 设置线程运行的CPU
procedure SetThreadCPU(AHandle: THandle; ACpuNo: Integer); inline;

implementation

resourcestring
  SNotSupportNow = '当前尚未支持功能 %s';
  SNotInitWorkers = '当前尚未初始化有工作者管理对象 TYXDWorkers';
  STooFewWorkers = '指定的最小工作者数量太少(必需大于等于1)。';
  STooManyLongtimeWorker = '不能允许太多长时间作业线程(最多允许工作者一半)。';
  SBadWaitDoneParam = '未知的等待正在执行作业完成方式:%d';

{$IFNDEF UNICODE}
const
  wrIOCompletion = TWaitResult(4);
{$ENDIF}

{$IFDEF MSWINDOWS}
type
  TGetTickCount64 = function: Int64;
{$ENDIF MSWINDOWS}
type
  TJobPool = class
  protected
    FFirst: PJob;
    FCount: Integer;
    FSize: Integer;
    FLocker: TSimpleLock;
  public
    constructor Create(AMaxSize: Integer);
    destructor Destroy; override;
    procedure Push(AJob: PJob);
    function Pop: PJob;
    property Count: Integer read FCount;
    property Size: Integer read FSize write FSize;
  end;

var
  JobPool: TJobPool;
  _CPUCount: Integer;
  {$IFDEF NEXTGEN}
  _Watch: TStopWatch;
  {$ELSE}
  GetTickCount64: TGetTickCount64;
  _PerfFreq: Int64;
  {$ENDIF}

function GetTimestamp: Int64;
begin
  {$IFDEF NEXTGEN}
  Result := _Watch.Elapsed.Ticks div 10000;
  {$ELSE}
  if _PerfFreq > 0 then begin
    QueryPerformanceCounter(Result);
    Result := Result * 1000 div _PerfFreq;
  end else if Assigned(GetTickCount64) then
    Result := GetTickCount64
  else
    Result := GetTickCount;
  {$ENDIF}
end;

function GetCPUCount: Integer;
{$IFDEF MSWINDOWS}
var
  si: SYSTEM_INFO;
{$ENDIF}
begin
  if _CPUCount = 0 then begin
  {$IFDEF MSWINDOWS}
    GetSystemInfo(si);
    Result := si.dwNumberOfProcessors;
  {$ELSE}// Linux,MacOS,iOS,Andriod{POSIX}
  {$IFDEF POSIX}
    Result := sysconf(_SC_NPROCESSORS_ONLN);
  {$ELSE}// 不认识的操作系统,CPU数默认为1
    Result := 1;
  {$ENDIF !POSIX}
  {$ENDIF !MSWINDOWS}
  end else
    Result := _CPUCount;
end;

function MakeJobProc(const AProc: TJobProcG): TJobProc;
begin
  TMethod(Result).Data := nil;
  TMethod(Result).Code := @AProc;
end;

function SameWorkerProc(const P1, P2: TJobProc): Boolean; inline;
begin
  Result := (TMethod(P1).Code = TMethod(P2).Code) and
    (TMethod(P1).Data = TMethod(P2).Data);
end;

procedure SetThreadCPU(AHandle: THandle; ACpuNo: Integer);
begin
  {$IFDEF MSWINDOWS}
  SetThreadIdealProcessor(AHandle, ACpuNo);
  {$ELSE}
  // Linux/Andriod/iOS暂时忽略,XE6未引入sched_setaffinity定义
  {$ENDIF}
end;

// 兼容2007版的原子操作接口
{$IF RTLVersion<26}
function AtomicCmpExchange(var Target: Integer; Value: Integer; Comparand: Integer): Integer; inline;
begin
  Result := InterlockedCompareExchange(Target, Value, Comparand);
end;

function AtomicIncrement(var Target: Integer): Integer; inline;
begin
  Result := InterlockedIncrement(Target);
end;

function AtomicDecrement(var Target: Integer): Integer; inline;
begin
  Result := InterlockedDecrement(Target);
end;

function AtomicExchange(var Target: Integer; Value: Integer): Integer;
begin
  Result := InterlockedExchange(Target, Value);
end;
{$IFEND <XE5}

{$IFDEF WORKER_SIMPLE_LOCK}
// 位与,返回原值
function AtomicAnd(var Dest: Integer; const AMask: Integer): Integer; inline;
var
  i: Integer;
begin
  repeat
    Result := Dest;
    i := Result and AMask;
  until AtomicCmpExchange(Dest, i, Result) = Result;
end;

// 位或,返回原值
function AtomicOr(var Dest: Integer; const AMask: Integer): Integer; inline;
var
  i: Integer;
begin
  repeat
    Result := Dest;
    i := Result or AMask;
  until AtomicCmpExchange(Dest, i, Result) = Result;
end;
{$ENDIF}

{ TJobPool }

constructor TJobPool.Create(AMaxSize: Integer);
begin
  FCount := 0;
  FSize := AMaxSize;
  FLocker := TSimpleLock.Create;
end;

destructor TJobPool.Destroy;
var
  AJob: PJob;
begin
  FLocker.Enter;
  try
    while FFirst <> nil do begin
      AJob := FFirst.Next;
      Dispose(FFirst);
      FFirst := AJob;
    end;
  finally
    FLocker.Free;
  end;
  inherited;
end;

function TJobPool.Pop: PJob;
begin
  FLocker.Enter;
  Result := FFirst;
  if Result <> nil then begin
    FFirst := Result.Next;
    Dec(FCount);
  end;
  FLocker.Leave;
  if Result = nil then
    GetMem(Result, SizeOf(TJob));
  Result.Reset;
end;

procedure TJobPool.Push(AJob: PJob);
var
  ADoFree: Boolean;
begin
  {$IFDEF UNICODE}
  if AJob.IsAnonWorkerProc then
    AJob.WorkerProcA := nil;
  {$ENDIF}
  FLocker.Enter;
  ADoFree := (FCount = FSize);
  if not ADoFree then begin
    AJob.Next := FFirst;
    FFirst := AJob;
    Inc(FCount);
  end;
  FLocker.Leave;
  if ADoFree then
    FreeMem(AJob);
end;

{ TJobBase }

procedure TJobBase.Clear;
var
  AItem: PJob;
begin
  while True do begin
    AItem := Pop;
    if AItem <> nil then
      FOwner.FreeJob(AItem)
    else
      Break;
  end;
end;

constructor TJobBase.Create(AOwner: TYXDWorkers);
begin
  FOwner := AOwner;
end;

destructor TJobBase.Destroy;
begin
  Clear;
  inherited;
end;

function TJobBase.GetEmpty: Boolean;
begin
  Result := (Count = 0);
end;

function TJobBase.Pop: PJob;
begin
  Result := InternalPop;
end;

function TJobBase.Push(AJob: PJob): Boolean;
begin
  AJob.Owner := Self;
  AJob.PushTime := GetTimestamp;
  Result := InternalPush(AJob);
  if not Result then begin
    AJob.Next := nil;
    FOwner.FreeJob(AJob);
  end;
end;

{ TJob }

procedure TJob.AfterRun(AUsedTime: Int64);
begin
  Inc(Runs);
  if AUsedTime > 0 then begin
    Inc(TotalUsedTime, AUsedTime);
    if MinUsedTime = 0 then
      MinUsedTime := AUsedTime
    else if MinUsedTime > AUsedTime then
      MinUsedTime := AUsedTime;
    if MaxUsedTime = 0 then
      MaxUsedTime := AUsedTime
    else if MaxUsedTime < AUsedTime then
      MaxUsedTime := AUsedTime;
  end;
end;

procedure TJob.Assign(const ASource: PJob);
begin
  Self := ASource^;
  // 下面三个成员不拷贝
  Worker := nil;
  Next := nil;
  Source := nil;
end;

function TJob.GetAvgTime: Integer;
begin
  if Runs > 0 then
    Result := TotalUsedTime div Runs
  else
    Result := 0;
end;

function TJob.GetElapseTime: Int64;
begin
  Result := GetTimestamp - StartTime;
end;

function TJob.GetIsTerminated: Boolean;
begin
  if Assigned(Worker) and Assigned(Worker.FOwner) then
    Result := Worker.FOwner.Terminating or Worker.Terminated or
      ((Flags and JOB_TERMINATED) <> 0) or (Worker.FTerminatingJob = @Self)
  else
    Result := (Flags and JOB_TERMINATED) <> 0;
end;

function TJob.GetValue(Index: Integer): Boolean;
begin
  Result := (Flags and Index) <> 0;
end;

procedure TJob.Create(AProc: TJobProc);
begin
  WorkerProc := AProc;
  SetValue(JOB_RUN_ONCE, True);
end;

procedure TJob.Reset;
begin
  FillChar(Self, SizeOf(TJob), 0);
end;

procedure TJob.SetIsTerminated(const Value: Boolean);
begin
  SetValue(JOB_TERMINATED, Value);
end;

procedure TJob.SetValue(Index: Integer; const Value: Boolean);
begin
  if Value then
    Flags := (Flags or Index)
  else
    Flags := (Flags and (not Index));
end;

procedure TJob.UpdateNextTime;
begin
  if (Runs = 0) and (FirstDelay <> 0) then
    NextTime := PushTime + FirstDelay
  else if Interval <> 0 then begin
    if NextTime = 0 then
      NextTime := GetTimestamp + Interval
    else
      Inc(NextTime, Interval);
  end else
    NextTime := GetTimestamp;
end;

{ TSimpleLock }

{$IFDEF WORKER_SIMPLE_LOCK}
constructor TSimpleLock.Create;
begin
  inherited;
  FFlags := 0;
end;

procedure TSimpleLock.Enter;
begin
  while (AtomicOr(FFlags, $01) and $01) <> 0 do begin
  {$IFDEF MSWINDOWS}
    SwitchToThread;
  {$ELSE}
    TThread.Yield;
  {$ENDIF}
  end;
end;

procedure TSimpleLock.Leave;
begin
  AtomicAnd(FFlags, Integer($FFFFFFFE));
end;
{$ENDIF}

{ TSimpleJobs }

function TSimpleJobs.ClearJobs(AObject: Pointer; AProc: TJobProc;
  AData: Pointer; AMaxTimes: Integer): Integer;
var
  AFirst, AJob, APrior, ANext: PJob;
  ACount: Integer;
  b: Boolean;
begin
  FLocker.Enter;     // 先将所有的异步作业清空,以防止被弹出执行
  AJob := FFirst;
  ACount := FCount;
  FFirst := nil;
  FLast := nil;
  FCount := 0;
  FLocker.Leave;

  Result := 0;
  APrior := nil;
  AFirst := nil;
  while (AJob <> nil) and (AMaxTimes <> 0) do begin
    ANext := AJob.Next;
    if AObject <> nil then
      b := TMethod(AJob.WorkerProc).Data = AObject
    else
      b := SameWorkerProc(AJob.WorkerProc, AProc) and (AJob.Data = AData);
    if b then begin
      if APrior <> nil then
        APrior.Next := ANext;
      FOwner.FreeJob(AJob);
      Dec(AMaxTimes);
      Inc(Result);
      Dec(ACount);
    end else begin
      if AFirst = nil then
        AFirst := AJob;
      APrior := AJob;
    end;
    AJob := ANext;
  end;
  if ACount > 0 then begin
    FLocker.Enter;
    AFirst.Next := FFirst;
    FFirst := AFirst;
    Inc(FCount, ACount);
    if FLast = nil then
      FLast := APrior;
    FLocker.Leave;
  end;
end;

procedure TSimpleJobs.Clear;
var
  AFirst: PJob;
begin
  FLocker.Enter;
  AFirst := FFirst;
  FFirst := nil;
  FLast := nil;
  FCount := 0;
  FLocker.Leave;
  FOwner.FreeJob(AFirst);
end;

function TSimpleJobs.Clear(AProc: TJobProc; AData: Pointer; AMaxTimes: Integer): Integer;
begin
  Result := ClearJobs(nil, AProc, AData, AMaxTimes);
end;

function TSimpleJobs.Clear(AObject: Pointer; AMaxTimes: Integer): Integer;
begin
  Result := ClearJobs(AObject, nil, nil, AMaxTimes);
end;

constructor TSimpleJobs.Create(AOwner: TYXDWorkers);
begin
  inherited Create(AOwner);
  FLocker := TSimpleLock.Create;
end;

destructor TSimpleJobs.Destroy;
begin
  inherited;
  FLocker.Free;
end;

function TSimpleJobs.GetCount: Integer;
begin
  Result := FCount;
end;

function TSimpleJobs.InternalPop: PJob;
begin
  FLocker.Enter;
  Result := FFirst;
  if Result <> nil then begin
    FFirst := Result.Next;
    if FFirst = nil then
      FLast := nil;
    Dec(FCount);
  end;
  FLocker.Leave;
  if Result <> nil then begin
    Result.PopTime := GetTimestamp;
    Result.Next := nil;
  end;
end;

function TSimpleJobs.InternalPush(AJob: PJob): Boolean;
begin
  FLocker.Enter;
  if FLast = nil then
    FFirst := AJob
  else
    FLast.Next := AJob;
  FLast := AJob;
  Inc(FCount);
  FLocker.Leave;
  Result := true;
end;

{ TRepeatJobs }

procedure TRepeatJobs.AfterJobRun(AJob: PJob; AUsedTime: Int64);
var
  ANode: PRBNode;

  function UpdateSource: Boolean;
  var
    ATemp, APrior: PJob;
  begin
    Result := False;
    ATemp := ANode.Data;
    APrior := nil;
    while ATemp <> nil do begin
      if ATemp = AJob.Source then begin
        if AJob.IsTerminated then begin
          if APrior <> nil then
            APrior.Next := ATemp.Next
          else
            ANode.Data := ATemp.Next;
          ATemp.Next := nil;
          FOwner.FreeJob(ATemp);
          if ANode.Data = nil then
            FItems.Delete(ANode);
        end else
          ATemp.AfterRun(AUsedTime);
        Result := True;
        Break;
      end;
      APrior := ATemp;
      ATemp := ATemp.Next;
    end;
  end;

begin
  FLocker.Enter;
  try
    ANode := FItems.Find(AJob);
    if ANode <> nil then begin
      if UpdateSource then
        Exit;
    end;
    ANode := FItems.First;
    while ANode <> nil do begin
      if UpdateSource then
        Break;
      ANode := ANode.Next;
    end;
  finally
    FLocker.Leave;
  end;
end;

function TRepeatJobs.ClearJobs(AObject: Pointer; AProc: TJobProc;
  AData: Pointer; AMaxTimes: Integer): Integer;
var
  ANode, ANext: PRBNode;
  APriorJob, AJob, ANextJob: PJob;
  ACanDelete, B: Boolean;
begin
  Result := 0;   // 现在清空重复的计划作业
  FLocker.Enter;
  try
    ANode := FItems.First;
    while (ANode <> nil) and (AMaxTimes <> 0) do begin
      ANext := ANode.Next;
      AJob := ANode.Data;
      ACanDelete := True;
      APriorJob := nil;
      while AJob <> nil do begin
        ANextJob := AJob.Next;
        if AObject <> nil then
          B := TMethod(AJob.WorkerProc).Data = AObject
        else
          B := SameWorkerProc(AJob.WorkerProc, AProc) and ((AData = Pointer(-1)) or (AData = AJob.Data));
        if B then begin
          if ANode.Data = AJob then
            ANode.Data := AJob.Next;
          if Assigned(APriorJob) then
            APriorJob.Next := AJob.Next;
          AJob.Next := nil;
          FOwner.FreeJob(AJob);
          Dec(AMaxTimes);
          Inc(Result);
        end else begin
          ACanDelete := False;
          APriorJob := AJob;
        end;
        AJob := ANextJob;
      end;
      if ACanDelete then
        FItems.Delete(ANode);
      ANode := ANext;
    end;
    if FItems.Count > 0 then
      FFirstFireTime := PJob(FItems.First.Data).NextTime
    else
      FFirstFireTime := 0;
  finally
    FLocker.Leave;
  end;
end;

function TRepeatJobs.Clear(AProc: TJobProc; AData: Pointer; AMaxTimes: Integer): Integer;
begin
  Result := ClearJobs(nil, AProc, AData, AMaxTimes);
end;

function TRepeatJobs.Clear(AObject: Pointer; AMaxTimes: Integer): Integer;
begin
  Result := ClearJobs(AObject, nil, nil, AMaxTimes);
end;

procedure TRepeatJobs.Clear;
begin
  FLocker.Enter;
  try
    FItems.Clear;
  finally
    FLocker.Leave;
  end;
end;

constructor TRepeatJobs.Create(AOwner: TYXDWorkers);
begin
  inherited Create(AOwner);
  FLocker := TCriticalSection.Create;
  FItems := TRBTree.Create(DoTimeCompare);
  FItems.OnDelete := DoJobDelete;
end;

destructor TRepeatJobs.Destroy;
begin
  FLocker.Enter;
  try
    FItems.Free;
  finally
    inherited;
    FLocker.Leave;
    FLocker.Free;
  end;
end;

procedure TRepeatJobs.DoJobDelete(ATree: TRBTree; ANode: PRBNode);
begin
  FOwner.FreeJob(ANode.Data);
end;

function TRepeatJobs.DoTimeCompare(P1, P2: Pointer): Integer;
begin
  Result := PJob(P1).NextTime - PJob(P2).NextTime;
end;

function TRepeatJobs.GetCount: Integer;
begin
  Result := FItems.Count;
end;

function TRepeatJobs.InternalPop: PJob;
var
  ANode: PRBNode;
  ATick: Int64;
  AJob: PJob;
begin
  Result := nil;
  ATick := GetTimestamp;
  FLocker.Enter;
  try
    if FItems.Count > 0 then begin
      ANode := FItems.First;
      if PJob(ANode.Data).NextTime <= ATick then begin
        AJob := ANode.Data;
        if AJob.Next <> nil then // 如果没有更多需要执行的作业,则删除结点,否则指向下一个
          ANode.Data := AJob.Next
        else begin
          ANode.Data := nil;
          FItems.Delete(ANode);
          ANode := FItems.First;
          if ANode <> nil then
            FFirstFireTime := PJob(ANode.Data).NextTime
          else // 没有计划作业了,不需要了
            FFirstFireTime := 0;
        end;
        if AJob.Runonce then
          Result := AJob
        else begin
          Inc(AJob.NextTime, AJob.Interval);
          Result := JobPool.Pop;
          Result.Assign(AJob);
          Result.Source := AJob;
          // 重新插入作业
          ANode := FItems.Find(AJob);
          if ANode = nil then begin
            FItems.Insert(AJob);
            FFirstFireTime := PJob(FItems.First.Data).NextTime;
          end else begin// 如果已经存在同一时刻的作业,则自己挂接到其它作业头部
            AJob.Next := PJob(ANode.Data);
            ANode.Data := AJob; // 首个作业改为自己
          end;
        end;
      end;
    end;
  finally
    FLocker.Leave;
  end;
  if Result <> nil then begin
    Result.PopTime := ATick;
    Result.Next := nil;
  end;
end;

function TRepeatJobs.InternalPush(AJob: PJob): Boolean;
var
  ANode: PRBNode;
begin
  AJob.UpdateNextTime;  // 计算作业的下次执行时间
  FLocker.Enter;
  try
    ANode := FItems.Find(AJob);
    if ANode = nil then begin
      FItems.Insert(AJob);
      FFirstFireTime := PJob(FItems.First.Data).NextTime;
    end else begin // 如果已经存在同一时刻的作业,则自己挂接到其它作业头部
      AJob.Next := PJob(ANode.Data);
      ANode.Data := AJob; // 首个作业改为自己
    end;
  finally
    FLocker.Leave;
  end;
  Result := True;
end;

{ TYXDWorker }

procedure TYXDWorker.ComNeeded(AInitFlags: Cardinal);
begin
  {$IFDEF MSWINDOWS}
  if not ComInitialized then begin
    if AInitFlags = 0 then
      CoInitialize(nil)
    else
      CoInitializeEx(nil, AInitFlags);
    SetValue(WORKER_COM_INITED, True);
  end;
  {$ENDIF MSWINDOWS}
end;

constructor TYXDWorker.Create(AOwner: TYXDWorkers);
begin
  inherited Create(True);
  FOwner := AOwner;
  FTimeout := 1000;
  FFlags := WORKER_ISBUSY; // 默认为忙碌
  AtomicIncrement(AOwner.FBusyCount);
  FEvent := TEvent.Create(nil, False, False, '');
  FreeOnTerminate := True;
end;

destructor TYXDWorker.Destroy;
begin
  FreeAndNil(FEvent);
  inherited;
end;

procedure TYXDWorker.DoJob(AJob: PJob);
begin
  {$IFDEF UNICODE}
  if AJob.IsAnonWorkerProc then
    AJob.WorkerProcA(AJob)
  else
  {$ENDIF}
  AJob.WorkerProc(AJob);
end;

procedure TYXDWorker.Execute;
var
  wr: TWaitResult;
  {$IFDEF MSWINDOWS}
  SyncEvent: TEvent;
  {$ENDIF}
begin
  {$IFDEF MSWINDOWS}
  SyncEvent := TEvent.Create(nil, False, False, '');
  {$ENDIF}
  try
    while not(Terminated or FOwner.FTerminating) do begin
      if FOwner.Enabled then begin
        if (FOwner.FSimpleJobs.FFirst <> nil) then begin
          {$IFDEF MSWINDOWS}SwitchToThread; {$ELSE} TThread.Yield;{$ENDIF}
          FTimeout := 0;
        end else if (FOwner.FRepeatJobs.FFirstFireTime <> 0) then begin
          FTimeout := FOwner.FRepeatJobs.FFirstFireTime - GetTimestamp;
          if FTimeout < 0 then // 时间已经到了?那么立刻执行
            FTimeout := 0;
        end else
          FTimeout := WAITJOB_TIMEOUT;
      end else
        FTimeout := WAITJOB_TIMEOUT; // 如果仍没有作业进入,则除非自己是保留的线程对象,否则释放工作者

      if FTimeout <> 0 then begin
        wr := FEvent.WaitFor(FTimeout);
        if Terminated or FOwner.FTerminating then
          Break;
      end else
        wr := wrSignaled;

      if (wr = wrSignaled) or ((FOwner.FRepeatJobs.FFirstFireTime <> 0) and
        (GetTimestamp >= FOwner.FRepeatJobs.FFirstFireTime - 1)) then
      begin
        if FOwner.FTerminating then
          Break;

        if IsIdle then begin
          SetValue(WORKER_ISBUSY or WORKER_LOOKUP, true);
          AtomicIncrement(FOwner.FBusyCount);
        end else
          SetValue(WORKER_LOOKUP, true);

        repeat
          FActiveJob := FOwner.Popup;
          if FActiveJob <> nil then begin
            FActiveJob.Worker := Self;
            FActiveJobProc := FActiveJob.WorkerProc;

            // 为Clear(AObject)准备判断,以避免FActiveJob线程不安全
            FActiveJobData := FActiveJob.Data;
            if FActiveJob.IsSignalWakeup then
              FActiveJobSource := FActiveJob.Source
            else
              FActiveJobSource := nil;

            if FActiveJob.IsGrouped then
              FActiveJobGroup := FActiveJob.Group
            else
              FActiveJobGroup := nil;

            FActiveJobFlags := FActiveJob.Flags;
            if FActiveJob.StartTime = 0 then begin
              FActiveJob.StartTime := GetTimestamp;
              FActiveJob.FirstTime := FActiveJob.StartTime;
            end else
              FActiveJob.StartTime := GetTimestamp;

            try
              FFlags := (FFlags or WORKER_EXECUTING) and (not WORKER_LOOKUP);
              if FActiveJob.InMainThread then
              {$IFDEF MSWINDOWS}
              begin
                if PostMessage(FOwner.FMainWorker, WM_APP, WPARAM(FActiveJob), LPARAM(SyncEvent)) then
                  SyncEvent.WaitFor(INFINITE);
              end
              {$ELSE}
                Synchronize(Self, FireInMainThread)
              {$ENDIF}
              else
                DoJob(FActiveJob);
            except
              if Assigned(FOwner.FOnErrorNotify) then
                FOwner.FOnErrorNotify(FActiveJob, Exception(ExceptObject), 'TYXDWorker.Execute');
            end;

            if not FActiveJob.Runonce then
              FOwner.FRepeatJobs.AfterJobRun(FActiveJob, GetTimestamp - FActiveJob.StartTime)
            else begin
              if FActiveJob.IsSignalWakeup then
                FOwner.SignalWorkDone(FActiveJob, GetTimestamp - FActiveJob.StartTime)
              else if FActiveJob.IsLongtimeJob then
                AtomicDecrement(FOwner.FLongTimeWorkers)
              else if FActiveJob.IsGrouped then
                FActiveJobGroup.DoJobExecuted(FActiveJob);
              FActiveJob.Worker := nil;
            end;

            FOwner.FreeJob(FActiveJob);
            FActiveJobProc := nil;
            FActiveJobSource := nil;
            FActiveJobFlags := 0;
            FActiveJobGroup := nil;
            FTerminatingJob := nil;
            FFlags := FFlags and (not WORKER_EXECUTING);

          end else
            FFlags := FFlags and (not WORKER_LOOKUP);
        until (FActiveJob = nil) or FOwner.FTerminating or Terminated or (not FOwner.Enabled);

        SetValue(WORKER_ISBUSY, False);
        FOwner.WorkerIdle(Self, irNoJob);
      end else if (not IsReserved) and (FTimeout = WAITJOB_TIMEOUT) then begin
        SetValue(WORKER_ISBUSY, False);
        FOwner.WorkerIdle(Self, irTimeout);
      end;
    end;
  finally
    FOwner.WorkerTerminate(Self);
    {$IFDEF MSWINDOWS}
    if ComInitialized then
      CoUninitialize;
    FreeAndNil(SyncEvent);
    {$ENDIF}
  end;
end;

procedure TYXDWorker.FireInMainThread;
begin
  DoJob(FActiveJob);
end;

function TYXDWorker.GetIsIdle: Boolean;
begin
  Result := not IsBusy;
end;

function TYXDWorker.GetValue(Index: Integer): Boolean;
begin
  Result := (FFlags and Index) <> 0;
end;

procedure TYXDWorker.SetValue(Index: Integer; const Value: Boolean);
begin
  if Value then
    FFlags := (FFlags or Index)
  else
    FFlags := (FFlags and (not Index));
end;

{ TYXDWorkers }

function TYXDWorkers.Clear(const ASignalName: string): Integer;
begin
  Result := ClearWaitJobs(0, ASignalName);
end;

function TYXDWorkers.Clear(ASignalId: Integer): Integer;
begin
  Result := ClearWaitJobs(ASignalId, '');
end;

function TYXDWorkers.Clear(AProc: TJobProc; AData: Pointer; AMaxTimes: Integer): Integer;
begin
  Result := ClearJobs(nil, AProc, AData, AMaxTimes);
end;

function TYXDWorkers.Clear(AObject: Pointer; AMaxTimes: Integer): Integer;
begin
  Result := ClearJobs(AObject, nil, nil, AMaxTimes);
end;

procedure TYXDWorkers.Clear;
var
  i: Integer;
  AParam: TWorkerWaitParam;
  ASignal: PSignal;
begin
  DisableWorkers; // 避免工作者取得新的作业
  try
    FSimpleJobs.Clear;
    FRepeatJobs.Clear;
    FLocker.Enter;
    try
      for i := 0 to FSignalJobs.BucketCount - 1 do begin
        if Assigned(FSignalJobs.Buckets[i]) then begin
          ASignal := FSignalJobs.Buckets[i].Data;
          FreeJob(ASignal.First);
          ASignal.First := nil;
        end;
      end;
    finally
      FLocker.Leave;
    end;
    AParam.WaitType := $FF;
    WaitRunningDone(AParam);
  finally
    EnableWorkers;
  end;
end;

function TYXDWorkers.ClearJobs(AObject: Pointer; AProc: TJobProc;
  AData: Pointer; AMaxTimes: Integer): Integer;
var
  ACleared: Integer;
  AWaitParam: TWorkerWaitParam;

  function ClearSignalJobs(IsClearObject: Boolean): Integer;
  var
    i: Integer;
    AJob, ANext, APrior: PJob;
    AList: PHashList;
    ASignal: PSignal;
    B: Boolean;
  begin
    Result := 0;
    FLocker.Enter;
    try
      for i := 0 to FSignalJobs.BucketCount - 1 do begin
        AList := FSignalJobs.Buckets[i];
        if AList <> nil then begin
          ASignal := AList.Data;
          if ASignal.First <> nil then begin
            AJob := ASignal.First;
            APrior := nil;
            while (AJob <> nil) and (AMaxTimes <> 0) do begin
              ANext := AJob.Next;
              if IsClearObject then
                B := TMethod(AJob.WorkerProc).Data = AObject
              else
                B := SameWorkerProc(AJob.WorkerProc, AProc) and ((AData = Pointer(-1)) or (AJob.Data = AData));
              if B then begin
                if ASignal.First = AJob then
                  ASignal.First := ANext;
                if Assigned(APrior) then
                  APrior.Next := ANext;
                AJob.Next := nil;
                FreeJob(AJob);
                Inc(Result);
                Dec(AMaxTimes);
              end else
                APrior := AJob;
              AJob := ANext;
            end;
            if AMaxTimes = 0 then
              Break;
          end;
        end;
      end;
    finally
      FLocker.Leave;
    end;
  end;

begin
  Result := 0;
  if Self <> nil then begin
    ACleared := FSimpleJobs.ClearJobs(AObject, AProc, AData, AMaxTimes);
    Dec(AMaxTimes, ACleared);
    Inc(Result, ACleared);
    if AMaxTimes <> 0 then begin
      ACleared := FRepeatJobs.ClearJobs(AObject, AProc, AData, AMaxTimes);
      Dec(AMaxTimes, ACleared);
      Inc(Result, ACleared);
      if AMaxTimes <> 0 then begin
        ACleared := ClearSignalJobs(AObject <> nil);
        Inc(Result, ACleared);
        if AMaxTimes <> 0 then begin
          AWaitParam.WaitType := 1;
          AWaitParam.Data := AData;
          AWaitParam.WorkerProc := TMethod(AProc);
          WaitRunningDone(AWaitParam);
        end;
      end;
    end;
  end;
end;

function TYXDWorkers.ClearWaitJobs(ASignalId: Integer; const ASignalName: string): Integer;
var
  i: Integer;
  ASignal: PSignal;
  AJob: PJob;
  B: Boolean;
begin
  AJob := nil;
  FLocker.Enter;
  try
    for i := 0 to FSignalJobs.BucketCount - 1 do begin
      if FSignalJobs.Buckets[i] <> nil then begin
        ASignal := FSignalJobs.Buckets[i].Data;
        if ASignalId > 0 then
          B := ASignal.Id = ASignalId
        else
          B := ASignal.Name = ASignalName;
        if B then begin
          AJob := ASignal.First;
          ASignal.First := nil;
          Break;
        end;
      end;
    end;
  finally
    FLocker.Leave;
  end;
  if AJob <> nil then
    Result := ClearSignalJobs(AJob)
  else
    Result := 0;
end;

function TYXDWorkers.ClearSignalJobs(ASource: PJob): Integer;
var
  ACount: Integer;
  AFirst, ALast, APrior, ANext: PJob;
  AWaitParam: TWorkerWaitParam;
begin
  Result := 0;
  AFirst := nil;
  APrior := nil;
  FSimpleJobs.FLocker.Enter;
  try
    ALast := FSimpleJobs.FFirst;
    ACount := FSimpleJobs.Count;
    FSimpleJobs.FFirst := nil;
    FSimpleJobs.FLast := nil;
    FSimpleJobs.FCount := 0;
  finally
    FSimpleJobs.FLocker.Leave;
  end;

  while ALast <> nil do begin
    if (ALast.IsSignalWakeup) and (ALast.Source = ASource) then begin
      ANext := ALast.Next;
      ALast.Next := nil;
      FreeJob(ALast);
      ALast := ANext;
      if APrior <> nil then
        APrior.Next := ANext;
      Dec(ACount);
      Inc(Result);
    end else begin
      if AFirst = nil then
        AFirst := ALast;
      APrior := ALast;
      ALast := ALast.Next;
    end;
  end;

  if ACount > 0 then begin
    FSimpleJobs.FLocker.Enter;
    try
      APrior.Next := FSimpleJobs.FFirst;
      FSimpleJobs.FFirst := AFirst;
      Inc(FSimpleJobs.FCount, ACount);
      if FSimpleJobs.FLast = nil then
        FSimpleJobs.FLast := APrior;
    finally
      FSimpleJobs.FLocker.Leave;
    end;
  end;
  AWaitParam.WaitType := 2;
  AWaitParam.SourceJob := ASource;
  WaitRunningDone(AWaitParam);
  FreeJob(ASource);
end;

procedure TYXDWorkers.ClearWorkers;
var
  i: Integer;

  {$IFDEF MSWINDOWS}
  function WorkerExists: Boolean;
  var
    J: Integer;
    ACode: Cardinal;
  begin
    Result := False;
    FLocker.Enter;
    try
      while FWorkerCount > 0 do begin
        if GetExitCodeThread(FWorkers[0].Handle, ACode) then begin
          if ACode = STILL_ACTIVE then begin
            Result := True;
            Break;
          end;
        end;
        // 工作者已经不存在,可能被外部线程结束
        FreeAndNil(FWorkers[0]);
        if FWorkerCount > 0 then begin
          for J := 1 to FWorkerCount - 1 do
            FWorkers[J - 1] := FWorkers[J];
          Dec(FWorkerCount);
          FWorkers[FWorkerCount] := nil;
        end;
      end;
    finally
      FLocker.Leave;
    end;
  end;
  {$ENDIF}
begin
  FTerminating := True;
  FLocker.Enter;
  try
    FRepeatJobs.FFirstFireTime := 0;
    for i := 0 to FWorkerCount - 1 do
      FWorkers[i].FEvent.SetEvent;
  finally
    FLocker.Leave;
  end;
  while (FWorkerCount > 0) {$IFDEF MSWINDOWS} and WorkerExists {$ENDIF} do
    {$IFDEF MSWINDOWS}SwitchToThread; {$ELSE} TThread.Yield;{$ENDIF}
end;

constructor TYXDWorkers.Create(AMinWorkers: Integer);
var
  i: Integer;
begin
  FBusyCount := 0;
  FSimpleJobs := TSimpleJobs.Create(Self);
  FRepeatJobs := TRepeatJobs.Create(Self);
  FSignalJobs := TYXDHashTable.Create(1361);
  FSignalJobs.OnDelete := DoJobFree;
  FSignalJobs.AutoSize := True;
  FLocker := TCriticalSection.Create;

  FCPUNum := GetCPUCount;
  if AMinWorkers < 1 then
    FMinWorkers := 2
  else
    FMinWorkers := AMinWorkers; // 最少工作者为2个
  FMaxWorkers := (FCPUNum shl 1) + 1;
  if FMaxWorkers <= FMinWorkers then
    FMaxWorkers := (FMinWorkers shl 1) + 1;
  FTerminating := False;

  // 创建默认工作者
  FDisableCount := 0;
  FMaxSignalId := 0;
  FWorkerCount := FMinWorkers;
  SetLength(FWorkers, FMaxWorkers);
  for i := 0 to FMinWorkers - 1 do begin
    FWorkers[i] := TYXDWorker.Create(Self);
    FWorkers[i].SetValue(WORKER_RESERVED, True); // 保留,不需要空闲检查
    FWorkers[i].Suspended := False;
    {$IFDEF MSWINDOWS}
    SetThreadCPU(FWorkers[i].Handle, i mod FCPUNum);
    {$ELSE}
    SetThreadCPU(FWorkers[i].ThreadID, i mod FCPUNum);
    {$ENDIF}
  end;
  FMaxLongtimeWorkers := (FMaxWorkers shr 1);
  {$IFDEF MSWINDOWS}
  FMainWorker := AllocateHWnd(DoMainThreadWork);
  {$ENDIF}
end;

destructor TYXDWorkers.Destroy;
begin
  ClearWorkers;
  FLocker.Enter;
  try
    FreeAndNil(FSimpleJobs);
    FreeAndNil(FRepeatJobs);
    FreeAndNil(FSignalJobs);
  finally
    FLocker.Leave;
    FLocker.Free;
  end;
  {$IFDEF MSWINDOWS}
  DeallocateHWnd(FMainWorker);
  {$ENDIF}
  inherited;
end;

procedure TYXDWorkers.DisableWorkers;
begin
  AtomicIncrement(FDisableCount);
end;

procedure TYXDWorkers.DoCustomFreeData(AFreeType: TJobDataFreeType;
  var AData: Pointer);
begin
  if Assigned(FOnCustomFreeData) then
    FOnCustomFreeData(Self, AFreeType, AData);
end;

procedure TYXDWorkers.DoJobFree(ATable: TObject; AHash: Cardinal; AData: Pointer);
var
  ASignal: PSignal;
begin
  ASignal := AData;
  if ASignal.First <> nil then
    FreeJob(ASignal.First);
  Dispose(ASignal);
end;

{$IFDEF MSWINDOWS}
procedure TYXDWorkers.DoMainThreadWork(var AMsg: TMessage);
var
  AJob: PJob;
begin
  if AMsg.Msg = WM_APP then begin
    try
      AJob := PJob(AMsg.WPARAM);
      AJob.Worker.DoJob(AJob);
    finally
      if AMsg.LPARAM <> 0 then
        TEvent(AMsg.LPARAM).SetEvent;
    end;
  end else
    AMsg.Result := DefWindowProc(FMainWorker, AMsg.Msg, AMsg.WPARAM, AMsg.LPARAM);
end;
{$ENDIF}

procedure TYXDWorkers.EnableWorkers;
var
  ANeedCount: Integer;
begin
  if AtomicDecrement(FDisableCount) = 0 then begin
    if (FSimpleJobs.Count > 0) or (FRepeatJobs.Count > 0) then begin
      ANeedCount := FSimpleJobs.Count + FRepeatJobs.Count;
      while ANeedCount > 0 do begin
        if not LookupIdleWorker then
          Break;
        Dec(ANeedCount);
      end;
    end;
  end;
end;

procedure InitJobFreeType(AOwner: TYXDWorkers; AJob: PJob; AData: Pointer; AFreeType: TJobDataFreeType); inline;
begin
  if AData <> nil then begin
    case AFreeType of
      jdfFreeAsObject:
        AJob.IsObjectOwner := True;
      jdfFreeAsRecord:
        AJob.IsRecordOwner := True;
      jdfFreeAsInterface:
        begin
          AJob.IsInterfaceOwner := True;
          IUnknown(AData)._AddRef;
        end;
    end;
  end else begin
    if AFreeType <> jdfFreeByUser then
      AOwner.DoCustomFreeData(AFreeType, AData);
  end;
end;

procedure TYXDWorkers.FireSignalJob(ASignal: PSignal; AData: Pointer;
  AFreeType: TJobDataFreeType);
var
  AJob, ACopy: PJob;
  ACount: PInteger;
begin
  Inc(ASignal.Fired);
  if AData <> nil then begin
    New(ACount);
    ACount^ := 1; //初始值
  end else
    ACount := nil;
  AJob := ASignal.First;
  while AJob <> nil do begin
    ACopy := JobPool.Pop;
    ACopy.Assign(AJob);
    ACopy.SetValue(JOB_RUN_ONCE, True);
    ACopy.Source := AJob;
    ACopy.Data := AData;
    InitJobFreeType(Self, ACopy, AData, AFreeType);
    if ACount <> nil then begin
      AtomicIncrement(ACount^);
      ACopy.RefCount := ACount;
    end;
    FSimpleJobs.Push(ACopy);
    AJob := AJob.Next;
  end;
  if AData <> nil then begin
    if AtomicDecrement(ACount^) = 0 then begin
      Dispose(ACount);
      FreeJobData(AData, AFreeType);
    end;
  end;
end;

procedure TYXDWorkers.FreeJob(AJob: PJob);
var
  ANext: PJob;
  AFreeData: Boolean;
begin
  while AJob <> nil do begin
    ANext := AJob.Next;
    if AJob.Data <> nil then begin
      if AJob.IsSignalWakeup then begin
        AFreeData := AtomicDecrement(AJob.RefCount^) = 0;
        if AFreeData then
          Dispose(AJob.RefCount);
      end else
        AFreeData := AJob.IsDataOwner;
      if AFreeData then begin
        if AJob.IsObjectOwner then begin
          FreeJobData(AJob.Data, jdfFreeAsObject);
          AJob.Data := nil;
        end else if AJob.IsRecordOwner then begin
          FreeJobData(AJob.Data, jdfFreeAsRecord);
          AJob.Data := nil;
        end else if AJob.IsInterfaceOwner then begin
          FreeJobData(AJob.Data, jdfFreeAsInterface);
          AJob.Data := nil;
        end;
      end;
    end;
    JobPool.Push(AJob);
    AJob := ANext;
  end;
end;

procedure TYXDWorkers.FreeJobData(AData: Pointer; AFreeType: TJobDataFreeType);
begin
  case AFreeType of
    jdfFreeAsObject:
      try
        FreeAndNil(TObject(AData));
      except
        if Assigned(FOnErrorNotify) then
          FOnErrorNotify(nil, Exception(ExceptObject), 'Workers.FreeJobData');
      end;
    jdfFreeAsRecord:
      try
        Dispose(AData);
      except
        if Assigned(FOnErrorNotify) then
          FOnErrorNotify(nil, Exception(ExceptObject), 'Workers.FreeJobData');
      end;
    jdfFreeAsInterface:
      try
        IUnknown(AData)._Release;
      except
        if Assigned(FOnErrorNotify) then
          FOnErrorNotify(nil, Exception(ExceptObject), 'Workers.FreeJobData');
      end;
  end;
end;

function TYXDWorkers.GetEnabled: Boolean;
begin
  Result := (FDisableCount = 0);
end;

class function TYXDWorkers.GetInstance: TYXDWorkers;
begin
  if not Assigned(Workers) then
    Workers := TYXDWorkers.Create();
  Result := Workers;
end;

class function TYXDWorkers.JobPoolCount: Integer;
begin
  Result := JobPool.Count;
end;

function TYXDWorkers.LookupIdleWorker: Boolean;
var
  i: Integer;
  AWorker: TYXDWorker;
begin
  Result := False;
  if (FDisableCount <> 0) or (FBusyCount = MaxWorkers) or FTerminating then
    Exit;
  FLocker.Enter;
  try
    if FBusyCount < FWorkerCount then begin
      for i := 0 to FWorkerCount - 1 do begin
        AWorker := FWorkers[i];
        if (AWorker <> nil) and (AWorker.IsIdle) then begin
          AWorker.Suspended := False;
          AWorker.SetValue(WORKER_ISBUSY, true);
          AtomicIncrement(FBusyCount);
          AWorker.FEvent.SetEvent;
          Result := true;
          Exit;
        end;
      end;
    end;
    if (not Result) and (FWorkerCount < MaxWorkers) then begin
      AWorker := TYXDWorker.Create(Self);
      {$IFDEF MSWINDOWS}
      SetThreadCPU(AWorker.Handle, FWorkerCount mod FCPUNum);
      {$ELSE}
      SetThreadCPU(AWorker.ThreadID, FWorkerCount mod GetCPUCount);
      {$ENDIF}
      AWorker.Suspended := False;
      AWorker.FEvent.SetEvent;
      FWorkers[FWorkerCount] := AWorker;
      Inc(FWorkerCount);
      Result := true;
    end;
  finally
    FLocker.Leave;
  end;
end;

function TYXDWorkers.Popup: PJob;
begin
  Result := FSimpleJobs.Pop;
  if Result = nil then
    Result := FRepeatJobs.Pop;
end;

procedure InitJob(AJob: PJob; AData: Pointer;
  ARunInMainThread: Boolean; const ADelay, AInterval: Int64); inline;
begin
  AJob.Data := AData;
  if AInterval > 0 then begin
    AJob.Interval := AInterval;
    AJob.SetValue(JOB_RUN_ONCE, False);
  end else
    AJob.SetValue(JOB_RUN_ONCE, True);
  AJob.FirstDelay := ADelay;
  AJob.SetValue(JOB_IN_MAINTHREAD, ARunInMainThread);
end;

function TYXDWorkers.Post(AJobProc: TJobProc; AData: Pointer;
  ARunInMainThread: Boolean; const ADelay, AInterval: Int64;
  AFreeType: TJobDataFreeType): Boolean;
var
  AJob: PJob;
begin
  AJob := JobPool.Pop;
  AJob.WorkerProc := AJobProc;
  InitJob(AJob, AData, ARunInMainThread, ADelay, AInterval);
  InitJobFreeType(Self, AJob, AData, AFreeType);
  Result := Post(AJob);
end;

function TYXDWorkers.Post(AJobProc: TJobProcG; AData: Pointer;
  ARunInMainThread: Boolean; const ADelay, AInterval: Int64;
  AFreeType: TJobDataFreeType): Boolean;
var
  AJob: PJob;
begin
  AJob := JobPool.Pop;
  AJob.WorkerProc := MakeJobProc(AJobProc);
  InitJob(AJob, AData, ARunInMainThread, ADelay, AInterval);
  InitJobFreeType(Self, AJob, AData, AFreeType);
  Result := Post(AJob);
end;

{$IFDEF UNICODE}
function TYXDWorkers.Post(AJobProc: TJobProcA; AData: Pointer;
  ARunInMainThread: Boolean; const ADelay, AInterval: Int64;
  AFreeType: TJobDataFreeType): Boolean;
var
  AJob: PJob;
begin
  AJob := JobPool.Pop;
  AJob.WorkerProcA := AJobProc;
  InitJob(AJob, AData, ARunInMainThread, ADelay, AInterval);
  InitJobFreeType(Self, AJob, AData, AFreeType);
  Result := Post(AJob);
end;
{$ENDIF}

function TimeToDelay(const ATime: TDateTime): Int64; inline;
var
  ANow, ATemp: TDateTime;
begin
  ANow := Now;
  ANow := ANow - Trunc(ANow); // ATime我们只要时间部分,日期忽略
  ATemp := ATime - Trunc(ATime);
  if ANow > ATemp then // 好吧,今天的点已经过了,算明天
    Result := Trunc(((1 + ANow) - ATemp) * WODay) // 延迟的时间,单位为1ms
  else
    Result := Trunc((ATemp - ANow) * WODay);
end;

function TYXDWorkers.Post(AJobProc: TJobProcG; const ATime: TDateTime;
  const AInterval: Int64; AData: Pointer; ARunInMainThread: Boolean;
  AFreeType: TJobDataFreeType): Boolean;
var
  AJob: PJob;
begin
  AJob := JobPool.Pop;
  AJob.WorkerProc := MakeJobProc(AJobProc);
  InitJob(AJob, AData, ARunInMainThread, TimeToDelay(ATime), AInterval);
  InitJobFreeType(Self, AJob, AData, AFreeType);
  Result := Post(AJob);
end;

function TYXDWorkers.Post(AJobProc: TJobProc; const ATime: TDateTime;
  const AInterval: Int64; AData: Pointer; ARunInMainThread: Boolean;
  AFreeType: TJobDataFreeType): Boolean;
var
  AJob: PJob;
begin
  AJob := JobPool.Pop;
  AJob.WorkerProc := AJobProc;
  InitJob(AJob, AData, ARunInMainThread, TimeToDelay(ATime), AInterval);
  InitJobFreeType(Self, AJob, AData, AFreeType);
  Result := Post(AJob);
end;

{$IFDEF UNICODE}
function TYXDWorkers.Post(AJobProc: TJobProcA; const ATime: TDateTime;
  const AInterval: Int64; AData: Pointer; ARunInMainThread: Boolean;
  AFreeType: TJobDataFreeType): Boolean;
var
  AJob: PJob;
begin
  AJob := JobPool.Pop;
  AJob.WorkerProcA := AJobProc;
  InitJob(AJob, AData, ARunInMainThread, TimeToDelay(ATime), AInterval);
  InitJobFreeType(Self, AJob, AData, AFreeType);
  Result := Post(AJob);
end;
{$ENDIF}

procedure InitLogJob(AJob: PJob; AData: Pointer); inline;
begin
  AJob.Data := AData;
  AJob.SetValue(JOB_LONGTIME, True);
  AJob.SetValue(JOB_RUN_ONCE, True); // 长作业只运行一次
end;

function TYXDWorkers.PostLongJob(AJobProc: TJobProc; AData: Pointer;
  AFreeType: TJobDataFreeType): Boolean;
var
  AJob: PJob;
begin
  if AtomicIncrement(FLongTimeWorkers) <= FMaxLongtimeWorkers then begin
    Result := True;
    AJob := JobPool.Pop;
    AJob.WorkerProc := AJobProc;
    InitLogJob(AJob, AData);
    InitJobFreeType(self, AJob, AData, AFreeType);
    Post(AJob);
  end else begin // 长期作业数已经达到极限
    AtomicDecrement(FLongTimeWorkers);
    Result := False;
  end;
end;

function TYXDWorkers.PostLongJob(AJobProc: TJobProcG; AData: Pointer;
  AFreeType: TJobDataFreeType): Boolean;
begin
  Result := PostLongJob(MakeJobProc(AJobProc), AData, AFreeType);
end;

{$IFDEF UNICODE}
function TYXDWorkers.PostLongJob(AJobProc: TJobProcA; AData: Pointer;
  AFreeType: TJobDataFreeType): Boolean;
var
  AJob: PJob;
begin
  if AtomicIncrement(FLongTimeWorkers) <= FMaxLongtimeWorkers then begin
    Result := True;
    AJob := JobPool.Pop;
    AJob.WorkerProcA := AJobProc;
    InitLogJob(AJob, AData);
    InitJobFreeType(self, AJob, AData, AFreeType);
    Post(AJob);
  end else begin // 长期作业数已经达到极限
    AtomicDecrement(FLongTimeWorkers);
    Result := False;
  end;
end;
{$ENDIF}

procedure InitWaitJob(AJob: PJob; ASignalId: Integer; ARunInMainThread: Boolean); inline;
begin
  AJob.Data := nil;
  AJob.SignalId := ASignalId;
  AJob.PushTime := GetTimestamp;
  AJob.SetValue(JOB_SIGNAL_WAKEUP, True);
  AJob.SetValue(JOB_IN_MAINTHREAD, ARunInMainThread);
end;

function TYXDWorkers.PostWaitJob(AJob: PJob; ASignalId: Integer): Boolean;
var
  ASignal: PSignal;
begin
  Result := False;
  FLocker.Enter;
  try
    ASignal := FSignalJobs.FindFirstData(ASignalId);
    if ASignal <> nil then begin
      AJob.Next := ASignal.First;
      ASignal.First := AJob;
      Result := True;
    end;
  finally
    FLocker.Leave;
    if not Result then
      JobPool.Push(AJob);
  end;
end;

function TYXDWorkers.PostWait(AJobProc: TJobProc; ASignalId: Integer;
  ARunInMainThread: Boolean): Boolean;
var
  AJob: PJob;
begin
  if not FTerminating then begin
    AJob := JobPool.Pop;
    AJob.WorkerProc := AJobProc;
    InitWaitJob(AJob, ASignalId, ARunInMainThread);
    Result := PostWaitJob(AJob, ASignalId);
  end else
    Result := False;
end;

function TYXDWorkers.PostWait(AJobProc: TJobProcG; ASignalId: Integer;
  ARunInMainThread: Boolean): Boolean;
begin
  Result := PostWait(MakeJobProc(AJobProc), ASignalId, ARunInMainThread);
end;

{$IFDEF UNICODE}
function TYXDWorkers.PostWait(AJobProc: TJobProcA; ASignalId: Integer;
  ARunInMainThread: Boolean): Boolean;
var
  AJob: PJob;
begin
  if not FTerminating then begin
    AJob := JobPool.Pop;
    AJob.WorkerProcA := AJobProc;
    InitWaitJob(AJob, ASignalId, ARunInMainThread);
    Result := PostWaitJob(AJob, ASignalId);
  end else
    Result := False;
end;
{$ENDIF}

function TYXDWorkers.Post(AJob: PJob): Boolean;
begin
  if (not FTerminating) and (Assigned(AJob.WorkerProc)
    {$IFDEF UNICODE} or Assigned(AJob.WorkerProcA){$ENDIF}) then
  begin
    if AJob.Runonce and (AJob.FirstDelay = 0) then
      Result := FSimpleJobs.Push(AJob)
    else
      Result := FRepeatJobs.Push(AJob);
    if Result then
      LookupIdleWorker;
  end else begin
    AJob.Next := nil;
    FreeJob(AJob);
    Result := False;
  end;
end;

procedure TYXDWorkers.SendSignal(AId: Integer; AData: Pointer;
  AFreeType: TJobDataFreeType);
var
  AFound: Boolean;
  ASignal: PSignal;
begin
  AFound := False;
  if FTerminating then Exit;
  FLocker.Enter;
  try
    ASignal := FSignalJobs.FindFirstData(AId);
    if ASignal <> nil then begin
      AFound := True;
      FireSignalJob(ASignal, AData, AFreeType);
    end;
  finally
    FLocker.Leave;
  end;
  if AFound then
    LookupIdleWorker;
end;

procedure TYXDWorkers.SendSignal(const AName: string; AData: Pointer;
  AFreeType: TJobDataFreeType);
var
  i: Integer;
  ASignal: PSignal;
  AFound: Boolean;
begin
  AFound := False;
  if Length(AName) = 0 then Exit;
  FLocker.Enter;
  try
    for i := 0 to FSignalJobs.BucketCount - 1 do begin
      if FSignalJobs.Buckets[i] <> nil then begin
        ASignal := FSignalJobs.Buckets[i].Data;
        if (Length(ASignal.Name) = Length(AName)) and (ASignal.Name = AName) then begin
          AFound := True;
          FireSignalJob(ASignal, AData, AFreeType);
          Break;
        end;
      end;
    end;
  finally
    FLocker.Leave;
  end;
  if AFound then
    LookupIdleWorker;
end;

procedure TYXDWorkers.SetEnabled(const Value: Boolean);
begin
  if Value then
    EnableWorkers
  else
    DisableWorkers;
end;

procedure TYXDWorkers.SetMaxLongtimeWorkers(const Value: Integer);
begin
  if FMaxLongtimeWorkers <> Value then begin
    if Value > (MaxWorkers shr 1) then // 长时间运行的作业不能大于最大工作线程的一半
      raise Exception.Create(STooManyLongtimeWorker);
    FMaxLongtimeWorkers := Value;
  end;
end;

procedure TYXDWorkers.SetMaxWorkers(const Value: Integer);
var
  ATemp, AMaxLong: Integer;
begin
  if (Value >= 2) and (FMaxWorkers <> Value) then begin
    AtomicExchange(ATemp, FLongTimeWorkers);
    AtomicExchange(FLongTimeWorkers, 0); // 强制置0,防止有新入的长时间作业
    AMaxLong := Value shr 1;
    FLocker.Enter;
    try
      if FLongTimeWorkers < AMaxLong then begin // 已经进行的长时间作业数小于一半的工作者
        if ATemp < AMaxLong then
          AMaxLong := ATemp;                    // 长时间作业最大值使用更改之前已经存在的长时间作业数量
        if FMaxWorkers > Value then begin
          while Value < FWorkerCount do         // 中止大于最大作业数的作业,谁是倒霉孩子?
            WorkerTerminate(FWorkers[FWorkerCount - 1]);
          FMaxWorkers := Value;
          SetLength(FWorkers, Value);
        end else begin
          FMaxWorkers := Value;
          SetLength(FWorkers, Value);
        end;
      end;
    finally
      FLocker.Leave;
      AtomicExchange(FLongTimeWorkers, AMaxLong);
    end;
  end;
end;

procedure TYXDWorkers.SetMinWorkers(const Value: Integer);
begin
  if FMinWorkers <> Value then begin
    if Value < 1 then
      raise Exception.Create(STooFewWorkers);
    FMinWorkers := Value;
  end;
end;

function TYXDWorkers.SignalIdByName(const AName: string): Integer;
var
  i, j: Integer;
  ASignal: PSignal;
begin
  Result := -1;
  j := Length(AName);
  if j < 1 then Exit;
  for i := 0 to FSignalJobs.BucketCount - 1 do begin
    if FSignalJobs.Buckets[i] <> nil then begin
      ASignal := FSignalJobs.Buckets[i].Data;
      if (Length(ASignal.Name) = j) and (ASignal.Name = AName) then begin
        Result := ASignal.Id;
        Exit;
      end;
    end;
  end;
end;

procedure TYXDWorkers.SignalWorkDone(AJob: PJob; AUsedTime: Int64);
var
  ASignal: PSignal;
  ATemp, APrior: PJob;
begin
  APrior := nil;
  FLocker.Enter;
  try
    ASignal := FSignalJobs.FindFirstData(AJob.SignalId);
    ATemp := ASignal.First;
    while ATemp <> nil do begin
      if ATemp = AJob.Source then begin
        if AJob.IsTerminated then begin
          if APrior <> nil then
            APrior.Next := ATemp.Next
          else
            ASignal.First := ATemp.Next;
          ATemp.Next := nil;
          FreeJob(ATemp);
        end else begin
          Inc(ATemp.Runs);  // 更新信号作业的统计信息
          if AUsedTime > 0 then begin
            if ATemp.MinUsedTime = 0 then
              ATemp.MinUsedTime := AUsedTime
            else if AUsedTime < ATemp.MinUsedTime then
              ATemp.MinUsedTime := AUsedTime;
            if ATemp.MaxUsedTime = 0 then
              ATemp.MaxUsedTime := AUsedTime
            else if AUsedTime > ATemp.MaxUsedTime then
              ATemp.MaxUsedTime := AUsedTime;
            Break;
          end;
        end;
      end;
      APrior := ATemp;
      ATemp := ATemp.Next;
    end;
  finally
    FLocker.Leave;
  end;
end;

procedure TYXDWorkers.WaitRunningDone(const AParam: TWorkerWaitParam);
var
  AInMainThread: Boolean;

  function HasJobRunning: Boolean;
  var
    i: Integer;
    AJob: PJob;
  begin
    Result := False;
    DisableWorkers;
    FLocker.Enter;
    try
      for i := 0 to FWorkerCount - 1 do begin
        if FWorkers[i].IsLookuping then begin// 还未就绪,所以在下次查询
          Result := True;
          Break;
        end else if FWorkers[i].IsExecuting then begin
          AJob := FWorkers[i].FActiveJob;
          case AParam.WaitType of
            0: // ByObject
              Result := TMethod(FWorkers[i].FActiveJobProc).Data = AParam.Bound;
            1: // ByData
              Result := (TMethod(FWorkers[i].FActiveJobProc).Code = TMethod(AParam.WorkerProc).Code) and
                (TMethod(FWorkers[i].FActiveJobProc).Data = TMethod(AParam.WorkerProc).Data) and
                ((AParam.Data = Pointer(-1)) or
                (FWorkers[i].FActiveJobData = AParam.Data));
            2: // BySignalSource
              Result := (FWorkers[i].FActiveJobSource = AParam.SourceJob);
            3: // ByGroup
              Result := (FWorkers[i].FActiveJobGroup = AParam.Group);
            $FF: // 所有
              Result := True;
          else
            if Assigned(FOnErrorNotify) then
              FOnErrorNotify(AJob, Exception.CreateFmt(SBadWaitDoneParam, [AParam.WaitType]), 'TYXDWorkers.WaitRunningDone');
          end;
          if Result then begin
            FWorkers[i].FTerminatingJob := AJob;
            Break;
          end;
        end;
      end;
    finally
      FLocker.Leave;
      EnableWorkers;
    end;
  end;

begin
  AInMainThread := GetCurrentThreadId = MainThreadId;
  while True do begin
    if HasJobRunning then begin
      if AInMainThread then begin
        // 如果是在主线程中清理,由于作业可能在主线程执行,可能已经投寄尚未执行,所以必需让其能够执行
        {$IFDEF NEXTGEN}
        fmx.Forms.Application.ProcessMessages;
        {$ELSE}
        Forms.Application.ProcessMessages;
        {$ENDIF}
      end;
      {$IFDEF MSWINDOWS}
      SwitchToThread;
      {$ELSE}
      TThread.Yield;
      {$ENDIF}
    end else // 没找到
      Break;
  end;
end;

procedure TYXDWorkers.WorkerIdle(AWorker: TYXDWorker; AReason: TWorkerIdleReason);
var
  i, J: Integer;
begin
  if AtomicDecrement(FBusyCount) > FMinWorkers then begin
    FLocker.Enter;
    try  // 工作者闲置时,如果不是前两个长工,就检测是否有短工可以解雇
      if (AWorker <> FWorkers[0]) and (AWorker <> FWorkers[1]) and (AReason = irTimeout) then
      begin
        for i := FMinWorkers to FWorkerCount - 1 do begin // 从第一个短工开始,将他们解雇
          if AWorker = FWorkers[i] then begin
            AWorker.Terminate;
            for J := i + 1 to FWorkerCount - 1 do
              FWorkers[J - 1] := FWorkers[J];
            FWorkers[FWorkerCount - 1] := nil;
            Dec(FWorkerCount);
            Break;
          end;
        end;
      end;
    finally
      FLocker.Leave;
    end;
  end;
  if AReason <> irNoJob then
    AtomicIncrement(FBusyCount)
end;

procedure TYXDWorkers.WorkerTerminate(AWorker: TObject);
var
  i ,J: Integer;
begin
  AtomicDecrement(FBusyCount);
  FLocker.Enter;
  for i := 0 to FWorkerCount - 1 do begin // 工作者被中止时,从列表中清除
    if FWorkers[i] = AWorker then begin
      //System.Move(FWorkers[I + 1], FWorkers[I], (FWorkerCount - I) * SizeOf(TObject));
      for J := i to FWorkerCount - 2 do
        FWorkers[J] := FWorkers[J + 1];
      FWorkers[FWorkerCount - 1] := nil;
      Dec(FWorkerCount);
      Break;
    end;
  end;
  FLocker.Leave;
end;

function TYXDWorkers.RegisterSignal(const AName: string): Integer;
var
  ASignal: PSignal;
begin
  FLocker.Enter;
  try
    Result := SignalIdByName(AName);
    if Result < 0 then begin
      Inc(FMaxSignalId);
      New(ASignal);
      ASignal.Id := FMaxSignalId;
      ASignal.Fired := 0;
      ASignal.Name := AName;
      ASignal.First := nil;
      FSignalJobs.Add(ASignal, ASignal.Id);
      Result := ASignal.Id;
    end;
  finally
    FLocker.Leave;
  end;
end;

{ TJobGroup }

function TJobGroup.Add(AProc: TJobProc; AData: Pointer;
  AInMainThread: Boolean; AFreeType: TJobDataFreeType): Boolean;
var
  AJob: PJob;
begin
  AJob := JobPool.Pop;
  AJob.Group := Self;
  AJob.WorkerProc := AProc;
  AJob.Data := AData;
  AJob.SetValue(JOB_RUN_ONCE, True);
  AJob.SetValue(JOB_GROUPED, True);
  AJob.SetValue(JOB_IN_MAINTHREAD, AInMainThread);
  InitJobFreeType(FOwner, AJob, AData, AFreeType);
  FLocker.Enter;
  try
    FWaitResult := wrIOCompletion;
    if FPrepareCount > 0 then begin // 正在添加项目,加到列表中,等待Run
      FItems.Add(AJob);
      Result := True;
    end else begin
      if ByOrder then begin // 按顺序
        Result := True;
        FItems.Add(AJob);
        if FItems.Count = 0 then
          Result := FOwner.Post(AJob);
      end else begin
        Result := FOwner.Post(AJob);
        if Result then
          FItems.Add(AJob);
      end;
    end;
  finally
    FLocker.Leave;
  end;
end;

procedure TJobGroup.Cancel;
var
  i: Integer;
  AJobs: TSimpleJobs;
  AJob, APrior, ANext: PJob;
  AWaitParam: TWorkerWaitParam;
begin
  FLocker.Enter;
  try
    if FByOrder then begin
      for i := 0 to FItems.Count - 1 do begin
        AJob := FItems[i];
        if AJob.PopTime = 0 then
          FOwner.FreeJob(AJob);
      end;
    end;
    FItems.Clear;
  finally
    FLocker.Leave;
  end;
  // 从SimpleJobs里清除关联的全部作业
  AJobs := FOwner.FSimpleJobs;
  AJobs.FLocker.Enter;
  try
    AJob := AJobs.FFirst;
    APrior := nil;
    while AJob <> nil do begin
      ANext := AJob.Next;
      if AJob.IsGrouped and (AJob.Group = Self) then begin
        if APrior = nil then
          AJobs.FFirst := AJob.Next
        else
          APrior.Next := AJob.Next;
        AJob.Next := nil;
        FOwner.FreeJob(AJob);
        if AJob = AJobs.FLast then
          AJobs.FLast := nil;
      end else
        APrior := AJob;
      AJob := ANext;
    end;
  finally
    AJobs.FLocker.Leave;
  end;
  AWaitParam.WaitType := 3;
  AWaitParam.Group := Self;
  FOwner.WaitRunningDone(AWaitParam);
end;

constructor TJobGroup.Create(AOwner: TYXDWorkers; AByOrder: Boolean);
begin
  if Assigned(AOwner) then
    FOwner := AOwner
  else
    FOwner := Workers;
  if (not Assigned(FOwner)) then
    raise Exception.Create(SNotInitWorkers);
  FEvent := TEvent.Create(nil, False, False, '');
  FLocker := TSimpleLock.Create;
  FByOrder := AByOrder;
  FItems := TJobItemList.Create;
end;

constructor TJobGroup.Create(AByOrder: Boolean);
begin
  Create(nil, AByOrder);
end;

destructor TJobGroup.Destroy;
var
  i: Integer;
begin
  Cancel;
  FOwner.Clear(Self, -1);
  FLocker.Enter;
  try
    if FItems.Count > 0 then begin
      FWaitResult := wrAbandoned;
      FEvent.SetEvent;
      for i := 0 to FItems.Count - 1 do begin
        if PJob(FItems[i]).PushTime <> 0 then
          JobPool.Push(FItems[i]);
      end;
      FItems.Clear;
    end;
  finally
    FLocker.Leave;
  end;
  FreeAndNil(FLocker);
  FreeAndNil(FEvent);
  FreeAndNil(FItems);
  inherited;
end;

procedure TJobGroup.DoAfterDone;
begin
  if Assigned(FAfterDone) then begin
    try
      FAfterDone(Self);
    except
      if Assigned(FOwner.FOnErrorNotify) then
        FOwner.FOnErrorNotify(nil, Exception(ExceptObject), 'TJobGroup.DoAfterDone');
    end;
  end;
end;

procedure TJobGroup.DoJobExecuted(AJob: PJob);
var
  i: Integer;
  AIsDone: Boolean;
begin
  if FWaitResult = wrIOCompletion then begin
    AIsDone := False;
    FLocker.Enter;
    try
      i := FItems.IndexOf(AJob);
      if i <> -1 then begin
        FItems.Delete(i);
        if FItems.Count = 0 then begin
          FWaitResult := wrSignaled;
          FEvent.SetEvent;
          AIsDone := true;
        end else if ByOrder then begin
          if not FOwner.Post(FItems[0]) then begin
            FWaitResult := wrAbandoned;
            FEvent.SetEvent;
          end;
        end;
      end;
    finally
      FLocker.Leave;
    end;
    if AIsDone then
      DoAfterDone;
  end;
end;

procedure TJobGroup.DoJobsTimeout(AJob: PJob);
begin
  FTimeoutCheck := False;
  Cancel;
  if FWaitResult = wrIOCompletion then begin
    FWaitResult := wrTimeout;
    FEvent.SetEvent;
    DoAfterDone;
  end;
end;

function TJobGroup.MsgWaitFor(ATimeout: Cardinal): TWaitResult;
var
  AEndTime: Int64;
begin
  if GetCurrentThreadId <> MainThreadId then
    Result := WaitFor(ATimeout)
  else begin
    Result := FWaitResult;
    FLocker.Enter;
    try
      if FItems.Count = 0 then
        Result := wrSignaled;
    finally
      FLocker.Leave;
    end;
    if Result = wrIOCompletion then begin
      AEndTime := GetTimestamp + ATimeout * 10;
      while GetTimestamp < AEndTime do begin
        // 每隔10毫秒检查一下是否有消息需要处理,有则处理,无则进入下一个等待
        if FEvent.WaitFor(10) = wrSignaled then begin
          Result := FWaitResult;
          Break;
        end else begin
          // 如果是在主线程中清理,由于作业可能在主线程执行,可能已经投寄尚未执行,所以必需让其能够执行
          {$IFDEF NEXTGEN}
          fmx.Forms.Application.ProcessMessages;
          {$ELSE}
          Forms.Application.ProcessMessages;
          {$ENDIF}
        end;
      end;
      if Result = wrIOCompletion then begin
        Cancel;
        if Result = wrIOCompletion then
          Result := wrTimeout;
      end;
      if FTimeoutCheck then
        FOwner.Clear;
      DoAfterDone;
    end;
  end;
end;

procedure TJobGroup.Prepare;
begin
  AtomicIncrement(FPrepareCount);
end;

procedure TJobGroup.Run(ATimeout: Cardinal = INFINITE);
var
  i: Integer;
begin
  if AtomicDecrement(FPrepareCount) = 0 then begin
    if ATimeout <> INFINITE then begin
      FTimeoutCheck := True;
      FOwner.Post(DoJobsTimeout, nil, False, ATimeout);
    end;
    FLocker.Enter;
    try
      if FItems.Count = 0 then
        FWaitResult := wrSignaled
      else begin
        FWaitResult := wrIOCompletion;
        if ByOrder then begin
          if not FOwner.Post(FItems[0]) then
            FWaitResult := wrAbandoned;
        end else begin
          for i := 0 to FItems.Count - 1 do begin
            if not FOwner.Post(FItems[i]) then begin
              FWaitResult := wrAbandoned;
              Break;
            end;
          end;
        end;
      end;
    finally
      FLocker.Leave;
    end;
    if FWaitResult <> wrIOCompletion then
      DoAfterDone;
  end;
end;

function TJobGroup.WaitFor(ATimeout: Cardinal): TWaitResult;
begin
  Result := FWaitResult;
  FLocker.Enter;
  try
    if FItems.Count = 0 then
      Result := wrSignaled;
  finally
    FLocker.Leave;
  end;
  if Result = wrIOCompletion then begin
    if FEvent.WaitFor(ATimeout) = wrSignaled then
      Result := FWaitResult
    else
      Result := wrTimeout;
  end;
  if FTimeoutCheck then
    FOwner.Clear;
  DoAfterDone;
end;

initialization
  _CPUCount := GetCPUCount;
  {$IFNDEF NEXTGEN}
  GetTickCount64 := GetProcAddress(GetModuleHandle(kernel32), 'GetTickCount64');
  if not QueryPerformanceFrequency(_PerfFreq) then
    _PerfFreq := -1;
  {$ELSE}
    _Watch := TStopWatch.Create;
    _Watch.Start;
  {$ENDIF}

  JobPool := TJobPool.Create(1024);
  //Workers := TYXDWorkers.Create;

finalization
  if Assigned(Workers) then FreeAndNil(Workers);
  if Assigned(JobPool) then FreeAndNil(JobPool);

end.

 

分享到: