Salesforce ストリーミング API をマスターする:リアルタイムイベント駆動型アプリケーション開発者ガイド


背景と適用シナリオ

Salesforce 開発者の皆さん、こんにちは!本日は、現代のアプリケーション開発において不可欠なリアルタイムデータ連携を実現する Salesforce Streaming API について、開発者の視点から深く掘り下げていきたいと思います。

従来のシステム連携では、クライアントが定期的にサーバーの状態を問い合わせる「ポーリング」方式が一般的でした。しかし、このアプローチは非効率です。データの変更がない場合でもリクエストを送信し続けるため、サーバーリソースと API コール数を無駄に消費します。特に、即時性が求められるアプリケーションにおいては、ポーリング間隔を短くする必要があり、その代償はさらに大きくなります。

ここで登場するのが、Streaming API です。これは、サーバーサイドプッシュ技術を利用して、Salesforce 内でデータが変更された際に、そのイベントをリアルタイムでクライアントに通知する仕組みです。この「Publish-Subscribe (Pub/Sub) モデル」により、クライアントは常にサーバーを監視する必要がなくなり、イベントが発生したときにのみデータを受け取ることができます。これにより、API の消費を大幅に削減し、よりスケーラブルで応答性の高いアプリケーションを構築することが可能になります。

主な適用シナリオ

  • リアルタイムダッシュボード: Lightning Web Component で構築されたダッシュボードが、ページの再読み込みなしに最新の商談状況やケースの進捗を即座に反映します。
  • 外部システムとのデータ同期: Salesforce で取引先情報が更新された瞬間に、その変更を ERP やマーケティングオートメーションツールに即時連携します。
  • カスタム通知機能: 特定の条件を満たすレコードが作成された際(例:高額な商談が成立した時)、Slack や Microsoft Teams、またはカスタムのモバイルアプリにプッシュ通知を送信します。
  • イベント駆動型マイクロサービス: Salesforce 内でのイベントをトリガーとして、外部のマイクロサービス(AWS Lambda など)を起動させ、複雑なビジネスプロセスを非同期で実行します。

Salesforce は、様々なユースケースに対応するため、以下の4種類のストリーミングイベントを提供しています。

  • PushTopic Events: SOQL クエリに基づいてレコードの作成、更新、削除、復元を通知します。最も古くからあるシンプルな仕組みです。
  • Change Data Capture (CDC): Salesforce レコードの項目変更を詳細に追跡し、変更前後の値を含むリッチなイベントを送信します。データレプリケーションに最適です。
  • Platform Events (プラットフォームイベント): 事前に定義したカスタムスキーマを持つイベントです。Salesforce 内部だけでなく、外部システムとの連携にも使える、柔軟でスケーラブルなイベント駆動型アーキテクチャの中核を担います。
  • Generic Events: 任意の文字列データを送信できる汎用的なイベントですが、現在は Platform Events の利用が推奨されています。

本記事では、特に柔軟性と拡張性に優れた Platform Events を中心に、その原理と実装方法を解説していきます。


原理説明

Streaming API の心臓部には、Bayeux (バイヨー) プロトコル とその実装である CometD という技術が使われています。これらは、HTTP を介してサーバーからのプッシュ通信をシミュレートするための標準的なアプローチです。

クライアントとサーバー間の通信は、一般的に Long Polling (ロングポーリング) という技術で行われます。通常のポーリングと異なり、クライアントがリクエストを送信すると、サーバーはイベントが発生するまでレスポンスを保留します。イベントが発生した時点でサーバーはレスポンスを返し、クライアントはすぐに新しいリクエストを送信して次のイベントを待ちます。これにより、常に接続が維持されているかのようなリアルタイム通信が実現されます。

開発者として理解すべきプロセスは以下の通りです。

  1. Handshake (ハンドシェイク): クライアントはまず Salesforce の CometD エンドポイント (/cometd/) に対して Handshake リクエストを送信し、セッションを確立します。サーバーは、サポートする接続タイプやクライアント ID などを返します。
  2. Connect (接続): Handshake が成功すると、クライアントは Connect リクエストを送信してサーバーからのイベント通知を待ち受け始めます。これが Long Polling の実体です。
  3. Subscribe (購読): クライアントは、特定のチャネル(例: /event/My_Event__e)を購読 (Subscribe) するリクエストを送信します。これにより、Salesforce はこのクライアントがどのイベントに関心があるかを認識します。
  4. Event Publishing (イベント発行): Apex、Flow、または API を介して Salesforce 内で Platform Event が発行されると、Bayeux サーバーはそのイベントを購読しているすべてのクライアントにプッシュ通知します。
  5. Unsubscribe/Disconnect (購読解除/切断): クライアントは不要になったチャネルの購読を解除 (Unsubscribe) したり、セッションを終了 (Disconnect) したりできます。

このアーキテクチャにより、クライアントはステートフルな接続を維持しつつ、効率的にイベントを受信することができます。特に Lightning Web Components (LWC) で開発する場合、Salesforce が提供する lightning/empApi モジュールがこれらの複雑なプロセスを抽象化してくれるため、開発者はイベントの購読とハンドリングに集中できます。


示例代码

ここでは、カスタムの Platform Event を定義し、Apex で発行し、LWC で購読するまでの一連の流れを、Salesforce 公式ドキュメントに基づいたコードで見ていきましょう。

1. Platform Event の定義

まず、[設定] > [インテグレーション] > [プラットフォームイベント] から、新しいプラットフォームイベントを定義します。

  • 表示ラベル: Cloud News
  • オブジェクト名: Cloud_News
  • API 参照名: Cloud_News__e
  • 公開動作: トランザクションのコミット後に公開

次に、以下のカスタム項目を追加します。

  • Location__c (テキスト, 255)
  • Urgent__c (チェックボックス)
  • News__c (ロングテキストエリア, 32768)

2. Apex による Platform Event の発行

以下の Apex コードは、Cloud_News__e イベントを作成し、EventBus.publish() メソッドを使ってイベントバスに発行します。このコードは、例えばトリガーやバッチクラス、または REST API から呼び出すことができます。

// イベントのリストを作成
List<Cloud_News__e> newsEvents = new List<Cloud_News__e>();

// 発行したいイベントインスタンスを生成し、項目に値を設定
newsEvents.add(new Cloud_News__e(
    Location__c='San Francisco, CA',
    Urgent__c=true,
    News__c='Dreamforce \'24 is approaching! Get ready for innovation.'
));

newsEvents.add(new Cloud_News__e(
    Location__c='Tokyo, JP',
    Urgent__c=false,
    News__c='New Trailhead modules for AI development are now available.'
));

// EventBus.publish メソッドを呼び出してイベントを発行
List<Database.SaveResult> results = EventBus.publish(newsEvents);

// 結果を反復処理して、成功したかどうかを確認
for (Database.SaveResult sr : results) {
    if (sr.isSuccess()) {
        System.debug('Successfully published event.');
    } else {
        for(Database.Error err : sr.getErrors()) {
            System.debug('Error returned: ' +
                        err.getStatusCode() +
                        ' - ' +
                        err.getMessage());
        }
    }
}

コードの注釈:

  • 1-2行目: 発行するイベントを格納するためのリストを初期化します。複数のイベントを一度に発行することがベストプラクティスです。
  • 5-14行目: new Cloud_News__e(...) のように、SObject と同様の構文でイベントインスタンスを作成し、定義したカスタム項目に値を設定します。
  • 17行目: EventBus.publish() がイベントを発行する中心的なメソッドです。引数にはイベントのリストを渡します。
  • 20-29行目: 発行処理は非同期で行われることがありますが、このメソッドは即座に結果を返します。Database.SaveResult をチェックすることで、発行が正常にキューに追加されたかを確認できます。

3. LWC による Platform Event の購読

次に、この Cloud_News__e イベントを購読し、受信したメッセージを表示する LWC を作成します。lightning/empApi モジュールが CometD 通信の複雑さをすべて隠蔽してくれます。

empApiLwc.js

import { LightningElement } from 'lwc';
// empApi モジュールから必要なメソッドをインポート
import { subscribe, unsubscribe, onError, setDebugFlag } from 'lightning/empApi';

export default class EmpApiLWC extends LightningElement {
    // 購読するチャネル名。プラットフォームイベントの API 参照名の前に /event/ を付けます。
    channelName = '/event/Cloud_News__e';
    isSubscribeDisabled = false;
    isUnsubscribeDisabled = true;

    subscription = {}; // 購読情報を保持するオブジェクト

    // コンポーネントが DOM に接続されたときに呼び出されるライフサイクルフック
    connectedCallback() {
        // エラーリスナーを登録
        this.registerErrorListener();
        // 購読処理を開始
        this.handleSubscribe();
    }

    // 購読ボタンのハンドラ
    handleSubscribe() {
        // subscribe メソッドを呼び出す
        // 第1引数: チャネル名
        // 第2引数: replayId (-1 は新しいイベントのみ受信)
        // 第3引数: イベント受信時のコールバック関数
        const messageCallback = (response) => {
            console.log('New message received: ', JSON.stringify(response));
            // ここで受信したイベントデータを使ってUIを更新する処理を実装
        };

        // channelName を購読し、コールバックを登録
        subscribe(this.channelName, -1, messageCallback).then(response => {
            // response には、チャネルと購読IDが含まれる
            console.log('Subscription request sent to: ', JSON.stringify(response.channel));
            this.subscription = response;
            this.toggleSubscriptionButtons(true);
        });
    }

    // 購読解除ボタンのハンドラ
    handleUnsubscribe() {
        this.toggleSubscriptionButtons(false);

        // 購読を解除
        unsubscribe(this.subscription, response => {
            console.log('unsubscribe() response: ', JSON.stringify(response));
        });
    }

    // ボタンの有効/無効を切り替えるヘルパーメソッド
    toggleSubscriptionButtons(isSubscribed) {
        this.isSubscribeDisabled = isSubscribed;
        this.isUnsubscribeDisabled = !isSubscribed;
    }

    // エラー発生時に呼び出されるリスナーを登録
    registerErrorListener() {
        onError(error => {
            console.log('Received error from server: ', JSON.stringify(error));
        });
    }
}

コードの注釈:

  • 3行目: lightning/empApi モジュールから、subscribe, unsubscribe, onError などの主要な関数をインポートします。
  • 8行目: 購読するチャネル名を指定します。Platform Event の場合、/event/<EventApiName> という形式になります。
  • 26行目: subscribe() 関数が購読処理の核です。第二引数の replayId は、どの時点からのイベントを受信するかを指定します。-1 は「購読を開始した後の新しいイベントのみ」を意味します。過去のイベントを再取得したい場合は、特定の replayId を指定します。
  • 28行目: イベントを受信した際に実行されるコールバック関数です。response オブジェクトには、イベントのペイロード (response.data.payload) が含まれています。
  • 46行目: unsubscribe() 関数で購読を解除します。コンポーネントが破棄される際には必ず呼び出すべきです。
  • 58行目: onError() でリスナーを登録しておくことで、接続エラーなどを検知し、再接続ロジックなどを実装できます。


注意事項

Streaming API を本番環境で利用する際には、以下の点に注意する必要があります。

権限 (Permissions)

  • Platform Event を購読または発行するには、ユーザのプロファイルまたは権限セットで、対象の Platform Event オブジェクトに対する「参照」権限(購読)および「作成」権限(発行)が必要です。
  • PushTopic を利用する場合は、PushTopic オブジェクトへの参照権限と、SOQL クエリで参照されるオブジェクトおよび項目へのアクセス権が必要です。
  • 接続には、API 接続用のユーザに「API の有効化」権限が必要です。

API 制限 (API Limits)

  • Streaming API には、24時間あたりのイベント配信数に上限があります。この上限は Salesforce のエディションによって異なり、[設定] > [組織情報] > [利用状況に基づくエンタイトルメント] で確認できます。上限を超えると、イベント配信が停止するため、監視が不可欠です。
  • 一度に発行できるイベントのペイロードサイズにも制限があります(通常 1MB)。大きなデータを扱う場合は、イベントに主要な情報のみを含め、詳細は別途 REST API などで取得する「Claim Check Pattern」の採用を検討してください。
  • 購読クライアントの数にも制限があるため、設計時には注意が必要です。

エラー処理と再接続 (Error Handling and Reconnection)

  • ネットワークの問題などでクライアントとサーバー間の接続が切断されることは常にあり得ます。クライアント側には、接続が切れたことを検知し、自動的に再接続(Handshake からやり直す)を試みるロジックを堅牢に実装する必要があります。
  • Platform Events や CDC のイベントには ReplayId という一意の ID が付与されます。クライアントは最後に正常に処理したイベントの ReplayId を保存しておくことで、再接続時にその ID を指定して、オフライン中に発生したイベントを漏れなく再取得(リプレイ)できます。イベントは通常24時間保持されます。

まとめとベストプラクティス

Salesforce Streaming API は、ポーリングに代わる効率的でスケーラブルなリアルタイムデータ連携ソリューションです。特に Platform Events を活用することで、Salesforce を中心とした柔軟なイベント駆動型アーキテクチャを構築できます。

ベストプラクティス

  1. 適切なイベントタイプを選択する:
    • PushTopic: 既存のレコードに対する単純な SOQL ベースの通知で十分な場合。
    • Change Data Capture (CDC): レコードのすべての項目変更を忠実に外部システムに複製したい場合。
    • Platform Events: カスタムデータ構造を持つイベントを定義し、システム間の疎結合な連携を実現したい場合。最も柔軟性が高い選択肢です。
  2. イベントペイロードを設計する: イベントには、後続の処理に必要な最小限のデータを含めるように設計します。ID や重要なステータスのみを通知し、詳細は受信側が必要に応じて Salesforce API をコールして取得する設計は、ペイロードサイズを抑え、制限に達しにくくします。
  3. クライアントの堅牢性を確保する: クライアント側の実装では、必ずエラーハンドリング、再接続ロジック、そして ReplayId を利用したイベントの再取得メカニズムを組み込み、メッセージの欠落を防ぎます。
  4. ガバナ制限を監視する: 定期的にイベント配信の使用状況を監視し、アーキテクチャが組織の制限内で持続可能であることを確認します。必要に応じて、Salesforce に上限緩和を依頼するか、アーキテクチャの見直しを検討します。

Streaming API を正しく理解し活用することで、これまで実現が難しかった応答性の高い、真にリアルタイムなアプリケーションを構築できます。ぜひ、皆さんのプロジェクトでもこの強力な機能を活用してみてください。

コメント