说明
——————————————————————–
YxdWorker 基于QDAC项目的QWorker ,并且绝大部分代码来自于此,
感谢swish和他的QWorker,QDAC,YxdWorker 版权归swish, YangYxd所有
QWorker来自QDAC项目,版权归swish(QQ:109867294)所有
QDAC官方群:250530692
——————————————————————–
感谢Swish兄长,为了QWorker负出了很多时间和精力。
{*******************************************************}
{ }
{ YxdWorker 后台工作者管理库 }
{ }
{ 版权所有 (C) 2013 GXT YangYxd }
{ }
{*******************************************************}
{
--------------------------------------------------------------------
说明
--------------------------------------------------------------------
YxdWorker 基于QDAC项目的QWorker ,并且绝大部分代码来自于此,
感谢swish和他的QWorker,QDAC,YxdWorker 版权归swish, YangYxd所有
QWorker来自QDAC项目,版权归swish(QQ:109867294)所有
QDAC官方群:250530692
--------------------------------------------------------------------
更新记录
--------------------------------------------------------------------
2014.08.23 ver 1.0.4
--------------------------------------------------------------------
- 解决Busy计数器BUG
2014.08.22 ver 1.0.3
--------------------------------------------------------------------
- 解决JobGroup超时和Cancel的问题,解决某些原因引起测速很慢的问题
- 提取合并部分代码,减少体积
2014.08.16 ver 1.0.2
--------------------------------------------------------------------
- 改进长时间任务处理方式 ,TSimpleJobs增加 FLongFirst,FLongLast 专
门应对长时间任务,解决长时间任务导致Clear失败BUG
- 同步QWorker修改TQJobGroup.AfterDone改为除了在完成时,在中断或超时
时仍然触发
- 同步QWorker增加TQJobGroup.Run函数加入超时设置,超过指定的时间如果
仍未执行完成,则中止后续执行
- 同步QWorker增加TQJobGroup.Cancel函数用于取消未执行的作业执行
2014.08.16 ver 1.0.1
--------------------------------------------------------------------
- 增加 FOnErrorNotify通知事件,以便使用者可以记录相关日志
- 将原QWorker中的Delay,At,Post合并为Post方法。
- 将原QWorker中的时间精度由0.1ms调整为1ms.
- 将原QWorker中TJobHelper的功能直接放入TJob中,以便在D2007中还能保
持良好的语法提示
- 将原QWorker中Worker类设置Flags相关功能改为GetValue,SetValue,减小
单元大小
- 对JobGroup的Add功能增加参数AFreeType, 并默认AInMainThread=False
- 提取合并部分代码,减少单元大小
- 删除Job中的Owner字段
--------------------------------------------------------------------
}
unit YxdWorker;
{.$DEFINE WORKER_SIMPLE_LOCK} // 是否使用原子自旋锁?
interface
uses
{$IFDEF UNICODE}Generics.Collections, {$ENDIF}
{$IFDEF NEXTGEN}Fmx.Forms, System.Diagnostics, {$ENDIF}
{$IFDEF POSIX}Posix.Unistd, Posix.Pthread, {$ENDIF}
{$IFDEF MSWINDOWS}Windows, Messages, Forms, Activex, {$ENDIF}
YxdHash, SysUtils, Classes, Types, SyncObjs;
const
JOB_RUN_ONCE = $0001; // 作业只运行一次
JOB_IN_MAINTHREAD = $0002; // 作业只能在主线程中运行
JOB_MAX_WORKERS = $0004; // 尽可能多的开启可能的工作者线程来处理作业,暂不支持
JOB_LONGTIME = $0008; // 作业需要很长的时间才能完成,以便调度程序减少它对其它作业的影响
JOB_SIGNAL_WAKEUP = $0010; // 作业根据信号需要唤醒
JOB_TERMINATED = $0020; // 作业不需要继续进行,可以结束了
JOB_FREE_OBJECT = $0040; // Data关联的是Object,作业完成或清理时释放
JOB_FREE_RECORD = $0080; // Data关联的是Record,作业完成或清理时释放
JOB_FREE_INTERFACE = $0100; // Data关联的是Interface,作业完成时调用_Release
JOB_GROUPED = $0200; // 当前作业是作业组的一员
JOB_ANONPROC = $0400; // 当前作业过程是匿名函数
JOB_DATA_OWNER = JOB_FREE_OBJECT + JOB_FREE_RECORD + JOB_FREE_INTERFACE;
// 作业是Data成员的所有者
WORKER_ISBUSY = $01; // 工作者忙碌
WORKER_PROCESSLONG = $02; // 当前处理的一个长时间作业
WORKER_RESERVED = $04; // 当前工作者是一个保留工作者
WORKER_COM_INITED = $08; // 工作者已初始化为支持COM的状态(仅限Windows)
WORKER_LOOKUP = $10; // 工作者正在查找作业
WORKER_EXECUTING = $20; // 工作者正在执行作业
WORKER_EXECUTED = $40; // 工作者已经完成作业
WAITJOB_TIMEOUT = 15000; // 工作者等待作业超时时间 (15秒)
const
WOSecond = 1000; // 1s
WOMinute = 60000; // 60s/1min
WOHour = 3600000; // 3600s/60min/1hour
WODay = Int64(86400000); // 1天
type
/// <summary>作业空闲原因,内部使用</summary>
TWorkerIdleReason = (
irTimeout, // 工作者已经等待超时,可以被释放
irNoJob // 没有需要处理的作业,此时工作者会进行WAITJOB_TIMEOUT释放
// 等待状态,如果在WAITJOB_TIMEOUT内有新作业进来,则工作者
// 会被唤醒,否则超时后会被释放
);
type
/// <summary>作业结束时如何处理Data成员</summary>
TJobDataFreeType = (
jdfFreeByUser, // 用户管理对象的释放
jdfFreeAsObject, // 附加的是一个TObject继承的对象,作业完成时会调用FreeObject释放
jdfFreeAsRecord, // 附加的是一个Record对象,作业完成时会调用Dispose释放
jdfFreeAsInterface // 附加的是一个接口对象,添加时会增加计数,作业完成时会减少计数
);
type
TJobBase = class;
TJobGroup = class;
TSimpleJobs = class;
TRepeatJobs = class;
TYXDWorker = class;
TYXDWorkers = class;
{$IFNDEF UNICODE}
IntPtr = Integer;
{$ENDIF}
PJob = ^TJob;
// 作业处理回调函数
TJobProc = procedure(AJob: PJob) of object;
TJobProcG = procedure(AJob: PJob);
{$IFDEF UNICODE}
TJobProcA = reference to procedure(AJob: PJob);
{$ENDIF}
TWorkerWaitParam = record
WaitType: Byte;
Data: Pointer;
case Integer of
0:
(Bound: Pointer); // 按对象清除
1:
(WorkerProc: TMethod);
2:
(SourceJob: PJob);
3:
(Group: Pointer);
end;
// 信号的内部定义
PSignal = ^TSignal;
TSignal = packed record
Id: Integer; // 信号的索引
Fired: Integer; // 信号已触发次数
Name: string; // 信号的名称
First: PJob; // 首个作业
end;
TJob = record
private
function GetAvgTime: Integer; inline;
function GetElapseTime: Int64; inline;
function GetValue(Index: Integer): Boolean; inline;
procedure SetValue(Index: Integer; const Value: Boolean); inline;
function GetIsTerminated: Boolean; inline;
procedure SetIsTerminated(const Value: Boolean); inline;
procedure AfterRun(AUsedTime: Int64);
procedure UpdateNextTime;
public
procedure Create(AProc: TJobProc);
/// <summary>值拷贝函数</summary>
/// <remarks>Worker/Next/Source不会复制并会被置空,Owner不会被复制</remarks>
procedure Assign(const ASource: PJob);
/// <summary>重置内容,以便为从队列中弹出做准备</summary>
procedure Reset; inline;
/// <summary>平均每次运行时间,单位为1ms</summary>
property AvgTime: Integer read GetAvgTime;
/// <summmary>本次已运行时间,单位为1ms</summary>
property ElapseTime: Int64 read GetElapseTime;
/// <summary>是否只运行一次,投递作业时自动设置</summary>
property Runonce: Boolean index JOB_RUN_ONCE read GetValue;
/// <summary>是否要求在主线程执行作业,实际效果比Windows的PostMessage相似</summary>
property InMainThread: Boolean index JOB_IN_MAINTHREAD read GetValue;
/// <summary>是否是一个运行时间比较长的作业,用Workers.LongtimeWork设置</summary>
property IsLongtimeJob: Boolean index JOB_LONGTIME read GetValue;
/// <summary>是否是一个信号触发的作业</summary>
property IsSignalWakeup: Boolean index JOB_SIGNAL_WAKEUP read GetValue;
/// <summary>是否是分组作业的成员</summary>
property IsGrouped: Boolean index JOB_GROUPED read GetValue;
/// <summary>是否要求结束当前作业</summary>
property IsTerminated: Boolean read GetIsTerminated write SetIsTerminated;
/// <summary>判断作业是否拥有Data数据成员</summary>
property IsDataOwner: Boolean index JOB_DATA_OWNER read GetValue;
/// <summary>判断作业的Data指向的是一个对象且要求作业完成时自动释放</summary>
property IsObjectOwner: Boolean index JOB_FREE_OBJECT read GetValue write SetValue;
/// <summary>判断作业的Data指向的是一个记录且要求作业完成时自动释放</summary>
property IsRecordOwner: Boolean index JOB_FREE_RECORD read GetValue write SetValue;
/// <summary>判断作业的Data指向的是一个接口且要求作业完成时自动释放</summary>
property IsInterfaceOwner: Boolean index JOB_FREE_INTERFACE read GetValue write SetValue;
/// <summary>判断作业处理过程是否是一个匿名函数</summary>
property IsAnonWorkerProc: Boolean index JOB_ANONPROC read GetValue write SetValue;
public
FirstTime: Int64; // 作业第一次开始时间
StartTime: Int64; // 本次作业开始时间,8B
PushTime: Int64; // 入队时间
PopTime: Int64; // 出队时间
NextTime: Int64; // 下一次运行的时间,+8B=16B
WorkerProc: TJobProc; // 作业处理函数+8/16B
{$IFDEF UNICODE}
WorkerProcA: TJobProcA;
{$ENDIF}
Owner: TJobBase; // 作业所隶属的队列
Next: PJob; // 下一个结点
Worker: TYXDWorker; // 当前作业工作者
Runs: Integer; // 已经运行的次数+4B
MinUsedTime: Integer; // 最小运行时间+4B
TotalUsedTime: Integer; // 运行总计花费的时间,TotalUsedTime/Runs可以得出平均执行时间+4B
MaxUsedTime: Integer; // 最大运行时间+4B
Flags: Integer; // 作业标志位+4B
Data: Pointer; // 附加数据内容
case Integer of
0:
(
SignalId: Integer; // 信号编码
Source: PJob; // 源作业地址
RefCount: PInteger; // 源数据
);
1:
(
Interval: Int64; // 运行时间间隔,单位为0.1ms,实际精度受不同操作系统限制+8B
FirstDelay: Int64; // 首次运行延迟,单位为0.1ms,默认为0
);
2:
(
Group: Pointer; // 分组作业支持
);
end;
// 作业队列对象的基类,提供基础的接口封装
TJobBase = class(TObject)
protected
FOwner: TYXDWorkers;
function InternalPush(AJob: PJob): Boolean; virtual; abstract;
function InternalPop: PJob; virtual; abstract;
function GetCount: Integer; virtual; abstract;
function GetEmpty: Boolean;
public
constructor Create(AOwner: TYXDWorkers); virtual;
destructor Destroy; override;
// 投寄一个作业 (外部不应尝试直接投寄任务到队列,其由Workers的相应函数内部调用。)
function Push(AJob: PJob): Boolean; virtual;
// 弹出一个作业
function Pop: PJob; virtual;
// 空所有作业
procedure Clear; overload; virtual;
// 清空指定的作业
function Clear(AProc: TJobProc; AData: Pointer; AMaxTimes: Integer): Integer; overload; virtual; abstract;
// 清空一个对象关联的所有作业
function Clear(AObject: Pointer; AMaxTimes: Integer): Integer; overload; virtual; abstract;
/// 不可靠警告:Count和Empty值仅是一个参考,在多线程环境下可能并不保证下一句代码执行时,会一致
property Empty: Boolean read GetEmpty; // 当前队列是否为空
property Count: Integer read GetCount; // 当前队列元素数量
end;
{$IFDEF WORKER_SIMPLE_LOCK}
// 一个基于位锁的简单锁定对象,使用原子函数置位
TSimpleLock = class
private
FFlags: Integer;
public
constructor Create;
procedure Enter; inline;
procedure Leave; inline;
end;
{$ELSE}
TSimpleLock = TCriticalSection;
{$ENDIF}
/// <summary>
/// 工作者线程使用单向链表管理,而不是进行排序检索是因为对于工作者数量有限,额外
/// 的处理反而不会直接最简单的循环直接有效
/// </summary>
TYXDWorker = class(TThread)
private
FOwner: TYXDWorkers;
FEvent: TEvent;
//FNext: TYXDWorker;
FFlags: Integer;
FTimeout: Integer;
FTerminatingJob: PJob;
function GetValue(Index: Integer): Boolean; inline;
procedure SetValue(Index: Integer; const Value: Boolean); inline;
function GetIsIdle: Boolean; inline;
protected
FActiveJob: PJob;
// 之所以不直接使用FActiveJob的相关方法,是因为保证外部可以线程安全的访问这两个成员
FActiveJobProc: TJobProc;
FActiveJobData: Pointer;
FActiveJobSource: PJob;
FActiveJobGroup: TJobGroup;
FActiveJobFlags: Integer;
procedure Execute; override;
procedure FireInMainThread;
procedure DoJob(AJob: PJob);
public
constructor Create(AOwner: TYXDWorkers); overload;
destructor Destroy; override;
procedure ComNeeded(AInitFlags: Cardinal = 0);
// 判断COM是否已经初始化为支持COM
property ComInitialized: Boolean index WORKER_COM_INITED read GetValue;
// 判断当前是否处于长时间作业处理过程中
property InLongtimeJob: Boolean index WORKER_PROCESSLONG read GetValue;
// 判断当前是否空闲
property IsIdle: Boolean read GetIsIdle;
// 判断当前是否忙碌
property IsBusy: Boolean index WORKER_ISBUSY read GetValue;
// 判断当前工作者是否是内部保留的工作者
property IsReserved: Boolean index WORKER_RESERVED read GetValue;
property IsLookuping: Boolean index WORKER_LOOKUP read GetValue;
property IsExecuting: Boolean index WORKER_EXECUTING read GetValue;
property IsExecuted: Boolean index WORKER_EXECUTED read GetValue;
end;
// 工作者错误通知事件
TWorkerErrorNotify = procedure(AJob: PJob; E: Exception; const ErrSource: string) of object;
// 自定义数据释放事件
TCustomFreeDataEvent = procedure(ASender: TYXDWorkers; AFreeType: TJobDataFreeType; var AData: Pointer) of object;
/// <summary>
/// 工作者管理对象,用来管理工作者和作业
/// </summary>
TYXDWorkers = class(TObject)
private
FWorkers: array of TYXDWorker;
FWorkerCount: Integer;
FDisableCount: Integer;
FBusyCount: Integer;
FMinWorkers: Integer;
FMaxWorkers: Integer;
FMaxSignalId: Integer;
FLongTimeWorkers: Integer; // 记录下长时间作业中的工作者,这种任务长时间不释放资源,可能会造成其它任务无法及时响应
FMaxLongtimeWorkers: Integer; // 允许最多同时执行的长时间任务数,不允许超过MaxWorkers的一半
FTerminating: Boolean;
FCPUNum: Integer;
FLocker: TCriticalSection;
FSimpleJobs: TSimpleJobs;
FRepeatJobs: TRepeatJobs;
FSignalJobs: TYXDHashTable;
FOnErrorNotify: TWorkerErrorNotify;
FOnCustomFreeData: TCustomFreeDataEvent;
{$IFDEF MSWINDOWS}
FMainWorker: HWND;
procedure DoMainThreadWork(var AMsg: TMessage);
{$ENDIF}
function GetEnabled: Boolean;
function PostWaitJob(AJob: PJob; ASignalId: Integer): Boolean;
function ClearSignalJobs(ASource: PJob): Integer;
function ClearJobs(AObject: Pointer; AProc: TJobProc; AData: Pointer; AMaxTimes: Integer): Integer;
function ClearWaitJobs(ASignalId: Integer; const ASignalName: string): Integer;
procedure SetEnabled(const Value: Boolean);
procedure SetMaxLongtimeWorkers(const Value: Integer);
procedure SetMaxWorkers(const Value: Integer);
procedure SetMinWorkers(const Value: Integer);
procedure EnableWorkers;
procedure DisableWorkers;
procedure ClearWorkers;
procedure FreeJob(AJob: PJob);
procedure FreeJobData(AData: Pointer; AFreeType: TJobDataFreeType);
protected
function Popup: PJob;
function Post(AJob: PJob): Boolean; overload;
function LookupIdleWorker: Boolean;
function SignalIdByName(const AName: string): Integer;
procedure SignalWorkDone(AJob: PJob; AUsedTime: Int64);
procedure WorkerIdle(AWorker: TYXDWorker; AReason: TWorkerIdleReason);
procedure WorkerTerminate(AWorker: TObject);
procedure WaitRunningDone(const AParam: TWorkerWaitParam);
procedure FireSignalJob(ASignal: PSignal; AData: Pointer; AFreeType: TJobDataFreeType);
procedure DoJobFree(ATable: TObject; AHash: Cardinal; AData: Pointer);
procedure DoCustomFreeData(AFreeType: TJobDataFreeType; var AData: Pointer);
public
constructor Create(AMinWorkers: Integer = 2); overload;
destructor Destroy; override;
// 获取Job池大小
class function JobPoolCount(): Integer;
// 获取实例
class function GetInstance: TYXDWorkers;
// 清除所有作业
procedure Clear; overload;
/// <summary>清除一个对象相关的所有作业</summary>
/// <param name="AObject">要释放的作业处理过程关联对象</param>
/// <param name="AMaxTimes">最多清除的数量,如果<0,则全清</param>
/// <returns>返回实际清除的作业数量</returns>
/// <remarks>一个对象如果计划了作业,则在自己释放前应调用本函数以清除关联的作业,
/// 否则,未完成的作业可能会触发异常。</remarks>
function Clear(AObject: Pointer; AMaxTimes: Integer = -1): Integer; overload;
/// <summary>清除所有投寄的指定过程作业</summary>
/// <param name="AProc">要清除的作业执行过程</param>
/// <param name="AData">要清除的作业附加数据指针地址,如果值为Pointer(-1),
/// 则清除所有的相关过程,否则,只清除附加数据地址一致的过程</param>
/// <param name="AMaxTimes">最多清除的数量,如果<0,则全清</param>
/// <returns>返回实际清除的作业数量</returns>
function Clear(AProc: TJobProc; AData: Pointer; AMaxTimes: Integer = -1): Integer; overload;
/// <summary>清除指定信号关联的所有作业</summary>
/// <param name="ASingalId">要清除的信号名称</param>
/// <returns>返回实际清除的作业数量</returns>
function Clear(const ASignalName: string): Integer; overload;
/// <summary>清除指定信号关联的所有作业</summary>
/// <param name="ASingalId">要清除的信号ID</param>
/// <returns>返回实际清除的作业数量</returns>
function Clear(ASignalId: Integer): Integer; overload;
/// <summary>投寄一个作业</summary>
/// <param name="AJobProc">要定时执行的作业过程</param>
/// <param name="ADelay">第一次执行前延迟时间,小于等于0则立即执行</param>
/// <param name="AInterval">后续重复作业间隔,如果小于等于0,则作业只执行一次</param>
/// <param name="ARunInMainThread">是否要求作业在主线程中执行</param>
function Post(AJobProc: TJobProc; AData: Pointer; ARunInMainThread: Boolean = False;
const ADelay: Int64 = 0; const AInterval: Int64 = 0; AFreeType: TJobDataFreeType = jdfFreeByUser): Boolean; overload;
function Post(AJobProc: TJobProcG; AData: Pointer; ARunInMainThread: Boolean = False;
const ADelay: Int64 = 0; const AInterval: Int64 = 0; AFreeType: TJobDataFreeType = jdfFreeByUser): Boolean; overload;
{$IFDEF UNICODE}
function Post(AJobProc: TJobProcA; AData: Pointer; ARunInMainThread: Boolean = False;
const ADelay: Int64 = 0; const AInterval: Int64 = 0; AFreeType: TJobDataFreeType = jdfFreeByUser): Boolean; overload;
{$ENDIF}
/// <summary>投寄一个在指定时间才开始的重复作业</summary>
/// <param name="AJobProc">要定时执行的作业过程</param>
/// <param name="ATime">执行时间,只要时间部分,日期忽略</param>
/// <param name="AInterval">后续重复作业间隔,如果小于等于0,则作业只执行一次</param>
/// <param name="ARunInMainThread">是否要求作业在主线程中执行</param>
function Post(AJobProc: TJobProc; const ATime: TDateTime; const AInterval: Int64;
AData: Pointer; ARunInMainThread: Boolean = False; AFreeType: TJobDataFreeType = jdfFreeByUser): Boolean; overload;
function Post(AJobProc: TJobProcG; const ATime: TDateTime; const AInterval: Int64;
AData: Pointer; ARunInMainThread: Boolean = False; AFreeType: TJobDataFreeType = jdfFreeByUser): Boolean; overload;
{$IFDEF UNICODE}
function Post(AJobProc: TJobProcA; const ATime: TDateTime; const AInterval: Int64;
AData: Pointer; ARunInMainThread: Boolean = False; AFreeType: TJobDataFreeType = jdfFreeByUser): Boolean; overload;
{$ENDIF}
/// <summary>投寄一个后台长时间执行的作业</summary>
/// <param name="AJobProc">要执行的作业过程</param>
/// <param name="AData">作业附加的用户数据指针</param>
/// <returns>成功投寄返回True,否则返回False</returns>
/// <remarks>长时间作业强制在后台线程中执行,而不允许投递到主线程中执行</remarks>
function PostLongJob(AJobProc: TJobProc; AData: Pointer;
AFreeType: TJobDataFreeType = jdfFreeByUser): Boolean; overload;
function PostLongJob(AJobProc: TJobProcG; AData: Pointer;
AFreeType: TJobDataFreeType = jdfFreeByUser): Boolean; overload;
{$IFDEF UNICODE}
function PostLongJob(AJobProc: TJobProcA; AData: Pointer;
AFreeType: TJobDataFreeType = jdfFreeByUser): Boolean; overload;
{$ENDIF}
/// <summary>投寄一个等待信号才开始的作业</summary>
/// <param name="AJobProc">要执行的作业过程</param>
/// <param name="ASignalId">等待的信号编码,该编码由RegisterSignal函数返回</param>
/// <param name="ARunInMainThread">作业要求在主线程中执行</param>
/// <returns>成功投寄返回True,否则返回False</returns>
function PostWait(AJobProc: TJobProc; ASignalId: Integer;
ARunInMainThread: Boolean = False): Boolean; overload;
function PostWait(AJobProc: TJobProcG; ASignalId: Integer;
ARunInMainThread: Boolean = False): Boolean; overload;
{$IFDEF UNICODE}
function PostWait(AJobProc: TJobProcA; ASignalId: Integer;
ARunInMainThread: Boolean = False): Boolean; overload;
{$ENDIF}
/// <summary>触发一个信号</summary>
/// <param name="AId">信号编码,由RegisterSignal返回</param>
/// <param name="AData">附加给作业的用户数据指针地址</param>
/// <remarks>触发一个信号后,Workers会触发所有已注册的信号关联处理过程的执行</remarks>
procedure SendSignal(AId: Integer; AData: Pointer = nil; AFreeType: TJobDataFreeType = jdfFreeByUser); overload;
/// <summary>按名称触发一个信号</summary>
/// <param name="AName">信号名称</param>
/// <param name="AData">附加给作业的用户数据指针地址</param>
/// <remarks>触发一个信号后,Workers会触发所有已注册的信号关联处理过程的执行</remarks>
procedure SendSignal(const AName: string; AData: Pointer = nil; AFreeType: TJobDataFreeType = jdfFreeByUser); overload;
/// <summary>注册一个信号</summary>
/// <param name="AName">信号名称</param>
/// <remarks>
/// 1.重复注册同一名称的信号将返回同一个编码
/// 2.信号一旦注册,则只有程序退出时才会自动释放
/// </remarks>
function RegisterSignal(const AName: string): Integer;
// 最大允许工作者数量,不能小于2
property MaxWorkers: Integer read FMaxWorkers write SetMaxWorkers;
// 最小工作者数量,不能小于2
property MinWorkers: Integer read FMinWorkers write SetMinWorkers;
// 大允许的长时间作业工作者数量,等价于允许开始的长时间作业数量
property MaxLongtimeWorkers: Integer read FMaxLongtimeWorkers write SetMaxLongtimeWorkers;
// 是否允许开始作业,如果为false,则投寄的作业都不会被执行,直到恢复为True
// (Enabled为False时已经运行的作业将仍然运行,它只影响尚未执行的作来)
property Enabled: Boolean read GetEnabled write SetEnabled;
// 是否正在释放TQWorkers对象自身
property Terminating: Boolean read FTerminating;
// 当前系统CPU数量
property CPUNum: Integer read FCPUNum;
// 繁忙的工作者数量
property BusyWorkerCount: Integer read FBusyCount;
// 当前工作者数量
property WorkerCount: Integer read FWorkerCount;
// 工作者错误回调通知事件
property OnErrorNotify: TWorkerErrorNotify read FOnErrorNotify write FOnErrorNotify;
// 用户指定的作业的Data对象释放方式
property OnCustomFreeData: TCustomFreeDataEvent read FOnCustomFreeData write FOnCustomFreeData;
end;
{$IFDEF UNICODE}
TJobItemList = TList<PJob>;
{$ELSE}
TJobItemList = TList;
{$ENDIF}
/// <summary>
/// 作业组,放在一起顺序执行或乱序执行,可以使用 WaitFor 等待全部完成
/// </summary>
TJobGroup = class(TObject)
private
FOwner: TYXDWorkers;
FLocker: TSimpleLock;
FEvent: TEvent; // 事件,用于等待作业完成
FCount: Integer;
FByOrder: Boolean;
FWaitResult: TWaitResult;
FAfterDone: TNotifyEvent; // 作业完成事件通知
FTimeoutCheck: Boolean; // 是否检查作业超时
FTag: Pointer;
protected
FItems: TJobItemList; // 作业列表
FPrepareCount: Integer; // 准备计数
procedure DoJobExecuted(AJob: PJob);
procedure DoJobsTimeout(AJob: PJob);
procedure DoAfterDone;
public
constructor Create(AByOrder: Boolean = False); overload;
constructor Create(AOwner: TYXDWorkers; AByOrder: Boolean = False); overload;
destructor Destroy; override;
// 取消未完成的作业
procedure Cancel;
// 准备添加作业,实际增加内部计数器
procedure Prepare;
// 减少内部计数器,如果计数器减为0,则开始实际执行作业
procedure Run(ATimeout: Cardinal = INFINITE);
// 添加一个作业过程,如果准备内部计数器为0,则直接执行,否则只添加到列表
function Add(AProc: TJobProc; AData: Pointer; AInMainThread: Boolean = False;
AFreeType: TJobDataFreeType = jdfFreeByUser): Boolean;
// 等待作业完成,ATimeout为最长等待时间
function WaitFor(ATimeout: Cardinal = INFINITE): TWaitResult;
// 等待作业完成,ATimeout为最长等待时间,不同的是MsgWaitFor不阻塞消息处理
function MsgWaitFor(ATimeout: Cardinal = INFINITE): TWaitResult;
// 未完成的作业数量
property Count: Integer read FCount;
// 全部作业执行完成时触发的回调事件
property AfterDone: TNotifyEvent read FAfterDone write FAfterDone;
// 是否是按顺序执行(即必需等待上一个作业完成后才执行下一个)
property ByOrder: Boolean read FByOrder;
property Tag: Pointer read FTag write FTag;
end;
/// <summary>
/// 用于管理计划型任务,需要在指定的时间点触发
/// </summary>
TRepeatJobs = class(TJobBase)
private
FLocker: TCriticalSection;
FFirstFireTime: Int64;
function ClearJobs(AObject: Pointer; AProc: TJobProc; AData: Pointer; AMaxTimes: Integer): Integer;
procedure AfterJobRun(AJob: PJob; AUsedTime: Int64);
protected
FItems: TRBTree;
function InternalPush(AJob: PJob): Boolean; override;
function InternalPop: PJob; override;
function DoTimeCompare(P1, P2: Pointer): Integer;
procedure DoJobDelete(ATree: TRBTree; ANode: PRBNode);
function GetCount: Integer; override;
public
constructor Create(AOwner: TYXDWorkers); override;
destructor Destroy; override;
procedure Clear; override;
function Clear(AObject: Pointer; AMaxTimes: Integer): Integer; overload; override;
function Clear(AProc: TJobProc; AData: Pointer; AMaxTimes: Integer): Integer; overload; override;
end;
/// <summary>
/// 用于管理简单的异步调用,没有触发时间要求的作业
/// </summary>
TSimpleJobs = class(TJobBase)
private
FFirst, FLast: PJob;
FCount: Integer;
FLocker: TSimpleLock;
function ClearJobs(AObject: Pointer; AProc: TJobProc; AData: Pointer; AMaxTimes: Integer): Integer;
protected
function InternalPush(AJob: PJob): Boolean; override;
function InternalPop: PJob; override;
function GetCount: Integer; override;
public
constructor Create(AOwner: TYXDWorkers); override;
destructor Destroy; override;
procedure Clear; overload; override;
function Clear(AObject: Pointer; AMaxTimes: Integer): Integer; overload; override;
function Clear(AProc: TJobProc; AData: Pointer; AMaxTimes: Integer): Integer; overload; override;
end;
var
Workers: TYXDWorkers = nil; // 需要时初始化,也可以自己定义,允许多个
// 返回值的时间精度为1ms
function GetTimestamp: Int64;
// 获取CPU数量
function GetCPUCount: Integer;
{$IF RTLVersion<26}
// 为与D2007兼容, 原子操作函数
function AtomicCmpExchange(var Target: Integer; Value, Comparand: Integer): Integer; inline;
function AtomicExchange(var Target: Integer; Value: Integer): Integer; inline;
function AtomicIncrement(var Target: Integer): Integer; inline;
function AtomicDecrement(var Target: Integer): Integer; inline;
{$IFEND}
{$IFDEF WORKER_SIMPLE_LOCK}
// 原子操作函数
function AtomicAnd(var Dest: Integer; const AMask: Integer): Integer; inline;
function AtomicOr(var Dest: Integer; const AMask: Integer): Integer; inline;
{$ENDIF}
// 将全局的作业处理函数转换为TJobProc类型,以便正常调度使用
function MakeJobProc(const AProc: TJobProcG): TJobProc; inline;
// 设置线程运行的CPU
procedure SetThreadCPU(AHandle: THandle; ACpuNo: Integer); inline;
implementation
resourcestring
SNotSupportNow = '当前尚未支持功能 %s';
SNotInitWorkers = '当前尚未初始化有工作者管理对象 TYXDWorkers';
STooFewWorkers = '指定的最小工作者数量太少(必需大于等于1)。';
STooManyLongtimeWorker = '不能允许太多长时间作业线程(最多允许工作者一半)。';
SBadWaitDoneParam = '未知的等待正在执行作业完成方式:%d';
{$IFNDEF UNICODE}
const
wrIOCompletion = TWaitResult(4);
{$ENDIF}
{$IFDEF MSWINDOWS}
type
TGetTickCount64 = function: Int64;
{$ENDIF MSWINDOWS}
type
TJobPool = class
protected
FFirst: PJob;
FCount: Integer;
FSize: Integer;
FLocker: TSimpleLock;
public
constructor Create(AMaxSize: Integer);
destructor Destroy; override;
procedure Push(AJob: PJob);
function Pop: PJob;
property Count: Integer read FCount;
property Size: Integer read FSize write FSize;
end;
var
JobPool: TJobPool;
_CPUCount: Integer;
{$IFDEF NEXTGEN}
_Watch: TStopWatch;
{$ELSE}
GetTickCount64: TGetTickCount64;
_PerfFreq: Int64;
{$ENDIF}
function GetTimestamp: Int64;
begin
{$IFDEF NEXTGEN}
Result := _Watch.Elapsed.Ticks div 10000;
{$ELSE}
if _PerfFreq > 0 then begin
QueryPerformanceCounter(Result);
Result := Result * 1000 div _PerfFreq;
end else if Assigned(GetTickCount64) then
Result := GetTickCount64
else
Result := GetTickCount;
{$ENDIF}
end;
function GetCPUCount: Integer;
{$IFDEF MSWINDOWS}
var
si: SYSTEM_INFO;
{$ENDIF}
begin
if _CPUCount = 0 then begin
{$IFDEF MSWINDOWS}
GetSystemInfo(si);
Result := si.dwNumberOfProcessors;
{$ELSE}// Linux,MacOS,iOS,Andriod{POSIX}
{$IFDEF POSIX}
Result := sysconf(_SC_NPROCESSORS_ONLN);
{$ELSE}// 不认识的操作系统,CPU数默认为1
Result := 1;
{$ENDIF !POSIX}
{$ENDIF !MSWINDOWS}
end else
Result := _CPUCount;
end;
function MakeJobProc(const AProc: TJobProcG): TJobProc;
begin
TMethod(Result).Data := nil;
TMethod(Result).Code := @AProc;
end;
function SameWorkerProc(const P1, P2: TJobProc): Boolean; inline;
begin
Result := (TMethod(P1).Code = TMethod(P2).Code) and
(TMethod(P1).Data = TMethod(P2).Data);
end;
procedure SetThreadCPU(AHandle: THandle; ACpuNo: Integer);
begin
{$IFDEF MSWINDOWS}
SetThreadIdealProcessor(AHandle, ACpuNo);
{$ELSE}
// Linux/Andriod/iOS暂时忽略,XE6未引入sched_setaffinity定义
{$ENDIF}
end;
// 兼容2007版的原子操作接口
{$IF RTLVersion<26}
function AtomicCmpExchange(var Target: Integer; Value: Integer; Comparand: Integer): Integer; inline;
begin
Result := InterlockedCompareExchange(Target, Value, Comparand);
end;
function AtomicIncrement(var Target: Integer): Integer; inline;
begin
Result := InterlockedIncrement(Target);
end;
function AtomicDecrement(var Target: Integer): Integer; inline;
begin
Result := InterlockedDecrement(Target);
end;
function AtomicExchange(var Target: Integer; Value: Integer): Integer;
begin
Result := InterlockedExchange(Target, Value);
end;
{$IFEND <XE5}
{$IFDEF WORKER_SIMPLE_LOCK}
// 位与,返回原值
function AtomicAnd(var Dest: Integer; const AMask: Integer): Integer; inline;
var
i: Integer;
begin
repeat
Result := Dest;
i := Result and AMask;
until AtomicCmpExchange(Dest, i, Result) = Result;
end;
// 位或,返回原值
function AtomicOr(var Dest: Integer; const AMask: Integer): Integer; inline;
var
i: Integer;
begin
repeat
Result := Dest;
i := Result or AMask;
until AtomicCmpExchange(Dest, i, Result) = Result;
end;
{$ENDIF}
{ TJobPool }
constructor TJobPool.Create(AMaxSize: Integer);
begin
FCount := 0;
FSize := AMaxSize;
FLocker := TSimpleLock.Create;
end;
destructor TJobPool.Destroy;
var
AJob: PJob;
begin
FLocker.Enter;
try
while FFirst <> nil do begin
AJob := FFirst.Next;
Dispose(FFirst);
FFirst := AJob;
end;
finally
FLocker.Free;
end;
inherited;
end;
function TJobPool.Pop: PJob;
begin
FLocker.Enter;
Result := FFirst;
if Result <> nil then begin
FFirst := Result.Next;
Dec(FCount);
end;
FLocker.Leave;
if Result = nil then
GetMem(Result, SizeOf(TJob));
Result.Reset;
end;
procedure TJobPool.Push(AJob: PJob);
var
ADoFree: Boolean;
begin
{$IFDEF UNICODE}
if AJob.IsAnonWorkerProc then
AJob.WorkerProcA := nil;
{$ENDIF}
FLocker.Enter;
ADoFree := (FCount = FSize);
if not ADoFree then begin
AJob.Next := FFirst;
FFirst := AJob;
Inc(FCount);
end;
FLocker.Leave;
if ADoFree then
FreeMem(AJob);
end;
{ TJobBase }
procedure TJobBase.Clear;
var
AItem: PJob;
begin
while True do begin
AItem := Pop;
if AItem <> nil then
FOwner.FreeJob(AItem)
else
Break;
end;
end;
constructor TJobBase.Create(AOwner: TYXDWorkers);
begin
FOwner := AOwner;
end;
destructor TJobBase.Destroy;
begin
Clear;
inherited;
end;
function TJobBase.GetEmpty: Boolean;
begin
Result := (Count = 0);
end;
function TJobBase.Pop: PJob;
begin
Result := InternalPop;
end;
function TJobBase.Push(AJob: PJob): Boolean;
begin
AJob.Owner := Self;
AJob.PushTime := GetTimestamp;
Result := InternalPush(AJob);
if not Result then begin
AJob.Next := nil;
FOwner.FreeJob(AJob);
end;
end;
{ TJob }
procedure TJob.AfterRun(AUsedTime: Int64);
begin
Inc(Runs);
if AUsedTime > 0 then begin
Inc(TotalUsedTime, AUsedTime);
if MinUsedTime = 0 then
MinUsedTime := AUsedTime
else if MinUsedTime > AUsedTime then
MinUsedTime := AUsedTime;
if MaxUsedTime = 0 then
MaxUsedTime := AUsedTime
else if MaxUsedTime < AUsedTime then
MaxUsedTime := AUsedTime;
end;
end;
procedure TJob.Assign(const ASource: PJob);
begin
Self := ASource^;
// 下面三个成员不拷贝
Worker := nil;
Next := nil;
Source := nil;
end;
function TJob.GetAvgTime: Integer;
begin
if Runs > 0 then
Result := TotalUsedTime div Runs
else
Result := 0;
end;
function TJob.GetElapseTime: Int64;
begin
Result := GetTimestamp - StartTime;
end;
function TJob.GetIsTerminated: Boolean;
begin
if Assigned(Worker) and Assigned(Worker.FOwner) then
Result := Worker.FOwner.Terminating or Worker.Terminated or
((Flags and JOB_TERMINATED) <> 0) or (Worker.FTerminatingJob = @Self)
else
Result := (Flags and JOB_TERMINATED) <> 0;
end;
function TJob.GetValue(Index: Integer): Boolean;
begin
Result := (Flags and Index) <> 0;
end;
procedure TJob.Create(AProc: TJobProc);
begin
WorkerProc := AProc;
SetValue(JOB_RUN_ONCE, True);
end;
procedure TJob.Reset;
begin
FillChar(Self, SizeOf(TJob), 0);
end;
procedure TJob.SetIsTerminated(const Value: Boolean);
begin
SetValue(JOB_TERMINATED, Value);
end;
procedure TJob.SetValue(Index: Integer; const Value: Boolean);
begin
if Value then
Flags := (Flags or Index)
else
Flags := (Flags and (not Index));
end;
procedure TJob.UpdateNextTime;
begin
if (Runs = 0) and (FirstDelay <> 0) then
NextTime := PushTime + FirstDelay
else if Interval <> 0 then begin
if NextTime = 0 then
NextTime := GetTimestamp + Interval
else
Inc(NextTime, Interval);
end else
NextTime := GetTimestamp;
end;
{ TSimpleLock }
{$IFDEF WORKER_SIMPLE_LOCK}
constructor TSimpleLock.Create;
begin
inherited;
FFlags := 0;
end;
procedure TSimpleLock.Enter;
begin
while (AtomicOr(FFlags, $01) and $01) <> 0 do begin
{$IFDEF MSWINDOWS}
SwitchToThread;
{$ELSE}
TThread.Yield;
{$ENDIF}
end;
end;
procedure TSimpleLock.Leave;
begin
AtomicAnd(FFlags, Integer($FFFFFFFE));
end;
{$ENDIF}
{ TSimpleJobs }
function TSimpleJobs.ClearJobs(AObject: Pointer; AProc: TJobProc;
AData: Pointer; AMaxTimes: Integer): Integer;
var
AFirst, AJob, APrior, ANext: PJob;
ACount: Integer;
b: Boolean;
begin
FLocker.Enter; // 先将所有的异步作业清空,以防止被弹出执行
AJob := FFirst;
ACount := FCount;
FFirst := nil;
FLast := nil;
FCount := 0;
FLocker.Leave;
Result := 0;
APrior := nil;
AFirst := nil;
while (AJob <> nil) and (AMaxTimes <> 0) do begin
ANext := AJob.Next;
if AObject <> nil then
b := TMethod(AJob.WorkerProc).Data = AObject
else
b := SameWorkerProc(AJob.WorkerProc, AProc) and (AJob.Data = AData);
if b then begin
if APrior <> nil then
APrior.Next := ANext;
FOwner.FreeJob(AJob);
Dec(AMaxTimes);
Inc(Result);
Dec(ACount);
end else begin
if AFirst = nil then
AFirst := AJob;
APrior := AJob;
end;
AJob := ANext;
end;
if ACount > 0 then begin
FLocker.Enter;
AFirst.Next := FFirst;
FFirst := AFirst;
Inc(FCount, ACount);
if FLast = nil then
FLast := APrior;
FLocker.Leave;
end;
end;
procedure TSimpleJobs.Clear;
var
AFirst: PJob;
begin
FLocker.Enter;
AFirst := FFirst;
FFirst := nil;
FLast := nil;
FCount := 0;
FLocker.Leave;
FOwner.FreeJob(AFirst);
end;
function TSimpleJobs.Clear(AProc: TJobProc; AData: Pointer; AMaxTimes: Integer): Integer;
begin
Result := ClearJobs(nil, AProc, AData, AMaxTimes);
end;
function TSimpleJobs.Clear(AObject: Pointer; AMaxTimes: Integer): Integer;
begin
Result := ClearJobs(AObject, nil, nil, AMaxTimes);
end;
constructor TSimpleJobs.Create(AOwner: TYXDWorkers);
begin
inherited Create(AOwner);
FLocker := TSimpleLock.Create;
end;
destructor TSimpleJobs.Destroy;
begin
inherited;
FLocker.Free;
end;
function TSimpleJobs.GetCount: Integer;
begin
Result := FCount;
end;
function TSimpleJobs.InternalPop: PJob;
begin
FLocker.Enter;
Result := FFirst;
if Result <> nil then begin
FFirst := Result.Next;
if FFirst = nil then
FLast := nil;
Dec(FCount);
end;
FLocker.Leave;
if Result <> nil then begin
Result.PopTime := GetTimestamp;
Result.Next := nil;
end;
end;
function TSimpleJobs.InternalPush(AJob: PJob): Boolean;
begin
FLocker.Enter;
if FLast = nil then
FFirst := AJob
else
FLast.Next := AJob;
FLast := AJob;
Inc(FCount);
FLocker.Leave;
Result := true;
end;
{ TRepeatJobs }
procedure TRepeatJobs.AfterJobRun(AJob: PJob; AUsedTime: Int64);
var
ANode: PRBNode;
function UpdateSource: Boolean;
var
ATemp, APrior: PJob;
begin
Result := False;
ATemp := ANode.Data;
APrior := nil;
while ATemp <> nil do begin
if ATemp = AJob.Source then begin
if AJob.IsTerminated then begin
if APrior <> nil then
APrior.Next := ATemp.Next
else
ANode.Data := ATemp.Next;
ATemp.Next := nil;
FOwner.FreeJob(ATemp);
if ANode.Data = nil then
FItems.Delete(ANode);
end else
ATemp.AfterRun(AUsedTime);
Result := True;
Break;
end;
APrior := ATemp;
ATemp := ATemp.Next;
end;
end;
begin
FLocker.Enter;
try
ANode := FItems.Find(AJob);
if ANode <> nil then begin
if UpdateSource then
Exit;
end;
ANode := FItems.First;
while ANode <> nil do begin
if UpdateSource then
Break;
ANode := ANode.Next;
end;
finally
FLocker.Leave;
end;
end;
function TRepeatJobs.ClearJobs(AObject: Pointer; AProc: TJobProc;
AData: Pointer; AMaxTimes: Integer): Integer;
var
ANode, ANext: PRBNode;
APriorJob, AJob, ANextJob: PJob;
ACanDelete, B: Boolean;
begin
Result := 0; // 现在清空重复的计划作业
FLocker.Enter;
try
ANode := FItems.First;
while (ANode <> nil) and (AMaxTimes <> 0) do begin
ANext := ANode.Next;
AJob := ANode.Data;
ACanDelete := True;
APriorJob := nil;
while AJob <> nil do begin
ANextJob := AJob.Next;
if AObject <> nil then
B := TMethod(AJob.WorkerProc).Data = AObject
else
B := SameWorkerProc(AJob.WorkerProc, AProc) and ((AData = Pointer(-1)) or (AData = AJob.Data));
if B then begin
if ANode.Data = AJob then
ANode.Data := AJob.Next;
if Assigned(APriorJob) then
APriorJob.Next := AJob.Next;
AJob.Next := nil;
FOwner.FreeJob(AJob);
Dec(AMaxTimes);
Inc(Result);
end else begin
ACanDelete := False;
APriorJob := AJob;
end;
AJob := ANextJob;
end;
if ACanDelete then
FItems.Delete(ANode);
ANode := ANext;
end;
if FItems.Count > 0 then
FFirstFireTime := PJob(FItems.First.Data).NextTime
else
FFirstFireTime := 0;
finally
FLocker.Leave;
end;
end;
function TRepeatJobs.Clear(AProc: TJobProc; AData: Pointer; AMaxTimes: Integer): Integer;
begin
Result := ClearJobs(nil, AProc, AData, AMaxTimes);
end;
function TRepeatJobs.Clear(AObject: Pointer; AMaxTimes: Integer): Integer;
begin
Result := ClearJobs(AObject, nil, nil, AMaxTimes);
end;
procedure TRepeatJobs.Clear;
begin
FLocker.Enter;
try
FItems.Clear;
finally
FLocker.Leave;
end;
end;
constructor TRepeatJobs.Create(AOwner: TYXDWorkers);
begin
inherited Create(AOwner);
FLocker := TCriticalSection.Create;
FItems := TRBTree.Create(DoTimeCompare);
FItems.OnDelete := DoJobDelete;
end;
destructor TRepeatJobs.Destroy;
begin
FLocker.Enter;
try
FItems.Free;
finally
inherited;
FLocker.Leave;
FLocker.Free;
end;
end;
procedure TRepeatJobs.DoJobDelete(ATree: TRBTree; ANode: PRBNode);
begin
FOwner.FreeJob(ANode.Data);
end;
function TRepeatJobs.DoTimeCompare(P1, P2: Pointer): Integer;
begin
Result := PJob(P1).NextTime - PJob(P2).NextTime;
end;
function TRepeatJobs.GetCount: Integer;
begin
Result := FItems.Count;
end;
function TRepeatJobs.InternalPop: PJob;
var
ANode: PRBNode;
ATick: Int64;
AJob: PJob;
begin
Result := nil;
ATick := GetTimestamp;
FLocker.Enter;
try
if FItems.Count > 0 then begin
ANode := FItems.First;
if PJob(ANode.Data).NextTime <= ATick then begin
AJob := ANode.Data;
if AJob.Next <> nil then // 如果没有更多需要执行的作业,则删除结点,否则指向下一个
ANode.Data := AJob.Next
else begin
ANode.Data := nil;
FItems.Delete(ANode);
ANode := FItems.First;
if ANode <> nil then
FFirstFireTime := PJob(ANode.Data).NextTime
else // 没有计划作业了,不需要了
FFirstFireTime := 0;
end;
if AJob.Runonce then
Result := AJob
else begin
Inc(AJob.NextTime, AJob.Interval);
Result := JobPool.Pop;
Result.Assign(AJob);
Result.Source := AJob;
// 重新插入作业
ANode := FItems.Find(AJob);
if ANode = nil then begin
FItems.Insert(AJob);
FFirstFireTime := PJob(FItems.First.Data).NextTime;
end else begin// 如果已经存在同一时刻的作业,则自己挂接到其它作业头部
AJob.Next := PJob(ANode.Data);
ANode.Data := AJob; // 首个作业改为自己
end;
end;
end;
end;
finally
FLocker.Leave;
end;
if Result <> nil then begin
Result.PopTime := ATick;
Result.Next := nil;
end;
end;
function TRepeatJobs.InternalPush(AJob: PJob): Boolean;
var
ANode: PRBNode;
begin
AJob.UpdateNextTime; // 计算作业的下次执行时间
FLocker.Enter;
try
ANode := FItems.Find(AJob);
if ANode = nil then begin
FItems.Insert(AJob);
FFirstFireTime := PJob(FItems.First.Data).NextTime;
end else begin // 如果已经存在同一时刻的作业,则自己挂接到其它作业头部
AJob.Next := PJob(ANode.Data);
ANode.Data := AJob; // 首个作业改为自己
end;
finally
FLocker.Leave;
end;
Result := True;
end;
{ TYXDWorker }
procedure TYXDWorker.ComNeeded(AInitFlags: Cardinal);
begin
{$IFDEF MSWINDOWS}
if not ComInitialized then begin
if AInitFlags = 0 then
CoInitialize(nil)
else
CoInitializeEx(nil, AInitFlags);
SetValue(WORKER_COM_INITED, True);
end;
{$ENDIF MSWINDOWS}
end;
constructor TYXDWorker.Create(AOwner: TYXDWorkers);
begin
inherited Create(True);
FOwner := AOwner;
FTimeout := 1000;
FFlags := WORKER_ISBUSY; // 默认为忙碌
AtomicIncrement(AOwner.FBusyCount);
FEvent := TEvent.Create(nil, False, False, '');
FreeOnTerminate := True;
end;
destructor TYXDWorker.Destroy;
begin
FreeAndNil(FEvent);
inherited;
end;
procedure TYXDWorker.DoJob(AJob: PJob);
begin
{$IFDEF UNICODE}
if AJob.IsAnonWorkerProc then
AJob.WorkerProcA(AJob)
else
{$ENDIF}
AJob.WorkerProc(AJob);
end;
procedure TYXDWorker.Execute;
var
wr: TWaitResult;
{$IFDEF MSWINDOWS}
SyncEvent: TEvent;
{$ENDIF}
begin
{$IFDEF MSWINDOWS}
SyncEvent := TEvent.Create(nil, False, False, '');
{$ENDIF}
try
while not(Terminated or FOwner.FTerminating) do begin
if FOwner.Enabled then begin
if (FOwner.FSimpleJobs.FFirst <> nil) then begin
{$IFDEF MSWINDOWS}SwitchToThread; {$ELSE} TThread.Yield;{$ENDIF}
FTimeout := 0;
end else if (FOwner.FRepeatJobs.FFirstFireTime <> 0) then begin
FTimeout := FOwner.FRepeatJobs.FFirstFireTime - GetTimestamp;
if FTimeout < 0 then // 时间已经到了?那么立刻执行
FTimeout := 0;
end else
FTimeout := WAITJOB_TIMEOUT;
end else
FTimeout := WAITJOB_TIMEOUT; // 如果仍没有作业进入,则除非自己是保留的线程对象,否则释放工作者
if FTimeout <> 0 then begin
wr := FEvent.WaitFor(FTimeout);
if Terminated or FOwner.FTerminating then
Break;
end else
wr := wrSignaled;
if (wr = wrSignaled) or ((FOwner.FRepeatJobs.FFirstFireTime <> 0) and
(GetTimestamp >= FOwner.FRepeatJobs.FFirstFireTime - 1)) then
begin
if FOwner.FTerminating then
Break;
if IsIdle then begin
SetValue(WORKER_ISBUSY or WORKER_LOOKUP, true);
AtomicIncrement(FOwner.FBusyCount);
end else
SetValue(WORKER_LOOKUP, true);
repeat
FActiveJob := FOwner.Popup;
if FActiveJob <> nil then begin
FActiveJob.Worker := Self;
FActiveJobProc := FActiveJob.WorkerProc;
// 为Clear(AObject)准备判断,以避免FActiveJob线程不安全
FActiveJobData := FActiveJob.Data;
if FActiveJob.IsSignalWakeup then
FActiveJobSource := FActiveJob.Source
else
FActiveJobSource := nil;
if FActiveJob.IsGrouped then
FActiveJobGroup := FActiveJob.Group
else
FActiveJobGroup := nil;
FActiveJobFlags := FActiveJob.Flags;
if FActiveJob.StartTime = 0 then begin
FActiveJob.StartTime := GetTimestamp;
FActiveJob.FirstTime := FActiveJob.StartTime;
end else
FActiveJob.StartTime := GetTimestamp;
try
FFlags := (FFlags or WORKER_EXECUTING) and (not WORKER_LOOKUP);
if FActiveJob.InMainThread then
{$IFDEF MSWINDOWS}
begin
if PostMessage(FOwner.FMainWorker, WM_APP, WPARAM(FActiveJob), LPARAM(SyncEvent)) then
SyncEvent.WaitFor(INFINITE);
end
{$ELSE}
Synchronize(Self, FireInMainThread)
{$ENDIF}
else
DoJob(FActiveJob);
except
if Assigned(FOwner.FOnErrorNotify) then
FOwner.FOnErrorNotify(FActiveJob, Exception(ExceptObject), 'TYXDWorker.Execute');
end;
if not FActiveJob.Runonce then
FOwner.FRepeatJobs.AfterJobRun(FActiveJob, GetTimestamp - FActiveJob.StartTime)
else begin
if FActiveJob.IsSignalWakeup then
FOwner.SignalWorkDone(FActiveJob, GetTimestamp - FActiveJob.StartTime)
else if FActiveJob.IsLongtimeJob then
AtomicDecrement(FOwner.FLongTimeWorkers)
else if FActiveJob.IsGrouped then
FActiveJobGroup.DoJobExecuted(FActiveJob);
FActiveJob.Worker := nil;
end;
FOwner.FreeJob(FActiveJob);
FActiveJobProc := nil;
FActiveJobSource := nil;
FActiveJobFlags := 0;
FActiveJobGroup := nil;
FTerminatingJob := nil;
FFlags := FFlags and (not WORKER_EXECUTING);
end else
FFlags := FFlags and (not WORKER_LOOKUP);
until (FActiveJob = nil) or FOwner.FTerminating or Terminated or (not FOwner.Enabled);
SetValue(WORKER_ISBUSY, False);
FOwner.WorkerIdle(Self, irNoJob);
end else if (not IsReserved) and (FTimeout = WAITJOB_TIMEOUT) then begin
SetValue(WORKER_ISBUSY, False);
FOwner.WorkerIdle(Self, irTimeout);
end;
end;
finally
FOwner.WorkerTerminate(Self);
{$IFDEF MSWINDOWS}
if ComInitialized then
CoUninitialize;
FreeAndNil(SyncEvent);
{$ENDIF}
end;
end;
procedure TYXDWorker.FireInMainThread;
begin
DoJob(FActiveJob);
end;
function TYXDWorker.GetIsIdle: Boolean;
begin
Result := not IsBusy;
end;
function TYXDWorker.GetValue(Index: Integer): Boolean;
begin
Result := (FFlags and Index) <> 0;
end;
procedure TYXDWorker.SetValue(Index: Integer; const Value: Boolean);
begin
if Value then
FFlags := (FFlags or Index)
else
FFlags := (FFlags and (not Index));
end;
{ TYXDWorkers }
function TYXDWorkers.Clear(const ASignalName: string): Integer;
begin
Result := ClearWaitJobs(0, ASignalName);
end;
function TYXDWorkers.Clear(ASignalId: Integer): Integer;
begin
Result := ClearWaitJobs(ASignalId, '');
end;
function TYXDWorkers.Clear(AProc: TJobProc; AData: Pointer; AMaxTimes: Integer): Integer;
begin
Result := ClearJobs(nil, AProc, AData, AMaxTimes);
end;
function TYXDWorkers.Clear(AObject: Pointer; AMaxTimes: Integer): Integer;
begin
Result := ClearJobs(AObject, nil, nil, AMaxTimes);
end;
procedure TYXDWorkers.Clear;
var
i: Integer;
AParam: TWorkerWaitParam;
ASignal: PSignal;
begin
DisableWorkers; // 避免工作者取得新的作业
try
FSimpleJobs.Clear;
FRepeatJobs.Clear;
FLocker.Enter;
try
for i := 0 to FSignalJobs.BucketCount - 1 do begin
if Assigned(FSignalJobs.Buckets[i]) then begin
ASignal := FSignalJobs.Buckets[i].Data;
FreeJob(ASignal.First);
ASignal.First := nil;
end;
end;
finally
FLocker.Leave;
end;
AParam.WaitType := $FF;
WaitRunningDone(AParam);
finally
EnableWorkers;
end;
end;
function TYXDWorkers.ClearJobs(AObject: Pointer; AProc: TJobProc;
AData: Pointer; AMaxTimes: Integer): Integer;
var
ACleared: Integer;
AWaitParam: TWorkerWaitParam;
function ClearSignalJobs(IsClearObject: Boolean): Integer;
var
i: Integer;
AJob, ANext, APrior: PJob;
AList: PHashList;
ASignal: PSignal;
B: Boolean;
begin
Result := 0;
FLocker.Enter;
try
for i := 0 to FSignalJobs.BucketCount - 1 do begin
AList := FSignalJobs.Buckets[i];
if AList <> nil then begin
ASignal := AList.Data;
if ASignal.First <> nil then begin
AJob := ASignal.First;
APrior := nil;
while (AJob <> nil) and (AMaxTimes <> 0) do begin
ANext := AJob.Next;
if IsClearObject then
B := TMethod(AJob.WorkerProc).Data = AObject
else
B := SameWorkerProc(AJob.WorkerProc, AProc) and ((AData = Pointer(-1)) or (AJob.Data = AData));
if B then begin
if ASignal.First = AJob then
ASignal.First := ANext;
if Assigned(APrior) then
APrior.Next := ANext;
AJob.Next := nil;
FreeJob(AJob);
Inc(Result);
Dec(AMaxTimes);
end else
APrior := AJob;
AJob := ANext;
end;
if AMaxTimes = 0 then
Break;
end;
end;
end;
finally
FLocker.Leave;
end;
end;
begin
Result := 0;
if Self <> nil then begin
ACleared := FSimpleJobs.ClearJobs(AObject, AProc, AData, AMaxTimes);
Dec(AMaxTimes, ACleared);
Inc(Result, ACleared);
if AMaxTimes <> 0 then begin
ACleared := FRepeatJobs.ClearJobs(AObject, AProc, AData, AMaxTimes);
Dec(AMaxTimes, ACleared);
Inc(Result, ACleared);
if AMaxTimes <> 0 then begin
ACleared := ClearSignalJobs(AObject <> nil);
Inc(Result, ACleared);
if AMaxTimes <> 0 then begin
AWaitParam.WaitType := 1;
AWaitParam.Data := AData;
AWaitParam.WorkerProc := TMethod(AProc);
WaitRunningDone(AWaitParam);
end;
end;
end;
end;
end;
function TYXDWorkers.ClearWaitJobs(ASignalId: Integer; const ASignalName: string): Integer;
var
i: Integer;
ASignal: PSignal;
AJob: PJob;
B: Boolean;
begin
AJob := nil;
FLocker.Enter;
try
for i := 0 to FSignalJobs.BucketCount - 1 do begin
if FSignalJobs.Buckets[i] <> nil then begin
ASignal := FSignalJobs.Buckets[i].Data;
if ASignalId > 0 then
B := ASignal.Id = ASignalId
else
B := ASignal.Name = ASignalName;
if B then begin
AJob := ASignal.First;
ASignal.First := nil;
Break;
end;
end;
end;
finally
FLocker.Leave;
end;
if AJob <> nil then
Result := ClearSignalJobs(AJob)
else
Result := 0;
end;
function TYXDWorkers.ClearSignalJobs(ASource: PJob): Integer;
var
ACount: Integer;
AFirst, ALast, APrior, ANext: PJob;
AWaitParam: TWorkerWaitParam;
begin
Result := 0;
AFirst := nil;
APrior := nil;
FSimpleJobs.FLocker.Enter;
try
ALast := FSimpleJobs.FFirst;
ACount := FSimpleJobs.Count;
FSimpleJobs.FFirst := nil;
FSimpleJobs.FLast := nil;
FSimpleJobs.FCount := 0;
finally
FSimpleJobs.FLocker.Leave;
end;
while ALast <> nil do begin
if (ALast.IsSignalWakeup) and (ALast.Source = ASource) then begin
ANext := ALast.Next;
ALast.Next := nil;
FreeJob(ALast);
ALast := ANext;
if APrior <> nil then
APrior.Next := ANext;
Dec(ACount);
Inc(Result);
end else begin
if AFirst = nil then
AFirst := ALast;
APrior := ALast;
ALast := ALast.Next;
end;
end;
if ACount > 0 then begin
FSimpleJobs.FLocker.Enter;
try
APrior.Next := FSimpleJobs.FFirst;
FSimpleJobs.FFirst := AFirst;
Inc(FSimpleJobs.FCount, ACount);
if FSimpleJobs.FLast = nil then
FSimpleJobs.FLast := APrior;
finally
FSimpleJobs.FLocker.Leave;
end;
end;
AWaitParam.WaitType := 2;
AWaitParam.SourceJob := ASource;
WaitRunningDone(AWaitParam);
FreeJob(ASource);
end;
procedure TYXDWorkers.ClearWorkers;
var
i: Integer;
{$IFDEF MSWINDOWS}
function WorkerExists: Boolean;
var
J: Integer;
ACode: Cardinal;
begin
Result := False;
FLocker.Enter;
try
while FWorkerCount > 0 do begin
if GetExitCodeThread(FWorkers[0].Handle, ACode) then begin
if ACode = STILL_ACTIVE then begin
Result := True;
Break;
end;
end;
// 工作者已经不存在,可能被外部线程结束
FreeAndNil(FWorkers[0]);
if FWorkerCount > 0 then begin
for J := 1 to FWorkerCount - 1 do
FWorkers[J - 1] := FWorkers[J];
Dec(FWorkerCount);
FWorkers[FWorkerCount] := nil;
end;
end;
finally
FLocker.Leave;
end;
end;
{$ENDIF}
begin
FTerminating := True;
FLocker.Enter;
try
FRepeatJobs.FFirstFireTime := 0;
for i := 0 to FWorkerCount - 1 do
FWorkers[i].FEvent.SetEvent;
finally
FLocker.Leave;
end;
while (FWorkerCount > 0) {$IFDEF MSWINDOWS} and WorkerExists {$ENDIF} do
{$IFDEF MSWINDOWS}SwitchToThread; {$ELSE} TThread.Yield;{$ENDIF}
end;
constructor TYXDWorkers.Create(AMinWorkers: Integer);
var
i: Integer;
begin
FBusyCount := 0;
FSimpleJobs := TSimpleJobs.Create(Self);
FRepeatJobs := TRepeatJobs.Create(Self);
FSignalJobs := TYXDHashTable.Create(1361);
FSignalJobs.OnDelete := DoJobFree;
FSignalJobs.AutoSize := True;
FLocker := TCriticalSection.Create;
FCPUNum := GetCPUCount;
if AMinWorkers < 1 then
FMinWorkers := 2
else
FMinWorkers := AMinWorkers; // 最少工作者为2个
FMaxWorkers := (FCPUNum shl 1) + 1;
if FMaxWorkers <= FMinWorkers then
FMaxWorkers := (FMinWorkers shl 1) + 1;
FTerminating := False;
// 创建默认工作者
FDisableCount := 0;
FMaxSignalId := 0;
FWorkerCount := FMinWorkers;
SetLength(FWorkers, FMaxWorkers);
for i := 0 to FMinWorkers - 1 do begin
FWorkers[i] := TYXDWorker.Create(Self);
FWorkers[i].SetValue(WORKER_RESERVED, True); // 保留,不需要空闲检查
FWorkers[i].Suspended := False;
{$IFDEF MSWINDOWS}
SetThreadCPU(FWorkers[i].Handle, i mod FCPUNum);
{$ELSE}
SetThreadCPU(FWorkers[i].ThreadID, i mod FCPUNum);
{$ENDIF}
end;
FMaxLongtimeWorkers := (FMaxWorkers shr 1);
{$IFDEF MSWINDOWS}
FMainWorker := AllocateHWnd(DoMainThreadWork);
{$ENDIF}
end;
destructor TYXDWorkers.Destroy;
begin
ClearWorkers;
FLocker.Enter;
try
FreeAndNil(FSimpleJobs);
FreeAndNil(FRepeatJobs);
FreeAndNil(FSignalJobs);
finally
FLocker.Leave;
FLocker.Free;
end;
{$IFDEF MSWINDOWS}
DeallocateHWnd(FMainWorker);
{$ENDIF}
inherited;
end;
procedure TYXDWorkers.DisableWorkers;
begin
AtomicIncrement(FDisableCount);
end;
procedure TYXDWorkers.DoCustomFreeData(AFreeType: TJobDataFreeType;
var AData: Pointer);
begin
if Assigned(FOnCustomFreeData) then
FOnCustomFreeData(Self, AFreeType, AData);
end;
procedure TYXDWorkers.DoJobFree(ATable: TObject; AHash: Cardinal; AData: Pointer);
var
ASignal: PSignal;
begin
ASignal := AData;
if ASignal.First <> nil then
FreeJob(ASignal.First);
Dispose(ASignal);
end;
{$IFDEF MSWINDOWS}
procedure TYXDWorkers.DoMainThreadWork(var AMsg: TMessage);
var
AJob: PJob;
begin
if AMsg.Msg = WM_APP then begin
try
AJob := PJob(AMsg.WPARAM);
AJob.Worker.DoJob(AJob);
finally
if AMsg.LPARAM <> 0 then
TEvent(AMsg.LPARAM).SetEvent;
end;
end else
AMsg.Result := DefWindowProc(FMainWorker, AMsg.Msg, AMsg.WPARAM, AMsg.LPARAM);
end;
{$ENDIF}
procedure TYXDWorkers.EnableWorkers;
var
ANeedCount: Integer;
begin
if AtomicDecrement(FDisableCount) = 0 then begin
if (FSimpleJobs.Count > 0) or (FRepeatJobs.Count > 0) then begin
ANeedCount := FSimpleJobs.Count + FRepeatJobs.Count;
while ANeedCount > 0 do begin
if not LookupIdleWorker then
Break;
Dec(ANeedCount);
end;
end;
end;
end;
procedure InitJobFreeType(AOwner: TYXDWorkers; AJob: PJob; AData: Pointer; AFreeType: TJobDataFreeType); inline;
begin
if AData <> nil then begin
case AFreeType of
jdfFreeAsObject:
AJob.IsObjectOwner := True;
jdfFreeAsRecord:
AJob.IsRecordOwner := True;
jdfFreeAsInterface:
begin
AJob.IsInterfaceOwner := True;
IUnknown(AData)._AddRef;
end;
end;
end else begin
if AFreeType <> jdfFreeByUser then
AOwner.DoCustomFreeData(AFreeType, AData);
end;
end;
procedure TYXDWorkers.FireSignalJob(ASignal: PSignal; AData: Pointer;
AFreeType: TJobDataFreeType);
var
AJob, ACopy: PJob;
ACount: PInteger;
begin
Inc(ASignal.Fired);
if AData <> nil then begin
New(ACount);
ACount^ := 1; //初始值
end else
ACount := nil;
AJob := ASignal.First;
while AJob <> nil do begin
ACopy := JobPool.Pop;
ACopy.Assign(AJob);
ACopy.SetValue(JOB_RUN_ONCE, True);
ACopy.Source := AJob;
ACopy.Data := AData;
InitJobFreeType(Self, ACopy, AData, AFreeType);
if ACount <> nil then begin
AtomicIncrement(ACount^);
ACopy.RefCount := ACount;
end;
FSimpleJobs.Push(ACopy);
AJob := AJob.Next;
end;
if AData <> nil then begin
if AtomicDecrement(ACount^) = 0 then begin
Dispose(ACount);
FreeJobData(AData, AFreeType);
end;
end;
end;
procedure TYXDWorkers.FreeJob(AJob: PJob);
var
ANext: PJob;
AFreeData: Boolean;
begin
while AJob <> nil do begin
ANext := AJob.Next;
if AJob.Data <> nil then begin
if AJob.IsSignalWakeup then begin
AFreeData := AtomicDecrement(AJob.RefCount^) = 0;
if AFreeData then
Dispose(AJob.RefCount);
end else
AFreeData := AJob.IsDataOwner;
if AFreeData then begin
if AJob.IsObjectOwner then begin
FreeJobData(AJob.Data, jdfFreeAsObject);
AJob.Data := nil;
end else if AJob.IsRecordOwner then begin
FreeJobData(AJob.Data, jdfFreeAsRecord);
AJob.Data := nil;
end else if AJob.IsInterfaceOwner then begin
FreeJobData(AJob.Data, jdfFreeAsInterface);
AJob.Data := nil;
end;
end;
end;
JobPool.Push(AJob);
AJob := ANext;
end;
end;
procedure TYXDWorkers.FreeJobData(AData: Pointer; AFreeType: TJobDataFreeType);
begin
case AFreeType of
jdfFreeAsObject:
try
FreeAndNil(TObject(AData));
except
if Assigned(FOnErrorNotify) then
FOnErrorNotify(nil, Exception(ExceptObject), 'Workers.FreeJobData');
end;
jdfFreeAsRecord:
try
Dispose(AData);
except
if Assigned(FOnErrorNotify) then
FOnErrorNotify(nil, Exception(ExceptObject), 'Workers.FreeJobData');
end;
jdfFreeAsInterface:
try
IUnknown(AData)._Release;
except
if Assigned(FOnErrorNotify) then
FOnErrorNotify(nil, Exception(ExceptObject), 'Workers.FreeJobData');
end;
end;
end;
function TYXDWorkers.GetEnabled: Boolean;
begin
Result := (FDisableCount = 0);
end;
class function TYXDWorkers.GetInstance: TYXDWorkers;
begin
if not Assigned(Workers) then
Workers := TYXDWorkers.Create();
Result := Workers;
end;
class function TYXDWorkers.JobPoolCount: Integer;
begin
Result := JobPool.Count;
end;
function TYXDWorkers.LookupIdleWorker: Boolean;
var
i: Integer;
AWorker: TYXDWorker;
begin
Result := False;
if (FDisableCount <> 0) or (FBusyCount = MaxWorkers) or FTerminating then
Exit;
FLocker.Enter;
try
if FBusyCount < FWorkerCount then begin
for i := 0 to FWorkerCount - 1 do begin
AWorker := FWorkers[i];
if (AWorker <> nil) and (AWorker.IsIdle) then begin
AWorker.Suspended := False;
AWorker.SetValue(WORKER_ISBUSY, true);
AtomicIncrement(FBusyCount);
AWorker.FEvent.SetEvent;
Result := true;
Exit;
end;
end;
end;
if (not Result) and (FWorkerCount < MaxWorkers) then begin
AWorker := TYXDWorker.Create(Self);
{$IFDEF MSWINDOWS}
SetThreadCPU(AWorker.Handle, FWorkerCount mod FCPUNum);
{$ELSE}
SetThreadCPU(AWorker.ThreadID, FWorkerCount mod GetCPUCount);
{$ENDIF}
AWorker.Suspended := False;
AWorker.FEvent.SetEvent;
FWorkers[FWorkerCount] := AWorker;
Inc(FWorkerCount);
Result := true;
end;
finally
FLocker.Leave;
end;
end;
function TYXDWorkers.Popup: PJob;
begin
Result := FSimpleJobs.Pop;
if Result = nil then
Result := FRepeatJobs.Pop;
end;
procedure InitJob(AJob: PJob; AData: Pointer;
ARunInMainThread: Boolean; const ADelay, AInterval: Int64); inline;
begin
AJob.Data := AData;
if AInterval > 0 then begin
AJob.Interval := AInterval;
AJob.SetValue(JOB_RUN_ONCE, False);
end else
AJob.SetValue(JOB_RUN_ONCE, True);
AJob.FirstDelay := ADelay;
AJob.SetValue(JOB_IN_MAINTHREAD, ARunInMainThread);
end;
function TYXDWorkers.Post(AJobProc: TJobProc; AData: Pointer;
ARunInMainThread: Boolean; const ADelay, AInterval: Int64;
AFreeType: TJobDataFreeType): Boolean;
var
AJob: PJob;
begin
AJob := JobPool.Pop;
AJob.WorkerProc := AJobProc;
InitJob(AJob, AData, ARunInMainThread, ADelay, AInterval);
InitJobFreeType(Self, AJob, AData, AFreeType);
Result := Post(AJob);
end;
function TYXDWorkers.Post(AJobProc: TJobProcG; AData: Pointer;
ARunInMainThread: Boolean; const ADelay, AInterval: Int64;
AFreeType: TJobDataFreeType): Boolean;
var
AJob: PJob;
begin
AJob := JobPool.Pop;
AJob.WorkerProc := MakeJobProc(AJobProc);
InitJob(AJob, AData, ARunInMainThread, ADelay, AInterval);
InitJobFreeType(Self, AJob, AData, AFreeType);
Result := Post(AJob);
end;
{$IFDEF UNICODE}
function TYXDWorkers.Post(AJobProc: TJobProcA; AData: Pointer;
ARunInMainThread: Boolean; const ADelay, AInterval: Int64;
AFreeType: TJobDataFreeType): Boolean;
var
AJob: PJob;
begin
AJob := JobPool.Pop;
AJob.WorkerProcA := AJobProc;
InitJob(AJob, AData, ARunInMainThread, ADelay, AInterval);
InitJobFreeType(Self, AJob, AData, AFreeType);
Result := Post(AJob);
end;
{$ENDIF}
function TimeToDelay(const ATime: TDateTime): Int64; inline;
var
ANow, ATemp: TDateTime;
begin
ANow := Now;
ANow := ANow - Trunc(ANow); // ATime我们只要时间部分,日期忽略
ATemp := ATime - Trunc(ATime);
if ANow > ATemp then // 好吧,今天的点已经过了,算明天
Result := Trunc(((1 + ANow) - ATemp) * WODay) // 延迟的时间,单位为1ms
else
Result := Trunc((ATemp - ANow) * WODay);
end;
function TYXDWorkers.Post(AJobProc: TJobProcG; const ATime: TDateTime;
const AInterval: Int64; AData: Pointer; ARunInMainThread: Boolean;
AFreeType: TJobDataFreeType): Boolean;
var
AJob: PJob;
begin
AJob := JobPool.Pop;
AJob.WorkerProc := MakeJobProc(AJobProc);
InitJob(AJob, AData, ARunInMainThread, TimeToDelay(ATime), AInterval);
InitJobFreeType(Self, AJob, AData, AFreeType);
Result := Post(AJob);
end;
function TYXDWorkers.Post(AJobProc: TJobProc; const ATime: TDateTime;
const AInterval: Int64; AData: Pointer; ARunInMainThread: Boolean;
AFreeType: TJobDataFreeType): Boolean;
var
AJob: PJob;
begin
AJob := JobPool.Pop;
AJob.WorkerProc := AJobProc;
InitJob(AJob, AData, ARunInMainThread, TimeToDelay(ATime), AInterval);
InitJobFreeType(Self, AJob, AData, AFreeType);
Result := Post(AJob);
end;
{$IFDEF UNICODE}
function TYXDWorkers.Post(AJobProc: TJobProcA; const ATime: TDateTime;
const AInterval: Int64; AData: Pointer; ARunInMainThread: Boolean;
AFreeType: TJobDataFreeType): Boolean;
var
AJob: PJob;
begin
AJob := JobPool.Pop;
AJob.WorkerProcA := AJobProc;
InitJob(AJob, AData, ARunInMainThread, TimeToDelay(ATime), AInterval);
InitJobFreeType(Self, AJob, AData, AFreeType);
Result := Post(AJob);
end;
{$ENDIF}
procedure InitLogJob(AJob: PJob; AData: Pointer); inline;
begin
AJob.Data := AData;
AJob.SetValue(JOB_LONGTIME, True);
AJob.SetValue(JOB_RUN_ONCE, True); // 长作业只运行一次
end;
function TYXDWorkers.PostLongJob(AJobProc: TJobProc; AData: Pointer;
AFreeType: TJobDataFreeType): Boolean;
var
AJob: PJob;
begin
if AtomicIncrement(FLongTimeWorkers) <= FMaxLongtimeWorkers then begin
Result := True;
AJob := JobPool.Pop;
AJob.WorkerProc := AJobProc;
InitLogJob(AJob, AData);
InitJobFreeType(self, AJob, AData, AFreeType);
Post(AJob);
end else begin // 长期作业数已经达到极限
AtomicDecrement(FLongTimeWorkers);
Result := False;
end;
end;
function TYXDWorkers.PostLongJob(AJobProc: TJobProcG; AData: Pointer;
AFreeType: TJobDataFreeType): Boolean;
begin
Result := PostLongJob(MakeJobProc(AJobProc), AData, AFreeType);
end;
{$IFDEF UNICODE}
function TYXDWorkers.PostLongJob(AJobProc: TJobProcA; AData: Pointer;
AFreeType: TJobDataFreeType): Boolean;
var
AJob: PJob;
begin
if AtomicIncrement(FLongTimeWorkers) <= FMaxLongtimeWorkers then begin
Result := True;
AJob := JobPool.Pop;
AJob.WorkerProcA := AJobProc;
InitLogJob(AJob, AData);
InitJobFreeType(self, AJob, AData, AFreeType);
Post(AJob);
end else begin // 长期作业数已经达到极限
AtomicDecrement(FLongTimeWorkers);
Result := False;
end;
end;
{$ENDIF}
procedure InitWaitJob(AJob: PJob; ASignalId: Integer; ARunInMainThread: Boolean); inline;
begin
AJob.Data := nil;
AJob.SignalId := ASignalId;
AJob.PushTime := GetTimestamp;
AJob.SetValue(JOB_SIGNAL_WAKEUP, True);
AJob.SetValue(JOB_IN_MAINTHREAD, ARunInMainThread);
end;
function TYXDWorkers.PostWaitJob(AJob: PJob; ASignalId: Integer): Boolean;
var
ASignal: PSignal;
begin
Result := False;
FLocker.Enter;
try
ASignal := FSignalJobs.FindFirstData(ASignalId);
if ASignal <> nil then begin
AJob.Next := ASignal.First;
ASignal.First := AJob;
Result := True;
end;
finally
FLocker.Leave;
if not Result then
JobPool.Push(AJob);
end;
end;
function TYXDWorkers.PostWait(AJobProc: TJobProc; ASignalId: Integer;
ARunInMainThread: Boolean): Boolean;
var
AJob: PJob;
begin
if not FTerminating then begin
AJob := JobPool.Pop;
AJob.WorkerProc := AJobProc;
InitWaitJob(AJob, ASignalId, ARunInMainThread);
Result := PostWaitJob(AJob, ASignalId);
end else
Result := False;
end;
function TYXDWorkers.PostWait(AJobProc: TJobProcG; ASignalId: Integer;
ARunInMainThread: Boolean): Boolean;
begin
Result := PostWait(MakeJobProc(AJobProc), ASignalId, ARunInMainThread);
end;
{$IFDEF UNICODE}
function TYXDWorkers.PostWait(AJobProc: TJobProcA; ASignalId: Integer;
ARunInMainThread: Boolean): Boolean;
var
AJob: PJob;
begin
if not FTerminating then begin
AJob := JobPool.Pop;
AJob.WorkerProcA := AJobProc;
InitWaitJob(AJob, ASignalId, ARunInMainThread);
Result := PostWaitJob(AJob, ASignalId);
end else
Result := False;
end;
{$ENDIF}
function TYXDWorkers.Post(AJob: PJob): Boolean;
begin
if (not FTerminating) and (Assigned(AJob.WorkerProc)
{$IFDEF UNICODE} or Assigned(AJob.WorkerProcA){$ENDIF}) then
begin
if AJob.Runonce and (AJob.FirstDelay = 0) then
Result := FSimpleJobs.Push(AJob)
else
Result := FRepeatJobs.Push(AJob);
if Result then
LookupIdleWorker;
end else begin
AJob.Next := nil;
FreeJob(AJob);
Result := False;
end;
end;
procedure TYXDWorkers.SendSignal(AId: Integer; AData: Pointer;
AFreeType: TJobDataFreeType);
var
AFound: Boolean;
ASignal: PSignal;
begin
AFound := False;
if FTerminating then Exit;
FLocker.Enter;
try
ASignal := FSignalJobs.FindFirstData(AId);
if ASignal <> nil then begin
AFound := True;
FireSignalJob(ASignal, AData, AFreeType);
end;
finally
FLocker.Leave;
end;
if AFound then
LookupIdleWorker;
end;
procedure TYXDWorkers.SendSignal(const AName: string; AData: Pointer;
AFreeType: TJobDataFreeType);
var
i: Integer;
ASignal: PSignal;
AFound: Boolean;
begin
AFound := False;
if Length(AName) = 0 then Exit;
FLocker.Enter;
try
for i := 0 to FSignalJobs.BucketCount - 1 do begin
if FSignalJobs.Buckets[i] <> nil then begin
ASignal := FSignalJobs.Buckets[i].Data;
if (Length(ASignal.Name) = Length(AName)) and (ASignal.Name = AName) then begin
AFound := True;
FireSignalJob(ASignal, AData, AFreeType);
Break;
end;
end;
end;
finally
FLocker.Leave;
end;
if AFound then
LookupIdleWorker;
end;
procedure TYXDWorkers.SetEnabled(const Value: Boolean);
begin
if Value then
EnableWorkers
else
DisableWorkers;
end;
procedure TYXDWorkers.SetMaxLongtimeWorkers(const Value: Integer);
begin
if FMaxLongtimeWorkers <> Value then begin
if Value > (MaxWorkers shr 1) then // 长时间运行的作业不能大于最大工作线程的一半
raise Exception.Create(STooManyLongtimeWorker);
FMaxLongtimeWorkers := Value;
end;
end;
procedure TYXDWorkers.SetMaxWorkers(const Value: Integer);
var
ATemp, AMaxLong: Integer;
begin
if (Value >= 2) and (FMaxWorkers <> Value) then begin
AtomicExchange(ATemp, FLongTimeWorkers);
AtomicExchange(FLongTimeWorkers, 0); // 强制置0,防止有新入的长时间作业
AMaxLong := Value shr 1;
FLocker.Enter;
try
if FLongTimeWorkers < AMaxLong then begin // 已经进行的长时间作业数小于一半的工作者
if ATemp < AMaxLong then
AMaxLong := ATemp; // 长时间作业最大值使用更改之前已经存在的长时间作业数量
if FMaxWorkers > Value then begin
while Value < FWorkerCount do // 中止大于最大作业数的作业,谁是倒霉孩子?
WorkerTerminate(FWorkers[FWorkerCount - 1]);
FMaxWorkers := Value;
SetLength(FWorkers, Value);
end else begin
FMaxWorkers := Value;
SetLength(FWorkers, Value);
end;
end;
finally
FLocker.Leave;
AtomicExchange(FLongTimeWorkers, AMaxLong);
end;
end;
end;
procedure TYXDWorkers.SetMinWorkers(const Value: Integer);
begin
if FMinWorkers <> Value then begin
if Value < 1 then
raise Exception.Create(STooFewWorkers);
FMinWorkers := Value;
end;
end;
function TYXDWorkers.SignalIdByName(const AName: string): Integer;
var
i, j: Integer;
ASignal: PSignal;
begin
Result := -1;
j := Length(AName);
if j < 1 then Exit;
for i := 0 to FSignalJobs.BucketCount - 1 do begin
if FSignalJobs.Buckets[i] <> nil then begin
ASignal := FSignalJobs.Buckets[i].Data;
if (Length(ASignal.Name) = j) and (ASignal.Name = AName) then begin
Result := ASignal.Id;
Exit;
end;
end;
end;
end;
procedure TYXDWorkers.SignalWorkDone(AJob: PJob; AUsedTime: Int64);
var
ASignal: PSignal;
ATemp, APrior: PJob;
begin
APrior := nil;
FLocker.Enter;
try
ASignal := FSignalJobs.FindFirstData(AJob.SignalId);
ATemp := ASignal.First;
while ATemp <> nil do begin
if ATemp = AJob.Source then begin
if AJob.IsTerminated then begin
if APrior <> nil then
APrior.Next := ATemp.Next
else
ASignal.First := ATemp.Next;
ATemp.Next := nil;
FreeJob(ATemp);
end else begin
Inc(ATemp.Runs); // 更新信号作业的统计信息
if AUsedTime > 0 then begin
if ATemp.MinUsedTime = 0 then
ATemp.MinUsedTime := AUsedTime
else if AUsedTime < ATemp.MinUsedTime then
ATemp.MinUsedTime := AUsedTime;
if ATemp.MaxUsedTime = 0 then
ATemp.MaxUsedTime := AUsedTime
else if AUsedTime > ATemp.MaxUsedTime then
ATemp.MaxUsedTime := AUsedTime;
Break;
end;
end;
end;
APrior := ATemp;
ATemp := ATemp.Next;
end;
finally
FLocker.Leave;
end;
end;
procedure TYXDWorkers.WaitRunningDone(const AParam: TWorkerWaitParam);
var
AInMainThread: Boolean;
function HasJobRunning: Boolean;
var
i: Integer;
AJob: PJob;
begin
Result := False;
DisableWorkers;
FLocker.Enter;
try
for i := 0 to FWorkerCount - 1 do begin
if FWorkers[i].IsLookuping then begin// 还未就绪,所以在下次查询
Result := True;
Break;
end else if FWorkers[i].IsExecuting then begin
AJob := FWorkers[i].FActiveJob;
case AParam.WaitType of
0: // ByObject
Result := TMethod(FWorkers[i].FActiveJobProc).Data = AParam.Bound;
1: // ByData
Result := (TMethod(FWorkers[i].FActiveJobProc).Code = TMethod(AParam.WorkerProc).Code) and
(TMethod(FWorkers[i].FActiveJobProc).Data = TMethod(AParam.WorkerProc).Data) and
((AParam.Data = Pointer(-1)) or
(FWorkers[i].FActiveJobData = AParam.Data));
2: // BySignalSource
Result := (FWorkers[i].FActiveJobSource = AParam.SourceJob);
3: // ByGroup
Result := (FWorkers[i].FActiveJobGroup = AParam.Group);
$FF: // 所有
Result := True;
else
if Assigned(FOnErrorNotify) then
FOnErrorNotify(AJob, Exception.CreateFmt(SBadWaitDoneParam, [AParam.WaitType]), 'TYXDWorkers.WaitRunningDone');
end;
if Result then begin
FWorkers[i].FTerminatingJob := AJob;
Break;
end;
end;
end;
finally
FLocker.Leave;
EnableWorkers;
end;
end;
begin
AInMainThread := GetCurrentThreadId = MainThreadId;
while True do begin
if HasJobRunning then begin
if AInMainThread then begin
// 如果是在主线程中清理,由于作业可能在主线程执行,可能已经投寄尚未执行,所以必需让其能够执行
{$IFDEF NEXTGEN}
fmx.Forms.Application.ProcessMessages;
{$ELSE}
Forms.Application.ProcessMessages;
{$ENDIF}
end;
{$IFDEF MSWINDOWS}
SwitchToThread;
{$ELSE}
TThread.Yield;
{$ENDIF}
end else // 没找到
Break;
end;
end;
procedure TYXDWorkers.WorkerIdle(AWorker: TYXDWorker; AReason: TWorkerIdleReason);
var
i, J: Integer;
begin
if AtomicDecrement(FBusyCount) > FMinWorkers then begin
FLocker.Enter;
try // 工作者闲置时,如果不是前两个长工,就检测是否有短工可以解雇
if (AWorker <> FWorkers[0]) and (AWorker <> FWorkers[1]) and (AReason = irTimeout) then
begin
for i := FMinWorkers to FWorkerCount - 1 do begin // 从第一个短工开始,将他们解雇
if AWorker = FWorkers[i] then begin
AWorker.Terminate;
for J := i + 1 to FWorkerCount - 1 do
FWorkers[J - 1] := FWorkers[J];
FWorkers[FWorkerCount - 1] := nil;
Dec(FWorkerCount);
Break;
end;
end;
end;
finally
FLocker.Leave;
end;
end;
if AReason <> irNoJob then
AtomicIncrement(FBusyCount)
end;
procedure TYXDWorkers.WorkerTerminate(AWorker: TObject);
var
i ,J: Integer;
begin
AtomicDecrement(FBusyCount);
FLocker.Enter;
for i := 0 to FWorkerCount - 1 do begin // 工作者被中止时,从列表中清除
if FWorkers[i] = AWorker then begin
//System.Move(FWorkers[I + 1], FWorkers[I], (FWorkerCount - I) * SizeOf(TObject));
for J := i to FWorkerCount - 2 do
FWorkers[J] := FWorkers[J + 1];
FWorkers[FWorkerCount - 1] := nil;
Dec(FWorkerCount);
Break;
end;
end;
FLocker.Leave;
end;
function TYXDWorkers.RegisterSignal(const AName: string): Integer;
var
ASignal: PSignal;
begin
FLocker.Enter;
try
Result := SignalIdByName(AName);
if Result < 0 then begin
Inc(FMaxSignalId);
New(ASignal);
ASignal.Id := FMaxSignalId;
ASignal.Fired := 0;
ASignal.Name := AName;
ASignal.First := nil;
FSignalJobs.Add(ASignal, ASignal.Id);
Result := ASignal.Id;
end;
finally
FLocker.Leave;
end;
end;
{ TJobGroup }
function TJobGroup.Add(AProc: TJobProc; AData: Pointer;
AInMainThread: Boolean; AFreeType: TJobDataFreeType): Boolean;
var
AJob: PJob;
begin
AJob := JobPool.Pop;
AJob.Group := Self;
AJob.WorkerProc := AProc;
AJob.Data := AData;
AJob.SetValue(JOB_RUN_ONCE, True);
AJob.SetValue(JOB_GROUPED, True);
AJob.SetValue(JOB_IN_MAINTHREAD, AInMainThread);
InitJobFreeType(FOwner, AJob, AData, AFreeType);
FLocker.Enter;
try
FWaitResult := wrIOCompletion;
if FPrepareCount > 0 then begin // 正在添加项目,加到列表中,等待Run
FItems.Add(AJob);
Result := True;
end else begin
if ByOrder then begin // 按顺序
Result := True;
FItems.Add(AJob);
if FItems.Count = 0 then
Result := FOwner.Post(AJob);
end else begin
Result := FOwner.Post(AJob);
if Result then
FItems.Add(AJob);
end;
end;
finally
FLocker.Leave;
end;
end;
procedure TJobGroup.Cancel;
var
i: Integer;
AJobs: TSimpleJobs;
AJob, APrior, ANext: PJob;
AWaitParam: TWorkerWaitParam;
begin
FLocker.Enter;
try
if FByOrder then begin
for i := 0 to FItems.Count - 1 do begin
AJob := FItems[i];
if AJob.PopTime = 0 then
FOwner.FreeJob(AJob);
end;
end;
FItems.Clear;
finally
FLocker.Leave;
end;
// 从SimpleJobs里清除关联的全部作业
AJobs := FOwner.FSimpleJobs;
AJobs.FLocker.Enter;
try
AJob := AJobs.FFirst;
APrior := nil;
while AJob <> nil do begin
ANext := AJob.Next;
if AJob.IsGrouped and (AJob.Group = Self) then begin
if APrior = nil then
AJobs.FFirst := AJob.Next
else
APrior.Next := AJob.Next;
AJob.Next := nil;
FOwner.FreeJob(AJob);
if AJob = AJobs.FLast then
AJobs.FLast := nil;
end else
APrior := AJob;
AJob := ANext;
end;
finally
AJobs.FLocker.Leave;
end;
AWaitParam.WaitType := 3;
AWaitParam.Group := Self;
FOwner.WaitRunningDone(AWaitParam);
end;
constructor TJobGroup.Create(AOwner: TYXDWorkers; AByOrder: Boolean);
begin
if Assigned(AOwner) then
FOwner := AOwner
else
FOwner := Workers;
if (not Assigned(FOwner)) then
raise Exception.Create(SNotInitWorkers);
FEvent := TEvent.Create(nil, False, False, '');
FLocker := TSimpleLock.Create;
FByOrder := AByOrder;
FItems := TJobItemList.Create;
end;
constructor TJobGroup.Create(AByOrder: Boolean);
begin
Create(nil, AByOrder);
end;
destructor TJobGroup.Destroy;
var
i: Integer;
begin
Cancel;
FOwner.Clear(Self, -1);
FLocker.Enter;
try
if FItems.Count > 0 then begin
FWaitResult := wrAbandoned;
FEvent.SetEvent;
for i := 0 to FItems.Count - 1 do begin
if PJob(FItems[i]).PushTime <> 0 then
JobPool.Push(FItems[i]);
end;
FItems.Clear;
end;
finally
FLocker.Leave;
end;
FreeAndNil(FLocker);
FreeAndNil(FEvent);
FreeAndNil(FItems);
inherited;
end;
procedure TJobGroup.DoAfterDone;
begin
if Assigned(FAfterDone) then begin
try
FAfterDone(Self);
except
if Assigned(FOwner.FOnErrorNotify) then
FOwner.FOnErrorNotify(nil, Exception(ExceptObject), 'TJobGroup.DoAfterDone');
end;
end;
end;
procedure TJobGroup.DoJobExecuted(AJob: PJob);
var
i: Integer;
AIsDone: Boolean;
begin
if FWaitResult = wrIOCompletion then begin
AIsDone := False;
FLocker.Enter;
try
i := FItems.IndexOf(AJob);
if i <> -1 then begin
FItems.Delete(i);
if FItems.Count = 0 then begin
FWaitResult := wrSignaled;
FEvent.SetEvent;
AIsDone := true;
end else if ByOrder then begin
if not FOwner.Post(FItems[0]) then begin
FWaitResult := wrAbandoned;
FEvent.SetEvent;
end;
end;
end;
finally
FLocker.Leave;
end;
if AIsDone then
DoAfterDone;
end;
end;
procedure TJobGroup.DoJobsTimeout(AJob: PJob);
begin
FTimeoutCheck := False;
Cancel;
if FWaitResult = wrIOCompletion then begin
FWaitResult := wrTimeout;
FEvent.SetEvent;
DoAfterDone;
end;
end;
function TJobGroup.MsgWaitFor(ATimeout: Cardinal): TWaitResult;
var
AEndTime: Int64;
begin
if GetCurrentThreadId <> MainThreadId then
Result := WaitFor(ATimeout)
else begin
Result := FWaitResult;
FLocker.Enter;
try
if FItems.Count = 0 then
Result := wrSignaled;
finally
FLocker.Leave;
end;
if Result = wrIOCompletion then begin
AEndTime := GetTimestamp + ATimeout * 10;
while GetTimestamp < AEndTime do begin
// 每隔10毫秒检查一下是否有消息需要处理,有则处理,无则进入下一个等待
if FEvent.WaitFor(10) = wrSignaled then begin
Result := FWaitResult;
Break;
end else begin
// 如果是在主线程中清理,由于作业可能在主线程执行,可能已经投寄尚未执行,所以必需让其能够执行
{$IFDEF NEXTGEN}
fmx.Forms.Application.ProcessMessages;
{$ELSE}
Forms.Application.ProcessMessages;
{$ENDIF}
end;
end;
if Result = wrIOCompletion then begin
Cancel;
if Result = wrIOCompletion then
Result := wrTimeout;
end;
if FTimeoutCheck then
FOwner.Clear;
DoAfterDone;
end;
end;
end;
procedure TJobGroup.Prepare;
begin
AtomicIncrement(FPrepareCount);
end;
procedure TJobGroup.Run(ATimeout: Cardinal = INFINITE);
var
i: Integer;
begin
if AtomicDecrement(FPrepareCount) = 0 then begin
if ATimeout <> INFINITE then begin
FTimeoutCheck := True;
FOwner.Post(DoJobsTimeout, nil, False, ATimeout);
end;
FLocker.Enter;
try
if FItems.Count = 0 then
FWaitResult := wrSignaled
else begin
FWaitResult := wrIOCompletion;
if ByOrder then begin
if not FOwner.Post(FItems[0]) then
FWaitResult := wrAbandoned;
end else begin
for i := 0 to FItems.Count - 1 do begin
if not FOwner.Post(FItems[i]) then begin
FWaitResult := wrAbandoned;
Break;
end;
end;
end;
end;
finally
FLocker.Leave;
end;
if FWaitResult <> wrIOCompletion then
DoAfterDone;
end;
end;
function TJobGroup.WaitFor(ATimeout: Cardinal): TWaitResult;
begin
Result := FWaitResult;
FLocker.Enter;
try
if FItems.Count = 0 then
Result := wrSignaled;
finally
FLocker.Leave;
end;
if Result = wrIOCompletion then begin
if FEvent.WaitFor(ATimeout) = wrSignaled then
Result := FWaitResult
else
Result := wrTimeout;
end;
if FTimeoutCheck then
FOwner.Clear;
DoAfterDone;
end;
initialization
_CPUCount := GetCPUCount;
{$IFNDEF NEXTGEN}
GetTickCount64 := GetProcAddress(GetModuleHandle(kernel32), 'GetTickCount64');
if not QueryPerformanceFrequency(_PerfFreq) then
_PerfFreq := -1;
{$ELSE}
_Watch := TStopWatch.Create;
_Watch.Start;
{$ENDIF}
JobPool := TJobPool.Create(1024);
//Workers := TYXDWorkers.Create;
finalization
if Assigned(Workers) then FreeAndNil(Workers);
if Assigned(JobPool) then FreeAndNil(JobPool);
end.
