背景と適用シナリオ
Salesforce 統合エンジニアとして、私たちは日々、Salesforce と外部システム間のデータ同期という課題に直面しています。従来、この課題を解決する最も一般的な方法は、API Polling (API ポーリング) でした。つまり、外部システムが定期的に Salesforce API を呼び出し、データの変更を確認するというアプローチです。しかし、この方法にはいくつかの大きな欠点があります。
第一に、リアルタイム性に欠けることです。ポーリング間隔を短くすればリアルタイムに近づきますが、それは API コール数の増大に直結します。第二に、その API コール数の増大が Salesforce のガバナ制限に抵触するリスクを高め、システムのパフォーマンスにも影響を与えます。そして第三に、変更がない場合でも API を呼び出すため、リソースの無駄遣いが発生します。
これらの課題を解決するために Salesforce が提供するのが、Change Data Capture (CDC)、日本語では変更データキャプチャと呼ばれる強力な機能です。CDC は、Salesforce 内のデータ変更をイベントとしてリアルタイムにストリーミング配信する仕組みです。ポーリングに代わるイベント駆動型アーキテクチャ (Event-Driven Architecture) を採用することで、Salesforce と外部システムの連携をより効率的、スケーラブル、かつリアルタイムに実現します。
統合エンジニアの視点から見た CDC の主な適用シナリオは以下の通りです。
データウェアハウスへのリアルタイム同期
Salesforce の取引先、商談、ケースなどのデータを、Snowflake、Google BigQuery、Amazon Redshift といったデータウェアハウス (DWH) にリアルタイムで複製します。これにより、分析チームは常に最新のデータに基づいたインサイトを得ることができます。
外部システムのキャッシュ更新
外部アプリケーションが Salesforce のデータをキャッシュとして保持している場合、CDC を利用して Salesforce 側でのデータ変更を即座に検知し、外部システムのキャッシュを無効化または更新することができます。これにより、データの一貫性が保たれます。
外部システムでのビジネスプロセス起動
Salesforce で「商談」が「成立」になった瞬間に、その変更イベントを CDC が捉え、外部の ERP システムに通知します。ERP 側では、そのイベントをトリガーとして、請求書発行や在庫引き当てといった後続プロセスを自動的に開始できます。
監査とコンプライアンス
誰が、いつ、どのレコードを、どのように変更したか、という詳細な変更履歴をイベントストリームとして受け取り、監査ログシステムに永続的に保存します。これにより、コンプライアンス要件を満たすための追跡が容易になります。
原理説明
Change Data Capture の中核は、Salesforce の堅牢なPlatform Events (プラットフォームイベント) フレームワークに基づいています。CDC を有効にすると、指定した Salesforce オブジェクト (取引先、取引先責任者など) に対するレコードの作成 (Create)、更新 (Update)、削除 (Delete)、復元 (Undelete) 操作がキャプチャされます。
これらの操作が発生すると、Salesforce は対応する変更イベント (Change Event) を生成し、イベントバス (Event Bus) と呼ばれるメッセージングチャネルにパブリッシュ (公開) します。外部のクライアントアプリケーション (サブスクライバー) は、このイベントバスを購読することで、ほぼリアルタイムに変更イベントを受け取ることができます。
各変更イベントメッセージは、JSON 形式のペイロードを持ち、2つの主要な部分から構成されます。
1. ChangeEventHeader
イベントのメタデータを含むヘッダー部分です。統合を実装する上で特に重要なフィールドは以下の通りです。
- entityName: 変更が発生したオブジェクト名 (例: `Account`)。
- changeType: 変更の種類 (`CREATE`, `UPDATE`, `DELETE`, `UNDELETE`)。
- changedFields: `UPDATE` 操作の場合に変更された項目のリスト (例: `["Name", "Phone"]`)。
- commitTimestamp: データベースに変更がコミットされた日時のタイムスタンプ (ミリ秒単位)。
- transactionKey: 同じトランザクション内で発生した全ての変更イベントをグループ化するためのユニークなキー。
- recordIds: 変更されたレコードの ID のリスト。
2. ペイロード本体
ヘッダーに続き、変更されたレコードのデータが含まれます。デフォルトでは、`UPDATE` の場合、`changedFields` にリストされた項目のみが含まれます。しかし、項目エンリッチメント (Field Enrichment) を有効にすることで、変更されていない項目も含め、レコードの全項目をペイロードに含めることが可能です。これにより、サブスクライバー側で別途 Salesforce API を呼び出してレコードの全体像を取得する手間を省くことができます。
サブスクライバーは、Streaming API を使用してこれらのイベントを購読します。一般的には、CometD プロトコルを実装したクライアントライブラリ (Java の Emp-Connector など) を使用して、イベントバスに接続し、メッセージを待ち受けます。イベントは発生順に配信され、各イベントには ReplayID (リプレイ ID) と呼ばれる一意の識別子が割り当てられます。クライアントは最後に処理したイベントの ReplayID を保存しておくことで、接続が切断された場合でも、切断時点からイベントの受信を再開でき、メッセージの欠落を防ぎます。
サンプルコード
ここでは、CDC を利用するための具体的なステップとコード例を、統合エンジニアがよく利用する Java と、Salesforce プラットフォーム内で完結する Apex の両方で紹介します。
1. CDC の有効化
まず、対象オブジェクトの CDC を有効にする必要があります。これはコーディング不要で、Salesforce の設定画面から行います。
- [設定] から、[クイック検索] ボックスに「変更データキャプチャ」と入力し、[変更データキャプチャ] を選択します。
- 利用可能なオブジェクトのリストから、CDC を有効にしたいオブジェクト (例: `Account` (取引先)) を選択し、`>` 矢印をクリックして [選択されたエンティティ] 列に移動します。
- [保存] をクリックします。
これで、取引先オブジェクトに対する CUD (Create, Update, Delete) 操作がイベントとして発行されるようになります。
2. Java クライアント (Emp-Connector) でのイベント購読
Emp-Connector は、Salesforce が提供する Streaming API のための公式 Java クライアントライブラリです。外部システムからイベントを購読する際の標準的な方法です。
import java.net.URL;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import org.cometd.bayeux.Channel;
import org.cometd.bayeux.Message;
import org.cometd.bayeux.client.ClientSessionChannel;
import org.cometd.client.BayeuxClient;
import org.cometd.client.transport.LongPollingTransport;
import org.eclipse.jetty.client.HttpClient;
import org.eclipse.jetty.client.api.Request;
import com.salesforce.emp.connector.BayeuxParameters;
import com.salesforce.emp.connector.EmpConnector;
import com.salesforce.emp.connector.LoginHelper;
import com.salesforce.emp.connector.TopicSubscription;
public class ChangeDataCaptureSubscriber {
public static void main(String[] argv) throws Exception {
// Salesforce のログイン情報
String username = "your_username@example.com";
String password = "your_password_and_security_token";
String loginUrl = "https://login.salesforce.com"; // 本番環境の場合
// BayeuxParameters を介してログイン情報を設定
BayeuxParameters params = LoginHelper.login(new URL(loginUrl), username, password);
// Consumer を定義し、受信したイベントを処理するロジックを実装
Consumer<Map<String, Object>> consumer = event -> {
// 受信したイベントをコンソールに出力
System.out.println(String.format("Received event: %s", event));
};
// EmpConnector インスタンスを作成
EmpConnector connector = new EmpConnector(params);
// 接続を開始
connector.start().get(5, TimeUnit.SECONDS);
System.out.println("Connector is running...");
// 購読したい変更イベントのトピック名を指定
// フォーマットは /data/ObjectNameChangeEvent
String topicName = "/data/AccountChangeEvent";
long replayFrom = EmpConnector.REPLAY_FROM_TIP; // 最新のイベントから購読開始
// トピックを購読
TopicSubscription subscription = connector.subscribe(topicName, replayFrom, consumer).get(5, TimeUnit.SECONDS);
System.out.println(String.format("Subscribed to: %s", subscription.getTopic()));
}
}
3. Apex トリガでのイベント処理
Salesforce プラットフォーム内で変更イベントを処理したい場合、Apex トリガを使用できます。例えば、取引先の名前が変更されたときに、関連するカスタムオブジェクトのレコードを更新する、といったシナリオで有効です。
変更イベントオブジェクト (例: `AccountChangeEvent`) に対して `after insert` トリガを作成します。
trigger AccountChangeEventTrigger on AccountChangeEvent (after insert) {
// 受信した変更イベントをループ処理
for (AccountChangeEvent event : Trigger.New) {
// イベントヘッダーから詳細情報を取得
EventBus.ChangeEventHeader header = event.ChangeEventHeader;
System.debug('Received a change event for ' + header.entityName);
System.debug('Change Type: ' + header.changeType);
// 変更が UPDATE の場合のみ特定の処理を実行
if (header.changeType == 'UPDATE') {
System.debug('Record IDs that were updated: ' + header.recordIds);
// 変更された項目を確認
if (header.changedFields.contains('Name')) {
// 取引先名が変更された場合のビジネスロジックをここに記述
// 例: ログをカスタムオブジェクトに記録する
System.debug('Account Name was changed. Transaction key: ' + header.transactionKey);
// ペイロードから新しい名前を取得
System.debug('New Account Name: ' + event.Name);
}
}
// 変更が CREATE の場合
if (header.changeType == 'CREATE') {
System.debug('A new Account was created with ID: ' + header.recordIds[0]);
System.debug('New Account Name: ' + event.Name);
System.debug('New Account Phone: ' + event.Phone);
}
}
}
注意事項
Change Data Capture を本番環境で運用する際には、いくつかの重要な点に注意する必要があります。
権限 (Permissions)
イベントを購読するインテグレーションユーザーには、適切な権限が必要です。
- 「すべてのデータの参照」権限: 変更イベントストリームにアクセスするために、この権限が必須です。
- 「変更データキャプチャイベントのソース項目の参照」権限: `ChangeEventHeader` の `changeOrigin` 項目(API、Apexなど変更の発生元を示す)にアクセスする場合に必要です。
- オブジェクト権限: ユーザーは、CDC を有効にしたオブジェクトに対する参照権限を持っている必要があります。
API 制限 (API Limits)
CDC は Salesforce のガバナ制限の影響を受けます。特に注意すべきはイベント配信の割り当てです。
- 24時間の配信上限: 組織の Edition によって、24時間以内に配信できるイベント数に上限が設けられています (例: Enterprise Edition では 50,000件)。この上限は、Platform Events、CDC イベント、PushTopic イベントなど、すべての高頻度イベントで共有されます。データ量の多いオブジェクトで CDC を有効にする場合は、この上限を消費し尽くさないよう注意が必要です。
- イベント保持期間: 生成された変更イベントは、イベントバス上で最大3日間 (72時間) 保持されます。この期間内であれば、クライアントは任意の `ReplayID` を指定して過去のイベントを再取得できます。クライアント側のシステムがダウンした場合でも、この保持期間内に復旧すれば、データを取りこぼすことなく処理を再開できます。
エラー処理と信頼性
堅牢なインテグレーションを構築するためには、エラー処理が不可欠です。
- ReplayID の永続化: クライアントは、正常に処理した最後のイベントの `ReplayID` を必ずデータベースやファイルなどの永続的なストレージに保存してください。これにより、クライアントの再起動や予期せぬシャットダウンが発生した際に、どこから処理を再開すればよいかを正確に把握できます。
- 再接続ロジック: ネットワークの問題などで Salesforce への接続が失われた場合に備え、指数バックオフ (Exponential Backoff) などのアルゴリズムを用いた自動再接続ロジックをクライアントに実装することが強く推奨されます。
まとめとベストプラクティス
Change Data Capture は、Salesforce と外部システム間のデータ連携を、従来のポーリングベースのアプローチから、モダンで効率的なイベント駆動型アーキテクチャへと進化させるための鍵となる機能です。リアルタイム性、スケーラビリティ、API コール数の削減といった多大なメリットをもたらします。
統合エンジニアとして CDC を最大限に活用するためのベストプラクティスを以下にまとめます。
- 対象オブジェクトの慎重な選定: ビジネス要件を精査し、リアルタイム連携が本当に必要なオブジェクトにのみ CDC を有効にしましょう。すべてのオブジェクトで有効にすると、イベント配信の割り当てを不必要に消費し、システムのノイズを増やす原因となります。
- 回復力のあるサブスクライバーの設計: クライアントアプリケーションは、一時的な障害が発生することを前提に設計する必要があります。`ReplayID` の確実な管理、堅牢な再接続ロジック、そして予期せぬイベントペイロードに対応できる柔軟なパーサーを実装してください。
- ガバナ制限の監視: [設定] > [会社の情報] で、現在のイベント配信使用量を確認できます。定期的にこの数値を監視し、上限に近づいていないかを確認するプロセスを導入しましょう。必要に応じて、Salesforce からアドオンライセンスを購入することも検討します。
- トランザクションの考慮: `transactionKey` を活用して、同じトランザクション内で発生した複数の変更をグループとして処理することを検討してください。これにより、データの一貫性を保ちやすくなります。
- 項目エンリッチメントの活用: サブスクライバー側で、変更されたレコードの全項目が必要になる場合は、項目エンリッチメントを有効にしましょう。これにより、イベント受信後に再度 Salesforce API を呼び出す `callback` 処理が不要になり、全体のレイテンシが改善され、API コール数も節約できます。
Change Data Capture を正しく理解し、これらのベストプラクティスを適用することで、Salesforce をハブとした、応答性が高く、信頼性のあるエンタープライズ統合ソリューションを構築することが可能になります。
コメント
コメントを投稿