背景と適用シナリオ
現代のエンタープライズアーキテクチャでは、複数のシステムが連携し、データがリアルタイムに近い形で同期されることが不可欠です。特に、CRMのハブである Salesforce のデータは、データウェアハウス、ERP、マーケティングオートメーションツールなど、多くの外部システムで活用されます。従来、このデータ同期を実現するためには、定期的に Salesforce API を呼び出して差分を確認する「ポーリング」方式が一般的でした。
しかし、ポーリング方式には以下のような課題がありました。
- APIガバナ制限の消費: 頻繁なポーリングは、Salesforce の貴重な API コール数を大量に消費します。
- 遅延の発生: ポーリング間隔を短くしない限り、データの変更検知にタイムラグが生じます。
- 複雑な差分検出ロジック: いつ、どのデータが変更されたかを特定するためのロジックをクライアント側で実装する必要がありました。
これらの課題を解決するために登場したのが、Change Data Capture (CDC)(変更データキャプチャ)です。CDC は、Salesforce 内のデータ変更をイベントとして非同期にストリーミングする、イベント駆動型アーキテクチャ(Event-Driven Architecture)の一部です。これにより、外部システムは Salesforce のデータ変更をリアルタイムに近い形で、かつ効率的に受け取ることが可能になります。
主な適用シナリオ
- データレプリケーション: Salesforce の取引先や商談データを、分析目的で Snowflake や Google BigQuery といったデータウェアハウスへリアルタイムに複製する。
- システム間データ同期: 顧客情報が Salesforce で更新された際に、即座に外部の ERP システムや請求システムにその変更を反映させる。
- キャッシュの無効化: 外部アプリケーションが保持している Salesforce データのキャッシュを、元のデータが変更されたタイミングで正確に無効化(パージ)する。
- 監査と通知: 特定の重要オブジェクト(例:カスタムの契約オブジェクト)に対する変更をリアルタイムに監視し、変更内容を監査ログシステムに記録したり、Slack などで関係者に通知したりする。
原理説明
Change Data Capture は、Salesforce の強力なイベントバスである Platform Event(プラットフォームイベント)の仕組みを基盤としています。Salesforce レコード(例えば、取引先レコード)に対して作成、更新、削除、復元(Undelete)の操作が行われると、Salesforce プラットフォームがその変更を検知し、特定の情報を含んだ「変更イベント(Change Event)」を生成してイベントバスにパブリッシュします。
外部のクライアントアプリケーションは、Streaming API と CometD プロトコルを使用してこのイベントバスに接続し、特定のイベントチャネルをサブスクライブ(購読)します。これにより、イベントが発生した際にリアルタイムでプッシュ通知を受け取ることができます。
変更イベント(Change Event)の構造
CDC が発行するイベントメッセージは JSON 形式で、主に2つの部分から構成されます。
- Payload (ペイロード):
変更があったレコードの全項目データが含まれます。どの項目が変更されたかに関わらず、レコードのすべての非 null 項目が送信されるため、クライアント側はレコードの最新の状態を容易に把握できます。 - ChangeEventHeader (チェンジイベントヘッダー):
イベントに関するメタデータが含まれます。これは非常に重要です。- entityName: イベントが発生したオブジェクト名(例: `Account`)。
- recordIds: 変更があったレコードの ID のリスト。
- changeType: 変更の種類(`CREATE`, `UPDATE`, `DELETE`, `UNDELETE`)。
- changedFields: `UPDATE` 操作の場合のみ。変更された項目の API 参照名のリスト。
- commitTimestamp: トランザクションがデータベースにコミットされた日時のタイムスタンプ。
- transactionKey: 同一トランザクション内で発生したイベントを識別するためのキー。
クライアントは、このヘッダー情報、特に `changeType` を見ることで、受け取ったデータに対してどのような処理(新規作成、更新、削除)を行うべきかを判断できます。
他のイベントとの違い
Salesforce には CDC 以外にもイベント通知の仕組みがありますが、それぞれ用途が異なります。
- PushTopic Events: SOQL クエリに基づいてレコードの変更を通知する古い仕組みです。通知される情報がレコードIDなどに限定されており、CDC ほど詳細なデータは含まれません。
- Platform Events: 開発者が自由にスキーマを定義できるカスタムイベントです。特定のデータ変更に縛られず、カスタムのビジネスプロセスを通知するために利用されます。CDC は Salesforce のデータ変更に特化した、いわば「標準の Platform Event」です。
データレプリケーションや同期の目的においては、現在では Change Data Capture が最も推奨されるソリューションです。
サンプルコード
ここでは、Java を使用して Salesforce の取引先(Account)オブジェクトの変更イベントをサブスクライブする例を示します。Salesforce が公式に提供している EMP Connector ライブラリを使用します。このライブラリは、Streaming API への接続と CometD のハンドシェイクを簡素化してくれます。
まず、対象のオブジェクト(この例では Account)で Change Data Capture を有効化する必要があります。これは Salesforce の [設定] > [インテグレーション] > [変更データキャプチャ] から UI 操作で行います。
Java (EMP Connector) によるイベントサブスクリプション
以下のコードは、指定されたチャネル(`/data/AccountChangeEvent`)をサブスクライブし、イベントを受信するたびにその内容をコンソールに出力します。
import java.net.URL; import java.util.Map; import java.util.concurrent.TimeUnit; import java.util.function.Consumer; import org.cometd.bayeux.Channel; import org.cometd.bayeux.Message; import org.cometd.bayeux.client.ClientSessionChannel; import org.cometd.client.BayeuxClient; import org.cometd.client.transport.LongPollingTransport; import org.eclipse.jetty.client.HttpClient; import org.eclipse.jetty.client.api.Request; import com.salesforce.emp.connector.BearerTokenProvider; import com.salesforce.emp.connector.EmpConnector; import com.salesforce.emp.connector.TopicSubscription; public class ChangeDataCaptureSubscriber { public static void main(String[] argv) throws Exception { // Salesforce のログイン情報と設定 String username = "your_username@example.com"; String password = "your_password_and_token"; String loginUrl = "https://login.salesforce.com"; // 本番環境の場合 String channel = "/data/AccountChangeEvent"; // 購読するCDCチャネル // BayeuxClient の認証プロバイダを設定 BearerTokenProvider tokenProvider = new BearerTokenProvider(() -> { try { return LoginHelper.login(new URL(loginUrl), username, password); } catch (Exception e) { throw new RuntimeException(e); } }); // EmpConnector の設定 EmpConnector.Builder builder = new EmpConnector.Builder() .withEndpoint(new URL(loginUrl + "/cometd/58.0")) // APIバージョンを指定 .withBearerTokenProvider(tokenProvider) .withMinVersion(10, 0); EmpConnector connector = builder.build(); // コネクタの起動 connector.start().get(5, TimeUnit.SECONDS); System.out.println("Connector started and connected."); // イベント受信時の処理を定義する Consumer Consumer<Map<String, Object>> consumer = event -> { // 受信したイベントデータをJSON形式で出力 System.out.println("Received event: "); System.out.println(new com.google.gson.GsonBuilder().setPrettyPrinting().create().toJson(event)); // ChangeEventHeader から情報を取得 Map<String, Object> payload = (Map<String, Object>) event.get("payload"); Map<String, Object> header = (Map<String, Object>) payload.get("ChangeEventHeader"); String changeType = (String) header.get("changeType"); String entityName = (String) header.get("entityName"); System.out.println("Change Type: " + changeType + " on Entity: " + entityName); }; // 指定したチャネルを購読開始 // -1L は最新のイベントから購読を開始することを意味する TopicSubscription subscription = connector.subscribe(channel, -1L, consumer).get(5, TimeUnit.SECONDS); System.out.println("Subscribed to channel: " + subscription.getTopic()); // アプリケーションが終了しないように待機 // 実際のアプリケーションでは、適切なシャットダウン処理が必要 // while (true) { Thread.sleep(1000); } } } // 注意: 上記コードは主要なロジックを示すためのものです。 // 実際の利用には、LoginHelperの実装や依存ライブラリ(emp-connector, jetty, gsonなど)の // pom.xmlやbuild.gradleへの追加が必要です。
このコードを実行し、Salesforce で取引先レコードを作成または更新すると、コンソールに JSON 形式のイベントデータが出力されます。クライアントは、この JSON をパースして `ChangeEventHeader` の `changeType` を確認し、ペイロードのデータを使って外部システムを更新するロジックを実装します。
注意事項
Change Data Capture を本番環境で利用する際には、いくつかの重要な点を考慮する必要があります。
権限とアクセス設定
- ユーザ権限: Streaming API に接続するインテグレーションユーザには、「すべてのデータの参照」権限と「変更データキャプチャイベントの表示」権限プロファイルが必要です。
- 項目レベルセキュリティ: CDC イベントには、インテグレーションユーザがアクセスできる項目のみが含まれます。特定の項目がイベントに含まれない場合、ユーザの項目レベルセキュリティ(FLS)を確認してください。
API 制限
- イベント配信上限: Salesforce 組織には、24時間以内に配信できるイベント数の上限があります。この上限は Salesforce のエディションや、アドオンライセンス(High-Volume Platform Events など)の購入状況によって異なります。上限に達すると、それ以降のイベントは配信されなくなります。組織の利用状況は [設定] > [プラットフォームイベント] ページで監視できます。
- イベント保持期間: パブリッシュされた変更イベントは、イベントバス上で最大3日間(72時間)保持されます。クライアントが何らかの理由でオフラインになった場合、この期間内であれば再接続して未受信のイベントを取得できます。
エラー処理と信頼性
- ReplayId: すべてのイベントには、ReplayId(リプレイID)という一意の識別子が割り当てられています。これはイベントの順序を保証する重要な要素です。クライアントは、最後に正常に処理したイベントの `ReplayId` を永続的に保存しておくべきです。
- 切断からの回復: ネットワーク障害などでクライアントが切断された場合、再接続時に保存しておいた `ReplayId` を指定してサブスクリプションを再開します。これにより、切断中に発生したイベントを見逃すことなく取得できます(これを「リプレイ機能」と呼びます)。
- Durable (永続的な) Streaming: 信頼性の高いデータ同期を実現するためには、このリプレイ機能を活用した永続的なクライアントを実装することが不可欠です。
まとめとベストプラクティス
Salesforce Change Data Capture は、従来のポーリング方式に代わる、モダンでスケーラブルなデータ連携ソリューションです。イベント駆動型のアプローチを採用することで、API コール数を節約し、ニアリアルタイムなデータ同期を実現します。
ベストプラクティス
- 適切なユースケースで利用する: データレプリケーションやシステム間のニアリアルタイム同期など、データの「状態」の変更を追跡するシナリオに最適です。
- ReplayId を必ず永続化する: クライアントアプリケーションの信頼性を確保するための最も重要なプラクティスです。最後に処理した `ReplayId` をファイルやデータベースに保存し、アプリケーションの再起動や再接続時に利用してください。
- 配信上限を監視する: 大量のデータ変更が予想される場合は、Salesforce のイベント配信上限を定期的に監視し、必要に応じてアドオンの購入を検討してください。
- クライアントの堅牢性を確保する: ネットワークの瞬断や Salesforce 側のメンテナンスに備え、自動再接続ロジックと指数バックオフなどの再試行メカニズムをクライアントに実装します。
- 中間キューの利用を検討する: 非常に大量のイベントを処理する必要がある場合や、複数のコンシューマが同じイベントを利用する場合は、Salesforce と最終的なコンシューマの間に Apache Kafka や Amazon SQS のようなメッセージキューを配置するアーキテクチャも有効です。これにより、システムの疎結合性とスケーラビリティが向上します。
Change Data Capture を正しく理解し、これらのベストプラクティスを適用することで、堅牢で効率的な Salesforce 連携基盤を構築することが可能になります。
コメント
コメントを投稿