背景と応用シナリオ
Salesforce 統合エンジニアとして、私たちは日々、様々なシステムと Salesforce との間でデータをやり取りする課題に直面しています。特に、数百万件にも及ぶレコードを扱う大規模なデータ操作は、常に大きな挑戦です。例えば、レガシーシステムからの初期データ移行、ERPとの夜間バッチ同期、あるいはコンプライアンス要件に基づく大量データのアーカイブなど、そのユースケースは多岐にわたります。
このような大規模データを扱う際に、標準の REST API や SOAP API を使用すると、Governor Limits(ガバナ制限)と呼ばれる Salesforce プラットフォームの共有リソースを保護するための制限にすぐに直面してしまいます。API コール数の上限、タイムアウト、CPU 使用時間制限などが、処理のボトルネックとなり、インテグレーションの信頼性とパフォーマンスを著しく低下させる原因となります。
この課題を解決するために Salesforce が提供しているのが、Bulk API です。特に、その最新バージョンである Bulk API 2.0 は、大規模データセットを非同期で効率的に処理するために最適化された、RESTful なインターフェースです。Bulk API 2.0 は、バックグラウンドでデータをバッチ処理するため、Governor Limits の影響を受けにくく、安定したデータロードと抽出を実現します。
統合エンジニアの視点から見た、具体的な応用シナリオは以下の通りです:
・初期データ移行: 新規に Salesforce を導入する際、既存の CRM やデータベースから数百万件の取引先、商談、ケースレコードを移行する。
・システム間の定期同期: 基幹システム(ERP)と Salesforce の商品マスタや在庫情報を、夜間バッチで一括同期する。
・データクレンジングとエンリッチメント: Salesforce 内の既存レコードに対し、外部のデータソースを用いて一括で情報を更新・補完する。
・データウェアハウス(DWH)へのエクスポート: Salesforce に蓄積された大量のデータを、分析やレポーティングのために BI ツールや DWH へ定期的にエクスポートする。
本記事では、Salesforce 統合エンジニアの視点から、Bulk API 2.0 の仕組み、具体的な使用方法、そして実践的なベストプラクティスについて、コード例を交えながら詳細に解説していきます。
原理説明
Bulk API 2.0 の中核をなすのは、Asynchronous Processing(非同期処理)という概念です。これは、リクエストを送信してから処理の完了を待つ同期的処理とは異なり、まず処理のリクエスト(ジョブ)を Salesforce に登録し、Salesforce がバックグラウンドでそのジョブを実行する仕組みです。私たちはジョブの状況を定期的に確認し、完了後に結果を取得します。このアーキテクチャにより、クライアント側は長時間の待機から解放され、Salesforce はリソースが利用可能な時に処理を実行できるため、プラットフォーム全体のスケーラビリティと安定性が保たれます。
Bulk API 2.0 は、旧バージョンである Bulk API 1.0 と比較して、いくつかの重要な改善がなされています。
・シンプルなワークフロー: Bulk API 1.0 では「ジョブを作成」→「バッチを作成」→「バッチにデータをアップロード」→「バッチを閉じる」→「ジョブを閉じる」という複数のステップが必要でした。Bulk API 2.0 では、このプロセスが「ジョブを作成」→「ジョブにデータをアップロード」→「ジョブを完了状態にする」という、より直感的でシンプルな3ステップに簡素化されています。
・自動的なバッチ処理: 1.0 では開発者が手動でデータをバッチ(塊)に分割する必要がありましたが、2.0 ではアップロードされた CSV データを Salesforce が内部で自動的に最適なサイズのチャンクに分割して処理します。これにより、クライアント側の実装が大幅に簡略化されました。
・RESTful な設計: Bulk API 2.0 は完全に REST (Representational State Transfer) アーキテクチャに基づいており、JSON 形式でのリクエストとレスポンス、標準的な HTTP メソッド(POST, PUT, PATCH, GET)を使用します。これにより、最新の Web 技術や開発ツールとの親和性が非常に高くなっています。
Bulk API 2.0 の典型的なデータロード(Insert, Update, Upsert, Delete)ジョブのライフサイクルは以下のようになります。
1. ジョブの作成 (Create a Job): POST
リクエストを /services/data/vXX.X/jobs/ingest
エンドポイントに送信します。リクエストボディで、対象オブジェクト(例:Account
)、操作(例:insert
)、データ形式(CSV
)などを指定します。Salesforce はレスポンスとして、一意のジョブ ID と、データアップロード用の URL を返します。
2. ジョブデータのアップロード (Upload Job Data): ステップ1で取得した URL に対して、PUT
リクエストで CSV 形式のデータをアップロードします。
3. ジョブのクローズ (Close the Job): データのアップロードが完了したら、ジョブ ID を指定して /services/data/vXX.X/jobs/ingest/{jobId}
エンドポイントに PATCH
リクエストを送信し、ジョブの状態を UploadComplete
に変更します。これにより、Salesforce はデータの処理を開始します。
4. ジョブ状況の確認 (Check Job Status): ジョブ ID を指定して同じエンドポイントに GET
リクエストを送信し、ジョブの状況(InProgress
, JobComplete
, Failed
など)を定期的にポーリング(確認)します。
5. 結果の取得 (Retrieve Results): ジョブが JobComplete
または Failed
になったら、成功したレコードのリスト(/successfulResults/
)と失敗したレコードのリスト(/failedResults/
)をそれぞれのエンドポイントからダウンロードします。失敗した結果には、エラーの理由が含まれており、デバッグや再処理に役立ちます。
サンプルコード
ここでは、cURL コマンドを使用して Bulk API 2.0 で取引先(Account)レコードを新規作成する一連の流れを解説します。 これらの例は Salesforce Developer Documentation に基づいています。
前提条件:
・$SESSION_ID
: 有効な OAuth 2.0 アクセストークン。
・$INSTANCE_URL
: Salesforce インスタンスの URL(例:https://yourInstance.salesforce.com
)。
ステップ1:ジョブの作成
まず、取引先オブジェクトにレコードを挿入するためのジョブを作成します。
curl -X POST $INSTANCE_URL/services/data/v58.0/jobs/ingest \ -H "Authorization: Bearer $SESSION_ID" \ -H "Content-Type: application/json; charset=UTF-8" \ -H "Accept: application/json" \ -d '{ "object" : "Account", "contentType" : "CSV", "operation" : "insert", "lineEnding" : "LF" }'
解説:
・-X POST
: 新しいリソース(ジョブ)を作成するため、POST メソッドを使用します。
・object
: 操作対象の SObject を指定します。ここでは `Account` です。
・contentType
: アップロードするデータの形式を指定します。Bulk API 2.0 は `CSV` を推奨しています。
・operation
: 実行する DML 操作を指定します(`insert`, `update`, `upsert`, `delete`, `hardDelete`)。
・lineEnding
: CSV データの改行コードを指定します。`LF` (Line Feed) または `CRLF` (Carriage Return Line Feed) が利用可能です。
このリクエストが成功すると、以下のような JSON レスポンスが返されます。id
がジョブID、contentUrl
がデータアップロード先です。
{ "id": "7508W00000aP9g0QAC", "operation": "insert", "object": "Account", "createdById": "0058W000008WoAeQAK", "createdDate": "2023-11-20T10:00:00.000+0000", "systemModstamp": "2023-11-20T10:00:00.000+0000", "state": "Open", "concurrencyMode": "Parallel", "contentType": "CSV", "apiVersion": 58.0, "contentUrl": "services/v58.0/jobs/ingest/7508W00000aP9g0QAC/batches", "lineEnding": "LF", "columnDelimiter": "COMMA" }
ステップ2:ジョブデータ(CSV)のアップロード
次に、作成したジョブに処理対象のデータをアップロードします。ここでは、`accounts.csv` というファイルにデータが格納されていると仮定します。
accounts.csv の内容:
Name,Description,Website Sample Account 1,"Description for account 1",www.example.com Sample Account 2,"Description for account 2",www.salesforce.com
ステップ1で取得した `contentUrl` を使用して、この CSV データをアップロードします。
curl -X PUT $INSTANCE_URL/services/v58.0/jobs/ingest/7508W00000aP9g0QAC/batches \ -H "Authorization: Bearer $SESSION_ID" \ -H "Content-Type: text/csv" \ -H "Accept: application/json" \ --data-binary @accounts.csv
解説:
・-X PUT
: 指定された URL にデータを配置(アップロード)するため、PUT メソッドを使用します。
・Content-Type: text/csv
: 送信するデータが CSV であることを示します。
・--data-binary @accounts.csv
: `accounts.csv` ファイルの内容をリクエストボディとして送信します。
アップロードが成功すると、HTTP ステータスコード `201 Created` が返されます。
ステップ3:ジョブのクローズ
全てのデータのアップロードが完了したら、ジョブの状態を `UploadComplete` に変更して、Salesforce に処理の開始を伝えます。
curl -X PATCH $INSTANCE_URL/services/data/v58.0/jobs/ingest/7508W00000aP9g0QAC \ -H "Authorization: Bearer $SESSION_ID" \ -H "Content-Type: application/json; charset=UTF-8" \ -H "Accept: application/json" \ -d '{ "state" : "UploadComplete" }'
解説:
・-X PATCH
: リソース(ジョブ)の一部を更新するため、PATCH メソッドを使用します。
・"state" : "UploadComplete"
: ジョブの状態を「アップロード完了」に更新します。
ステップ4:ジョブ状況の確認
ジョブの処理が完了するまで、定期的に状況を確認します。
curl -X GET $INSTANCE_URL/services/data/v58.0/jobs/ingest/7508W00000aP9g0QAC \ -H "Authorization: Bearer $SESSION_ID" \ -H "Accept: application/json"
解説:
・-X GET
: リソース(ジョブ)の現在の情報を取得するため、GET メソッドを使用します。
・レスポンスの state
フィールドを確認します。InProgress
(処理中)、JobComplete
(完了)、Failed
(失敗)などの値が返されます。JobComplete
になるまで、適切な間隔(例:指数バックオフ戦略)でポーリングを続けます。
ステップ5:結果の取得
ジョブが完了したら、成功したレコードと失敗したレコードの結果を取得します。
成功したレコードの結果を取得:
curl -X GET $INSTANCE_URL/services/data/v58.0/jobs/ingest/7508W00000aP9g0QAC/successfulResults/ \ -H "Authorization: Bearer $SESSION_ID" \ -H "Accept: text/csv" > successful_results.csv
失敗したレコードの結果を取得:
curl -X GET $INSTANCE_URL/services/data/v58.0/jobs/ingest/7508W00000aP9g0QAC/failedResults/ \ -H "Authorization: Bearer $SESSION_ID" \ -H "Accept: text/csv" > failed_results.csv
解説:
・/successfulResults/
エンドポイントは成功したレコードの情報を、/failedResults/
エンドポイントは失敗したレコードの情報を返します。
・失敗結果の CSV ファイルには、元のデータに加えて sf__Error
という列が追加され、失敗理由(例:入力規則違反、必須項目欠落)が記載されています。
注意事項
Bulk API 2.0 を利用したインテグレーションを設計・実装する際には、以下の点に注意する必要があります。
権限とアクセス
Bulk API 2.0 を使用するインテグレーションユーザーには、適切な権限が必要です。
・API Enabled: プロファイルまたは権限セットで「API の有効化」権限が必要です。
・Object Permissions: 操作対象のオブジェクトに対する適切な CRUD(作成、参照、更新、削除)権限が必要です。
・Modify All Data / View All Data: 大規模なデータ操作では、しばしば「すべてのデータの変更」や「すべてのデータの参照」権限が必要になることがあります。権限は最小権限の原則に従って慎重に割り当ててください。
API 制限
Bulk API 2.0 にも制限があります。これらを把握し、設計に組み込むことが重要です。
・レコード数の制限: 24時間以内に処理できるレコード数に上限があります。この上限は Salesforce のエディションによって異なり、例えば Unlimited Edition では1億5000万レコードです。
・ジョブ数の制限: 24時間以内に作成できるジョブの総数にも上限があります(通常100,000件)。
・データアップロードサイズ: 1つのジョブにアップロードできるデータサイズは、ファイル圧縮の有無などによって異なりますが、一般的には非常に大きなファイル(数GB)を扱うことが可能です。ただし、15分以内にアップロードを完了させる必要があります。
データ形式とガバナ制限
・CSV 形式: Bulk API 2.0 は CSV データ形式を前提としています。ヘッダー行は Salesforce の項目 API 参照名と正確に一致させる必要があります。また、データ型(日付形式、数値形式など)が Salesforce 側の定義と一致していることを事前に確認してください。
・ガバナ制限の考慮: Bulk API は非同期処理のため多くのガバナ制限を回避できますが、レコードが挿入・更新される際には、Apex Triggers、Flows、Validation Rules などの自動化処理は通常通り実行されます。非効率なトリガーや複雑なフローが存在すると、CPU 時間制限を超えてジョブが失敗する原因となります。大規模なデータロード前には、これらの自動化処理のパフォーマンスをレビューし、必要に応じて最適化するか、一時的に無効化することを検討してください。
エラーハンドリング
インテグレーションの堅牢性は、エラーハンドリングの設計に大きく依存します。
・失敗結果の解析: failedResults
を必ず取得し、sf__Error
列の内容を解析するロジックを実装してください。エラーメッセージに基づいて、データの修正や再処理の戦略を立てます。
・リトライメカニズム: 一時的なエラー(例:レコードロック)が原因で失敗したレコードについては、リトライ(再試行)メカニズムを実装することが有効です。ただし、無限にリトライすると API 制限を消費するため、リトライ回数に上限を設け、指数バックオフなどの戦略を用いるべきです。
まとめとベストプラクティス
Salesforce Bulk API 2.0 は、大規模データセットを扱う際の強力かつ不可欠なツールです。その RESTful でシンプルなワークフローは、統合エンジニアが堅牢でスケーラブルなデータ連携ソリューションを構築する上で大きな助けとなります。
最後に、Bulk API 2.0 を最大限に活用するためのベストプラクティスをまとめます。
1. 適切なツールの選択: 数百から数千件程度の小規模なデータ操作であれば、標準の REST API の方がオーバーヘッドが少なく、リアルタイム性も高いため適しています。Bulk API 2.0 は、数万件以上のレコードを扱う場合にその真価を発揮します。
2. 事前のデータ準備: Salesforce にアップロードする前に、クライアント側で可能な限りデータの検証とクレンジングを行ってください。これにより、エラー率が低下し、Salesforce 側での処理時間も短縮され、デバッグの手間が大幅に削減されます。
3. API 制限の監視: 本番環境でインテグレーションを運用する際は、Salesforce の「組織情報」ページや /limits
REST API リソースを使用して、API コールの消費量を定期的に監視し、上限に達しないように計画を立てることが重要です。
4. 並列処理の検討: 非常に巨大なデータセット(数千万件以上)を扱う場合、データを論理的な単位で分割し、複数のジョブを並行して実行することで、全体の処理時間を短縮できる可能性があります。ただし、Salesforce の同時処理ジョブ数には制限があるため、過度な並列化は避けるべきです。
5. 賢明なポーリング戦略: ジョブの状況を確認するためのポーリングは、API コールを消費します。固定間隔で頻繁にリクエストするのではなく、最初は短い間隔で、徐々に間隔を広げていく「指数バックオフ」戦略を実装し、不要な API コールを削減しましょう。
6. 自動化処理の管理: 大規模なデータ移行時には、関連するトリガー、フロー、ワークフロールールを一時的に無効化することを検討してください。処理完了後、再度有効化し、必要に応じて一括で再計算処理を実行する方が、全体のパフォーマンスが向上する場合があります。
これらの原理とベストプラクティスを理解し、適用することで、Salesforce Bulk API 2.0 を活用した、信頼性と拡張性に優れたデータ統合ソリューションを構築できるでしょう。
コメント
コメントを投稿