Options
All
  • Public
  • Public/Protected
  • All
Menu

Class StreamingClient

Api wrapper to support Salesforce streaming. The client contains an internal implementation of a cometd specification.

Salesforce client and timeout information

Streaming API imposes two timeouts, as supported in the Bayeux protocol.

Socket timeout: 110 seconds A client receives events (JSON-formatted HTTP responses) while it waits on a connection. If no events are generated and the client is still waiting, the connection times out after 110 seconds and the server closes the connection. Clients should reconnect before two minutes to avoid the connection timeout.

Reconnect timeout: 40 seconds After receiving the events, a client needs to reconnect to receive the next set of events. If the reconnection doesn't happen within 40 seconds, the server expires the subscription and the connection is closed. If this happens, the client must start again and handshake, subscribe, and connect. Each Streaming API client logs into an instance and maintains a session. When the client handshakes, connects, or subscribes, the session timeout is restarted. A client session times out if the client doesn’t reconnect to the server within 40 seconds after receiving a response (an event, subscribe result, and so on).

Note that these timeouts apply to the Streaming API client session and not the Salesforce authentication session. If the client session times out, the authentication session remains active until the organization-specific timeout policy goes into effect.

const streamProcessor = (message: JsonMap): StatusResult => {
   const payload = ensureJsonMap(message.payload);
   const id = ensureString(payload.id);

    if (payload.status !== 'Active') {
      return  { completed: false };
    }

    return {
        completed: true,
        payload: id
    };
  };

const org = await Org.create({});
const options = new StreamingClient.DefaultOptions(org, 'MyPushTopics', streamProcessor);

const asyncStatusClient = await StreamingClient.create(options);

await asyncStatusClient.handshake();

const info: RequestInfo = {
    method: 'POST',
    url: `${org.getField(OrgFields.INSTANCE_URL)}/SomeService`,
    headers: { HEADER: 'HEADER_VALUE'},
    body: 'My content'
};

await asyncStatusClient.subscribe(async () => {
   const connection = await org.getConnection();
   // Now that we are subscribed, we can initiate the request that will cause the events to start streaming.
   const requestResponse: JsonCollection = await connection.request(info);
   const id = ensureJsonMap(requestResponse).id;
   console.log(`this.id: ${JSON.stringify(ensureString(id), null, 4)}`);
});

Hierarchy

  • AsyncOptionalCreatable<Options>
    • StreamingClient

Index

Constructors

constructor

Methods

handshake

init

  • init(): Promise<void>

replay

  • replay(replayId: number): void
  • Allows replaying of of Streaming events starting with replayId.

    Parameters

    • replayId: number

      The starting message id to replay from.

    Returns void

subscribe

  • subscribe(streamInit?: undefined | function): Promise<AnyJson>
  • Subscribe to streaming events. When the streaming processor that's set in the options completes execution it returns a payload in the StatusResult object. The payload is just echoed here for convenience.

    Throws SfdxError{ name: 'StreamingClient.TimeoutErrorType.SUBSCRIBE'} When the subscribe timeout occurs.

    Parameters

    • Optional streamInit: undefined | function

      This function should call the platform apis that result in streaming updates on push topics. StatusResult

    Returns Promise<AnyJson>

Static create

  • create<P, T>(this: object, options?: P): Promise<T>
  • Asynchronously constructs and initializes a new instance of a concrete subclass with the optional options.

    Type parameters

    • P

    • T: AsyncOptionalCreatable<P>

    Parameters

    • this: object
    • Optional options: P

      An options object providing initialization params to the async constructor.

    Returns Promise<T>