Salesforce ストリーミング API:リアルタイムなイベント連携への詳細ガイド

背景と応用シナリオ

Salesforce 統合エンジニア (Salesforce Integration Engineer)として、私は日々、Salesforce と外部システム間のシームレスなデータ連携という課題に取り組んでいます。従来の連携方式の多くは、定期的に Salesforce に問い合わせを行う「ポーリング」に依存していました。しかし、このアプローチには、API コール数の消費、リアルタイム性の欠如、そしてシステムへの不必要な負荷という、看過できない欠点が存在します。

例えば、EC サイトで注文が確定した際に、その情報をリアルタイムで Salesforce の「商談」レコードに反映させたいとします。ポーリング方式では、数分おきに「新しい注文はないか?」と確認し続ける必要があり、タイムラグが発生するだけでなく、変更がない場合でも API コールを無駄に消費してしまいます。これは非効率的であり、スケーラブルなソリューションとは言えません。

ここで登場するのが、Salesforce Streaming API です。これは、Salesforce プラットフォーム上で発生したイベントを、リアルタイムで外部クライアントにプッシュ通知するための強力なソリューションです。この API は、publish-subscribe (Pub/Sub) モデル (発行/購読モデル) に基づいており、データが変更された瞬間に Salesforce が能動的に通知を送信するため、クライアントは常に最新の状態を維持できます。これは、私のような統合エンジニアにとって、イベント駆動型アーキテクチャ (Event-Driven Architecture) を構築するための根幹をなす技術です。

具体的な応用シナリオとしては、以下のようなものが考えられます。

  • 外部システムの UI のリアルタイム更新: Salesforce 上で取引先のステータスが更新された瞬間に、外部の顧客管理ダッシュボードにその変更を即座に反映させる。
  • 異種システム間のワークフロー連携: Salesforce で商談が「成立」になったことをトリガーに、外部の ERP システムに対して在庫引当や請求書作成のプロセスを自動的にキックする。
  • リアルタイムデータ同期: Salesforce をマスタデータとし、変更があったレコード情報(どの項目がどう変わったかなど)を外部のデータウェアハウス (DWH) に遅延なく連携し、データの整合性を維持する。
  • IoT デバイスとの連携: 現場の IoT デバイスから送信されたデータを Platform Events として Salesforce に取り込み、特定の閾値を超えた場合に Streaming API を通じて監視システムにアラートを送信する。

このように Streaming API は、ポーリングの限界を克服し、効率的で応答性の高い、モダンな統合ソリューションを実現するための鍵となります。


原理説明

Salesforce Streaming API の中核をなすのは、Bayeux プロトコルと、その実装である CometD というメッセージングライブラリです。これは、サーバからのプッシュ通知をシミュレートするための技術であり、具体的にはロングポーリング (Long Polling) という手法を用いています。

通常のポーリング(ショートポーリング)では、クライアントがリクエストを送信すると、サーバはすぐにレスポンスを返します。イベントがなければ空のレスポンスが返り、クライアントはまたすぐに次のリクエストを送信します。一方、ロングポーリングでは、クライアントがリクエストを送信すると、サーバはイベントが発生するまでその接続を保持します。イベントが発生した時点で初めてレスポンスを返し、接続を閉じます。クライアントはレスポンスを受け取ると、すぐに次のリクエストを送信して、再び待機状態に入ります。この仕組みにより、サーバからのプッシュ通知を擬似的に実現し、リアルタイムに近い通信を低遅延かつ効率的に行うことができます。

統合エンジニアとして理解すべき最も重要な点は、Streaming API が提供する4種類のイベントタイプです。それぞれに特徴があり、用途に応じて最適なものを選択する必要があります。

PushTopic Events

特定の SOQL クエリ条件に一致するレコードの作成、更新、削除、復元をトリガーとしてイベントを通知します。設定が比較的容易で、既存のオブジェクトに対する変更を監視するシンプルなユースケースに適しています。例えば、「年間売上が 1 億円以上の取引先が作成または更新された場合」といった条件で通知を飛ばすことができます。

Platform Events (プラットフォームイベント)

開発者が自由にスキーマ(ペイロードの構造)を定義できる、カスタムイベントです。これは非常に強力で、特定のデータモデルに縛られません。発行者と購読者が疎結合になるため、複雑なビジネスプロセスやシステム間連携に最適です。例えば、「新規顧客登録プロセス完了」や「重要インシデント発生」といった、レコードの CRUD 操作とは直接関係のないビジネスイベントを定義して通知できます。

Change Data Capture (CDC) (変更データキャプチャ)

特定の Salesforce オブジェクトに対するすべての変更(作成、更新、削除、復元)を、詳細な情報とともに通知します。どの項目が、どの値からどの値に変わったか(例: "oldValue": "Prospecting", "newValue": "Qualification")といった差分情報が含まれるため、Salesforce と外部データベースのデータレプリケーション(複製)シナリオに非常に適しています。

Generic Events

任意の文字列メッセージを通知するための、最もシンプルなイベントタイプです。特定のデータ構造を持たないため、現在はあまり使用されず、より構造化された Platform Events の利用が推奨されています。

これらのイベントタイプの中から、連携の要件(リアルタイム性、ペイロードの複雑さ、データ差分の必要性など)を考慮して、最適なものを選択することが、統合設計の第一歩となります。


示例代码

ここでは、最も基本的なユースケースである PushTopic イベントを、クライアントサイドの JavaScript を使って購読する方法を見ていきます。この例では、取引先 (Account) オブジェクトで特定の条件を満たすレコードが作成または更新された場合に通知を受け取ります。このコードは、Visualforce ページや Lightning Web Component 内、または外部の Web アプリケーションで利用できます。

まず、Salesforce 上で以下のような PushTopic を作成しておく必要があります。

// Apexを使ってPushTopicを作成する例
PushTopic pushTopic = new PushTopic();
pushTopic.Name = 'AccountChanges';
pushTopic.Query = 'SELECT Id, Name, Industry FROM Account WHERE AnnualRevenue > 500000';
pushTopic.ApiVersion = 58.0;
pushTopic.NotifyForOperationCreate = true;
pushTopic.NotifyForOperationUpdate = true;
pushTopic.NotifyForFields = 'Referenced'; // クエリ内の全項目を通知
insert pushTopic;

次に、この PushTopic を購読するクライアント側の JavaScript コードです。このコードは、Salesforce の静的リソースとしてアップロードされた CometD ライブラリを利用することを前提としています。

<!-- このコードはVisualforceページ内で実行されることを想定しています -->
<apex:page>
    <!-- Salesforceが提供するCometDライブラリを読み込む -->
    <apex:includeScript value="/js/cometd/cometd.js"/>
    <apex:includeScript value="/js/cometd/json2.js"/>
    <apex:includeScript value="/js/cometd/jquery.js"/>
    <apex:includeScript value="/js/cometd/jquery.cometd.js"/>

    <script type="text/javascript">
        (function($){
            $(document).ready(function() {
                // CometDクライアントを設定する
                // 1. CometDの初期化と設定
                $.cometd.configure({
                    url: window.location.protocol + '//' + window.location.hostname + '/cometd/58.0/',
                    requestHeaders: { Authorization: 'OAuth {!$Api.Session_ID}'} // SalesforceセッションIDで認証
                });

                // 2. Salesforceサーバとのハンドシェイクを実行
                $.cometd.handshake(function(handshakeReply) {
                    if (handshakeReply.successful) {
                        console.log('CometD handshake successful.');
                        // 3. ハンドシェイクが成功したら、PushTopicを購読する
                        // チャンネル名は '/topic/[PushTopic名]' となる
                        var subscription = $.cometd.subscribe('/topic/AccountChanges', function(message) {
                            // 4. メッセージ(イベント通知)を受信した際の処理
                            console.log('Received message:', message);

                            // 受信したペイロードからデータを抽出
                            var data = message.data;
                            var sobject = data.sobject;
                            var eventType = data.event.type; // 'created' or 'updated'

                            // 画面に通知内容を表示する(例)
                            $('#notifications').append(
                                '<p>Event: ' + eventType + ' | Account Name: ' + sobject.Name + ' (Id: ' + sobject.Id + ')</p>'
                            );
                        });
                        console.log('Subscription to /topic/AccountChanges successful.');
                    } else {
                        console.error('CometD handshake failed: ', handshakeReply);
                    }
                });
            });
        })(jQuery);
    </script>

    <h1>Real-Time Account Notifications</h1>
    <div id="notifications">
        <!-- ここに通知が表示されます -->
    </div>

</apex:page>

上記のコードは、以下のステップで動作します。

  1. ライブラリの読み込み: CometD を動作させるために必要な JavaScript ライブラリを読み込みます。
  2. CometD の設定: CometD の接続先 URL (Salesforce の CometD エンドポイント) と、認証情報 (ここでは Visualforce コンテキストで取得できるセッション ID) を設定します。
  3. ハンドシェイク: クライアントとサーバ間で接続を確立するための「ハンドシェイク」を行います。これが成功すると、購読が可能になります。
  4. 購読 (Subscribe): `$.cometd.subscribe` メソッドを使い、目的の PushTopic チャンネル (`/topic/AccountChanges`) を購読します。コールバック関数を登録し、メッセージがプッシュされてきた際の処理を定義します。
  5. メッセージ処理: 通知が届くとコールバック関数が実行され、引数の `message` オブジェクトにイベントの詳細なペイロードが含まれています。ここから必要な情報を取り出して、画面の更新などの後続処理を行います。

注意事項

Streaming API を使った統合ソリューションを設計・実装する際には、以下の点に注意する必要があります。

権限 (Permissions)

Streaming API を利用するユーザには、適切な権限が必要です。PushTopic の場合、そのユーザは PushTopic オブジェクトへの参照アクセス権と、PushTopic のクエリで指定されているオブジェクトおよび項目への参照アクセス権が必要です。Platform Events や CDC の場合も、対応するイベントオブジェクトへの権限が付与されている必要があります。

API 制限 (API Limits)

Salesforce にはガバナ制限が存在し、Streaming API も例外ではありません。特に注意すべきは以下の制限です。

  • イベント配信上限: 24 時間以内に配信できるイベントの総数には、組織のエディションごとに上限があります(例: Enterprise Edition では 50,000 件)。これを超えると、それ以降のイベントは配信されません。
  • 同時接続クライアント数: 同時に接続できる CometD クライアントの数にも上限があります。多数のクライアントが接続するシステムを設計する場合は、この制限を考慮する必要があります。
  • 購読数上限: 1 つのクライアントが同時に購読できるチャンネル数にも制限があります。

これらの制限は、組織の「組織情報」ページで確認できます。大規模な連携を実装する前には、必ずこれらの上限値を確認し、必要に応じてアドオンライセンス(High-Volume Platform Events など)の購入を検討すべきです。

エラー処理と再接続ロジック

ネットワークは常に安定しているとは限りません。クライアントとサーバ間の接続が予期せず切断される可能性があります。そのため、クライアント側には堅牢なエラー処理と再接続ロジックの実装が不可欠です。CometD ライブラリには、接続断を検知して自動的に再ハンドシェイクを試みる機能が備わっていますが、これを適切に設定し、監視することが重要です。

イベントの永続性とリプレイ

クライアントがオフラインの間に発生したイベントはどうなるのでしょうか? Streaming API のイベントは、一定期間(PushTopic/CDC は 24 時間、Platform Events は設定により最大 72 時間)Salesforce のイベントバスに保持されます。各イベントには、一意で昇順の ReplayID (リプレイ ID) が付与されています。クライアントは、最後に正常に受信したイベントの ReplayID を保持しておくことで、再接続時にその ID を指定して、オフライン中に見逃したイベントをすべて取得(リプレイ)することができます。このリプレイ機能は、データの欠損を防ぎ、統合の信頼性を担保する上で極めて重要な機能です。


まとめとベストプラクティス

Salesforce Streaming API は、従来のポーリング方式の課題を解決し、リアルタイムでイベント駆動型の統合を実現するための強力なツールです。統合エンジニアとして、この技術を使いこなすことで、より効率的で、スケーラブルで、応答性の高いシステムを構築できます。

最後に、Streaming API を活用するためのベストプラクティスをまとめます。

  • 適切なイベントタイプを選択する:
    • PushTopic: SOQL ベースのシンプルなレコード変更通知に。
    • Platform Events: カスタムペイロードが必要な、疎結合なプロセス連携に。
    • Change Data Capture: データレプリケーションや監査目的での詳細な変更履歴の追跡に。
  • 堅牢なクライアントを実装する: ネットワークの瞬断やサーバのメンテナンスを想定し、必ず自動再接続とエラーハンドリングのロジックを組み込みます。
  • ReplayID を活用してメッセージの損失を防ぐ: クライアント側で最後に処理した ReplayID を永続化し、再接続時にそれ以降のイベントを取得するように実装することで、データの完全性を保証します。
  • API 制限を常に意識する: 設計段階で、予想されるイベント量とクライアント数がガバナ制限内に収まることを確認します。必要であれば、イベントの発行量を制御する、またはアドオンを検討します。
  • セキュリティを確保する: 認証には OAuth などのセキュアなプロトコルを使用し、接続ユーザには最小権限の原則を適用します。

これらの原理とベストプラクティスを理解し、適用することで、Salesforce Streaming API は貴社のデジタルトランスフォーメーションを加速させる、信頼性の高い連携基盤となるでしょう。

コメント