背景と適用シナリオ
Salesforceは世界をリードするCRMプラットフォームとして、顧客データを中心とした膨大な情報を扱います。日々の業務において、基幹システムからのデータ移行、ETL (Extract, Transform, Load) ツールとの連携、データのバックアップやアーカイブなど、大量のレコードを一括で処理する必要がある場面は少なくありません。
このような大規模なデータ操作を効率的かつ安定的に行うために、Salesforceは Bulk API を提供しています。特に、その最新バージョンである Bulk API 2.0 は、従来のバージョン(1.0)を大幅に簡素化し、開発者にとってより直感的で使いやすいインターフェースを提供します。
Bulk API 2.0 は、REST (Representational State Transfer) の原則に基づいたシンプルなフレームワークを採用しており、数千から数百万件のレコードを非同期で処理することに特化しています。従来のBulk API 1.0では、開発者が手動で「バッチ」を作成し、管理する必要がありましたが、Bulk API 2.0ではSalesforceが自動的にデータのチャンク分割と並列処理を最適化してくれるため、開発者はデータ処理のロジックに集中できます。
主な適用シナリオ
- 初期データ移行: 他のシステムからSalesforceへ、数万件以上のマスタデータ(取引先、取引先責任者など)を初めてロードする場合。
- 定期的なデータ同期: ERPやデータウェアハウスとSalesforce間で、夜間バッチなどを利用して大量のデータを同期する場合。
- データクレンジングと更新: 既存の大量レコードに対して、外部ツールでクレンジングした結果を一括で反映する場合。
- データの削除とアーカイブ: 古いレコードや不要になったレコードを物理削除、または外部ストレージへアーカイブするために一括でエクスポートする場合。
基本的に、2,000件以上のレコードを一度に処理する場合は、標準のREST APIやSOAP APIよりもBulk API 2.0の使用が推奨されます。これにより、APIコール数の削減とパフォーマンスの向上が期待できます。
原理の説明
Bulk API 2.0 の処理フローは、非常にシンプルで直線的です。開発者は一連のHTTPリクエストを送信することで、大規模なデータ処理ジョブを管理します。全体の流れは以下のようになります。
1. ジョブの作成 (Create a Job)
まず、どのようなデータ操作を行うかを定義した「ジョブ」を作成します。これは、/services/data/vXX.X/jobs/ingest
エンドポイントへの POST リクエストによって行われます。リクエストボディには、対象オブジェクト(例:Account
)、操作種別(insert
, update
, upsert
, delete
, hardDelete
, query
, queryAll
)、データ形式(通常は CSV
)などの情報をJSON形式で含めます。
Salesforceはこのリクエストを受け取ると、ジョブIDとデータアップロード用のユニークなURL (contentUrl
) を返却します。このジョブは、この時点では「オープン」状態 (Open
) となります。
2. データのアップロード (Upload Data)
次に、作成したジョブに対して処理対象のデータをアップロードします。これは、ステップ1で取得した contentUrl
への PUT リクエストによって行われます。リクエストのボディには、CSV形式の生データを含めます。ヘッダーの Content-Type
には text/csv
を指定します。
この段階では、まだデータの処理は開始されません。単にデータがSalesforceのサーバーに転送されるだけです。
3. ジョブのクローズ (Close the Job)
データのアップロードが完了したら、Salesforceに処理を開始するよう指示します。これは、ジョブのエンドポイント (/services/data/vXX.X/jobs/ingest/{jobId}
) への PATCH リクエストによって行われます。リクエストボディで、ジョブの状態を UploadComplete
に変更します。
このリクエストが成功すると、Salesforceはジョブを処理キューに追加し、非同期でのデータ処理を開始します。
4. ジョブの状態監視 (Monitor the Job)
ジョブは非同期で処理されるため、完了までには時間がかかります。ジョブの現在の状態(InProgress
, JobComplete
, Failed
など)を確認するには、ジョブのエンドポイントへの GET リクエストを定期的に送信(ポーリング)します。処理が完了すると、成功したレコード数や失敗したレコード数などのサマリー情報がレスポンスに含まれます。
5. 結果の取得 (Get Results)
ジョブが完了 (JobComplete
) したら、処理結果の詳細を取得できます。成功したレコード、失敗したレコード、未処理のレコードは、それぞれ専用のエンドポイントからCSV形式でダウンロードできます。
- 成功したレコード:
/services/data/vXX.X/jobs/ingest/{jobId}/successfulResults/
- 失敗したレコード:
/services/data/vXX.X/jobs/ingest/{jobId}/failedResults/
- 未処理のレコード:
/services/data/vXX.X/jobs/ingest/{jobId}/unprocessedrecords/
特に失敗したレコードの結果ファイルには、エラーメッセージを含む列(sf__Error
)が追加されているため、エラーの原因特定とリトライ処理に役立ちます。
示例代码
ここでは、cURLコマンドを使用して、新しい取引先 (Account) レコードをCSVファイルから挿入する一連のプロセスを解説します。
前提
- アクセストークン:
YOUR_ACCESS_TOKEN
を有効なOAuthアクセストークンに置き換えてください。 - インスタンスURL:
YOUR_INSTANCE_URL
を組織のインスタンスURL(例: `https://yourdomain.my.salesforce.com`)に置き換えてください。 - CSVファイル: 以下の内容で `accounts.csv` ファイルを作成します。ヘッダー名はオブジェクトのAPI参照名と一致させる必要があります。
Name,Type,BillingStreet,BillingCity,BillingState,Phone "Sample Account 1","Prospect","123 Market St","San Francisco","CA","(123) 456-7890" "Sample Account 2","Customer - Direct","456 Main St","New York","NY","(987) 654-3210"
ステップ 1: ジョブの作成
取引先オブジェクトにレコードを挿入するためのジョブを作成します。
# POSTリクエストでデータ挿入ジョブを作成 curl -X POST $YOUR_INSTANCE_URL/services/data/v58.0/jobs/ingest \ -H "Authorization: Bearer $YOUR_ACCESS_TOKEN" \ -H "Content-Type: application/json; charset=UTF-8" \ -H "Accept: application/json" \ -d '{ "object" : "Account", "operation" : "insert", "contentType" : "CSV", "lineEnding" : "LF" }'
解説:
object
: 操作対象のsObjectを指定します。operation
: 実行する操作(ここでは `insert`)を指定します。contentType
: アップロードするデータの形式を指定します。lineEnding
: CSVファイルの改行コードを指定します。`LF` (Line Feed) または `CRLF` (Carriage Return Line Feed) が一般的です。
成功すると、以下のようなJSONレスポンスが返されます。この中の id
と contentUrl
を後続のステップで使用します。
{ "id": "7508W00000aFBbAQAW", "operation": "insert", "object": "Account", "createdById": "0058W0000084g1iQAA", "createdDate": "2023-10-27T05:21:09.000+0000", "systemModstamp": "2023-10-27T05:21:09.000+0000", "state": "Open", "concurrencyMode": "Parallel", "contentType": "CSV", "apiVersion": 58.0, "contentUrl": "services/v58.0/jobs/ingest/7508W00000aFBbAQAW/batches", "lineEnding": "LF", "columnDelimiter": "COMMA" }
ステップ 2: データのアップロード
ステップ1で取得した contentUrl
を使用して、CSVデータをアップロードします。
# PUTリクエストでCSVデータをアップロード curl -X PUT $YOUR_INSTANCE_URL/services/v58.0/jobs/ingest/7508W00000aFBbAQAW/batches \ -H "Authorization: Bearer $YOUR_ACCESS_TOKEN" \ -H "Content-Type: text/csv" \ --data-binary "@accounts.csv"
解説:
-H "Content-Type: text/csv"
: 送信するデータがCSV形式であることを示します。--data-binary "@accounts.csv"
: ローカルにある `accounts.csv` ファイルの内容をリクエストボディとして送信します。
このリクエストが成功すると、ステータスコード 201 Created
が返されます。
ステップ 3: ジョブのクローズ
データのアップロードが完了したので、ジョブの状態を `UploadComplete` に変更して処理を開始させます。
# PATCHリクエストでジョブをクローズし、処理を開始 curl -X PATCH $YOUR_INSTANCE_URL/services/data/v58.0/jobs/ingest/7508W00000aFBbAQAW \ -H "Authorization: Bearer $YOUR_ACCESS_TOKEN" \ -H "Content-Type: application/json; charset=UTF-8" \ -H "Accept: application/json" \ -d '{ "state" : "UploadComplete" }'
成功すると、レスポンスとしてジョブ情報が返され、state
が UploadComplete
になっていることが確認できます。
ステップ 4: ジョブの状態監視
ジョブが完了するまで、定期的に状態を確認します。
# GETリクエストでジョブの現在の状態を取得 curl -X GET $YOUR_INSTANCE_URL/services/data/v58.0/jobs/ingest/7508W00000aFBbAQAW \ -H "Authorization: Bearer $YOUR_ACCESS_TOKEN" \ -H "Accept: application/json"
処理が完了すると、state
が JobComplete
になり、numberRecordsProcessed
などの統計情報が含まれます。
{ "id": "7508W00000aFBbAQAW", "state": "JobComplete", "operation": "insert", "object": "Account", ... "numberRecordsProcessed": 2, "numberRecordsFailed": 0, "totalProcessingTime": 541 }
ステップ 5: 失敗したレコードの結果取得
もし失敗したレコードがある場合 (numberRecordsFailed
> 0)、以下のリクエストで詳細なエラー情報を取得します。
# 失敗したレコードの結果を取得 curl -X GET $YOUR_INSTANCE_URL/services/data/v58.0/jobs/ingest/7508W00000aFBbAQAW/failedResults/ \ -H "Authorization: Bearer $YOUR_ACCESS_TOKEN" \ -H "Accept: text/csv"
このエンドポイントは、エラーの詳細(例:必須項目が不足、入力規則違反など)を含むCSVデータを返します。
注意事項
Bulk API 2.0 を利用する際には、Salesforceのガバナ制限やセキュリティに関するいくつかの点に注意する必要があります。
権限 (Permissions)
- APIの有効化: APIを利用するユーザーのプロファイルまたは権限セットで「API 有効 (API Enabled)」権限が必要です。
- 一括APIの管理: Bulk APIジョブを作成・管理するには、「一括 API の管理 (Manage Data Integrations)」権限が必要です。
- オブジェクト権限: 対象オブジェクトに対する適切なCRUD(作成、参照、更新、削除)権限が必要です。
API制限 (API Limits)
Bulk API 2.0 は Governor Limits (ガバナ制限) の対象となります。これらは組織のエディションによって異なります。
- レコード数の制限: 24時間以内に処理できるレコード数には上限があります(例:Enterprise Editionでは1億5千万件)。この上限はBulk API 1.0と2.0で共有されます。
- ファイルサイズの制限: 1つのジョブでアップロードできるデータの合計サイズは、非圧縮時で150MBまでです。
- 処理時間の制限: Salesforceが内部でデータをチャンクに分割して処理する際、1チャンクあたりの処理時間には15分のタイムアウトがあります。複雑なトリガやプロセスが原因でタイムアウトが発生する可能性があります。
- ジョブ数の制限: 24時間以内に作成できるジョブ数にも上限があります(例:100,000件)。
これらの制限を超過するとAPIリクエストが失敗するため、設計段階で考慮することが重要です。
エラー処理 (Error Handling)
Bulk API 2.0 は非同期処理であるため、エラーハンドリングは非常に重要です。
- ポーリング戦略: ジョブの状態を監視する際は、むやみにリクエストを繰り返すのではなく、指数バックオフ(リクエスト間隔を徐々に長くする)などの戦略を採用し、APIコール数を節約することが推奨されます。
- 失敗結果の分析:
failedResults
エンドポイントから取得したCSVには、sf__Id
(失敗したレコードのID、Update/Delete時)とsf__Error
(エラーメッセージ)列が含まれます。この情報をログに記録し、エラーの原因を分析して、修正後に再処理するロジックを組み込むべきです。 - 冪等性: ネットワークエラーなどでリクエストが成功したか不明な場合でも、同じリクエストを再送して意図しない副作用(例:レコードの二重作成)が起きないよう、設計を考慮する必要があります。Upsert操作と外部IDを使用することは、この問題に対する一般的な解決策です。
まとめとベストプラクティス
Salesforce Bulk API 2.0は、大規模なデータ操作をシンプルかつ効率的に実行するための強力なツールです。RESTfulな設計と自動バッチ処理により、開発者は複雑なプロセスを意識することなく、大量データのインテグレーションを実装できます。
ベストプラクティス
- 適切なAPIの選択:
- Bulk API 2.0: 数千件以上のレコードを扱う場合に最適です。非同期処理を許容できる場合に選択します。
- 標準REST/SOAP API: リアルタイム性が必要な少数レコード(2,000件未満)の操作に適しています。
- データの事前準備:
ジョブを投入する前に、CSVデータのクレンジング(必須項目の確認、データ型の整合性チェックなど)を可能な限り行っておくことで、処理中のエラーを減らし、成功率を高めることができます。
- 並列処理の活用:
Bulk API 2.0 はデフォルトで並列処理 (Parallel mode) を行います。これによりスループットが向上しますが、レコードロック競合が発生する可能性があります。ロック競合が頻発する場合は、一度に投入するデータ量を調整するか、関連性の高いレコードを別々のジョブに分けるなどの対策を検討してください。(注:Bulk API 1.0にあった直列モード (Serial mode) は2.0では明示的にサポートされていません。)
- 堅牢な監視とリトライ機構の実装:
本番環境で運用するインテグレーションでは、ジョブの状態を確実に監視し、失敗したレコードを特定して自動または手動でリトライできる仕組みを必ず構築してください。
- ガバナ制限の監視:
大規模なデータ処理を計画する際は、事前に組織のAPI制限を確認し、処理が制限に抵触しないようデータ量を分割したり、処理時間を分散させたりする計画を立てることが重要です。REST APIの
/limits
リソースで現在のAPI使用状況を確認できます。
これらの原則とベストプラクティスに従うことで、Salesforce Bulk API 2.0の能力を最大限に引き出し、信頼性の高い大規模データ連携ソリューションを構築することができるでしょう。
コメント
コメントを投稿