基于 QWorker 的多线程编程 – For 并行

看到这个标题,你可能会感觉到疑惑:难道我们前面用 Post 或 TQJobGroup 提交的作业不是并行的吗?为什么又会冒出一个 For 并行?

首先,第一个问题的答案是:它们当然是并行的,但在特定的场合下,由于调度会引起一定的性能损失。

接下来,第二个问题的答案是:For 并行是在特定场合下的解决方案,它的目的是按指定的次数重复执行指定的作业处理函数。由于它不需要重复投寄作业,所以调度开销更小,在满足条件的前提下,性能更佳。

我们来看看 For 并行有什么限制:

(1)、它只会每个CPU使用一个工作者工作。

(2)、它必需等待所有的作业完成或调用BreakIt中止时,才会退出执行。

(3)、它的所有作业执行函数都是在后台线程中的执行的,不允许调度主线程作业。

(4)、同时只有一个作业函数在并发执行,不能同时运行不同的作业函数。

而与 For 并行相比,TQJobGroup 的主要优势体现在:

(1)、更大的并发处理能力,实际的工作者数量仅受作业数量和 MaxWorkers 属性限制。

(2)、更灵活。

(2.1)、作业可以工作在主线程和后台线程中;

(2.2)、作业可以顺序执行,也可以并行执行;

(2.3)、可以同时运行不同的作业函数。

综上所述,在能够使用 For 并行的场合,性能要比 TQJobGroup 更高。我们根据上面的比较可以基本确定一些原则:

(1)、For 不能用于需要在主线程中执行的作业;

(2)、For 适用于同时执行同一个函数的作业,同时运行不同的作业处理函数需要使用 TQJobGroup;

(3)、For 不能要求作业函数的执行顺序;

(4)、For 不应用于慢速 IO 牵涉到的计算;

好了,说了这么多,我们来做一个实际的 For 并行的例子,首先来看 For 并行的声明:

class function TQWorkers.&For(const AStartIndex, AStopIndex: NativeInt;
  AWorkerProc: TQForJobProc; AMsgWait: Boolean; AData: Pointer;
  AFreeType: TQJobDataFreeType): TWaitResult;inline;
class function TQWorkers.&For(const AStartIndex, AStopIndex: NativeInt;
  AWorkerProc: TQForJobProcA; AMsgWait: Boolean; AData: Pointer;
  AFreeType: TQJobDataFreeType): TWaitResult;inline;
class function TQWorkers.&For(const AStartIndex, AStopIndex: NativeInt;
  AWorkerProc: TQForJobProcG; AMsgWait: Boolean; AData: Pointer;
  AFreeType: TQJobDataFreeType): TWaitResult;inline

你可能注意到了,这三个 For 函数都是 inline 内联函数,它实际上触发的是 TQForJobs 的方法,这只是统一接口的语法糖,两者本身没有任何区别。我们来了解下各个参数的含义:

AStartIndex:循环起始值

AStopIndex:循环结束值

AWorkerProc:作业处理过程,有三种重载,分别对应类成员函数、匿名函数和全局函数版本。

AMsgWait:是否在等待时允许消息循环,如果为True,则主线程不会被阻塞,消息能够正常处理,程序自己处理必要的阻塞,否则,主线程被阻塞。

AData:作业附加数据,被当做作业的Data成员传递给作业处理函数。

AFreeType:作业附加数据类型,用于决定是否自动释放作业的附加数据,默认由用户负责释放。

在执行完成后,会返回一个等待结果,如果成功从AStartIndex循环到AStopIndex,则返回wrSignaled,如果被中途取消(调用Break,或者程序退出),则返回wrAbandoned。

【注意】实际循环的执行范围是集合 [AStartIndex,AStopIndex],这意味着AStopIndex应该大于等于AStartIndex,否则循环过程将不会被执行。

那么,接下来,我们看下For作业函数的声明:

TQForJobProc = procedure(ALoopMgr: TQForJobs; AJob: PQJob; AIndex: NativeInt) of object;
TQForJobProcG = procedure(ALoopMgr: TQForJobs; AJob: PQJob;AIndex: NativeInt);
TQForJobProcA = reference to procedure(ALoopMgr: TQForJobs; AJob: PQJob;AIndex: NativeInt);

与普通的作业过程相比,For循环作业过程多了两个参数:

(1)、ALoopMgr参数是循环作业的控制对象,它提供了一个BreakIt方法用于中断循环的执行;

(2)、AIndex参数指明了当前循环的索引值,表明它是第几个循环。

至于AJob参数,其含义与普通的作业没有任何区别。

下在我们来看具体的例子,同前面一样,我们省略了匿名函数和全局函数的版本,需要的话,您 只需要将相关函数直接替换即可:

【Delphi】

procedure TForm1.DoForJobProc(AMgr: TQForJobs; AJob: PQJob; AIndex: NativeInt);
begin
InterlockedIncrement(PInteger(AJob.Data)^);
end;

procedure TForm1.Button1Click(Sender:TObject);
var
  ARuns:NativeInt;
  T,T1:Cardinal;
  I: Integer;
begin
ARuns:=0;
T:=GetTickCount;
TQForJobs.For(0,999999,DoForJobProc,False,@ARuns);
T:=GetTickCount-T;
ShowMessage(IntToStr(T)+'ms');
end;

【C++ Builder】

void __fastcall TForm1::DoForJobProc(TQForJobs *AMgr,PQJob *AJob,NativeInt AIndex)
{
InterlockedIncrement(*((int *)AJob.Data));
}

void __fastcall TForm1::Button1Click(System::TObject *Sender);
{
NativeInt ARuns;
DWORD T,T1;
Integer I;
ARuns=0;
T=GetTickCount();
TQWorkers::For(0,999999,DoForJobProc,false,&ARuns);
T=GetTickCount()-T;
ShowMessage(IntToStr(T)+'ms');
}

关于 For 并行,本文暂时只说这么多,如果有什么疑问,欢迎在官方群或者后面留言。

分享到: