说明
——————————————————————–
YxdWorker 基于QDAC项目的QWorker ,并且绝大部分代码来自于此,
感谢swish和他的QWorker,QDAC,YxdWorker 版权归swish, YangYxd所有
QWorker来自QDAC项目,版权归swish(QQ:109867294)所有
QDAC官方群:250530692
——————————————————————–
感谢Swish兄长,为了QWorker负出了很多时间和精力。
{*******************************************************} { } { YxdWorker 后台工作者管理库 } { } { 版权所有 (C) 2013 GXT YangYxd } { } {*******************************************************} { -------------------------------------------------------------------- 说明 -------------------------------------------------------------------- YxdWorker 基于QDAC项目的QWorker ,并且绝大部分代码来自于此, 感谢swish和他的QWorker,QDAC,YxdWorker 版权归swish, YangYxd所有 QWorker来自QDAC项目,版权归swish(QQ:109867294)所有 QDAC官方群:250530692 -------------------------------------------------------------------- 更新记录 -------------------------------------------------------------------- 2014.08.23 ver 1.0.4 -------------------------------------------------------------------- - 解决Busy计数器BUG 2014.08.22 ver 1.0.3 -------------------------------------------------------------------- - 解决JobGroup超时和Cancel的问题,解决某些原因引起测速很慢的问题 - 提取合并部分代码,减少体积 2014.08.16 ver 1.0.2 -------------------------------------------------------------------- - 改进长时间任务处理方式 ,TSimpleJobs增加 FLongFirst,FLongLast 专 门应对长时间任务,解决长时间任务导致Clear失败BUG - 同步QWorker修改TQJobGroup.AfterDone改为除了在完成时,在中断或超时 时仍然触发 - 同步QWorker增加TQJobGroup.Run函数加入超时设置,超过指定的时间如果 仍未执行完成,则中止后续执行 - 同步QWorker增加TQJobGroup.Cancel函数用于取消未执行的作业执行 2014.08.16 ver 1.0.1 -------------------------------------------------------------------- - 增加 FOnErrorNotify通知事件,以便使用者可以记录相关日志 - 将原QWorker中的Delay,At,Post合并为Post方法。 - 将原QWorker中的时间精度由0.1ms调整为1ms. - 将原QWorker中TJobHelper的功能直接放入TJob中,以便在D2007中还能保 持良好的语法提示 - 将原QWorker中Worker类设置Flags相关功能改为GetValue,SetValue,减小 单元大小 - 对JobGroup的Add功能增加参数AFreeType, 并默认AInMainThread=False - 提取合并部分代码,减少单元大小 - 删除Job中的Owner字段 -------------------------------------------------------------------- } unit YxdWorker; {.$DEFINE WORKER_SIMPLE_LOCK} // 是否使用原子自旋锁? interface uses {$IFDEF UNICODE}Generics.Collections, {$ENDIF} {$IFDEF NEXTGEN}Fmx.Forms, System.Diagnostics, {$ENDIF} {$IFDEF POSIX}Posix.Unistd, Posix.Pthread, {$ENDIF} {$IFDEF MSWINDOWS}Windows, Messages, Forms, Activex, {$ENDIF} YxdHash, SysUtils, Classes, Types, SyncObjs; const JOB_RUN_ONCE = $0001; // 作业只运行一次 JOB_IN_MAINTHREAD = $0002; // 作业只能在主线程中运行 JOB_MAX_WORKERS = $0004; // 尽可能多的开启可能的工作者线程来处理作业,暂不支持 JOB_LONGTIME = $0008; // 作业需要很长的时间才能完成,以便调度程序减少它对其它作业的影响 JOB_SIGNAL_WAKEUP = $0010; // 作业根据信号需要唤醒 JOB_TERMINATED = $0020; // 作业不需要继续进行,可以结束了 JOB_FREE_OBJECT = $0040; // Data关联的是Object,作业完成或清理时释放 JOB_FREE_RECORD = $0080; // Data关联的是Record,作业完成或清理时释放 JOB_FREE_INTERFACE = $0100; // Data关联的是Interface,作业完成时调用_Release JOB_GROUPED = $0200; // 当前作业是作业组的一员 JOB_ANONPROC = $0400; // 当前作业过程是匿名函数 JOB_DATA_OWNER = JOB_FREE_OBJECT + JOB_FREE_RECORD + JOB_FREE_INTERFACE; // 作业是Data成员的所有者 WORKER_ISBUSY = $01; // 工作者忙碌 WORKER_PROCESSLONG = $02; // 当前处理的一个长时间作业 WORKER_RESERVED = $04; // 当前工作者是一个保留工作者 WORKER_COM_INITED = $08; // 工作者已初始化为支持COM的状态(仅限Windows) WORKER_LOOKUP = $10; // 工作者正在查找作业 WORKER_EXECUTING = $20; // 工作者正在执行作业 WORKER_EXECUTED = $40; // 工作者已经完成作业 WAITJOB_TIMEOUT = 15000; // 工作者等待作业超时时间 (15秒) const WOSecond = 1000; // 1s WOMinute = 60000; // 60s/1min WOHour = 3600000; // 3600s/60min/1hour WODay = Int64(86400000); // 1天 type /// <summary>作业空闲原因,内部使用</summary> TWorkerIdleReason = ( irTimeout, // 工作者已经等待超时,可以被释放 irNoJob // 没有需要处理的作业,此时工作者会进行WAITJOB_TIMEOUT释放 // 等待状态,如果在WAITJOB_TIMEOUT内有新作业进来,则工作者 // 会被唤醒,否则超时后会被释放 ); type /// <summary>作业结束时如何处理Data成员</summary> TJobDataFreeType = ( jdfFreeByUser, // 用户管理对象的释放 jdfFreeAsObject, // 附加的是一个TObject继承的对象,作业完成时会调用FreeObject释放 jdfFreeAsRecord, // 附加的是一个Record对象,作业完成时会调用Dispose释放 jdfFreeAsInterface // 附加的是一个接口对象,添加时会增加计数,作业完成时会减少计数 ); type TJobBase = class; TJobGroup = class; TSimpleJobs = class; TRepeatJobs = class; TYXDWorker = class; TYXDWorkers = class; {$IFNDEF UNICODE} IntPtr = Integer; {$ENDIF} PJob = ^TJob; // 作业处理回调函数 TJobProc = procedure(AJob: PJob) of object; TJobProcG = procedure(AJob: PJob); {$IFDEF UNICODE} TJobProcA = reference to procedure(AJob: PJob); {$ENDIF} TWorkerWaitParam = record WaitType: Byte; Data: Pointer; case Integer of 0: (Bound: Pointer); // 按对象清除 1: (WorkerProc: TMethod); 2: (SourceJob: PJob); 3: (Group: Pointer); end; // 信号的内部定义 PSignal = ^TSignal; TSignal = packed record Id: Integer; // 信号的索引 Fired: Integer; // 信号已触发次数 Name: string; // 信号的名称 First: PJob; // 首个作业 end; TJob = record private function GetAvgTime: Integer; inline; function GetElapseTime: Int64; inline; function GetValue(Index: Integer): Boolean; inline; procedure SetValue(Index: Integer; const Value: Boolean); inline; function GetIsTerminated: Boolean; inline; procedure SetIsTerminated(const Value: Boolean); inline; procedure AfterRun(AUsedTime: Int64); procedure UpdateNextTime; public procedure Create(AProc: TJobProc); /// <summary>值拷贝函数</summary> /// <remarks>Worker/Next/Source不会复制并会被置空,Owner不会被复制</remarks> procedure Assign(const ASource: PJob); /// <summary>重置内容,以便为从队列中弹出做准备</summary> procedure Reset; inline; /// <summary>平均每次运行时间,单位为1ms</summary> property AvgTime: Integer read GetAvgTime; /// <summmary>本次已运行时间,单位为1ms</summary> property ElapseTime: Int64 read GetElapseTime; /// <summary>是否只运行一次,投递作业时自动设置</summary> property Runonce: Boolean index JOB_RUN_ONCE read GetValue; /// <summary>是否要求在主线程执行作业,实际效果比Windows的PostMessage相似</summary> property InMainThread: Boolean index JOB_IN_MAINTHREAD read GetValue; /// <summary>是否是一个运行时间比较长的作业,用Workers.LongtimeWork设置</summary> property IsLongtimeJob: Boolean index JOB_LONGTIME read GetValue; /// <summary>是否是一个信号触发的作业</summary> property IsSignalWakeup: Boolean index JOB_SIGNAL_WAKEUP read GetValue; /// <summary>是否是分组作业的成员</summary> property IsGrouped: Boolean index JOB_GROUPED read GetValue; /// <summary>是否要求结束当前作业</summary> property IsTerminated: Boolean read GetIsTerminated write SetIsTerminated; /// <summary>判断作业是否拥有Data数据成员</summary> property IsDataOwner: Boolean index JOB_DATA_OWNER read GetValue; /// <summary>判断作业的Data指向的是一个对象且要求作业完成时自动释放</summary> property IsObjectOwner: Boolean index JOB_FREE_OBJECT read GetValue write SetValue; /// <summary>判断作业的Data指向的是一个记录且要求作业完成时自动释放</summary> property IsRecordOwner: Boolean index JOB_FREE_RECORD read GetValue write SetValue; /// <summary>判断作业的Data指向的是一个接口且要求作业完成时自动释放</summary> property IsInterfaceOwner: Boolean index JOB_FREE_INTERFACE read GetValue write SetValue; /// <summary>判断作业处理过程是否是一个匿名函数</summary> property IsAnonWorkerProc: Boolean index JOB_ANONPROC read GetValue write SetValue; public FirstTime: Int64; // 作业第一次开始时间 StartTime: Int64; // 本次作业开始时间,8B PushTime: Int64; // 入队时间 PopTime: Int64; // 出队时间 NextTime: Int64; // 下一次运行的时间,+8B=16B WorkerProc: TJobProc; // 作业处理函数+8/16B {$IFDEF UNICODE} WorkerProcA: TJobProcA; {$ENDIF} Owner: TJobBase; // 作业所隶属的队列 Next: PJob; // 下一个结点 Worker: TYXDWorker; // 当前作业工作者 Runs: Integer; // 已经运行的次数+4B MinUsedTime: Integer; // 最小运行时间+4B TotalUsedTime: Integer; // 运行总计花费的时间,TotalUsedTime/Runs可以得出平均执行时间+4B MaxUsedTime: Integer; // 最大运行时间+4B Flags: Integer; // 作业标志位+4B Data: Pointer; // 附加数据内容 case Integer of 0: ( SignalId: Integer; // 信号编码 Source: PJob; // 源作业地址 RefCount: PInteger; // 源数据 ); 1: ( Interval: Int64; // 运行时间间隔,单位为0.1ms,实际精度受不同操作系统限制+8B FirstDelay: Int64; // 首次运行延迟,单位为0.1ms,默认为0 ); 2: ( Group: Pointer; // 分组作业支持 ); end; // 作业队列对象的基类,提供基础的接口封装 TJobBase = class(TObject) protected FOwner: TYXDWorkers; function InternalPush(AJob: PJob): Boolean; virtual; abstract; function InternalPop: PJob; virtual; abstract; function GetCount: Integer; virtual; abstract; function GetEmpty: Boolean; public constructor Create(AOwner: TYXDWorkers); virtual; destructor Destroy; override; // 投寄一个作业 (外部不应尝试直接投寄任务到队列,其由Workers的相应函数内部调用。) function Push(AJob: PJob): Boolean; virtual; // 弹出一个作业 function Pop: PJob; virtual; // 空所有作业 procedure Clear; overload; virtual; // 清空指定的作业 function Clear(AProc: TJobProc; AData: Pointer; AMaxTimes: Integer): Integer; overload; virtual; abstract; // 清空一个对象关联的所有作业 function Clear(AObject: Pointer; AMaxTimes: Integer): Integer; overload; virtual; abstract; /// 不可靠警告:Count和Empty值仅是一个参考,在多线程环境下可能并不保证下一句代码执行时,会一致 property Empty: Boolean read GetEmpty; // 当前队列是否为空 property Count: Integer read GetCount; // 当前队列元素数量 end; {$IFDEF WORKER_SIMPLE_LOCK} // 一个基于位锁的简单锁定对象,使用原子函数置位 TSimpleLock = class private FFlags: Integer; public constructor Create; procedure Enter; inline; procedure Leave; inline; end; {$ELSE} TSimpleLock = TCriticalSection; {$ENDIF} /// <summary> /// 工作者线程使用单向链表管理,而不是进行排序检索是因为对于工作者数量有限,额外 /// 的处理反而不会直接最简单的循环直接有效 /// </summary> TYXDWorker = class(TThread) private FOwner: TYXDWorkers; FEvent: TEvent; //FNext: TYXDWorker; FFlags: Integer; FTimeout: Integer; FTerminatingJob: PJob; function GetValue(Index: Integer): Boolean; inline; procedure SetValue(Index: Integer; const Value: Boolean); inline; function GetIsIdle: Boolean; inline; protected FActiveJob: PJob; // 之所以不直接使用FActiveJob的相关方法,是因为保证外部可以线程安全的访问这两个成员 FActiveJobProc: TJobProc; FActiveJobData: Pointer; FActiveJobSource: PJob; FActiveJobGroup: TJobGroup; FActiveJobFlags: Integer; procedure Execute; override; procedure FireInMainThread; procedure DoJob(AJob: PJob); public constructor Create(AOwner: TYXDWorkers); overload; destructor Destroy; override; procedure ComNeeded(AInitFlags: Cardinal = 0); // 判断COM是否已经初始化为支持COM property ComInitialized: Boolean index WORKER_COM_INITED read GetValue; // 判断当前是否处于长时间作业处理过程中 property InLongtimeJob: Boolean index WORKER_PROCESSLONG read GetValue; // 判断当前是否空闲 property IsIdle: Boolean read GetIsIdle; // 判断当前是否忙碌 property IsBusy: Boolean index WORKER_ISBUSY read GetValue; // 判断当前工作者是否是内部保留的工作者 property IsReserved: Boolean index WORKER_RESERVED read GetValue; property IsLookuping: Boolean index WORKER_LOOKUP read GetValue; property IsExecuting: Boolean index WORKER_EXECUTING read GetValue; property IsExecuted: Boolean index WORKER_EXECUTED read GetValue; end; // 工作者错误通知事件 TWorkerErrorNotify = procedure(AJob: PJob; E: Exception; const ErrSource: string) of object; // 自定义数据释放事件 TCustomFreeDataEvent = procedure(ASender: TYXDWorkers; AFreeType: TJobDataFreeType; var AData: Pointer) of object; /// <summary> /// 工作者管理对象,用来管理工作者和作业 /// </summary> TYXDWorkers = class(TObject) private FWorkers: array of TYXDWorker; FWorkerCount: Integer; FDisableCount: Integer; FBusyCount: Integer; FMinWorkers: Integer; FMaxWorkers: Integer; FMaxSignalId: Integer; FLongTimeWorkers: Integer; // 记录下长时间作业中的工作者,这种任务长时间不释放资源,可能会造成其它任务无法及时响应 FMaxLongtimeWorkers: Integer; // 允许最多同时执行的长时间任务数,不允许超过MaxWorkers的一半 FTerminating: Boolean; FCPUNum: Integer; FLocker: TCriticalSection; FSimpleJobs: TSimpleJobs; FRepeatJobs: TRepeatJobs; FSignalJobs: TYXDHashTable; FOnErrorNotify: TWorkerErrorNotify; FOnCustomFreeData: TCustomFreeDataEvent; {$IFDEF MSWINDOWS} FMainWorker: HWND; procedure DoMainThreadWork(var AMsg: TMessage); {$ENDIF} function GetEnabled: Boolean; function PostWaitJob(AJob: PJob; ASignalId: Integer): Boolean; function ClearSignalJobs(ASource: PJob): Integer; function ClearJobs(AObject: Pointer; AProc: TJobProc; AData: Pointer; AMaxTimes: Integer): Integer; function ClearWaitJobs(ASignalId: Integer; const ASignalName: string): Integer; procedure SetEnabled(const Value: Boolean); procedure SetMaxLongtimeWorkers(const Value: Integer); procedure SetMaxWorkers(const Value: Integer); procedure SetMinWorkers(const Value: Integer); procedure EnableWorkers; procedure DisableWorkers; procedure ClearWorkers; procedure FreeJob(AJob: PJob); procedure FreeJobData(AData: Pointer; AFreeType: TJobDataFreeType); protected function Popup: PJob; function Post(AJob: PJob): Boolean; overload; function LookupIdleWorker: Boolean; function SignalIdByName(const AName: string): Integer; procedure SignalWorkDone(AJob: PJob; AUsedTime: Int64); procedure WorkerIdle(AWorker: TYXDWorker; AReason: TWorkerIdleReason); procedure WorkerTerminate(AWorker: TObject); procedure WaitRunningDone(const AParam: TWorkerWaitParam); procedure FireSignalJob(ASignal: PSignal; AData: Pointer; AFreeType: TJobDataFreeType); procedure DoJobFree(ATable: TObject; AHash: Cardinal; AData: Pointer); procedure DoCustomFreeData(AFreeType: TJobDataFreeType; var AData: Pointer); public constructor Create(AMinWorkers: Integer = 2); overload; destructor Destroy; override; // 获取Job池大小 class function JobPoolCount(): Integer; // 获取实例 class function GetInstance: TYXDWorkers; // 清除所有作业 procedure Clear; overload; /// <summary>清除一个对象相关的所有作业</summary> /// <param name="AObject">要释放的作业处理过程关联对象</param> /// <param name="AMaxTimes">最多清除的数量,如果<0,则全清</param> /// <returns>返回实际清除的作业数量</returns> /// <remarks>一个对象如果计划了作业,则在自己释放前应调用本函数以清除关联的作业, /// 否则,未完成的作业可能会触发异常。</remarks> function Clear(AObject: Pointer; AMaxTimes: Integer = -1): Integer; overload; /// <summary>清除所有投寄的指定过程作业</summary> /// <param name="AProc">要清除的作业执行过程</param> /// <param name="AData">要清除的作业附加数据指针地址,如果值为Pointer(-1), /// 则清除所有的相关过程,否则,只清除附加数据地址一致的过程</param> /// <param name="AMaxTimes">最多清除的数量,如果<0,则全清</param> /// <returns>返回实际清除的作业数量</returns> function Clear(AProc: TJobProc; AData: Pointer; AMaxTimes: Integer = -1): Integer; overload; /// <summary>清除指定信号关联的所有作业</summary> /// <param name="ASingalId">要清除的信号名称</param> /// <returns>返回实际清除的作业数量</returns> function Clear(const ASignalName: string): Integer; overload; /// <summary>清除指定信号关联的所有作业</summary> /// <param name="ASingalId">要清除的信号ID</param> /// <returns>返回实际清除的作业数量</returns> function Clear(ASignalId: Integer): Integer; overload; /// <summary>投寄一个作业</summary> /// <param name="AJobProc">要定时执行的作业过程</param> /// <param name="ADelay">第一次执行前延迟时间,小于等于0则立即执行</param> /// <param name="AInterval">后续重复作业间隔,如果小于等于0,则作业只执行一次</param> /// <param name="ARunInMainThread">是否要求作业在主线程中执行</param> function Post(AJobProc: TJobProc; AData: Pointer; ARunInMainThread: Boolean = False; const ADelay: Int64 = 0; const AInterval: Int64 = 0; AFreeType: TJobDataFreeType = jdfFreeByUser): Boolean; overload; function Post(AJobProc: TJobProcG; AData: Pointer; ARunInMainThread: Boolean = False; const ADelay: Int64 = 0; const AInterval: Int64 = 0; AFreeType: TJobDataFreeType = jdfFreeByUser): Boolean; overload; {$IFDEF UNICODE} function Post(AJobProc: TJobProcA; AData: Pointer; ARunInMainThread: Boolean = False; const ADelay: Int64 = 0; const AInterval: Int64 = 0; AFreeType: TJobDataFreeType = jdfFreeByUser): Boolean; overload; {$ENDIF} /// <summary>投寄一个在指定时间才开始的重复作业</summary> /// <param name="AJobProc">要定时执行的作业过程</param> /// <param name="ATime">执行时间,只要时间部分,日期忽略</param> /// <param name="AInterval">后续重复作业间隔,如果小于等于0,则作业只执行一次</param> /// <param name="ARunInMainThread">是否要求作业在主线程中执行</param> function Post(AJobProc: TJobProc; const ATime: TDateTime; const AInterval: Int64; AData: Pointer; ARunInMainThread: Boolean = False; AFreeType: TJobDataFreeType = jdfFreeByUser): Boolean; overload; function Post(AJobProc: TJobProcG; const ATime: TDateTime; const AInterval: Int64; AData: Pointer; ARunInMainThread: Boolean = False; AFreeType: TJobDataFreeType = jdfFreeByUser): Boolean; overload; {$IFDEF UNICODE} function Post(AJobProc: TJobProcA; const ATime: TDateTime; const AInterval: Int64; AData: Pointer; ARunInMainThread: Boolean = False; AFreeType: TJobDataFreeType = jdfFreeByUser): Boolean; overload; {$ENDIF} /// <summary>投寄一个后台长时间执行的作业</summary> /// <param name="AJobProc">要执行的作业过程</param> /// <param name="AData">作业附加的用户数据指针</param> /// <returns>成功投寄返回True,否则返回False</returns> /// <remarks>长时间作业强制在后台线程中执行,而不允许投递到主线程中执行</remarks> function PostLongJob(AJobProc: TJobProc; AData: Pointer; AFreeType: TJobDataFreeType = jdfFreeByUser): Boolean; overload; function PostLongJob(AJobProc: TJobProcG; AData: Pointer; AFreeType: TJobDataFreeType = jdfFreeByUser): Boolean; overload; {$IFDEF UNICODE} function PostLongJob(AJobProc: TJobProcA; AData: Pointer; AFreeType: TJobDataFreeType = jdfFreeByUser): Boolean; overload; {$ENDIF} /// <summary>投寄一个等待信号才开始的作业</summary> /// <param name="AJobProc">要执行的作业过程</param> /// <param name="ASignalId">等待的信号编码,该编码由RegisterSignal函数返回</param> /// <param name="ARunInMainThread">作业要求在主线程中执行</param> /// <returns>成功投寄返回True,否则返回False</returns> function PostWait(AJobProc: TJobProc; ASignalId: Integer; ARunInMainThread: Boolean = False): Boolean; overload; function PostWait(AJobProc: TJobProcG; ASignalId: Integer; ARunInMainThread: Boolean = False): Boolean; overload; {$IFDEF UNICODE} function PostWait(AJobProc: TJobProcA; ASignalId: Integer; ARunInMainThread: Boolean = False): Boolean; overload; {$ENDIF} /// <summary>触发一个信号</summary> /// <param name="AId">信号编码,由RegisterSignal返回</param> /// <param name="AData">附加给作业的用户数据指针地址</param> /// <remarks>触发一个信号后,Workers会触发所有已注册的信号关联处理过程的执行</remarks> procedure SendSignal(AId: Integer; AData: Pointer = nil; AFreeType: TJobDataFreeType = jdfFreeByUser); overload; /// <summary>按名称触发一个信号</summary> /// <param name="AName">信号名称</param> /// <param name="AData">附加给作业的用户数据指针地址</param> /// <remarks>触发一个信号后,Workers会触发所有已注册的信号关联处理过程的执行</remarks> procedure SendSignal(const AName: string; AData: Pointer = nil; AFreeType: TJobDataFreeType = jdfFreeByUser); overload; /// <summary>注册一个信号</summary> /// <param name="AName">信号名称</param> /// <remarks> /// 1.重复注册同一名称的信号将返回同一个编码 /// 2.信号一旦注册,则只有程序退出时才会自动释放 /// </remarks> function RegisterSignal(const AName: string): Integer; // 最大允许工作者数量,不能小于2 property MaxWorkers: Integer read FMaxWorkers write SetMaxWorkers; // 最小工作者数量,不能小于2 property MinWorkers: Integer read FMinWorkers write SetMinWorkers; // 大允许的长时间作业工作者数量,等价于允许开始的长时间作业数量 property MaxLongtimeWorkers: Integer read FMaxLongtimeWorkers write SetMaxLongtimeWorkers; // 是否允许开始作业,如果为false,则投寄的作业都不会被执行,直到恢复为True // (Enabled为False时已经运行的作业将仍然运行,它只影响尚未执行的作来) property Enabled: Boolean read GetEnabled write SetEnabled; // 是否正在释放TQWorkers对象自身 property Terminating: Boolean read FTerminating; // 当前系统CPU数量 property CPUNum: Integer read FCPUNum; // 繁忙的工作者数量 property BusyWorkerCount: Integer read FBusyCount; // 当前工作者数量 property WorkerCount: Integer read FWorkerCount; // 工作者错误回调通知事件 property OnErrorNotify: TWorkerErrorNotify read FOnErrorNotify write FOnErrorNotify; // 用户指定的作业的Data对象释放方式 property OnCustomFreeData: TCustomFreeDataEvent read FOnCustomFreeData write FOnCustomFreeData; end; {$IFDEF UNICODE} TJobItemList = TList<PJob>; {$ELSE} TJobItemList = TList; {$ENDIF} /// <summary> /// 作业组,放在一起顺序执行或乱序执行,可以使用 WaitFor 等待全部完成 /// </summary> TJobGroup = class(TObject) private FOwner: TYXDWorkers; FLocker: TSimpleLock; FEvent: TEvent; // 事件,用于等待作业完成 FCount: Integer; FByOrder: Boolean; FWaitResult: TWaitResult; FAfterDone: TNotifyEvent; // 作业完成事件通知 FTimeoutCheck: Boolean; // 是否检查作业超时 FTag: Pointer; protected FItems: TJobItemList; // 作业列表 FPrepareCount: Integer; // 准备计数 procedure DoJobExecuted(AJob: PJob); procedure DoJobsTimeout(AJob: PJob); procedure DoAfterDone; public constructor Create(AByOrder: Boolean = False); overload; constructor Create(AOwner: TYXDWorkers; AByOrder: Boolean = False); overload; destructor Destroy; override; // 取消未完成的作业 procedure Cancel; // 准备添加作业,实际增加内部计数器 procedure Prepare; // 减少内部计数器,如果计数器减为0,则开始实际执行作业 procedure Run(ATimeout: Cardinal = INFINITE); // 添加一个作业过程,如果准备内部计数器为0,则直接执行,否则只添加到列表 function Add(AProc: TJobProc; AData: Pointer; AInMainThread: Boolean = False; AFreeType: TJobDataFreeType = jdfFreeByUser): Boolean; // 等待作业完成,ATimeout为最长等待时间 function WaitFor(ATimeout: Cardinal = INFINITE): TWaitResult; // 等待作业完成,ATimeout为最长等待时间,不同的是MsgWaitFor不阻塞消息处理 function MsgWaitFor(ATimeout: Cardinal = INFINITE): TWaitResult; // 未完成的作业数量 property Count: Integer read FCount; // 全部作业执行完成时触发的回调事件 property AfterDone: TNotifyEvent read FAfterDone write FAfterDone; // 是否是按顺序执行(即必需等待上一个作业完成后才执行下一个) property ByOrder: Boolean read FByOrder; property Tag: Pointer read FTag write FTag; end; /// <summary> /// 用于管理计划型任务,需要在指定的时间点触发 /// </summary> TRepeatJobs = class(TJobBase) private FLocker: TCriticalSection; FFirstFireTime: Int64; function ClearJobs(AObject: Pointer; AProc: TJobProc; AData: Pointer; AMaxTimes: Integer): Integer; procedure AfterJobRun(AJob: PJob; AUsedTime: Int64); protected FItems: TRBTree; function InternalPush(AJob: PJob): Boolean; override; function InternalPop: PJob; override; function DoTimeCompare(P1, P2: Pointer): Integer; procedure DoJobDelete(ATree: TRBTree; ANode: PRBNode); function GetCount: Integer; override; public constructor Create(AOwner: TYXDWorkers); override; destructor Destroy; override; procedure Clear; override; function Clear(AObject: Pointer; AMaxTimes: Integer): Integer; overload; override; function Clear(AProc: TJobProc; AData: Pointer; AMaxTimes: Integer): Integer; overload; override; end; /// <summary> /// 用于管理简单的异步调用,没有触发时间要求的作业 /// </summary> TSimpleJobs = class(TJobBase) private FFirst, FLast: PJob; FCount: Integer; FLocker: TSimpleLock; function ClearJobs(AObject: Pointer; AProc: TJobProc; AData: Pointer; AMaxTimes: Integer): Integer; protected function InternalPush(AJob: PJob): Boolean; override; function InternalPop: PJob; override; function GetCount: Integer; override; public constructor Create(AOwner: TYXDWorkers); override; destructor Destroy; override; procedure Clear; overload; override; function Clear(AObject: Pointer; AMaxTimes: Integer): Integer; overload; override; function Clear(AProc: TJobProc; AData: Pointer; AMaxTimes: Integer): Integer; overload; override; end; var Workers: TYXDWorkers = nil; // 需要时初始化,也可以自己定义,允许多个 // 返回值的时间精度为1ms function GetTimestamp: Int64; // 获取CPU数量 function GetCPUCount: Integer; {$IF RTLVersion<26} // 为与D2007兼容, 原子操作函数 function AtomicCmpExchange(var Target: Integer; Value, Comparand: Integer): Integer; inline; function AtomicExchange(var Target: Integer; Value: Integer): Integer; inline; function AtomicIncrement(var Target: Integer): Integer; inline; function AtomicDecrement(var Target: Integer): Integer; inline; {$IFEND} {$IFDEF WORKER_SIMPLE_LOCK} // 原子操作函数 function AtomicAnd(var Dest: Integer; const AMask: Integer): Integer; inline; function AtomicOr(var Dest: Integer; const AMask: Integer): Integer; inline; {$ENDIF} // 将全局的作业处理函数转换为TJobProc类型,以便正常调度使用 function MakeJobProc(const AProc: TJobProcG): TJobProc; inline; // 设置线程运行的CPU procedure SetThreadCPU(AHandle: THandle; ACpuNo: Integer); inline; implementation resourcestring SNotSupportNow = '当前尚未支持功能 %s'; SNotInitWorkers = '当前尚未初始化有工作者管理对象 TYXDWorkers'; STooFewWorkers = '指定的最小工作者数量太少(必需大于等于1)。'; STooManyLongtimeWorker = '不能允许太多长时间作业线程(最多允许工作者一半)。'; SBadWaitDoneParam = '未知的等待正在执行作业完成方式:%d'; {$IFNDEF UNICODE} const wrIOCompletion = TWaitResult(4); {$ENDIF} {$IFDEF MSWINDOWS} type TGetTickCount64 = function: Int64; {$ENDIF MSWINDOWS} type TJobPool = class protected FFirst: PJob; FCount: Integer; FSize: Integer; FLocker: TSimpleLock; public constructor Create(AMaxSize: Integer); destructor Destroy; override; procedure Push(AJob: PJob); function Pop: PJob; property Count: Integer read FCount; property Size: Integer read FSize write FSize; end; var JobPool: TJobPool; _CPUCount: Integer; {$IFDEF NEXTGEN} _Watch: TStopWatch; {$ELSE} GetTickCount64: TGetTickCount64; _PerfFreq: Int64; {$ENDIF} function GetTimestamp: Int64; begin {$IFDEF NEXTGEN} Result := _Watch.Elapsed.Ticks div 10000; {$ELSE} if _PerfFreq > 0 then begin QueryPerformanceCounter(Result); Result := Result * 1000 div _PerfFreq; end else if Assigned(GetTickCount64) then Result := GetTickCount64 else Result := GetTickCount; {$ENDIF} end; function GetCPUCount: Integer; {$IFDEF MSWINDOWS} var si: SYSTEM_INFO; {$ENDIF} begin if _CPUCount = 0 then begin {$IFDEF MSWINDOWS} GetSystemInfo(si); Result := si.dwNumberOfProcessors; {$ELSE}// Linux,MacOS,iOS,Andriod{POSIX} {$IFDEF POSIX} Result := sysconf(_SC_NPROCESSORS_ONLN); {$ELSE}// 不认识的操作系统,CPU数默认为1 Result := 1; {$ENDIF !POSIX} {$ENDIF !MSWINDOWS} end else Result := _CPUCount; end; function MakeJobProc(const AProc: TJobProcG): TJobProc; begin TMethod(Result).Data := nil; TMethod(Result).Code := @AProc; end; function SameWorkerProc(const P1, P2: TJobProc): Boolean; inline; begin Result := (TMethod(P1).Code = TMethod(P2).Code) and (TMethod(P1).Data = TMethod(P2).Data); end; procedure SetThreadCPU(AHandle: THandle; ACpuNo: Integer); begin {$IFDEF MSWINDOWS} SetThreadIdealProcessor(AHandle, ACpuNo); {$ELSE} // Linux/Andriod/iOS暂时忽略,XE6未引入sched_setaffinity定义 {$ENDIF} end; // 兼容2007版的原子操作接口 {$IF RTLVersion<26} function AtomicCmpExchange(var Target: Integer; Value: Integer; Comparand: Integer): Integer; inline; begin Result := InterlockedCompareExchange(Target, Value, Comparand); end; function AtomicIncrement(var Target: Integer): Integer; inline; begin Result := InterlockedIncrement(Target); end; function AtomicDecrement(var Target: Integer): Integer; inline; begin Result := InterlockedDecrement(Target); end; function AtomicExchange(var Target: Integer; Value: Integer): Integer; begin Result := InterlockedExchange(Target, Value); end; {$IFEND <XE5} {$IFDEF WORKER_SIMPLE_LOCK} // 位与,返回原值 function AtomicAnd(var Dest: Integer; const AMask: Integer): Integer; inline; var i: Integer; begin repeat Result := Dest; i := Result and AMask; until AtomicCmpExchange(Dest, i, Result) = Result; end; // 位或,返回原值 function AtomicOr(var Dest: Integer; const AMask: Integer): Integer; inline; var i: Integer; begin repeat Result := Dest; i := Result or AMask; until AtomicCmpExchange(Dest, i, Result) = Result; end; {$ENDIF} { TJobPool } constructor TJobPool.Create(AMaxSize: Integer); begin FCount := 0; FSize := AMaxSize; FLocker := TSimpleLock.Create; end; destructor TJobPool.Destroy; var AJob: PJob; begin FLocker.Enter; try while FFirst <> nil do begin AJob := FFirst.Next; Dispose(FFirst); FFirst := AJob; end; finally FLocker.Free; end; inherited; end; function TJobPool.Pop: PJob; begin FLocker.Enter; Result := FFirst; if Result <> nil then begin FFirst := Result.Next; Dec(FCount); end; FLocker.Leave; if Result = nil then GetMem(Result, SizeOf(TJob)); Result.Reset; end; procedure TJobPool.Push(AJob: PJob); var ADoFree: Boolean; begin {$IFDEF UNICODE} if AJob.IsAnonWorkerProc then AJob.WorkerProcA := nil; {$ENDIF} FLocker.Enter; ADoFree := (FCount = FSize); if not ADoFree then begin AJob.Next := FFirst; FFirst := AJob; Inc(FCount); end; FLocker.Leave; if ADoFree then FreeMem(AJob); end; { TJobBase } procedure TJobBase.Clear; var AItem: PJob; begin while True do begin AItem := Pop; if AItem <> nil then FOwner.FreeJob(AItem) else Break; end; end; constructor TJobBase.Create(AOwner: TYXDWorkers); begin FOwner := AOwner; end; destructor TJobBase.Destroy; begin Clear; inherited; end; function TJobBase.GetEmpty: Boolean; begin Result := (Count = 0); end; function TJobBase.Pop: PJob; begin Result := InternalPop; end; function TJobBase.Push(AJob: PJob): Boolean; begin AJob.Owner := Self; AJob.PushTime := GetTimestamp; Result := InternalPush(AJob); if not Result then begin AJob.Next := nil; FOwner.FreeJob(AJob); end; end; { TJob } procedure TJob.AfterRun(AUsedTime: Int64); begin Inc(Runs); if AUsedTime > 0 then begin Inc(TotalUsedTime, AUsedTime); if MinUsedTime = 0 then MinUsedTime := AUsedTime else if MinUsedTime > AUsedTime then MinUsedTime := AUsedTime; if MaxUsedTime = 0 then MaxUsedTime := AUsedTime else if MaxUsedTime < AUsedTime then MaxUsedTime := AUsedTime; end; end; procedure TJob.Assign(const ASource: PJob); begin Self := ASource^; // 下面三个成员不拷贝 Worker := nil; Next := nil; Source := nil; end; function TJob.GetAvgTime: Integer; begin if Runs > 0 then Result := TotalUsedTime div Runs else Result := 0; end; function TJob.GetElapseTime: Int64; begin Result := GetTimestamp - StartTime; end; function TJob.GetIsTerminated: Boolean; begin if Assigned(Worker) and Assigned(Worker.FOwner) then Result := Worker.FOwner.Terminating or Worker.Terminated or ((Flags and JOB_TERMINATED) <> 0) or (Worker.FTerminatingJob = @Self) else Result := (Flags and JOB_TERMINATED) <> 0; end; function TJob.GetValue(Index: Integer): Boolean; begin Result := (Flags and Index) <> 0; end; procedure TJob.Create(AProc: TJobProc); begin WorkerProc := AProc; SetValue(JOB_RUN_ONCE, True); end; procedure TJob.Reset; begin FillChar(Self, SizeOf(TJob), 0); end; procedure TJob.SetIsTerminated(const Value: Boolean); begin SetValue(JOB_TERMINATED, Value); end; procedure TJob.SetValue(Index: Integer; const Value: Boolean); begin if Value then Flags := (Flags or Index) else Flags := (Flags and (not Index)); end; procedure TJob.UpdateNextTime; begin if (Runs = 0) and (FirstDelay <> 0) then NextTime := PushTime + FirstDelay else if Interval <> 0 then begin if NextTime = 0 then NextTime := GetTimestamp + Interval else Inc(NextTime, Interval); end else NextTime := GetTimestamp; end; { TSimpleLock } {$IFDEF WORKER_SIMPLE_LOCK} constructor TSimpleLock.Create; begin inherited; FFlags := 0; end; procedure TSimpleLock.Enter; begin while (AtomicOr(FFlags, $01) and $01) <> 0 do begin {$IFDEF MSWINDOWS} SwitchToThread; {$ELSE} TThread.Yield; {$ENDIF} end; end; procedure TSimpleLock.Leave; begin AtomicAnd(FFlags, Integer($FFFFFFFE)); end; {$ENDIF} { TSimpleJobs } function TSimpleJobs.ClearJobs(AObject: Pointer; AProc: TJobProc; AData: Pointer; AMaxTimes: Integer): Integer; var AFirst, AJob, APrior, ANext: PJob; ACount: Integer; b: Boolean; begin FLocker.Enter; // 先将所有的异步作业清空,以防止被弹出执行 AJob := FFirst; ACount := FCount; FFirst := nil; FLast := nil; FCount := 0; FLocker.Leave; Result := 0; APrior := nil; AFirst := nil; while (AJob <> nil) and (AMaxTimes <> 0) do begin ANext := AJob.Next; if AObject <> nil then b := TMethod(AJob.WorkerProc).Data = AObject else b := SameWorkerProc(AJob.WorkerProc, AProc) and (AJob.Data = AData); if b then begin if APrior <> nil then APrior.Next := ANext; FOwner.FreeJob(AJob); Dec(AMaxTimes); Inc(Result); Dec(ACount); end else begin if AFirst = nil then AFirst := AJob; APrior := AJob; end; AJob := ANext; end; if ACount > 0 then begin FLocker.Enter; AFirst.Next := FFirst; FFirst := AFirst; Inc(FCount, ACount); if FLast = nil then FLast := APrior; FLocker.Leave; end; end; procedure TSimpleJobs.Clear; var AFirst: PJob; begin FLocker.Enter; AFirst := FFirst; FFirst := nil; FLast := nil; FCount := 0; FLocker.Leave; FOwner.FreeJob(AFirst); end; function TSimpleJobs.Clear(AProc: TJobProc; AData: Pointer; AMaxTimes: Integer): Integer; begin Result := ClearJobs(nil, AProc, AData, AMaxTimes); end; function TSimpleJobs.Clear(AObject: Pointer; AMaxTimes: Integer): Integer; begin Result := ClearJobs(AObject, nil, nil, AMaxTimes); end; constructor TSimpleJobs.Create(AOwner: TYXDWorkers); begin inherited Create(AOwner); FLocker := TSimpleLock.Create; end; destructor TSimpleJobs.Destroy; begin inherited; FLocker.Free; end; function TSimpleJobs.GetCount: Integer; begin Result := FCount; end; function TSimpleJobs.InternalPop: PJob; begin FLocker.Enter; Result := FFirst; if Result <> nil then begin FFirst := Result.Next; if FFirst = nil then FLast := nil; Dec(FCount); end; FLocker.Leave; if Result <> nil then begin Result.PopTime := GetTimestamp; Result.Next := nil; end; end; function TSimpleJobs.InternalPush(AJob: PJob): Boolean; begin FLocker.Enter; if FLast = nil then FFirst := AJob else FLast.Next := AJob; FLast := AJob; Inc(FCount); FLocker.Leave; Result := true; end; { TRepeatJobs } procedure TRepeatJobs.AfterJobRun(AJob: PJob; AUsedTime: Int64); var ANode: PRBNode; function UpdateSource: Boolean; var ATemp, APrior: PJob; begin Result := False; ATemp := ANode.Data; APrior := nil; while ATemp <> nil do begin if ATemp = AJob.Source then begin if AJob.IsTerminated then begin if APrior <> nil then APrior.Next := ATemp.Next else ANode.Data := ATemp.Next; ATemp.Next := nil; FOwner.FreeJob(ATemp); if ANode.Data = nil then FItems.Delete(ANode); end else ATemp.AfterRun(AUsedTime); Result := True; Break; end; APrior := ATemp; ATemp := ATemp.Next; end; end; begin FLocker.Enter; try ANode := FItems.Find(AJob); if ANode <> nil then begin if UpdateSource then Exit; end; ANode := FItems.First; while ANode <> nil do begin if UpdateSource then Break; ANode := ANode.Next; end; finally FLocker.Leave; end; end; function TRepeatJobs.ClearJobs(AObject: Pointer; AProc: TJobProc; AData: Pointer; AMaxTimes: Integer): Integer; var ANode, ANext: PRBNode; APriorJob, AJob, ANextJob: PJob; ACanDelete, B: Boolean; begin Result := 0; // 现在清空重复的计划作业 FLocker.Enter; try ANode := FItems.First; while (ANode <> nil) and (AMaxTimes <> 0) do begin ANext := ANode.Next; AJob := ANode.Data; ACanDelete := True; APriorJob := nil; while AJob <> nil do begin ANextJob := AJob.Next; if AObject <> nil then B := TMethod(AJob.WorkerProc).Data = AObject else B := SameWorkerProc(AJob.WorkerProc, AProc) and ((AData = Pointer(-1)) or (AData = AJob.Data)); if B then begin if ANode.Data = AJob then ANode.Data := AJob.Next; if Assigned(APriorJob) then APriorJob.Next := AJob.Next; AJob.Next := nil; FOwner.FreeJob(AJob); Dec(AMaxTimes); Inc(Result); end else begin ACanDelete := False; APriorJob := AJob; end; AJob := ANextJob; end; if ACanDelete then FItems.Delete(ANode); ANode := ANext; end; if FItems.Count > 0 then FFirstFireTime := PJob(FItems.First.Data).NextTime else FFirstFireTime := 0; finally FLocker.Leave; end; end; function TRepeatJobs.Clear(AProc: TJobProc; AData: Pointer; AMaxTimes: Integer): Integer; begin Result := ClearJobs(nil, AProc, AData, AMaxTimes); end; function TRepeatJobs.Clear(AObject: Pointer; AMaxTimes: Integer): Integer; begin Result := ClearJobs(AObject, nil, nil, AMaxTimes); end; procedure TRepeatJobs.Clear; begin FLocker.Enter; try FItems.Clear; finally FLocker.Leave; end; end; constructor TRepeatJobs.Create(AOwner: TYXDWorkers); begin inherited Create(AOwner); FLocker := TCriticalSection.Create; FItems := TRBTree.Create(DoTimeCompare); FItems.OnDelete := DoJobDelete; end; destructor TRepeatJobs.Destroy; begin FLocker.Enter; try FItems.Free; finally inherited; FLocker.Leave; FLocker.Free; end; end; procedure TRepeatJobs.DoJobDelete(ATree: TRBTree; ANode: PRBNode); begin FOwner.FreeJob(ANode.Data); end; function TRepeatJobs.DoTimeCompare(P1, P2: Pointer): Integer; begin Result := PJob(P1).NextTime - PJob(P2).NextTime; end; function TRepeatJobs.GetCount: Integer; begin Result := FItems.Count; end; function TRepeatJobs.InternalPop: PJob; var ANode: PRBNode; ATick: Int64; AJob: PJob; begin Result := nil; ATick := GetTimestamp; FLocker.Enter; try if FItems.Count > 0 then begin ANode := FItems.First; if PJob(ANode.Data).NextTime <= ATick then begin AJob := ANode.Data; if AJob.Next <> nil then // 如果没有更多需要执行的作业,则删除结点,否则指向下一个 ANode.Data := AJob.Next else begin ANode.Data := nil; FItems.Delete(ANode); ANode := FItems.First; if ANode <> nil then FFirstFireTime := PJob(ANode.Data).NextTime else // 没有计划作业了,不需要了 FFirstFireTime := 0; end; if AJob.Runonce then Result := AJob else begin Inc(AJob.NextTime, AJob.Interval); Result := JobPool.Pop; Result.Assign(AJob); Result.Source := AJob; // 重新插入作业 ANode := FItems.Find(AJob); if ANode = nil then begin FItems.Insert(AJob); FFirstFireTime := PJob(FItems.First.Data).NextTime; end else begin// 如果已经存在同一时刻的作业,则自己挂接到其它作业头部 AJob.Next := PJob(ANode.Data); ANode.Data := AJob; // 首个作业改为自己 end; end; end; end; finally FLocker.Leave; end; if Result <> nil then begin Result.PopTime := ATick; Result.Next := nil; end; end; function TRepeatJobs.InternalPush(AJob: PJob): Boolean; var ANode: PRBNode; begin AJob.UpdateNextTime; // 计算作业的下次执行时间 FLocker.Enter; try ANode := FItems.Find(AJob); if ANode = nil then begin FItems.Insert(AJob); FFirstFireTime := PJob(FItems.First.Data).NextTime; end else begin // 如果已经存在同一时刻的作业,则自己挂接到其它作业头部 AJob.Next := PJob(ANode.Data); ANode.Data := AJob; // 首个作业改为自己 end; finally FLocker.Leave; end; Result := True; end; { TYXDWorker } procedure TYXDWorker.ComNeeded(AInitFlags: Cardinal); begin {$IFDEF MSWINDOWS} if not ComInitialized then begin if AInitFlags = 0 then CoInitialize(nil) else CoInitializeEx(nil, AInitFlags); SetValue(WORKER_COM_INITED, True); end; {$ENDIF MSWINDOWS} end; constructor TYXDWorker.Create(AOwner: TYXDWorkers); begin inherited Create(True); FOwner := AOwner; FTimeout := 1000; FFlags := WORKER_ISBUSY; // 默认为忙碌 AtomicIncrement(AOwner.FBusyCount); FEvent := TEvent.Create(nil, False, False, ''); FreeOnTerminate := True; end; destructor TYXDWorker.Destroy; begin FreeAndNil(FEvent); inherited; end; procedure TYXDWorker.DoJob(AJob: PJob); begin {$IFDEF UNICODE} if AJob.IsAnonWorkerProc then AJob.WorkerProcA(AJob) else {$ENDIF} AJob.WorkerProc(AJob); end; procedure TYXDWorker.Execute; var wr: TWaitResult; {$IFDEF MSWINDOWS} SyncEvent: TEvent; {$ENDIF} begin {$IFDEF MSWINDOWS} SyncEvent := TEvent.Create(nil, False, False, ''); {$ENDIF} try while not(Terminated or FOwner.FTerminating) do begin if FOwner.Enabled then begin if (FOwner.FSimpleJobs.FFirst <> nil) then begin {$IFDEF MSWINDOWS}SwitchToThread; {$ELSE} TThread.Yield;{$ENDIF} FTimeout := 0; end else if (FOwner.FRepeatJobs.FFirstFireTime <> 0) then begin FTimeout := FOwner.FRepeatJobs.FFirstFireTime - GetTimestamp; if FTimeout < 0 then // 时间已经到了?那么立刻执行 FTimeout := 0; end else FTimeout := WAITJOB_TIMEOUT; end else FTimeout := WAITJOB_TIMEOUT; // 如果仍没有作业进入,则除非自己是保留的线程对象,否则释放工作者 if FTimeout <> 0 then begin wr := FEvent.WaitFor(FTimeout); if Terminated or FOwner.FTerminating then Break; end else wr := wrSignaled; if (wr = wrSignaled) or ((FOwner.FRepeatJobs.FFirstFireTime <> 0) and (GetTimestamp >= FOwner.FRepeatJobs.FFirstFireTime - 1)) then begin if FOwner.FTerminating then Break; if IsIdle then begin SetValue(WORKER_ISBUSY or WORKER_LOOKUP, true); AtomicIncrement(FOwner.FBusyCount); end else SetValue(WORKER_LOOKUP, true); repeat FActiveJob := FOwner.Popup; if FActiveJob <> nil then begin FActiveJob.Worker := Self; FActiveJobProc := FActiveJob.WorkerProc; // 为Clear(AObject)准备判断,以避免FActiveJob线程不安全 FActiveJobData := FActiveJob.Data; if FActiveJob.IsSignalWakeup then FActiveJobSource := FActiveJob.Source else FActiveJobSource := nil; if FActiveJob.IsGrouped then FActiveJobGroup := FActiveJob.Group else FActiveJobGroup := nil; FActiveJobFlags := FActiveJob.Flags; if FActiveJob.StartTime = 0 then begin FActiveJob.StartTime := GetTimestamp; FActiveJob.FirstTime := FActiveJob.StartTime; end else FActiveJob.StartTime := GetTimestamp; try FFlags := (FFlags or WORKER_EXECUTING) and (not WORKER_LOOKUP); if FActiveJob.InMainThread then {$IFDEF MSWINDOWS} begin if PostMessage(FOwner.FMainWorker, WM_APP, WPARAM(FActiveJob), LPARAM(SyncEvent)) then SyncEvent.WaitFor(INFINITE); end {$ELSE} Synchronize(Self, FireInMainThread) {$ENDIF} else DoJob(FActiveJob); except if Assigned(FOwner.FOnErrorNotify) then FOwner.FOnErrorNotify(FActiveJob, Exception(ExceptObject), 'TYXDWorker.Execute'); end; if not FActiveJob.Runonce then FOwner.FRepeatJobs.AfterJobRun(FActiveJob, GetTimestamp - FActiveJob.StartTime) else begin if FActiveJob.IsSignalWakeup then FOwner.SignalWorkDone(FActiveJob, GetTimestamp - FActiveJob.StartTime) else if FActiveJob.IsLongtimeJob then AtomicDecrement(FOwner.FLongTimeWorkers) else if FActiveJob.IsGrouped then FActiveJobGroup.DoJobExecuted(FActiveJob); FActiveJob.Worker := nil; end; FOwner.FreeJob(FActiveJob); FActiveJobProc := nil; FActiveJobSource := nil; FActiveJobFlags := 0; FActiveJobGroup := nil; FTerminatingJob := nil; FFlags := FFlags and (not WORKER_EXECUTING); end else FFlags := FFlags and (not WORKER_LOOKUP); until (FActiveJob = nil) or FOwner.FTerminating or Terminated or (not FOwner.Enabled); SetValue(WORKER_ISBUSY, False); FOwner.WorkerIdle(Self, irNoJob); end else if (not IsReserved) and (FTimeout = WAITJOB_TIMEOUT) then begin SetValue(WORKER_ISBUSY, False); FOwner.WorkerIdle(Self, irTimeout); end; end; finally FOwner.WorkerTerminate(Self); {$IFDEF MSWINDOWS} if ComInitialized then CoUninitialize; FreeAndNil(SyncEvent); {$ENDIF} end; end; procedure TYXDWorker.FireInMainThread; begin DoJob(FActiveJob); end; function TYXDWorker.GetIsIdle: Boolean; begin Result := not IsBusy; end; function TYXDWorker.GetValue(Index: Integer): Boolean; begin Result := (FFlags and Index) <> 0; end; procedure TYXDWorker.SetValue(Index: Integer; const Value: Boolean); begin if Value then FFlags := (FFlags or Index) else FFlags := (FFlags and (not Index)); end; { TYXDWorkers } function TYXDWorkers.Clear(const ASignalName: string): Integer; begin Result := ClearWaitJobs(0, ASignalName); end; function TYXDWorkers.Clear(ASignalId: Integer): Integer; begin Result := ClearWaitJobs(ASignalId, ''); end; function TYXDWorkers.Clear(AProc: TJobProc; AData: Pointer; AMaxTimes: Integer): Integer; begin Result := ClearJobs(nil, AProc, AData, AMaxTimes); end; function TYXDWorkers.Clear(AObject: Pointer; AMaxTimes: Integer): Integer; begin Result := ClearJobs(AObject, nil, nil, AMaxTimes); end; procedure TYXDWorkers.Clear; var i: Integer; AParam: TWorkerWaitParam; ASignal: PSignal; begin DisableWorkers; // 避免工作者取得新的作业 try FSimpleJobs.Clear; FRepeatJobs.Clear; FLocker.Enter; try for i := 0 to FSignalJobs.BucketCount - 1 do begin if Assigned(FSignalJobs.Buckets[i]) then begin ASignal := FSignalJobs.Buckets[i].Data; FreeJob(ASignal.First); ASignal.First := nil; end; end; finally FLocker.Leave; end; AParam.WaitType := $FF; WaitRunningDone(AParam); finally EnableWorkers; end; end; function TYXDWorkers.ClearJobs(AObject: Pointer; AProc: TJobProc; AData: Pointer; AMaxTimes: Integer): Integer; var ACleared: Integer; AWaitParam: TWorkerWaitParam; function ClearSignalJobs(IsClearObject: Boolean): Integer; var i: Integer; AJob, ANext, APrior: PJob; AList: PHashList; ASignal: PSignal; B: Boolean; begin Result := 0; FLocker.Enter; try for i := 0 to FSignalJobs.BucketCount - 1 do begin AList := FSignalJobs.Buckets[i]; if AList <> nil then begin ASignal := AList.Data; if ASignal.First <> nil then begin AJob := ASignal.First; APrior := nil; while (AJob <> nil) and (AMaxTimes <> 0) do begin ANext := AJob.Next; if IsClearObject then B := TMethod(AJob.WorkerProc).Data = AObject else B := SameWorkerProc(AJob.WorkerProc, AProc) and ((AData = Pointer(-1)) or (AJob.Data = AData)); if B then begin if ASignal.First = AJob then ASignal.First := ANext; if Assigned(APrior) then APrior.Next := ANext; AJob.Next := nil; FreeJob(AJob); Inc(Result); Dec(AMaxTimes); end else APrior := AJob; AJob := ANext; end; if AMaxTimes = 0 then Break; end; end; end; finally FLocker.Leave; end; end; begin Result := 0; if Self <> nil then begin ACleared := FSimpleJobs.ClearJobs(AObject, AProc, AData, AMaxTimes); Dec(AMaxTimes, ACleared); Inc(Result, ACleared); if AMaxTimes <> 0 then begin ACleared := FRepeatJobs.ClearJobs(AObject, AProc, AData, AMaxTimes); Dec(AMaxTimes, ACleared); Inc(Result, ACleared); if AMaxTimes <> 0 then begin ACleared := ClearSignalJobs(AObject <> nil); Inc(Result, ACleared); if AMaxTimes <> 0 then begin AWaitParam.WaitType := 1; AWaitParam.Data := AData; AWaitParam.WorkerProc := TMethod(AProc); WaitRunningDone(AWaitParam); end; end; end; end; end; function TYXDWorkers.ClearWaitJobs(ASignalId: Integer; const ASignalName: string): Integer; var i: Integer; ASignal: PSignal; AJob: PJob; B: Boolean; begin AJob := nil; FLocker.Enter; try for i := 0 to FSignalJobs.BucketCount - 1 do begin if FSignalJobs.Buckets[i] <> nil then begin ASignal := FSignalJobs.Buckets[i].Data; if ASignalId > 0 then B := ASignal.Id = ASignalId else B := ASignal.Name = ASignalName; if B then begin AJob := ASignal.First; ASignal.First := nil; Break; end; end; end; finally FLocker.Leave; end; if AJob <> nil then Result := ClearSignalJobs(AJob) else Result := 0; end; function TYXDWorkers.ClearSignalJobs(ASource: PJob): Integer; var ACount: Integer; AFirst, ALast, APrior, ANext: PJob; AWaitParam: TWorkerWaitParam; begin Result := 0; AFirst := nil; APrior := nil; FSimpleJobs.FLocker.Enter; try ALast := FSimpleJobs.FFirst; ACount := FSimpleJobs.Count; FSimpleJobs.FFirst := nil; FSimpleJobs.FLast := nil; FSimpleJobs.FCount := 0; finally FSimpleJobs.FLocker.Leave; end; while ALast <> nil do begin if (ALast.IsSignalWakeup) and (ALast.Source = ASource) then begin ANext := ALast.Next; ALast.Next := nil; FreeJob(ALast); ALast := ANext; if APrior <> nil then APrior.Next := ANext; Dec(ACount); Inc(Result); end else begin if AFirst = nil then AFirst := ALast; APrior := ALast; ALast := ALast.Next; end; end; if ACount > 0 then begin FSimpleJobs.FLocker.Enter; try APrior.Next := FSimpleJobs.FFirst; FSimpleJobs.FFirst := AFirst; Inc(FSimpleJobs.FCount, ACount); if FSimpleJobs.FLast = nil then FSimpleJobs.FLast := APrior; finally FSimpleJobs.FLocker.Leave; end; end; AWaitParam.WaitType := 2; AWaitParam.SourceJob := ASource; WaitRunningDone(AWaitParam); FreeJob(ASource); end; procedure TYXDWorkers.ClearWorkers; var i: Integer; {$IFDEF MSWINDOWS} function WorkerExists: Boolean; var J: Integer; ACode: Cardinal; begin Result := False; FLocker.Enter; try while FWorkerCount > 0 do begin if GetExitCodeThread(FWorkers[0].Handle, ACode) then begin if ACode = STILL_ACTIVE then begin Result := True; Break; end; end; // 工作者已经不存在,可能被外部线程结束 FreeAndNil(FWorkers[0]); if FWorkerCount > 0 then begin for J := 1 to FWorkerCount - 1 do FWorkers[J - 1] := FWorkers[J]; Dec(FWorkerCount); FWorkers[FWorkerCount] := nil; end; end; finally FLocker.Leave; end; end; {$ENDIF} begin FTerminating := True; FLocker.Enter; try FRepeatJobs.FFirstFireTime := 0; for i := 0 to FWorkerCount - 1 do FWorkers[i].FEvent.SetEvent; finally FLocker.Leave; end; while (FWorkerCount > 0) {$IFDEF MSWINDOWS} and WorkerExists {$ENDIF} do {$IFDEF MSWINDOWS}SwitchToThread; {$ELSE} TThread.Yield;{$ENDIF} end; constructor TYXDWorkers.Create(AMinWorkers: Integer); var i: Integer; begin FBusyCount := 0; FSimpleJobs := TSimpleJobs.Create(Self); FRepeatJobs := TRepeatJobs.Create(Self); FSignalJobs := TYXDHashTable.Create(1361); FSignalJobs.OnDelete := DoJobFree; FSignalJobs.AutoSize := True; FLocker := TCriticalSection.Create; FCPUNum := GetCPUCount; if AMinWorkers < 1 then FMinWorkers := 2 else FMinWorkers := AMinWorkers; // 最少工作者为2个 FMaxWorkers := (FCPUNum shl 1) + 1; if FMaxWorkers <= FMinWorkers then FMaxWorkers := (FMinWorkers shl 1) + 1; FTerminating := False; // 创建默认工作者 FDisableCount := 0; FMaxSignalId := 0; FWorkerCount := FMinWorkers; SetLength(FWorkers, FMaxWorkers); for i := 0 to FMinWorkers - 1 do begin FWorkers[i] := TYXDWorker.Create(Self); FWorkers[i].SetValue(WORKER_RESERVED, True); // 保留,不需要空闲检查 FWorkers[i].Suspended := False; {$IFDEF MSWINDOWS} SetThreadCPU(FWorkers[i].Handle, i mod FCPUNum); {$ELSE} SetThreadCPU(FWorkers[i].ThreadID, i mod FCPUNum); {$ENDIF} end; FMaxLongtimeWorkers := (FMaxWorkers shr 1); {$IFDEF MSWINDOWS} FMainWorker := AllocateHWnd(DoMainThreadWork); {$ENDIF} end; destructor TYXDWorkers.Destroy; begin ClearWorkers; FLocker.Enter; try FreeAndNil(FSimpleJobs); FreeAndNil(FRepeatJobs); FreeAndNil(FSignalJobs); finally FLocker.Leave; FLocker.Free; end; {$IFDEF MSWINDOWS} DeallocateHWnd(FMainWorker); {$ENDIF} inherited; end; procedure TYXDWorkers.DisableWorkers; begin AtomicIncrement(FDisableCount); end; procedure TYXDWorkers.DoCustomFreeData(AFreeType: TJobDataFreeType; var AData: Pointer); begin if Assigned(FOnCustomFreeData) then FOnCustomFreeData(Self, AFreeType, AData); end; procedure TYXDWorkers.DoJobFree(ATable: TObject; AHash: Cardinal; AData: Pointer); var ASignal: PSignal; begin ASignal := AData; if ASignal.First <> nil then FreeJob(ASignal.First); Dispose(ASignal); end; {$IFDEF MSWINDOWS} procedure TYXDWorkers.DoMainThreadWork(var AMsg: TMessage); var AJob: PJob; begin if AMsg.Msg = WM_APP then begin try AJob := PJob(AMsg.WPARAM); AJob.Worker.DoJob(AJob); finally if AMsg.LPARAM <> 0 then TEvent(AMsg.LPARAM).SetEvent; end; end else AMsg.Result := DefWindowProc(FMainWorker, AMsg.Msg, AMsg.WPARAM, AMsg.LPARAM); end; {$ENDIF} procedure TYXDWorkers.EnableWorkers; var ANeedCount: Integer; begin if AtomicDecrement(FDisableCount) = 0 then begin if (FSimpleJobs.Count > 0) or (FRepeatJobs.Count > 0) then begin ANeedCount := FSimpleJobs.Count + FRepeatJobs.Count; while ANeedCount > 0 do begin if not LookupIdleWorker then Break; Dec(ANeedCount); end; end; end; end; procedure InitJobFreeType(AOwner: TYXDWorkers; AJob: PJob; AData: Pointer; AFreeType: TJobDataFreeType); inline; begin if AData <> nil then begin case AFreeType of jdfFreeAsObject: AJob.IsObjectOwner := True; jdfFreeAsRecord: AJob.IsRecordOwner := True; jdfFreeAsInterface: begin AJob.IsInterfaceOwner := True; IUnknown(AData)._AddRef; end; end; end else begin if AFreeType <> jdfFreeByUser then AOwner.DoCustomFreeData(AFreeType, AData); end; end; procedure TYXDWorkers.FireSignalJob(ASignal: PSignal; AData: Pointer; AFreeType: TJobDataFreeType); var AJob, ACopy: PJob; ACount: PInteger; begin Inc(ASignal.Fired); if AData <> nil then begin New(ACount); ACount^ := 1; //初始值 end else ACount := nil; AJob := ASignal.First; while AJob <> nil do begin ACopy := JobPool.Pop; ACopy.Assign(AJob); ACopy.SetValue(JOB_RUN_ONCE, True); ACopy.Source := AJob; ACopy.Data := AData; InitJobFreeType(Self, ACopy, AData, AFreeType); if ACount <> nil then begin AtomicIncrement(ACount^); ACopy.RefCount := ACount; end; FSimpleJobs.Push(ACopy); AJob := AJob.Next; end; if AData <> nil then begin if AtomicDecrement(ACount^) = 0 then begin Dispose(ACount); FreeJobData(AData, AFreeType); end; end; end; procedure TYXDWorkers.FreeJob(AJob: PJob); var ANext: PJob; AFreeData: Boolean; begin while AJob <> nil do begin ANext := AJob.Next; if AJob.Data <> nil then begin if AJob.IsSignalWakeup then begin AFreeData := AtomicDecrement(AJob.RefCount^) = 0; if AFreeData then Dispose(AJob.RefCount); end else AFreeData := AJob.IsDataOwner; if AFreeData then begin if AJob.IsObjectOwner then begin FreeJobData(AJob.Data, jdfFreeAsObject); AJob.Data := nil; end else if AJob.IsRecordOwner then begin FreeJobData(AJob.Data, jdfFreeAsRecord); AJob.Data := nil; end else if AJob.IsInterfaceOwner then begin FreeJobData(AJob.Data, jdfFreeAsInterface); AJob.Data := nil; end; end; end; JobPool.Push(AJob); AJob := ANext; end; end; procedure TYXDWorkers.FreeJobData(AData: Pointer; AFreeType: TJobDataFreeType); begin case AFreeType of jdfFreeAsObject: try FreeAndNil(TObject(AData)); except if Assigned(FOnErrorNotify) then FOnErrorNotify(nil, Exception(ExceptObject), 'Workers.FreeJobData'); end; jdfFreeAsRecord: try Dispose(AData); except if Assigned(FOnErrorNotify) then FOnErrorNotify(nil, Exception(ExceptObject), 'Workers.FreeJobData'); end; jdfFreeAsInterface: try IUnknown(AData)._Release; except if Assigned(FOnErrorNotify) then FOnErrorNotify(nil, Exception(ExceptObject), 'Workers.FreeJobData'); end; end; end; function TYXDWorkers.GetEnabled: Boolean; begin Result := (FDisableCount = 0); end; class function TYXDWorkers.GetInstance: TYXDWorkers; begin if not Assigned(Workers) then Workers := TYXDWorkers.Create(); Result := Workers; end; class function TYXDWorkers.JobPoolCount: Integer; begin Result := JobPool.Count; end; function TYXDWorkers.LookupIdleWorker: Boolean; var i: Integer; AWorker: TYXDWorker; begin Result := False; if (FDisableCount <> 0) or (FBusyCount = MaxWorkers) or FTerminating then Exit; FLocker.Enter; try if FBusyCount < FWorkerCount then begin for i := 0 to FWorkerCount - 1 do begin AWorker := FWorkers[i]; if (AWorker <> nil) and (AWorker.IsIdle) then begin AWorker.Suspended := False; AWorker.SetValue(WORKER_ISBUSY, true); AtomicIncrement(FBusyCount); AWorker.FEvent.SetEvent; Result := true; Exit; end; end; end; if (not Result) and (FWorkerCount < MaxWorkers) then begin AWorker := TYXDWorker.Create(Self); {$IFDEF MSWINDOWS} SetThreadCPU(AWorker.Handle, FWorkerCount mod FCPUNum); {$ELSE} SetThreadCPU(AWorker.ThreadID, FWorkerCount mod GetCPUCount); {$ENDIF} AWorker.Suspended := False; AWorker.FEvent.SetEvent; FWorkers[FWorkerCount] := AWorker; Inc(FWorkerCount); Result := true; end; finally FLocker.Leave; end; end; function TYXDWorkers.Popup: PJob; begin Result := FSimpleJobs.Pop; if Result = nil then Result := FRepeatJobs.Pop; end; procedure InitJob(AJob: PJob; AData: Pointer; ARunInMainThread: Boolean; const ADelay, AInterval: Int64); inline; begin AJob.Data := AData; if AInterval > 0 then begin AJob.Interval := AInterval; AJob.SetValue(JOB_RUN_ONCE, False); end else AJob.SetValue(JOB_RUN_ONCE, True); AJob.FirstDelay := ADelay; AJob.SetValue(JOB_IN_MAINTHREAD, ARunInMainThread); end; function TYXDWorkers.Post(AJobProc: TJobProc; AData: Pointer; ARunInMainThread: Boolean; const ADelay, AInterval: Int64; AFreeType: TJobDataFreeType): Boolean; var AJob: PJob; begin AJob := JobPool.Pop; AJob.WorkerProc := AJobProc; InitJob(AJob, AData, ARunInMainThread, ADelay, AInterval); InitJobFreeType(Self, AJob, AData, AFreeType); Result := Post(AJob); end; function TYXDWorkers.Post(AJobProc: TJobProcG; AData: Pointer; ARunInMainThread: Boolean; const ADelay, AInterval: Int64; AFreeType: TJobDataFreeType): Boolean; var AJob: PJob; begin AJob := JobPool.Pop; AJob.WorkerProc := MakeJobProc(AJobProc); InitJob(AJob, AData, ARunInMainThread, ADelay, AInterval); InitJobFreeType(Self, AJob, AData, AFreeType); Result := Post(AJob); end; {$IFDEF UNICODE} function TYXDWorkers.Post(AJobProc: TJobProcA; AData: Pointer; ARunInMainThread: Boolean; const ADelay, AInterval: Int64; AFreeType: TJobDataFreeType): Boolean; var AJob: PJob; begin AJob := JobPool.Pop; AJob.WorkerProcA := AJobProc; InitJob(AJob, AData, ARunInMainThread, ADelay, AInterval); InitJobFreeType(Self, AJob, AData, AFreeType); Result := Post(AJob); end; {$ENDIF} function TimeToDelay(const ATime: TDateTime): Int64; inline; var ANow, ATemp: TDateTime; begin ANow := Now; ANow := ANow - Trunc(ANow); // ATime我们只要时间部分,日期忽略 ATemp := ATime - Trunc(ATime); if ANow > ATemp then // 好吧,今天的点已经过了,算明天 Result := Trunc(((1 + ANow) - ATemp) * WODay) // 延迟的时间,单位为1ms else Result := Trunc((ATemp - ANow) * WODay); end; function TYXDWorkers.Post(AJobProc: TJobProcG; const ATime: TDateTime; const AInterval: Int64; AData: Pointer; ARunInMainThread: Boolean; AFreeType: TJobDataFreeType): Boolean; var AJob: PJob; begin AJob := JobPool.Pop; AJob.WorkerProc := MakeJobProc(AJobProc); InitJob(AJob, AData, ARunInMainThread, TimeToDelay(ATime), AInterval); InitJobFreeType(Self, AJob, AData, AFreeType); Result := Post(AJob); end; function TYXDWorkers.Post(AJobProc: TJobProc; const ATime: TDateTime; const AInterval: Int64; AData: Pointer; ARunInMainThread: Boolean; AFreeType: TJobDataFreeType): Boolean; var AJob: PJob; begin AJob := JobPool.Pop; AJob.WorkerProc := AJobProc; InitJob(AJob, AData, ARunInMainThread, TimeToDelay(ATime), AInterval); InitJobFreeType(Self, AJob, AData, AFreeType); Result := Post(AJob); end; {$IFDEF UNICODE} function TYXDWorkers.Post(AJobProc: TJobProcA; const ATime: TDateTime; const AInterval: Int64; AData: Pointer; ARunInMainThread: Boolean; AFreeType: TJobDataFreeType): Boolean; var AJob: PJob; begin AJob := JobPool.Pop; AJob.WorkerProcA := AJobProc; InitJob(AJob, AData, ARunInMainThread, TimeToDelay(ATime), AInterval); InitJobFreeType(Self, AJob, AData, AFreeType); Result := Post(AJob); end; {$ENDIF} procedure InitLogJob(AJob: PJob; AData: Pointer); inline; begin AJob.Data := AData; AJob.SetValue(JOB_LONGTIME, True); AJob.SetValue(JOB_RUN_ONCE, True); // 长作业只运行一次 end; function TYXDWorkers.PostLongJob(AJobProc: TJobProc; AData: Pointer; AFreeType: TJobDataFreeType): Boolean; var AJob: PJob; begin if AtomicIncrement(FLongTimeWorkers) <= FMaxLongtimeWorkers then begin Result := True; AJob := JobPool.Pop; AJob.WorkerProc := AJobProc; InitLogJob(AJob, AData); InitJobFreeType(self, AJob, AData, AFreeType); Post(AJob); end else begin // 长期作业数已经达到极限 AtomicDecrement(FLongTimeWorkers); Result := False; end; end; function TYXDWorkers.PostLongJob(AJobProc: TJobProcG; AData: Pointer; AFreeType: TJobDataFreeType): Boolean; begin Result := PostLongJob(MakeJobProc(AJobProc), AData, AFreeType); end; {$IFDEF UNICODE} function TYXDWorkers.PostLongJob(AJobProc: TJobProcA; AData: Pointer; AFreeType: TJobDataFreeType): Boolean; var AJob: PJob; begin if AtomicIncrement(FLongTimeWorkers) <= FMaxLongtimeWorkers then begin Result := True; AJob := JobPool.Pop; AJob.WorkerProcA := AJobProc; InitLogJob(AJob, AData); InitJobFreeType(self, AJob, AData, AFreeType); Post(AJob); end else begin // 长期作业数已经达到极限 AtomicDecrement(FLongTimeWorkers); Result := False; end; end; {$ENDIF} procedure InitWaitJob(AJob: PJob; ASignalId: Integer; ARunInMainThread: Boolean); inline; begin AJob.Data := nil; AJob.SignalId := ASignalId; AJob.PushTime := GetTimestamp; AJob.SetValue(JOB_SIGNAL_WAKEUP, True); AJob.SetValue(JOB_IN_MAINTHREAD, ARunInMainThread); end; function TYXDWorkers.PostWaitJob(AJob: PJob; ASignalId: Integer): Boolean; var ASignal: PSignal; begin Result := False; FLocker.Enter; try ASignal := FSignalJobs.FindFirstData(ASignalId); if ASignal <> nil then begin AJob.Next := ASignal.First; ASignal.First := AJob; Result := True; end; finally FLocker.Leave; if not Result then JobPool.Push(AJob); end; end; function TYXDWorkers.PostWait(AJobProc: TJobProc; ASignalId: Integer; ARunInMainThread: Boolean): Boolean; var AJob: PJob; begin if not FTerminating then begin AJob := JobPool.Pop; AJob.WorkerProc := AJobProc; InitWaitJob(AJob, ASignalId, ARunInMainThread); Result := PostWaitJob(AJob, ASignalId); end else Result := False; end; function TYXDWorkers.PostWait(AJobProc: TJobProcG; ASignalId: Integer; ARunInMainThread: Boolean): Boolean; begin Result := PostWait(MakeJobProc(AJobProc), ASignalId, ARunInMainThread); end; {$IFDEF UNICODE} function TYXDWorkers.PostWait(AJobProc: TJobProcA; ASignalId: Integer; ARunInMainThread: Boolean): Boolean; var AJob: PJob; begin if not FTerminating then begin AJob := JobPool.Pop; AJob.WorkerProcA := AJobProc; InitWaitJob(AJob, ASignalId, ARunInMainThread); Result := PostWaitJob(AJob, ASignalId); end else Result := False; end; {$ENDIF} function TYXDWorkers.Post(AJob: PJob): Boolean; begin if (not FTerminating) and (Assigned(AJob.WorkerProc) {$IFDEF UNICODE} or Assigned(AJob.WorkerProcA){$ENDIF}) then begin if AJob.Runonce and (AJob.FirstDelay = 0) then Result := FSimpleJobs.Push(AJob) else Result := FRepeatJobs.Push(AJob); if Result then LookupIdleWorker; end else begin AJob.Next := nil; FreeJob(AJob); Result := False; end; end; procedure TYXDWorkers.SendSignal(AId: Integer; AData: Pointer; AFreeType: TJobDataFreeType); var AFound: Boolean; ASignal: PSignal; begin AFound := False; if FTerminating then Exit; FLocker.Enter; try ASignal := FSignalJobs.FindFirstData(AId); if ASignal <> nil then begin AFound := True; FireSignalJob(ASignal, AData, AFreeType); end; finally FLocker.Leave; end; if AFound then LookupIdleWorker; end; procedure TYXDWorkers.SendSignal(const AName: string; AData: Pointer; AFreeType: TJobDataFreeType); var i: Integer; ASignal: PSignal; AFound: Boolean; begin AFound := False; if Length(AName) = 0 then Exit; FLocker.Enter; try for i := 0 to FSignalJobs.BucketCount - 1 do begin if FSignalJobs.Buckets[i] <> nil then begin ASignal := FSignalJobs.Buckets[i].Data; if (Length(ASignal.Name) = Length(AName)) and (ASignal.Name = AName) then begin AFound := True; FireSignalJob(ASignal, AData, AFreeType); Break; end; end; end; finally FLocker.Leave; end; if AFound then LookupIdleWorker; end; procedure TYXDWorkers.SetEnabled(const Value: Boolean); begin if Value then EnableWorkers else DisableWorkers; end; procedure TYXDWorkers.SetMaxLongtimeWorkers(const Value: Integer); begin if FMaxLongtimeWorkers <> Value then begin if Value > (MaxWorkers shr 1) then // 长时间运行的作业不能大于最大工作线程的一半 raise Exception.Create(STooManyLongtimeWorker); FMaxLongtimeWorkers := Value; end; end; procedure TYXDWorkers.SetMaxWorkers(const Value: Integer); var ATemp, AMaxLong: Integer; begin if (Value >= 2) and (FMaxWorkers <> Value) then begin AtomicExchange(ATemp, FLongTimeWorkers); AtomicExchange(FLongTimeWorkers, 0); // 强制置0,防止有新入的长时间作业 AMaxLong := Value shr 1; FLocker.Enter; try if FLongTimeWorkers < AMaxLong then begin // 已经进行的长时间作业数小于一半的工作者 if ATemp < AMaxLong then AMaxLong := ATemp; // 长时间作业最大值使用更改之前已经存在的长时间作业数量 if FMaxWorkers > Value then begin while Value < FWorkerCount do // 中止大于最大作业数的作业,谁是倒霉孩子? WorkerTerminate(FWorkers[FWorkerCount - 1]); FMaxWorkers := Value; SetLength(FWorkers, Value); end else begin FMaxWorkers := Value; SetLength(FWorkers, Value); end; end; finally FLocker.Leave; AtomicExchange(FLongTimeWorkers, AMaxLong); end; end; end; procedure TYXDWorkers.SetMinWorkers(const Value: Integer); begin if FMinWorkers <> Value then begin if Value < 1 then raise Exception.Create(STooFewWorkers); FMinWorkers := Value; end; end; function TYXDWorkers.SignalIdByName(const AName: string): Integer; var i, j: Integer; ASignal: PSignal; begin Result := -1; j := Length(AName); if j < 1 then Exit; for i := 0 to FSignalJobs.BucketCount - 1 do begin if FSignalJobs.Buckets[i] <> nil then begin ASignal := FSignalJobs.Buckets[i].Data; if (Length(ASignal.Name) = j) and (ASignal.Name = AName) then begin Result := ASignal.Id; Exit; end; end; end; end; procedure TYXDWorkers.SignalWorkDone(AJob: PJob; AUsedTime: Int64); var ASignal: PSignal; ATemp, APrior: PJob; begin APrior := nil; FLocker.Enter; try ASignal := FSignalJobs.FindFirstData(AJob.SignalId); ATemp := ASignal.First; while ATemp <> nil do begin if ATemp = AJob.Source then begin if AJob.IsTerminated then begin if APrior <> nil then APrior.Next := ATemp.Next else ASignal.First := ATemp.Next; ATemp.Next := nil; FreeJob(ATemp); end else begin Inc(ATemp.Runs); // 更新信号作业的统计信息 if AUsedTime > 0 then begin if ATemp.MinUsedTime = 0 then ATemp.MinUsedTime := AUsedTime else if AUsedTime < ATemp.MinUsedTime then ATemp.MinUsedTime := AUsedTime; if ATemp.MaxUsedTime = 0 then ATemp.MaxUsedTime := AUsedTime else if AUsedTime > ATemp.MaxUsedTime then ATemp.MaxUsedTime := AUsedTime; Break; end; end; end; APrior := ATemp; ATemp := ATemp.Next; end; finally FLocker.Leave; end; end; procedure TYXDWorkers.WaitRunningDone(const AParam: TWorkerWaitParam); var AInMainThread: Boolean; function HasJobRunning: Boolean; var i: Integer; AJob: PJob; begin Result := False; DisableWorkers; FLocker.Enter; try for i := 0 to FWorkerCount - 1 do begin if FWorkers[i].IsLookuping then begin// 还未就绪,所以在下次查询 Result := True; Break; end else if FWorkers[i].IsExecuting then begin AJob := FWorkers[i].FActiveJob; case AParam.WaitType of 0: // ByObject Result := TMethod(FWorkers[i].FActiveJobProc).Data = AParam.Bound; 1: // ByData Result := (TMethod(FWorkers[i].FActiveJobProc).Code = TMethod(AParam.WorkerProc).Code) and (TMethod(FWorkers[i].FActiveJobProc).Data = TMethod(AParam.WorkerProc).Data) and ((AParam.Data = Pointer(-1)) or (FWorkers[i].FActiveJobData = AParam.Data)); 2: // BySignalSource Result := (FWorkers[i].FActiveJobSource = AParam.SourceJob); 3: // ByGroup Result := (FWorkers[i].FActiveJobGroup = AParam.Group); $FF: // 所有 Result := True; else if Assigned(FOnErrorNotify) then FOnErrorNotify(AJob, Exception.CreateFmt(SBadWaitDoneParam, [AParam.WaitType]), 'TYXDWorkers.WaitRunningDone'); end; if Result then begin FWorkers[i].FTerminatingJob := AJob; Break; end; end; end; finally FLocker.Leave; EnableWorkers; end; end; begin AInMainThread := GetCurrentThreadId = MainThreadId; while True do begin if HasJobRunning then begin if AInMainThread then begin // 如果是在主线程中清理,由于作业可能在主线程执行,可能已经投寄尚未执行,所以必需让其能够执行 {$IFDEF NEXTGEN} fmx.Forms.Application.ProcessMessages; {$ELSE} Forms.Application.ProcessMessages; {$ENDIF} end; {$IFDEF MSWINDOWS} SwitchToThread; {$ELSE} TThread.Yield; {$ENDIF} end else // 没找到 Break; end; end; procedure TYXDWorkers.WorkerIdle(AWorker: TYXDWorker; AReason: TWorkerIdleReason); var i, J: Integer; begin if AtomicDecrement(FBusyCount) > FMinWorkers then begin FLocker.Enter; try // 工作者闲置时,如果不是前两个长工,就检测是否有短工可以解雇 if (AWorker <> FWorkers[0]) and (AWorker <> FWorkers[1]) and (AReason = irTimeout) then begin for i := FMinWorkers to FWorkerCount - 1 do begin // 从第一个短工开始,将他们解雇 if AWorker = FWorkers[i] then begin AWorker.Terminate; for J := i + 1 to FWorkerCount - 1 do FWorkers[J - 1] := FWorkers[J]; FWorkers[FWorkerCount - 1] := nil; Dec(FWorkerCount); Break; end; end; end; finally FLocker.Leave; end; end; if AReason <> irNoJob then AtomicIncrement(FBusyCount) end; procedure TYXDWorkers.WorkerTerminate(AWorker: TObject); var i ,J: Integer; begin AtomicDecrement(FBusyCount); FLocker.Enter; for i := 0 to FWorkerCount - 1 do begin // 工作者被中止时,从列表中清除 if FWorkers[i] = AWorker then begin //System.Move(FWorkers[I + 1], FWorkers[I], (FWorkerCount - I) * SizeOf(TObject)); for J := i to FWorkerCount - 2 do FWorkers[J] := FWorkers[J + 1]; FWorkers[FWorkerCount - 1] := nil; Dec(FWorkerCount); Break; end; end; FLocker.Leave; end; function TYXDWorkers.RegisterSignal(const AName: string): Integer; var ASignal: PSignal; begin FLocker.Enter; try Result := SignalIdByName(AName); if Result < 0 then begin Inc(FMaxSignalId); New(ASignal); ASignal.Id := FMaxSignalId; ASignal.Fired := 0; ASignal.Name := AName; ASignal.First := nil; FSignalJobs.Add(ASignal, ASignal.Id); Result := ASignal.Id; end; finally FLocker.Leave; end; end; { TJobGroup } function TJobGroup.Add(AProc: TJobProc; AData: Pointer; AInMainThread: Boolean; AFreeType: TJobDataFreeType): Boolean; var AJob: PJob; begin AJob := JobPool.Pop; AJob.Group := Self; AJob.WorkerProc := AProc; AJob.Data := AData; AJob.SetValue(JOB_RUN_ONCE, True); AJob.SetValue(JOB_GROUPED, True); AJob.SetValue(JOB_IN_MAINTHREAD, AInMainThread); InitJobFreeType(FOwner, AJob, AData, AFreeType); FLocker.Enter; try FWaitResult := wrIOCompletion; if FPrepareCount > 0 then begin // 正在添加项目,加到列表中,等待Run FItems.Add(AJob); Result := True; end else begin if ByOrder then begin // 按顺序 Result := True; FItems.Add(AJob); if FItems.Count = 0 then Result := FOwner.Post(AJob); end else begin Result := FOwner.Post(AJob); if Result then FItems.Add(AJob); end; end; finally FLocker.Leave; end; end; procedure TJobGroup.Cancel; var i: Integer; AJobs: TSimpleJobs; AJob, APrior, ANext: PJob; AWaitParam: TWorkerWaitParam; begin FLocker.Enter; try if FByOrder then begin for i := 0 to FItems.Count - 1 do begin AJob := FItems[i]; if AJob.PopTime = 0 then FOwner.FreeJob(AJob); end; end; FItems.Clear; finally FLocker.Leave; end; // 从SimpleJobs里清除关联的全部作业 AJobs := FOwner.FSimpleJobs; AJobs.FLocker.Enter; try AJob := AJobs.FFirst; APrior := nil; while AJob <> nil do begin ANext := AJob.Next; if AJob.IsGrouped and (AJob.Group = Self) then begin if APrior = nil then AJobs.FFirst := AJob.Next else APrior.Next := AJob.Next; AJob.Next := nil; FOwner.FreeJob(AJob); if AJob = AJobs.FLast then AJobs.FLast := nil; end else APrior := AJob; AJob := ANext; end; finally AJobs.FLocker.Leave; end; AWaitParam.WaitType := 3; AWaitParam.Group := Self; FOwner.WaitRunningDone(AWaitParam); end; constructor TJobGroup.Create(AOwner: TYXDWorkers; AByOrder: Boolean); begin if Assigned(AOwner) then FOwner := AOwner else FOwner := Workers; if (not Assigned(FOwner)) then raise Exception.Create(SNotInitWorkers); FEvent := TEvent.Create(nil, False, False, ''); FLocker := TSimpleLock.Create; FByOrder := AByOrder; FItems := TJobItemList.Create; end; constructor TJobGroup.Create(AByOrder: Boolean); begin Create(nil, AByOrder); end; destructor TJobGroup.Destroy; var i: Integer; begin Cancel; FOwner.Clear(Self, -1); FLocker.Enter; try if FItems.Count > 0 then begin FWaitResult := wrAbandoned; FEvent.SetEvent; for i := 0 to FItems.Count - 1 do begin if PJob(FItems[i]).PushTime <> 0 then JobPool.Push(FItems[i]); end; FItems.Clear; end; finally FLocker.Leave; end; FreeAndNil(FLocker); FreeAndNil(FEvent); FreeAndNil(FItems); inherited; end; procedure TJobGroup.DoAfterDone; begin if Assigned(FAfterDone) then begin try FAfterDone(Self); except if Assigned(FOwner.FOnErrorNotify) then FOwner.FOnErrorNotify(nil, Exception(ExceptObject), 'TJobGroup.DoAfterDone'); end; end; end; procedure TJobGroup.DoJobExecuted(AJob: PJob); var i: Integer; AIsDone: Boolean; begin if FWaitResult = wrIOCompletion then begin AIsDone := False; FLocker.Enter; try i := FItems.IndexOf(AJob); if i <> -1 then begin FItems.Delete(i); if FItems.Count = 0 then begin FWaitResult := wrSignaled; FEvent.SetEvent; AIsDone := true; end else if ByOrder then begin if not FOwner.Post(FItems[0]) then begin FWaitResult := wrAbandoned; FEvent.SetEvent; end; end; end; finally FLocker.Leave; end; if AIsDone then DoAfterDone; end; end; procedure TJobGroup.DoJobsTimeout(AJob: PJob); begin FTimeoutCheck := False; Cancel; if FWaitResult = wrIOCompletion then begin FWaitResult := wrTimeout; FEvent.SetEvent; DoAfterDone; end; end; function TJobGroup.MsgWaitFor(ATimeout: Cardinal): TWaitResult; var AEndTime: Int64; begin if GetCurrentThreadId <> MainThreadId then Result := WaitFor(ATimeout) else begin Result := FWaitResult; FLocker.Enter; try if FItems.Count = 0 then Result := wrSignaled; finally FLocker.Leave; end; if Result = wrIOCompletion then begin AEndTime := GetTimestamp + ATimeout * 10; while GetTimestamp < AEndTime do begin // 每隔10毫秒检查一下是否有消息需要处理,有则处理,无则进入下一个等待 if FEvent.WaitFor(10) = wrSignaled then begin Result := FWaitResult; Break; end else begin // 如果是在主线程中清理,由于作业可能在主线程执行,可能已经投寄尚未执行,所以必需让其能够执行 {$IFDEF NEXTGEN} fmx.Forms.Application.ProcessMessages; {$ELSE} Forms.Application.ProcessMessages; {$ENDIF} end; end; if Result = wrIOCompletion then begin Cancel; if Result = wrIOCompletion then Result := wrTimeout; end; if FTimeoutCheck then FOwner.Clear; DoAfterDone; end; end; end; procedure TJobGroup.Prepare; begin AtomicIncrement(FPrepareCount); end; procedure TJobGroup.Run(ATimeout: Cardinal = INFINITE); var i: Integer; begin if AtomicDecrement(FPrepareCount) = 0 then begin if ATimeout <> INFINITE then begin FTimeoutCheck := True; FOwner.Post(DoJobsTimeout, nil, False, ATimeout); end; FLocker.Enter; try if FItems.Count = 0 then FWaitResult := wrSignaled else begin FWaitResult := wrIOCompletion; if ByOrder then begin if not FOwner.Post(FItems[0]) then FWaitResult := wrAbandoned; end else begin for i := 0 to FItems.Count - 1 do begin if not FOwner.Post(FItems[i]) then begin FWaitResult := wrAbandoned; Break; end; end; end; end; finally FLocker.Leave; end; if FWaitResult <> wrIOCompletion then DoAfterDone; end; end; function TJobGroup.WaitFor(ATimeout: Cardinal): TWaitResult; begin Result := FWaitResult; FLocker.Enter; try if FItems.Count = 0 then Result := wrSignaled; finally FLocker.Leave; end; if Result = wrIOCompletion then begin if FEvent.WaitFor(ATimeout) = wrSignaled then Result := FWaitResult else Result := wrTimeout; end; if FTimeoutCheck then FOwner.Clear; DoAfterDone; end; initialization _CPUCount := GetCPUCount; {$IFNDEF NEXTGEN} GetTickCount64 := GetProcAddress(GetModuleHandle(kernel32), 'GetTickCount64'); if not QueryPerformanceFrequency(_PerfFreq) then _PerfFreq := -1; {$ELSE} _Watch := TStopWatch.Create; _Watch.Start; {$ENDIF} JobPool := TJobPool.Create(1024); //Workers := TYXDWorkers.Create; finalization if Assigned(Workers) then FreeAndNil(Workers); if Assigned(JobPool) then FreeAndNil(JobPool); end.