服务器端要推送数据给客户端,有多种不同的方式,SSE 是其中一种实现简单易用的方式,但 Delphi 并没有内置的支持。实际上要实现这一点,并不是很复杂,我们可以直接基于 THttpClient 来实现,这里我们给出一个简单的教程。
我做一个完整的封装,有需要的朋友可以联系购买,价格人民币 99.00 元,包含了完整的 SSE 协议支持。
1. 定义一个支持 SSE 的数据流对象,方便实时接收服务器端推送的数据。因为是示例,我们简化下实现:
type
TSSEStream=class(TBytesStream)
public
function Write(const Buffer; Count: Longint): Longint; override;
end;
2. 使用 THttpClient 发送请求:
procedure TfrmSSE.Button1Click(Sender: TObject);
var
AHttpClient:THttpClient;
ASource,AResp:TStream;
begin
AHttpClient:=THttpClient.Create;
ASource:=TStringStream.Create('{}');
AResp:=TSSEStream.Create;
Memo1.Lines.Add(FormatDateTime('hh:nn:ss.zzz',Now)+':开始请求');
var Async:=AHttpCLient.BeginPost('http://100.64.0.12/test',ASource,AResp);
while not Async.IsCompleted do
Application.ProcessMessages;
Memo1.Lines.Add(FormatDateTime('hh:nn:ss.zzz',Now)+':结束请求');
FreeAndNil(ASource);
FreeAndNil(AResp);
FreeAndNil(AHttpClient);
end;
3. 实现 TSSEStream.Write 方法,我们来处理收到的数据库:
{ TSSEStream }
function TSSEStream.Write(const Buffer; Count: Longint): Longint;
var
APos:Int64;
begin
APos:=Position;
inherited;
var S:=TEncoding.ANSI.GetString(Bytes,APos,Count);
var AOffset:Integer:=5;
var AJson:=TJsonObject.ParseJSONFragment(S,AOffset,[]);
TThread.ForceQueue(nil,
procedure
begin
TfrmSSE(Application.MainForm).Memo1.Lines.Add(FormatDateTime('hh:nn:ss.zzz',Now)+':'+AJson.ToJSON([]));
end
);
end;
实际运行效果:
付费版提供一个接口,供用户使用直接使用:
IZSSESession = interface
['{D5D7CCAA-EB76-46F7-B733-07014170C1D6}']
/// <summary>通过GET投寄一个Server Sent Event 请求</summary>
/// <param name="AUrl">请求的URL</param>
/// <param name="AEventCallback">通知回调函数</param>
/// <param name="AfterDone">事件通知结束回调函数</param>
/// <param name="AHeaders">附加的HTTP请求头,方便传 token 等信息</param>
/// <remark>所有的请求都是异步执行的,请保证回调关联的对象是有效的。关联的对象释放前,请调用 Cancel或 WaitForComplete 保证没有未执行完成的请求</remark>
procedure Get(const AUrl: String; const AEventCallback: TZSSEMessageEvent; const AfterDone: TZSSENotifyEvent;
const AHeaders: TNetHeaders = []); overload;
/// <summary>通过POST投寄一个Server Sent Event 请求</summary>
/// <param name="AUrl">请求的URL</param>
/// <param name="ARequestStream">POST内容数据流</param>
/// <param name="AEventCallback">通知回调函数</param>
/// <param name="AfterDone">事件通知结束回调函数</param>
/// <param name="AHeaders">附加的HTTP请求头,方便传 token 等信息</param>
/// <param name="AStreamOwner">是否由SSE会话来管理流的释放,默认为 false,由用户在 afterDone中释放</param>
/// <remark>所有的请求都是异步执行的,请保证回调关联的对象是有效的。关联的对象释放前,请调用 Cancel或 WaitForComplete 保证没有未执行完成的请求</remark>
procedure Post(const AUrl: String; ARequestStream: TStream; const AEventCallback: TZSSEMessageEvent;
const AfterDone: TZSSENotifyEvent; const AHeaders: TNetHeaders = []; AStreamOwner: Boolean = false); overload;
/// <summary>通过GET投寄一个Server Sent Event 请求</summary>
/// <param name="AUrl">请求的URL</param>
/// <param name="AText">请求内容文本</param>
/// <param name="AEventCallback">通知回调函数</param>
/// <param name="AfterDone">事件通知结束回调函数</param>
/// <param name="AHeaders">附加的HTTP请求头,方便传 token 等信息</param>
/// <remark>所有的请求都是异步执行的,请保证回调关联的对象是有效的。关联的对象释放前,请调用 Cancel或 WaitForComplete 保证没有未执行完成的请求</remark>
procedure Post(const AUrl: String; AText: String; const AEventCallback: TZSSEMessageEvent;
const AfterDone: TZSSENotifyEvent; const AHeaders: TNetHeaders = []); overload;
/// <summary>取消正在执行的SSE请求</summary>
/// <returns>成功,返回true,失败,返回false</returns>
function Cancel: Boolean;
/// <summary>取消正在执行的SSE请求</summary>
/// <param name="ATimeout">等待超时,默认一直等待</param>
/// <returns>成功,返回true,失败,返回false</returns>
/// <remark>注意如果等待超时,不会触发AfterDone 回调</remark>
procedure WaitForComplete(ATimeout: Cardinal = INFINITE);
/// <summary>获取派发的通知数量,属性 EventCount 的读取函数</summary>
function GetEventCount: Integer;
/// <summary>派发的通知数量</summary>
property EventCount: Integer read GetEventCount;
end;
TZSSESession = class(TInterfacedObject, IZSSESession)
private
FRequest: TStream;
FResponse: TBytesStream;
FHttpClient: THttpClient;
FTimer: TTimer;
FHeaders: TNetHeaders;
FAsync: IAsyncResult;
FHttpResp: IHttpResponse;
FLastItem: TZSSEItem;
FUrl: String;
FLastPos: Int64;
FEventCount: Integer;
FRequestStreamOwner, FIsPost, FIsEventStream: Boolean;
FEncoding: TEncoding;
FOnMessage: TZSSEMessageEvent;
FAfterDone: TZSSENotifyEvent;
procedure Prepare(const AUrl: String; ARequestStream: TStream; AStreamOwner: Boolean;
const AEventCallback: TZSSEMessageEvent; const AfterDone: TZSSENotifyEvent; const AHeaders: TNetHeaders);
procedure NeedTimer(AInterval: Cardinal);
procedure DoTimer(Sender: TObject);
procedure DoAfterDone;
procedure DoRetry;
function DoEvent: Boolean;
procedure DoRecvData(const Sender: TObject; AContentLength: Int64; AReadCount: Int64; var AAbort: Boolean);
protected
procedure Get(const AUrl: String; const AEventCallback: TZSSEMessageEvent; const AfterDone: TZSSENotifyEvent;
const AHeaders: TNetHeaders = []); overload;
procedure Post(const AUrl: String; ARequestStream: TStream; const AEventCallback: TZSSEMessageEvent;
const AfterDone: TZSSENotifyEvent; const AHeaders: TNetHeaders = []; AStreamOwner: Boolean = false); overload;
procedure Post(const AUrl: String; AText: String; const AEventCallback: TZSSEMessageEvent;
const AfterDone: TZSSENotifyEvent; const AHeaders: TNetHeaders = []); overload;
function Cancel: Boolean;
procedure WaitForComplete(ATimeout: Cardinal = INFINITE);
function GetEventCount: Integer;
procedure StopTimer;
constructor Create;
destructor Destroy; override;
public
class function SSEGet(const AUrl: String; const AEventCallback: TZSSEMessageEvent;
const AfterDone: TZSSENotifyEvent; const AHeaders: TNetHeaders = []): IZSSESession; overload; static;
class function SSEPost(const AUrl: String; ARequestStream: TStream; const AEventCallback: TZSSEMessageEvent;
const AfterDone: TZSSENotifyEvent; const AHeaders: TNetHeaders = []; AStreamOwner: Boolean = false): IZSSESession;
overload; static;
class function SSEPost(const AUrl: String; AText: String; const AEventCallback: TZSSEMessageEvent;
const AfterDone: TZSSENotifyEvent; const AHeaders: TNetHeaders = []): IZSSESession; overload; static;
end;
使用封装后的代码,我们调用时,只需要如下代码即可:
FSession := TZSSESession.SSEPost(edtUrl.Text, String('{}'), TZSSEMessageEvent(
procedure(const AData: TZSSEItem; var AContinue: Boolean)
begin
TThread.Queue(nil,
procedure
begin
Memo1.Lines.Add('EventId:' + AData.EventId + ',EventName:' + AData.EventName + ',Retry:' +
AData.Retry.ToString + ',Data:');
Memo1.Lines.Add(' ' + AData.Data);
end);
end), TZSSENotifyEvent(
procedure
begin
Memo1.Lines.Add(FormatDateTime('hh:nn:ss.zzz', Now) + ':结束请求');
FSession := nil;
end), [TNetHeader.Create('Content-Type', 'application/json;charset=utf8')]);
如果是 GET 请求,替换为 SSEGet 函数即可:
FSession := TZSSESession.SSEGet(edtUrl.Text, TZSSEMessageEvent(
procedure(const AData: TZSSEItem; var AContinue: Boolean)
begin
TThread.Queue(nil,
procedure
begin
Memo1.Lines.Add('EventId:' + AData.EventId + ',EventName:' + AData.EventName + ',Retry:' +
AData.Retry.ToString + ',Data:');
Memo1.Lines.Add(' ' + AData.Data);
end);
end), TZSSENotifyEvent(
procedure
begin
Memo1.Lines.Add(FormatDateTime('hh:nn:ss.zzz', Now) + ':结束请求');
FSession := nil;
end));
在上面的示例代码中,我们访问了 Memo1 这个组件,所以在 Form 的 OnDestroy 中,我们需要保证它执行完成,如果未完成,我们中止它。
procedure TfrmSSE.FormDestroy(Sender: TObject);
begin
if Assigned(FSession) then
FSession.Cancel;
end;
