背景と適用シナリオ
エンタープライズシステムにおいて、大規模なデータセットを扱うことは日常的な課題です。Salesforceも例外ではありません。数十万、数百万レコードに及ぶデータの移行、外部システムとの定期的な同期、あるいはデータのバックアップやアーカイブなど、大量のデータを効率的かつ確実に処理する必要があります。このような要件に対して、一件ずつレコードを処理する標準の REST API や SOAP API を使用すると、APIガバナ制限(Governor Limits)に容易に抵触し、処理時間も膨大になります。この課題を解決するためにSalesforceが提供しているのが Bulk API です。
Bulk APIは、大量のレコードを非同期(Asynchronous)で処理するために最適化されたRESTベースのAPIです。特に、以下のようなシナリオでその真価を発揮します。
- 初期データ移行:他のシステムからSalesforceへ初めてデータを投入する際。取引先、取引先責任者、商談など、関連するオブジェクトの大量のレコードを一度にロードする必要があります。
- 定期的なデータ同期:ERPやデータウェアハウスといった基幹システムとSalesforce間で、毎日あるいは毎週、大量のデータを同期させるバッチ処理。
- データクレンジングと更新:既存の大量のレコードに対して、外部のデータソースを用いて一括で情報の更新やクレンジングを行う場合。
- データのアーカイブと削除:古くなったデータをSalesforceから外部ストレージに移動させたり、一括で削除したりする場合。
本記事では、特に最新バージョンである Bulk API 2.0 に焦点を当て、その原理から具体的な使用方法、注意点、ベストプラクティスまでを技術アーキテクトの視点から詳細に解説します。
原理説明
Bulk API 2.0は、その前身であるBulk API 1.0の複雑さを解消し、よりシンプルで使いやすいフレームワークを提供します。その中核となるコンセプトは「ジョブ (Job)」と「非同期処理 (Asynchronous Processing)」です。
開発者は、まず処理したい内容を定義した「ジョブ」を作成します。ジョブには、対象オブジェクト(例:Account)、操作(例:insert, update, upsert, delete)、データ形式(CSV)などの情報が含まれます。次に、処理対象のデータをCSV形式でアップロードします。データのアップロードが完了したら、ジョBを「クローズ」します。これにより、Salesforceプラットフォームに対してデータの処理開始を依頼したことになります。
ここからがBulk APIの最大の特徴である非同期処理です。Salesforceは依頼されたジョブをキューに入れ、サーバーリソースが利用可能になった時点でバックグラウンドで処理を開始します。プラットフォームはアップロードされた巨大なCSVファイルを自動的に小さなチャンクに分割し、効率的に処理を進めます。開発者はジョブのステータスを定期的にポーリング(問い合わせ)することで、処理の進捗(処理中、完了、失敗など)を確認できます。処理が完了すると、成功したレコードの結果と、エラーが発生したレコードの詳細なエラーメッセージをそれぞれ別のファイルとしてダウンロードできます。
このアーキテクチャにより、クライアント側はAPIリクエストを投げてから処理完了まで長時間待機する必要がなく、Salesforce側はサーバーリソースを最適化しながら大規模な処理を実行できるという、双方にとってのメリットが生まれます。
Bulk API 1.0では開発者が手動でデータをバッチに分割し、各バッチの処理状況を個別に管理する必要がありましたが、Bulk API 2.0ではSalesforceがこのバッチ管理を自動で行うため、開発者はジョブ全体の管理に集中できるようになりました。これにより、インテグレーション開発の工数が大幅に削減されます。
示例コード
ここでは、Bulk API 2.0を使用して取引先(Account)レコードを新規作成する一連のフローを、cURLコマンドの例を用いて解説します。これらのコードはSalesforce Developerの公式ドキュメントに基づいています。
前提条件:
- 接続済みアプリケーション(Connected App)が設定済みで、アクセストークン(`$ACCESS_TOKEN`)が取得できていること。
- SalesforceインスタンスのURL(`$INSTANCE_URL`)が分かっていること。
- 処理対象のデータを含むCSVファイル(`accounts.csv`)が手元にあること。
Name,Description,Website,Phone Account 1,"Description for Account 1",www.account1.com,415-555-1212 Account 2,"Description for Account 2",www.account2.com,415-555-1213
ステップ1:ジョブの作成
まず、データを投入するためのジョブを作成します。このリクエストでは、どのオブジェクトに、どのような操作を行うかを定義します。
# cURLコマンドによるジョブ作成リクエスト curl -X POST $INSTANCE_URL/services/data/v58.0/jobs/ingest \ -H "Authorization: Bearer $ACCESS_TOKEN" \ -H "Content-Type: application/json; charset=UTF-8" \ -H "Accept: application/json" \ -d '{ "object" : "Account", "contentType" : "CSV", "operation" : "insert", "lineEnding" : "LF" }' # 正常なレスポンス (JSON形式) # contentUrl にデータアップロード先のエンドポイントが含まれる { "id" : "750R0000000z0f9IAA", "operation" : "insert", "object" : "Account", "createdById" : "005R0000000g9d8IAA", "createdDate" : "2023-10-27T10:30:00.000+0000", "systemModstamp" : "2023-10-27T10:30:00.000+0000", "state" : "Open", "concurrencyMode" : "Parallel", "contentType" : "CSV", "apiVersion" : 58.0, "contentUrl" : "services/data/v58.0/jobs/ingest/750R0000000z0f9IAA/batches", "lineEnding" : "LF", "columnDelimiter" : "COMMA" }
このレスポンスで返された `id` (例: `750R0000000z0f9IAA`) がジョブIDとなり、以降の操作で必須となります。また、`state` が `Open` になっており、データを受け付け可能な状態であることを示しています。
ステップ2:ジョブへのデータアップロード
次に、作成したジョブに対してCSVデータをアップロードします。リクエストの宛先は、ステップ1のレスポンスに含まれていた `contentUrl` です。
# cURLコマンドによるCSVデータアップロードリクエスト # accounts.csv ファイルの内容をリクエストボディとして送信する curl -X PUT $INSTANCE_URL/services/data/v58.0/jobs/ingest/750R0000000z0f9IAA/batches \ -H "Authorization: Bearer $ACCESS_TOKEN" \ -H "Content-Type: text/csv" \ -H "Accept: application/json" \ --data-binary @accounts.csv
このリクエストが成功すると、ステータスコード `201 Created` が返却されます。これでデータのアップロードは完了です。
ステップ3:ジョブのクローズ
データのアップロードがすべて完了したら、ジョブの状態を `UploadComplete` に変更してクローズします。これにより、Salesforceにデータの処理を開始するよう通知します。
# cURLコマンドによるジョブのクローズリクエスト curl -X PATCH $INSTANCE_URL/services/data/v58.0/jobs/ingest/750R0000000z0f9IAA \ -H "Authorization: Bearer $ACCESS_TOKEN" \ -H "Content-Type: application/json; charset=UTF-8" \ -H "Accept: application/json" \ -d '{ "state" : "UploadComplete" }' # 正常なレスポンス (JSON形式) # state が UploadComplete に変更されていることを確認 { "id" : "750R0000000z0f9IAA", "operation" : "insert", "object" : "Account", "createdById" : "005R0000000g9d8IAA", "createdDate" : "2023-10-27T10:30:00.000+0000", "systemModstamp" : "2023-10-27T10:35:00.000+0000", "state" : "UploadComplete", "concurrencyMode" : "Parallel", "contentType" : "CSV", "apiVersion" : 58.0, "contentUrl" : "services/data/v58.0/jobs/ingest/750R0000000z0f9IAA/batches", "lineEnding" : "LF", "columnDelimiter" : "COMMA" }
ステップ4:ジョブステータスの確認
ジョブがクローズされると、Salesforceはバックグラウンドで処理を開始します。処理が完了するまで、定期的にジョブのステータスを確認します。
# cURLコマンドによるジョブステータスの確認リクエスト curl -X GET $INSTANCE_URL/services/data/v58.0/jobs/ingest/750R0000000z0f9IAA \ -H "Authorization: Bearer $ACCESS_TOKEN" \ -H "Accept: application/json" # 処理完了時のレスポンス例 (JSON形式) # state が JobComplete になっている { "id" : "750R0000000z0f9IAA", "state" : "JobComplete", "createdDate" : "2023-10-27T10:30:00.000+0000", "systemModstamp" : "2023-10-27T10:40:00.000+0000", "object" : "Account", "operation" : "insert", "numberRecordsProcessed" : 2, "numberRecordsFailed" : 0, "retries" : 0, "totalProcessingTime" : 1500, ... }
レスポンスの `state` が `InProgress` であれば処理中、`JobComplete` であれば完了、`Failed` であればジョブ全体が失敗したことを意味します。`numberRecordsProcessed` と `numberRecordsFailed` で処理結果の概要を把握できます。
ステップ5:処理結果の取得
ジョブが完了したら、成功したレコードと失敗したレコードの結果を取得できます。それぞれ専用のエンドポイントが用意されています。
# 成功したレコードの結果を取得 curl -X GET $INSTANCE_URL/services/data/v58.0/jobs/ingest/750R0000000z0f9IAA/successfulResults/ \ -H "Authorization: Bearer $ACCESS_TOKEN" \ -H "Accept: text/csv" # 失敗したレコードの結果を取得(この例では0件) curl -X GET $INSTANCE_URL/services/data/v58.0/jobs/ingest/750R0000000z0f9IAA/failedResults/ \ -H "Authorization: Bearer $ACCESS_TOKEN" \ -H "Accept: text/csv"
成功結果には、Salesforceによって採番されたレコードIDが追加されたCSVが返されます。失敗結果には、元のレコードデータに加えてエラーメッセージの列が追加されたCSVが返され、どのレコードがなぜ失敗したのかを特定するのに役立ちます。
注意事項
Bulk API 2.0を効果的かつ安全に利用するためには、以下の点に注意する必要があります。
権限 (Permissions)
Bulk APIを使用するインテグレーションユーザーには、適切な権限が必要です。
- APIの有効化:プロファイルまたは権限セットで「APIの有効化 (API Enabled)」権限が必要です。
- オブジェクト権限:対象オブジェクトに対する「作成」「参照」「更新」「削除」の権限が必要です。
- 項目レベルセキュリティ:アクセスする項目に対する参照・編集権限が必要です。
- データインテグレーションの管理:Bulk APIのジョブを監視・管理するために「データインテグレーションの管理 (Manage Data Integrations)」権限が推奨されます。
API制限 (API Limits)
Bulk API 2.0には、プラットフォームの安定性を維持するためのガバナ制限が存在します。これらの制限は、標準APIよりもはるかに緩やかですが、常に意識する必要があります。
- レコード数:24時間あたりに処理できるレコード数は、組織のエディションに応じて上限があります(例: Enterprise Editionで1億5000万レコード)。
- ジョブ数:24時間あたりに作成できるジョブの数には100,000件という上限があります。
- ファイルサイズ:アップロードするCSVファイルの元のデータサイズは最大150MBです。
- 処理時間:1ジョブあたりの合計処理時間には2時間という上限があります。
これらの制限値はSalesforceのリリースによって変更される可能性があるため、常に最新の公式ドキュメントを参照してください。
エラー処理 (Error Handling)
大規模なデータを扱う以上、一部のレコードでエラーが発生することは避けられません。Bulk API 2.0では、ジョブが完了した後に `failedResults` エンドポイントからエラーの詳細を取得できます。エラーハンドリングのプロセスは以下のようになります。
- ジョブステータスが `JobComplete` であることを確認します。
- `numberRecordsFailed` が0より大きい場合、`failedResults` エンドポイントにリクエストを送信し、エラー内容が記載されたCSVを取得します。
- CSVに含まれるエラーメッセージ(例:「必須項目がありません」「不正なIDです」など)を分析します。
- 元のソースデータを修正し、失敗したレコードのみを含む新しいCSVファイルを作成します。
- 修正したデータで新しいBulk APIジョブを作成し、再実行します。
堅牢なインテグレーションを構築するためには、このエラーハンドリングとリトライのロジックを自動化することが不可欠です。
まとめとベストプラクティス
Salesforce Bulk API 2.0は、大規模なデータセットを効率的かつ確実に処理するための強力なツールです。その非同期アーキテクチャは、API制限の回避とサーバーリソースの最適化を実現し、開発者はよりシンプルなジョブ管理に集中できます。
最後に、Bulk API 2.0を最大限に活用するためのベストプラクティスを以下に示します。
- 適切なAPIの選択:数千レコード未満の小規模なデータ処理や、リアルタイムな同期が求められる場合は標準のREST/SOAP APIを使用し、数千から数百万レコードのバッチ処理にはBulk API 2.0を選択します。
- データの前処理:Salesforceにアップロードする前に、データクレンジング(必須項目のチェック、データ型の検証など)をソース側で実施することで、エラーの発生を最小限に抑え、処理効率を高めます。
- ジョブの分割:数千万件を超えるような極端に巨大なデータセットを扱う場合は、単一の巨大なジョブとして実行するのではなく、論理的な単位(例:地域別、年度別など)で複数のジョブに分割することを検討します。これにより、一部のジョブが失敗した場合の影響範囲を限定し、管理しやすくなります。
- ガバナ制限の監視:本番環境でBulk APIを定期的に使用する場合は、`/services/data/vXX.X/limits` エンドポイントなどを利用して、組織のAPIコール残量を監視する仕組みを導入します。
- PK Chunkingの活用(クエリの場合):Bulk API Queryで非常に大きなテーブル(数百万件以上)からデータを抽出する場合、PK Chunkingヘッダーを有効にすることを検討してください。これにより、主キー(ID)の範囲に基づいてクエリが自動的に分割され、タイムアウトのリスクを大幅に低減できます。
これらの原理とベストプラクティスを理解し、適切にBulk API 2.0を設計・実装することで、Salesforceにおける大規模データ連携の課題をスマートに解決することができるでしょう。
コメント
コメントを投稿