[教程]为 THttpClient 增加 Server-Sent Event(SSE) 支持

服务器端要推送数据给客户端,有多种不同的方式,SSE 是其中一种实现简单易用的方式,但 Delphi 并没有内置的支持。实际上要实现这一点,并不是很复杂,我们可以直接基于 THttpClient 来实现,这里我们给出一个简单的教程。

我做一个完整的封装,有需要的朋友可以联系购买,价格人民币 99.00 元,包含了完整的 SSE 协议支持。

1. 定义一个支持 SSE 的数据流对象,方便实时接收服务器端推送的数据。因为是示例,我们简化下实现:

type
  TSSEStream=class(TBytesStream)
  public
    function Write(const Buffer; Count: Longint): Longint; override;
  end;

2. 使用 THttpClient 发送请求:

procedure TfrmSSE.Button1Click(Sender: TObject);
var
  AHttpClient:THttpClient;
  ASource,AResp:TStream;
begin
  AHttpClient:=THttpClient.Create;
  ASource:=TStringStream.Create('{}');
  AResp:=TSSEStream.Create;
  Memo1.Lines.Add(FormatDateTime('hh:nn:ss.zzz',Now)+':开始请求');
  var Async:=AHttpCLient.BeginPost('http://100.64.0.12/test',ASource,AResp);
  while not Async.IsCompleted do
    Application.ProcessMessages;
  Memo1.Lines.Add(FormatDateTime('hh:nn:ss.zzz',Now)+':结束请求');
  FreeAndNil(ASource);
  FreeAndNil(AResp);
  FreeAndNil(AHttpClient);
end;

3. 实现 TSSEStream.Write 方法,我们来处理收到的数据库:

{ TSSEStream }

function TSSEStream.Write(const Buffer; Count: Longint): Longint;
var
  APos:Int64;
begin
  APos:=Position;
  inherited;
  var S:=TEncoding.ANSI.GetString(Bytes,APos,Count);
  var AOffset:Integer:=5;
  var AJson:=TJsonObject.ParseJSONFragment(S,AOffset,[]);
  TThread.ForceQueue(nil,
    procedure
    begin
      TfrmSSE(Application.MainForm).Memo1.Lines.Add(FormatDateTime('hh:nn:ss.zzz',Now)+':'+AJson.ToJSON([]));
    end
  );
end;

实际运行效果:

付费版提供一个接口,供用户使用直接使用:

IZSSESession = interface
    ['{D5D7CCAA-EB76-46F7-B733-07014170C1D6}']
    /// <summary>通过GET投寄一个Server Sent Event 请求</summary>
    /// <param name="AUrl">请求的URL</param>
    /// <param name="AEventCallback">通知回调函数</param>
    /// <param name="AfterDone">事件通知结束回调函数</param>
    /// <param name="AHeaders">附加的HTTP请求头,方便传 token 等信息</param>
    /// <remark>所有的请求都是异步执行的,请保证回调关联的对象是有效的。关联的对象释放前,请调用 Cancel或 WaitForComplete 保证没有未执行完成的请求</remark>
    procedure Get(const AUrl: String; const AEventCallback: TZSSEMessageEvent; const AfterDone: TZSSENotifyEvent;
      const AHeaders: TNetHeaders = []); overload;

    /// <summary>通过POST投寄一个Server Sent Event 请求</summary>
    /// <param name="AUrl">请求的URL</param>
    /// <param name="ARequestStream">POST内容数据流</param>
    /// <param name="AEventCallback">通知回调函数</param>
    /// <param name="AfterDone">事件通知结束回调函数</param>
    /// <param name="AHeaders">附加的HTTP请求头,方便传 token 等信息</param>
    /// <param name="AStreamOwner">是否由SSE会话来管理流的释放,默认为 false,由用户在 afterDone中释放</param>
    /// <remark>所有的请求都是异步执行的,请保证回调关联的对象是有效的。关联的对象释放前,请调用 Cancel或 WaitForComplete 保证没有未执行完成的请求</remark>

    procedure Post(const AUrl: String; ARequestStream: TStream; const AEventCallback: TZSSEMessageEvent;
      const AfterDone: TZSSENotifyEvent; const AHeaders: TNetHeaders = []; AStreamOwner: Boolean = false); overload;

    /// <summary>通过GET投寄一个Server Sent Event 请求</summary>
    /// <param name="AUrl">请求的URL</param>
    /// <param name="AText">请求内容文本</param>
    /// <param name="AEventCallback">通知回调函数</param>
    /// <param name="AfterDone">事件通知结束回调函数</param>
    /// <param name="AHeaders">附加的HTTP请求头,方便传 token 等信息</param>
    /// <remark>所有的请求都是异步执行的,请保证回调关联的对象是有效的。关联的对象释放前,请调用 Cancel或 WaitForComplete 保证没有未执行完成的请求</remark>

    procedure Post(const AUrl: String; AText: String; const AEventCallback: TZSSEMessageEvent;
      const AfterDone: TZSSENotifyEvent; const AHeaders: TNetHeaders = []); overload;
    /// <summary>取消正在执行的SSE请求</summary>
    /// <returns>成功,返回true,失败,返回false</returns>
    function Cancel: Boolean;
    /// <summary>取消正在执行的SSE请求</summary>
    /// <param name="ATimeout">等待超时,默认一直等待</param>
    /// <returns>成功,返回true,失败,返回false</returns>
    /// <remark>注意如果等待超时,不会触发AfterDone 回调</remark>
    procedure WaitForComplete(ATimeout: Cardinal = INFINITE);
    /// <summary>获取派发的通知数量,属性 EventCount 的读取函数</summary>
    function GetEventCount: Integer;
    /// <summary>派发的通知数量</summary>
    property EventCount: Integer read GetEventCount;
  end;

  TZSSESession = class(TInterfacedObject, IZSSESession)
  private
    FRequest: TStream;
    FResponse: TBytesStream;
    FHttpClient: THttpClient;
    FTimer: TTimer;
    FHeaders: TNetHeaders;
    FAsync: IAsyncResult;
    FHttpResp: IHttpResponse;
    FLastItem: TZSSEItem;
    FUrl: String;
    FLastPos: Int64;
    FEventCount: Integer;
    FRequestStreamOwner, FIsPost, FIsEventStream: Boolean;
    FEncoding: TEncoding;
    FOnMessage: TZSSEMessageEvent;
    FAfterDone: TZSSENotifyEvent;
    procedure Prepare(const AUrl: String; ARequestStream: TStream; AStreamOwner: Boolean;
      const AEventCallback: TZSSEMessageEvent; const AfterDone: TZSSENotifyEvent; const AHeaders: TNetHeaders);
    procedure NeedTimer(AInterval: Cardinal);
    procedure DoTimer(Sender: TObject);
    procedure DoAfterDone;
    procedure DoRetry;
    function DoEvent: Boolean;
    procedure DoRecvData(const Sender: TObject; AContentLength: Int64; AReadCount: Int64; var AAbort: Boolean);
  protected
    procedure Get(const AUrl: String; const AEventCallback: TZSSEMessageEvent; const AfterDone: TZSSENotifyEvent;
      const AHeaders: TNetHeaders = []); overload;
    procedure Post(const AUrl: String; ARequestStream: TStream; const AEventCallback: TZSSEMessageEvent;
      const AfterDone: TZSSENotifyEvent; const AHeaders: TNetHeaders = []; AStreamOwner: Boolean = false); overload;
    procedure Post(const AUrl: String; AText: String; const AEventCallback: TZSSEMessageEvent;
      const AfterDone: TZSSENotifyEvent; const AHeaders: TNetHeaders = []); overload;
    function Cancel: Boolean;
    procedure WaitForComplete(ATimeout: Cardinal = INFINITE);
    function GetEventCount: Integer;
    procedure StopTimer;
    constructor Create;
    destructor Destroy; override;

  public
    class function SSEGet(const AUrl: String; const AEventCallback: TZSSEMessageEvent;
      const AfterDone: TZSSENotifyEvent; const AHeaders: TNetHeaders = []): IZSSESession; overload; static;
    class function SSEPost(const AUrl: String; ARequestStream: TStream; const AEventCallback: TZSSEMessageEvent;
      const AfterDone: TZSSENotifyEvent; const AHeaders: TNetHeaders = []; AStreamOwner: Boolean = false): IZSSESession;
      overload; static;
    class function SSEPost(const AUrl: String; AText: String; const AEventCallback: TZSSEMessageEvent;
      const AfterDone: TZSSENotifyEvent; const AHeaders: TNetHeaders = []): IZSSESession; overload; static;
  end;

使用封装后的代码,我们调用时,只需要如下代码即可:

FSession := TZSSESession.SSEPost(edtUrl.Text, String('{}'), TZSSEMessageEvent(
    procedure(const AData: TZSSEItem; var AContinue: Boolean)
    begin
      TThread.Queue(nil,
        procedure
        begin
          Memo1.Lines.Add('EventId:' + AData.EventId + ',EventName:' + AData.EventName + ',Retry:' +
            AData.Retry.ToString + ',Data:');
          Memo1.Lines.Add('    ' + AData.Data);
        end);
    end), TZSSENotifyEvent(
    procedure
    begin
      Memo1.Lines.Add(FormatDateTime('hh:nn:ss.zzz', Now) + ':结束请求');
      FSession := nil;
    end), [TNetHeader.Create('Content-Type', 'application/json;charset=utf8')]);

如果是 GET 请求,替换为 SSEGet 函数即可:

FSession := TZSSESession.SSEGet(edtUrl.Text, TZSSEMessageEvent(
    procedure(const AData: TZSSEItem; var AContinue: Boolean)
    begin
      TThread.Queue(nil,
        procedure
        begin
          Memo1.Lines.Add('EventId:' + AData.EventId + ',EventName:' + AData.EventName + ',Retry:' +
            AData.Retry.ToString + ',Data:');
          Memo1.Lines.Add('    ' + AData.Data);
        end);
    end), TZSSENotifyEvent(
    procedure
    begin
      Memo1.Lines.Add(FormatDateTime('hh:nn:ss.zzz', Now) + ':结束请求');
      FSession := nil;
    end));

在上面的示例代码中,我们访问了 Memo1 这个组件,所以在 Form 的 OnDestroy 中,我们需要保证它执行完成,如果未完成,我们中止它。

procedure TfrmSSE.FormDestroy(Sender: TObject);
begin
  if Assigned(FSession) then
    FSession.Cancel;
end;

滚动至顶部