背景と適用シナリオ
Salesforce 統合エンジニアとして、私たちは日々、外部システムと Salesforce 間で大量のデータをやり取りする課題に直面しています。例えば、基幹システム (ERP) からの受注データの同期、データウェアハウスへの Salesforce データのエクスポート、あるいは初期導入時の大規模なデータ移行など、そのシナリオは多岐にわたります。
標準の REST API や SOAP API は、単一または少数のレコードをリアルタイムに処理するのには非常に優れていますが、数万、数百万といったレコードを一度に処理しようとすると、ガバナ制限 (Governor Limits) と呼ばれる API コール数の上限に達してしまったり、パフォーマンスが著しく低下したりする問題が発生します。このような大規模データ処理の課題を解決するために Salesforce が提供しているのが Bulk API (バルク API) です。
Bulk API は、大量のレコードを非同期 (asynchronous) で処理するために最適化された REST ベースの API です。主な適用シナリオは以下の通りです。
- データ移行 (Data Migration): 他の CRM やレガシーシステムから Salesforce へ、顧客、商談、商品などのマスタデータを一括でロードする。
- データ同期 (Data Synchronization): 夜間バッチなどで、外部システムと Salesforce のデータを定期的に同期し、一貫性を保つ。
- データクレンジングと更新 (Data Cleansing and Update): 既存の大量レコードに対して、一括でデータのクレンジングや更新処理を行う。
- データアーカイブ (Data Archiving): Salesforce 内の古いデータを外部ストレージに一括でエクスポートし、ストレージ容量を確保する。
本記事では、特に最新バージョンである Bulk API 2.0 に焦点を当て、その仕組み、具体的な使用方法、そして統合エンジニアとして知っておくべき注意点やベストプラクティスを詳しく解説します。
原理説明
Bulk API 2.0 は、Bulk API 1.0 の複雑さを解消し、よりシンプルで使いやすいワークフローを提供します。Bulk API 1.0 では、ジョブ (Job) とバッチ (Batch) という2つの概念を管理する必要がありましたが、2.0 ではジョブの作成とデータアップロードだけで済むようになり、開発者の負担が大幅に軽減されました。
Bulk API 2.0 の処理フローは、以下のステップで構成されます。
- ジョブの作成 (Create a Job):
まず、どのような操作(挿入、更新、アップサート、削除、クエリなど)をどのオブジェクトに対して行うかを定義した「ジョブ」を作成します。このリクエストを Salesforce に送信すると、ジョブ ID とデータアップロード用の URL が返却されます。
- データのアップロード (Upload Data):
次に、処理対象のデータを CSV 形式で用意し、ステップ1で取得した URL に対して PUT リクエストでアップロードします。この時点ではまだデータの処理は開始されません。
- ジョブのクローズ (Close the Job):
データのアップロードが完了したら、ジョブの状態を「UploadComplete」に変更するリクエストを送信します。これにより、Salesforce はデータの処理を開始する準備ができたことを認識し、非同期処理のキューにジョブを投入します。
- ジョブステータスの監視 (Monitor Job Status):
ジョブの処理は非同期で行われるため、即座には完了しません。定期的にジョブのステータスをポーリング (polling) して、処理が完了したか(`JobComplete`)、失敗したか(`Failed`)を確認する必要があります。
- 結果の取得 (Retrieve Results):
ジョブが完了したら、結果を取得します。データロードジョブの場合は、成功したレコードと失敗したレコードのリストを取得できます。クエリジョブの場合は、抽出されたデータを含む CSV ファイルのリストが返されます。特に失敗したレコードの情報にはエラー理由が含まれているため、エラーハンドリングにおいて非常に重要です。
この一連の流れはすべて RESTful な API コールを通じて行われ、CSV データを扱うため、様々なプログラミング言語やETLツールと高い親和性を持っています。私たち統合エンジニアにとっては、スケーラブルで堅牢なデータ連携を構築するための強力な武器となります。
サンプルコード
ここでは、cURL コマンドを使用して Bulk API 2.0 の一連の操作を実行する例を示します。これらの例は、Salesforce の公式ドキュメントに基づいています。`$SESSION_ID` と `$INSTANCE_URL` は、OAuth 2.0 認証フローなどで取得したアクセストークンとインスタンス URL に置き換えてください。
ステップ1: データ挿入ジョブの作成
取引先 (Account) オブジェクトに新しいレコードを挿入するためのジョブを作成します。`operation` に `insert` を指定し、`lineEnding` で改行コードを `LF` (Line Feed) に指定しています。
curl -X POST $INSTANCE_URL/services/data/v58.0/jobs/ingest \ -H "Authorization: Bearer $SESSION_ID" \ -H "Content-Type: application/json; charset=UTF-8" \ -H "Accept: application/json" \ -d '{ "object" : "Account", "contentType" : "CSV", "operation" : "insert", "lineEnding" : "LF" }'
レスポンス (一部): レスポンスには、ジョブID (`id`) とデータアップロード先のURL (`contentUrl`) が含まれます。
{ "id": "750R0000000z0f7IAA", "operation": "insert", "object": "Account", "createdById": "005R0000000j1k2IAA", "createdDate": "2023-11-20T10:00:00.000+0000", "systemModstamp": "2023-11-20T10:00:00.000+0000", "state": "Open", "concurrencyMode": "Parallel", "contentType": "CSV", "apiVersion": 58.0, "contentUrl": "services/data/v58.0/jobs/ingest/750R0000000z0f7IAA/batches", "lineEnding": "LF", "columnDelimiter": "COMMA" }
ステップ2: CSVデータのアップロード
処理したいデータを含む CSV ファイル (`accounts.csv`) を、ステップ1で取得した `contentUrl` に PUT リクエストでアップロードします。
accounts.csv の内容:
Name,Industry,NumberOfEmployees SFDC Integration Inc,Technology,500 Bulk API Systems,Electronics,2000
cURL コマンド:
# contentUrl はステップ1のレスポンスから取得した完全なパスに置き換えます # 例: $INSTANCE_URL/services/data/v58.0/jobs/ingest/750R0000000z0f7IAA/batches curl -X PUT $INSTANCE_URL/{contentUrl} \ -H "Authorization: Bearer $SESSION_ID" \ -H "Content-Type: text/csv" \ -T "accounts.csv"
このリクエストが成功すると、HTTP ステータスコード `201 Created` が返されます。
ステップ3: ジョブのクローズ
データのアップロードが完了したら、ジョブの状態を `UploadComplete` に変更して、Salesforce に処理を開始するよう指示します。
# {jobId} はステップ1で取得したIDに置き換えます curl -X PATCH $INSTANCE_URL/services/data/v58.0/jobs/ingest/{jobId} \ -H "Authorization: Bearer $SESSION_ID" \ -H "Content-Type: application/json; charset=UTF-8" \ -d '{ "state" : "UploadComplete" }'
ステップ4: ジョブ情報の監視
ジョブの処理状況を確認するために、ジョブ情報を GET リクエストで取得します。`state` フィールドが `JobComplete` になるまで定期的に実行します。
# {jobId} はステップ1で取得したIDに置き換えます curl -X GET $INSTANCE_URL/services/data/v58.0/jobs/ingest/{jobId} \ -H "Authorization: Bearer $SESSION_ID" \ -H "Accept: application/json"
完了時のレスポンス (一部): `state` が `JobComplete` になり、成功したレコード数 (`numberRecordsProcessed`) と失敗したレコード数 (`numberRecordsFailed`) が確認できます。
{ "id": "750R0000000z0f7IAA", "state": "JobComplete", "numberRecordsProcessed": 2, "numberRecordsFailed": 0, "totalProcessingTime": 580 ... }
ステップ5: 失敗したレコードの結果を取得
もし `numberRecordsFailed` が0より大きい場合、どのレコードがなぜ失敗したのかを確認する必要があります。以下のエンドポイントから失敗したレコードとそのエラー理由を含む CSV を取得できます。
# {jobId} はステップ1で取得したIDに置き換えます curl -X GET $INSTANCE_URL/services/data/v58.0/jobs/ingest/{jobId}/failedResults/ \ -H "Authorization: Bearer $SESSION_ID" \ -H "Accept: text/csv" > failed_records.csv
取得した `failed_records.csv` には、元のデータに加えて `sf__Id` と `sf__Error` という列が追加されており、エラーの詳細を確認できます。
注意事項
Bulk API 2.0 を使用した堅牢な統合を構築するには、いくつかの重要な点に注意する必要があります。
権限 (Permissions)
Bulk API を使用するユーザーには、以下の権限が必要です。
- API の有効化 (API Enabled): API アクセスの基本となるプロファイル権限。
- データインテグレーションの管理 (Manage Data Integrations): Bulk API ジョブの監視や中止を行うための権限。
- オブジェクト権限: 処理対象オブジェクトに対する作成、参照、更新、削除 (CRUD) 権限。
- 項目レベルセキュリティ (Field-Level Security): アクセスするすべての項目に対する参照・編集権限。
これらの権限が不足していると、ジョブの作成時や実行時に権限エラーが発生します。
API 制限 (API Limits)
Salesforce Platform はマルチテナント環境であるため、リソースを公平に利用するためのガバナ制限が設けられています。Bulk API 2.0 にも以下のような制限が存在します。
- ジョブ数の制限: 24時間あたりに作成できるジョブの総数には上限があります(例: 100,000件)。
- ファイルサイズの制限: アップロードする CSV ファイルのサイズは 150MB までです。
- 処理時間の制限: Salesforce は内部的にデータをチャンクに分割して処理しますが、1つのチャンク(約10,000レコード)あたりの処理時間にはタイムアウト(約10分)が設定されています。複雑なトリガやプロセスが実行される場合、この制限に達する可能性があります。
- 文字数制限: 1つのレコードの全項目を合わせた文字数にも上限があります。
これらの制限はエディションや契約によって異なる場合があるため、常に最新の公式ドキュメント「Salesforce Developer Limits and Allocations Quick Reference」を確認することが重要です。
エラー処理 (Error Handling)
統合エンジニアリングにおいて、エラー処理は最も重要な要素の一つです。Bulk API のジョブが `JobComplete` ステータスになっても、それは「すべてのレコードが成功した」ことを意味しません。必ず `numberRecordsFailed` の値を確認し、0より大きい場合は失敗結果ファイルを取得して原因を分析する必要があります。
一般的なエラー原因としては、入力規則違反、必須項目の欠落、データ型の不一致、重複ルールの抵触などが挙げられます。失敗したレコードを特定し、データを修正して再実行するロジックを統合プロセスに組み込むことが不可欠です。
まとめとベストプラクティス
Salesforce Bulk API 2.0 は、大規模なデータセットを効率的かつ確実に処理するための強力なツールです。その非同期アーキテクチャは、ガバナ制限を回避し、システムのパフォーマンスを維持しながら、大量のデータを Salesforce にロードまたは Salesforce からエクスポートすることを可能にします。
私たち Salesforce 統合エンジニアが、Bulk API 2.0 を最大限に活用し、安定したデータ連携を実現するためのベストプラクティスを以下にまとめます。
- 適切な API の選択: 2,000 レコード未満の小規模なデータ処理であれば、リアルタイム性が高い標準の REST API の方が適している場合があります。Bulk API は、数千から数百万レコードの処理に特化して使用しましょう。
- データのチャンキング (Chunking): 巨大なデータセット(例: 100万レコード)を一度にアップロードするのではなく、より小さなチャンク(例: 1万~5万レコードずつ)に分割して複数のジョブとして実行することを推奨します。これにより、処理が並列化されやすくなり、一部のチャンクでエラーが発生しても他への影響を最小限に抑えることができます。
- PK Chunking の活用 (for Queries): 数百万レコードを超えるオブジェクトからデータを抽出(クエリ)する場合、タイムアウトを避けるために PK Chunking という機能を使用します。これは、主キー (ID) の範囲に基づいてクエリを自動的に分割してくれる機能で、非常に大きなテーブルでも安定したデータ抽出が可能になります。
- 賢明なポーリング戦略: ジョブステータスを監視する際、固定間隔で頻繁にポーリングすると API コール数を無駄に消費します。最初は短い間隔で、徐々に間隔を広げていく「エクスポネンシャルバックオフ (Exponential Backoff)」戦略を実装することが推奨されます。
- 外部ID (External ID) の活用: 外部システムとのデータ同期を行う際には、`upsert` 操作と外部IDとして機能するカスタム項目を組み合わせるのが定石です。これにより、Salesforce ID を意識することなく、レコードの新規作成と更新を単一の操作で効率的に行うことができます。
- 自動化と監視: Bulk API を利用した統合プロセスは、エラーハンドリング、リトライロジック、そして成功・失敗の通知メカニズムを含めて完全に自動化されるべきです。また、API の使用状況やジョブの実行状況を定期的に監視し、パフォーマンスのボトルネックや潜在的な問題を早期に発見する体制を整えましょう。
これらのプラクティスを遵守することで、Salesforce Bulk API 2.0 の真の力を引き出し、スケーラブルで信頼性の高いエンタープライズレベルのデータ統合ソリューションを構築することができるでしょう。
コメント
コメントを投稿