Salesforce ストリーミング API を活用したリアルタイム・イベント駆動型インテグレーションの実現

ご挨拶:Salesforce 統合エンジニアの視点から

こんにちは。Salesforce 統合エンジニアとして、日々様々なシステム間のデータ連携に取り組んでいます。従来のバッチ処理による定時同期も依然として有効な手段ですが、今日のビジネス環境では、顧客体験の向上や業務プロセスの迅速化のために、リアルタイムでのデータ反映が不可欠となっています。例えば、営業担当が商談を成立させたと同時に、バックエンドの基幹システム(ERP)に受注情報が即座に登録され、在庫引当や製造指示が開始される、といったシナリオです。

このような「イベント」を起点としたアーキテクチャを実現するために、Salesforce が提供する強力な機能が Streaming API (ストリーミングAPI) です。この記事では、統合エンジニアの視点から、Streaming API の基本原理、具体的な実装方法、そして本番環境で安定稼働させるための注意点までを、深く掘り下げて解説します。


Streaming API の基本原理とイベントの種類

Streaming API は、Publish-Subscribe (Pub/Sub) Model (パブリッシュ/サブスクライブモデル) に基づいています。これは、メッセージの送信者(Publisher)と受信者(Subscriber)が直接通信するのではなく、「チャネル」と呼ばれる中間的な口を介して通信するモデルです。Salesforce 内でデータの変更(イベント)が発生すると、Salesforce はそのイベントを特定のチャネルに発行(Publish)します。外部のクライアントアプリケーションは、そのチャネルを購読(Subscribe)しておくことで、イベントが発生した際にほぼリアルタイムで通知を受け取ることができます。

このアーキテクチャの最大の利点は、疎結合 (Loosely Coupled) なシステム連携が可能になる点です。Salesforce は「誰がイベントを受け取るか」を意識する必要がなく、クライアント側も Salesforce の内部実装を深く知る必要がありません。これにより、柔軟で拡張性の高いシステム統合が実現できます。

通信には、Bayeux Protocol (バイユープロトコル) というプロトコルと、その JavaScript 実装である CometD が利用されています。これは、クライアントとサーバ間で持続的な接続(Long Polling)を維持し、サーバ側でイベントが発生した際に即座にクライアントへプッシュ通知を行う技術です。

ストリーミングイベントの主要な4タイプ

Streaming API には、ユースケースに応じて選択できるいくつかのイベントタイプが存在します。統合設計において、どのイベントタイプを選択するかは非常に重要です。

1. PushTopic Events

最も古くから提供されているイベントタイプです。SOQL (Salesforce Object Query Language) クエリを事前に定義しておき、そのクエリに合致するレコードが作成または更新された際にイベントが発生します。特定の条件に合致した場合にのみ通知が欲しい、といったシンプルな要件に適しています。

2. Change Data Capture (CDC) (変更データキャプチャ)

よりモダンで強力なデータ同期のためのイベントタイプです。特定の Salesforce オブジェクトに対して有効化すると、そのオブジェクトのレコードに対するすべての変更(作成、更新、削除、復元)がイベントとして発行されます。変更前後の項目値など、豊富な情報がペイロードに含まれるため、外部データベースとのデータレプリケーション(複製)に最適です。

3. Platform Events (プラットフォームイベント)

完全にカスタムなイベントを定義できる、最も柔軟性の高いイベントタイプです。Salesforce の標準オブジェクトやカスタムオブジェクトの変更に縛られず、Apex やフローなどから任意のタイミングで、任意のデータ構造を持つイベントを発行できます。これにより、ビジネスプロセスに基づいたシステム間の連携(例:「承認プロセス完了」イベントを外部システムに通知)など、複雑なシナリオに対応できます。

4. Generic Events (ジェネリックイベント)

PushTopic と似ていますが、SOQL クエリではなく、特定の API を通じて任意のメッセージを発行できるイベントです。主に Salesforce 内部の機能で利用されることが多く、カスタムインテグレーションで選択されることは稀です。


実装サンプル:PushTopic を利用した取引先変更の購読

ここでは、最も基本的な PushTopic を例に、イベントの発行からクライアントでの受信までの一連の流れをコードで見ていきましょう。シナリオとして、「年間売上が 1,000,000 円以上の取引先が作成または更新された場合に、その情報をリアルタイムで検知する」というものを想定します。

ステップ1: PushTopic の作成 (Apex)

まず、通知の条件となる SOQL クエリを定義した PushTopic を作成します。これは Apex を使ってプログラム的に作成するのが確実です。

// 開発者コンソールなどから匿名実行します
PushTopic pushTopic = new PushTopic();
pushTopic.Name = 'HighRevenueAccounts'; // PushTopic の API 参照名
pushTopic.Query = 'SELECT Id, Name, AnnualRevenue FROM Account WHERE AnnualRevenue > 1000000';
pushTopic.ApiVersion = 58.0;
// NotifyForFields は、どの項目の変更を監視するかを指定します
// 'Referenced' は、Query で指定されたすべての項目を対象とします
pushTopic.NotifyForFields = 'Referenced'; 
// レコードのどの操作を通知対象とするか
pushTopic.NotifyForOperationCreate = true;
pushTopic.NotifyForOperationUpdate = true;
pushTopic.NotifyForOperationUndelete = false;
pushTopic.NotifyForOperationDelete = false;

insert pushTopic;

詳細な注釈:

  • Name: この PushTopic を一意に識別するための名前です。クライアントが購読するチャネル名(`/topic/HighRevenueAccounts`)の一部になります。
  • Query: イベントを発生させるトリガーとなる SOQL クエリです。このクエリの条件を満たすレコードの変更が通知対象となります。
  • ApiVersion: 使用する API のバージョンです。最新バージョンを指定することが推奨されます。
  • NotifyForFields: 通知をトリガーする項目の変更範囲を定義します。'All', 'Referenced' (クエリ内の項目), 'Select' (クエリ内の項目を選択), 'Where' (WHERE句の項目) から選択します。'Referenced' が一般的です。
  • NotifyForOperation...: 作成、更新、削除、復元といった操作のうち、どれを通知対象にするかをブール値で指定します。

ステップ2: クライアントの実装 (JavaScript と CometD)

次に、この PushTopic チャネルを購読し、イベントを受信するクライアントを JavaScript で実装します。ここでは、静的な HTML ページで CometD ライブラリを利用する例を示します。このコードは、Salesforce 公式ドキュメントに基づいています。

<!DOCTYPE html>
<html>
<head>
    <title>Streaming API Client Example</title>
    <script type="text/javascript" src="https://your.salesforce.instance/cometd/58.0/cometd.js"></script>
    <script type="text/javascript" src="https://your.salesforce.instance/cometd/58.0/jquery.cometd.js"></script>
    <script type="text/javascript" src="https://your.salesforce.instance/cometd/58.0/json2.js"></script>
    <script type="text/javascript" src="https://your.salesforce.instance/cometd/58.0/jquery-1.11.2.min.js"></script>
</head>
<body>
    <h2>Streaming API Event Log</h2>
    <div id="event-log"></div>

    <script type="text/javascript">
    (function($) {
        $(document).ready(function() {
            // CometD ライブラリの設定
            var cometd = new $.Cometd();

            // Salesforce への接続設定
            // 実際のアプリケーションでは、OAuth 2.0 などで動的にセッションIDを取得してください
            var sessionId = 'ここにセッションIDを挿入';
            var salesforceInstanceUrl = 'https://your.salesforce.instance';
            
            function cometdConnect() {
                cometd.configure({
                    url: salesforceInstanceUrl + '/cometd/58.0/',
                    requestHeaders: { Authorization: 'Bearer ' + sessionId },
                    appendMessageTypeToURL: false,
                    logLevel: 'debug'
                });

                // Salesforce へのハンドシェイク
                cometd.handshake(function(status) {
                    if (status.successful) {
                        console.log('Successfully connected to Salesforce Streaming API.');
                        // 接続成功後、PushTopic チャネルを購読
                        var channel = '/topic/HighRevenueAccounts';
                        cometd.subscribe(channel, function(message) {
                            // イベント受信時の処理
                            console.log('Received event:', message);
                            var logDiv = $('#event-log');
                            logDiv.append('<p>Event Received: ' + JSON.stringify(message.data.sobject) + '</p>');
                        });
                    } else {
                        console.error('Failed to connect: ', status);
                    }
                });
            }

            // 接続開始
            cometdConnect();
        });
    })(jQuery);
    </script>
</body>
</html>

詳細な注釈:

  • ライブラリの読み込み: CometD の動作に必要な JavaScript ライブラリを Salesforce のエンドポイントから直接読み込んでいます。`your.salesforce.instance` は、あなたの組織のドメイン(例: `MyDomainName.my.salesforce.com`)に置き換えてください。
  • セッションID: `sessionId` は、API 経由で Salesforce に接続するための認証情報です。この例ではハードコードしていますが、本番環境では絶対に行わないでください。OAuth 2.0 などのセキュアな認証フローで取得したアクセストークンを使用します。
  • cometd.configure(): CometD の設定です。接続先 URL と、認証情報を含むリクエストヘッダーを指定します。
  • cometd.handshake(): Salesforce の Streaming API サーバーとの接続を開始(ハンドシェイク)します。
  • cometd.subscribe(): ハンドシェイク成功後、購読したいチャネル(この場合は `/topic/HighRevenueAccounts`)を指定して購読を開始します。第二引数には、イベントを受信したときに実行されるコールバック関数を渡します。
  • コールバック関数: `message` 引数には、Salesforce から送信されたイベントデータが含まれます。`message.data.sobject` に、SOQL で指定した項目の値が入っています。


統合エンジニアが考慮すべき注意事項

Streaming API を利用した統合を設計・構築する際には、以下の点を必ず考慮に入れる必要があります。

権限 (Permissions)

Streaming API に接続するユーザーには、適切な権限が必要です。

  • プロファイルまたは権限セットで「API の有効化 (API Enabled)」権限が必要です。
  • PushTopic や CDC の場合、対象オブジェクトおよび SOQL クエリに含まれるすべての項目に対する参照権限が必要です。
  • Platform Events の場合、そのプラットフォームイベントオブジェクトに対する参照権限が必要です。

API 制限 (API Limits)

Salesforce の他の API と同様に、Streaming API にもガバナ制限が存在します。

  • イベント配信数の上限: 24時間あたりに配信できるイベント数には上限があります(エディションにより異なる)。大量のデータ変更が頻繁に発生するオブジェクトに CDC を使用する場合などは、この上限に注意が必要です。
  • 同時接続クライアント数: 1つの組織に同時に接続できるクライアントの数にも上限があります。
  • サブスクリプション数の上限: 1つのクライアントが同時に購読できるチャネルの数にも制限があります。
これらの制限値は Salesforce のリリースによって変更される可能性があるため、常に最新の公式ドキュメント「Salesforce Developer Limits and Allocations Quick Reference」を確認してください。

エラー処理と耐障害性 (Error Handling & Durability)

ネットワークの問題などでクライアントと Salesforce 間の接続が切断されることは十分にあり得ます。その間に発生したイベントを見逃してしまうと、データ不整合の原因となります。

この問題に対処するため、Streaming API は ReplayId (リプレイID) という仕組みを提供しています。全てのイベントには一意の `ReplayId` が付与されており、Salesforce は過去24時間分のイベントを保持しています。クライアントは、最後に正常に受信したイベントの `ReplayId` を保存しておき、再接続する際にその ID を指定することで、切断中に見逃したイベントをすべて再受信することができます。

堅牢な統合クライアントを構築するには、この ReplayId を永続化し、再接続ロジックに組み込むことが不可欠です。


まとめとベストプラクティス

Salesforce Streaming API は、外部システムとのリアルタイムなデータ連携を実現するための強力なソリューションです。従来のポーリング方式(定期的に Salesforce に問い合わせる方式)に比べ、API コール数を劇的に削減し、より即時性の高いアーキテクチャを構築できます。

統合エンジニアとして成功裏に導入するためのベストプラクティスを以下にまとめます。

1. 適切なイベントタイプを選択する:

  • PushTopic: 既存のアプリケーションや、SOQL で表現できるシンプルな条件での通知に。
  • Change Data Capture (CDC): データのレプリケーションや監査ログなど、レコードのすべての変更を追跡する必要がある場合に。
  • Platform Events: カスタムプロセスを起点とした疎結合なシステム連携や、複雑なデータ構造を持つイベントを扱いたい場合に。

2. 耐障害性を確保する: クライアント側で ReplayId を確実に管理し、接続断からの復旧メカニズムを必ず実装してください。

3. API 制限を監視する: 特にイベント配信数の上限に注意し、組織の利用状況を監視する仕組みを導入しましょう。

4. セキュリティを徹底する: 認証情報(セッションIDやアクセストークン)の管理は厳重に行い、OAuth 2.0 などのセキュアな認証フローを利用してください。

これらの点を踏まえ、Streaming API を適切に活用することで、ビジネスの変化に迅速に対応できる、柔軟かつ強力な統合ソリューションを構築することができるでしょう。

コメント