執筆者:Salesforce 統合エンジニア
背景とユースケース
Salesforce は、企業の顧客関係管理 (CRM) の中核を担うプラットフォームとして、日々膨大な量のデータを扱っています。日々の業務で発生する数件のデータ更新であれば、標準の画面操作や、SOAP/REST API を用いた小規模な連携で十分に対応可能です。しかし、システム間の大規模なデータ同期、初期データ移行、あるいは数百万件に及ぶレコードの一括クリーンアップといったシナリオでは、これらの方法は効率的ではありません。トランザクションごとに API コールを消費し、ガバナ制限に抵触しやすく、処理時間も長大になるためです。
このような大規模データ処理の課題を解決するために Salesforce が提供しているのが、Bulk API (バルク API) です。特に、その最新バージョンである Bulk API 2.0 は、従来の Bulk API 1.0 の複雑さを解消し、よりシンプルで使いやすい RESTful なインターフェースを提供します。私たち統合エンジニアにとって、Bulk API 2.0 は、外部システム(ERP, DWH, マーケティングオートメーションツールなど)と Salesforce との間で、大量のデータを効率的かつ確実に連携させるための最も重要なツールの一つです。
具体的なユースケースとしては、以下のようなものが挙げられます。
- 初期データ移行:旧システムから Salesforce へ、数百万件の取引先、取引先責任者、商談データを一度に移行する。
- 夜間バッチ同期:基幹システム(ERP)で更新された製品マスタや在庫情報を、毎晩 Salesforce に一括で反映させる。
- データアーカイブ:活動履歴など、古くなったレコードを Salesforce から抽出し、外部のデータウェアハウスへバックアップする。
- データクレンジング:外部の名寄せツールでクレンジングされたデータを、Salesforce 上のレコードに一括で適用する。
Bulk API 2.0 は、これらの処理を非同期で実行します。つまり、データを Salesforce にアップロードするリクエストを投げた後、Salesforce 側がバックグラウンドで処理を行い、完了後に結果を取得するという流れになります。これにより、クライアント側は処理完了を待ち続ける必要がなく、システムリソースを効率的に利用できます。この記事では、私たち統合エンジニアの視点から、Bulk API 2.0 の仕組み、具体的な使い方、そして実践的な注意事項を詳しく解説していきます。
原理説明
Bulk API 2.0 の最大の特長は、そのシンプルさにあります。従来の Bulk API 1.0 では、ジョブを作成し、複数のバッチにデータを分割してアップロードし、各バッチの状態を個別に管理する必要がありましたが、Bulk API 2.0 ではこのプロセスが大幅に簡素化されました。Salesforce がデータの分割(チャンキング)と並列処理を自動的に行ってくれるため、開発者はデータ全体のアップロードとジョブ全体の状態管理に集中できます。
Bulk API 2.0 の処理フローは、以下のステップで構成されます。すべて REST API のエンドポイントへのリクエストを通じて行われます。
1. ジョブの作成 (Create a Job)
まず、どのような操作(挿入、更新など)をどのオブジェクトに対して行うかを定義する「ジョブ」を作成します。これは、/services/data/vXX.X/jobs/ingest
エンドポイントへの POST リクエストによって行います。リクエストボディには、JSON 形式で以下の情報を指定します。
- object: 操作対象のオブジェクト名(例: `Account`, `Contact`, `MyCustomObject__c`)。
- operation: 実行するデータ操作の種類。`insert` (新規作成)、`update` (更新)、`upsert` (更新/挿入)、`delete` (削除)、`hardDelete` (物理削除) から選択します。
- contentType: アップロードするデータの形式。Bulk API 2.0 では `CSV` が標準です。
- externalIdFieldName: `upsert` 操作を行う場合に、キーとなる外部 ID 項目を指定します。
- lineEnding: CSV ファイルの改行コード。`LF` (Line Feed) または `CRLF` (Carriage Return + Line Feed) を指定します。
このリクエストが成功すると、Salesforce はジョブ ID や、データをアップロードするための専用 URL (`contentUrl`) を含むレスポンスを返します。
2. ジョブデータのアップロード (Upload Job Data)
次に、ステップ 1 で取得した `contentUrl` に対して、CSV 形式のデータを PUT リクエストでアップロードします。リクエストのヘッダーには `Content-Type: text/csv` を指定し、ボディに CSV データそのものを含めます。この CSV の1行目は、API 参照名(`Name`, `BillingStreet`, `AnnualRevenue` など)のヘッダー行である必要があります。
3. ジョブのクローズ (Close the Job)
データのアップロードが完了したら、Salesforce に処理を開始するよう通知する必要があります。これは、ジョブ情報のエンドポイント(例: `/services/data/vXX.X/jobs/ingest/{jobId}`)に対して、`"state": "UploadComplete"` という JSON ボディを含む PATCH リクエストを送信することで行います。このリクエストをもって、ジョブはキューに追加され、Salesforce によるデータ処理が開始されます。
4. ジョブステータスの監視 (Monitor Job Status)
ジョブは非同期で処理されるため、定期的にその状態を確認する必要があります。ジョブ情報のエンドポイントに対して GET リクエストを送信することで、現在のジョブの状態(`state`)や、処理済みのレコード数などを確認できます。主なジョブの状態は以下の通りです。
- Open: ジョブは作成されたが、データはまだアップロードされていない。
- UploadComplete: データがアップロードされ、処理キューに入っている。
- InProgress: データ処理が進行中。
- JobComplete: 処理が正常に完了した。
- Failed: 致命的なエラーによりジョブが失敗した。
- Aborted: ユーザーによってジョブが中止された。
5. 結果の取得 (Retrieve Results)
ジョブの状態が `JobComplete` または `Failed` になったら、処理結果を取得できます。ジョブ情報には、結果を取得するための3種類のエンドポイントが含まれています。
- successfulResults: 正常に処理されたレコードの一覧を取得します。
- failedResults: エラーが発生したレコードの一覧と、そのエラー内容を取得します。
- unprocessedRecords: 何らかの理由(例:ジョブがタイムアウトした)で処理されなかったレコードの一覧を取得します。
これらのエンドポイントに GET リクエストを送信すると、結果が CSV 形式で返ってきます。特に `failedResults` を確認し、エラーの原因を特定してデータを修正し、再実行するプロセスは、統合処理において非常に重要です。
サンプルコード
ここでは、cURL を使用して Bulk API 2.0 の一連のフローを実行する例を、Salesforce 公式ドキュメントに基づいて紹介します。`$SESSION_ID` と `$INSTANCE_URL` は、事前に OAuth 2.0 などで取得したアクセストークンとインスタンス URL に置き換えてください。
1. ジョブの作成(取引先オブジェクトへの insert)
まず、取引先 (Account) に新しいレコードを挿入するためのジョブを作成します。
curl -X POST $INSTANCE_URL/services/data/v58.0/jobs/ingest \ -H "Authorization: Bearer $SESSION_ID" \ -H "Content-Type: application/json; charset=UTF-8" \ -H "Accept: application/json" \ -d '{ "object" : "Account", "contentType" : "CSV", "operation" : "insert", "lineEnding" : "LF" }'
このコマンドは `/services/data/v58.0/jobs/ingest` エンドポイントに POST リクエストを送信しています。 `-H` オプションで認証トークンやコンテントタイプを指定し、`-d` オプションでジョブの定義(対象オブジェクト、操作、データ形式など)を JSON で渡しています。 成功すると、以下のような JSON が返却されます。`id` (ジョブID) と `contentUrl` が後続のステップで重要になります。
{ "id": "750R0000000zLLGIA2", "operation": "insert", "object": "Account", "createdById": "005R0000000hOMwIAM", "createdDate": "2023-11-20T10:00:00.000+0000", "systemModstamp": "2023-11-20T10:00:00.000+0000", "state": "Open", "concurrencyMode": "Parallel", "contentType": "CSV", "apiVersion": 58.0, "contentUrl": "services/data/v58.0/jobs/ingest/750R0000000zLLGIA2/batches", "lineEnding": "LF", "columnDelimiter": "COMMA" }
2. ジョブデータのアップロード
次に、先ほど取得した `contentUrl` を使って、CSV データをアップロードします。ここでは `accs.csv` というファイルにデータが格納されていると仮定します。
curl -X PUT $INSTANCE_URL/services/data/v58.0/jobs/ingest/750R0000000zLLGIA2/batches \ -H "Authorization: Bearer $SESSION_ID" \ -H "Content-Type: text/csv" \ --data-binary @accs.csv
`accs.csv` の内容は以下のようになります。1行目はヘッダーです。
Name,Description,AnnualRevenue,NumberOfEmployees "Sample Account 1","Created via Bulk API 2.0",1000000,100 "Sample Account 2","Another account",5000000,500このコマンドは PUT メソッドを使用し、`Content-Type` に `text/csv` を指定します。`--data-binary @accs.csv` で、`accs.csv` ファイルの内容をリクエストボディとして送信します。
3. ジョブのクローズ
データのアップロードが完了したら、ジョブの状態を `UploadComplete` に変更して、処理を開始させます。
curl -X PATCH $INSTANCE_URL/services/data/v58.0/jobs/ingest/750R0000000zLLGIA2 \ -H "Authorization: Bearer $SESSION_ID" \ -H "Content-Type: application/json; charset=UTF-8" \ -d '{ "state" : "UploadComplete" }'
PATCH メソッドを使い、ジョブのエンドポイントに対して `state` を `UploadComplete` に変更するようリクエストしています。これにより、Salesforce 側の処理がトリガーされます。
4. ジョブ結果の取得
ジョブが完了した後(ステータスチェックは省略)、成功したレコードの結果を取得します。
curl -X GET $INSTANCE_URL/services/data/v58.0/jobs/ingest/750R0000000zLLGIA2/successfulResults/ \ -H "Authorization: Bearer $SESSION_ID"
`successfulResults` エンドポイントに GET リクエストを送信するだけです。成功すると、Salesforce が自動で付与した ID と作成日時が追加された CSV データが返ってきます。
"sf__Id","sf__Created","Name","Description","AnnualRevenue","NumberOfEmployees" "001R0000003y4aFIAQ","true","Sample Account 1","Created via Bulk API 2.0","1000000.0","100" "001R0000003y4aGIAQ","true","Sample Account 2","Another account","5000000.0","500"同様に、エラーが発生した場合は `failedResults` エンドポイントからエラー詳細を含む CSV を取得できます。
注意事項
Bulk API 2.0 を利用した堅牢な連携を実装するためには、以下の点に注意する必要があります。
権限 (Permissions)
Bulk API 2.0 を使用するユーザーには、以下の権限が必要です。
- API の有効化 (API Enabled): API アクセスの基本となるプロファイル権限です。
- オブジェクト権限: 操作対象のオブジェクトに対する適切な CRUD(作成、参照、更新、削除)権限が必要です。
- 項目レベルセキュリティ (Field-Level Security): CSV に含まれるすべての項目に対する参照権限および編集権限が必要です。
- データインテグレーションの管理 (Manage Data Integrations): 他のユーザーが作成したジョブを含め、組織内のすべての Bulk ジョブを監視、管理、中止するために必要な権限です。通常は連携専用ユーザーに付与します。
API 制限 (API Limits)
Salesforce Platform を安定して利用するため、Bulk API 2.0 にもガバナ制限が設けられています。
- ジョブ作成数: 24 時間あたりに作成できるジョブの数は 100,000 件です (以前は 10,000 件でしたが緩和されました)。しかし、これは膨大な数であり、通常は問題になりません。
- レコード処理数: 24 時間あたりに処理できるレコードの合計数には組織ごとの制限があります。この制限はエディションによって異なります(例: Unlimited Edition で 1億5000万件)。この制限を超えると、新しいジョブが処理されなくなります。
- ファイルサイズ: アップロードする CSV ファイルのサイズは 150 MB までです。
- 処理時間: ジョブの合計処理時間は最大2時間です。2時間を超えると、ジョブは失敗ステータスになります。
エラー処理 (Error Handling)
大規模データ連携において、エラーハンドリングは最も重要な部分です。
- レコードレベルのエラー: 入力規則違反、必須項目の欠落、データ型の不一致など、個々のレコードに起因するエラーは、`failedResults` に記録されます。連携プログラムは、必ずこの結果ファイルを取得し、エラー内容を解析してログに記録する必要があります。エラーになったレコードは、データを修正した上で、別のジョブとして再実行する戦略が一般的です。
- ジョブレベルのエラー: ジョブの `state` が `Failed` になる場合があります。これは、無効な項目名を指定した場合や、Salesforce 内部で予期せぬエラーが発生した場合など、ジョブ全体が処理不可能になったことを意味します。この場合は、ジョブの作成リクエスト自体を見直す必要があります。
- タイムアウトと未処理レコード: 非常に大規模なジョブや、複雑なトリガーが設定されているオブジェクトへのロードでは、処理時間が制限を超えてしまうことがあります。その場合、一部のレコードが処理されないままジョブが終了することがあります。`unprocessedRecords` エンドポイントを確認し、未処理のレコードを再実行する必要があります。
まとめとベストプラクティス
Bulk API 2.0 は、Salesforce との間に大規模なデータをやり取りする際の、強力でシンプルなソリューションです。その非同期処理モデルと自動チャンキング機能により、開発者は複雑なバッチ管理から解放され、データ連携ロジックそのものに集中することができます。
私たち統合エンジニアが Bulk API 2.0 を最大限に活用するためのベストプラクティスを以下にまとめます。
- 適切な API の選択: 処理対象が数千件以上のレコードである場合に Bulk API 2.0 を選択します。2,000 レコード未満の場合は、トランザクション制御が容易な標準の REST API (Composite API など) の方が適している場合があります。
- ジョブの粒度: 数千万件といった極端に巨大なデータを一度に処理するのではなく、ビジネス的な意味合い(例:地域別、製品カテゴリ別など)で論理的に分割したジョブを作成することを検討します。これにより、エラー発生時の影響範囲を限定し、監視や再実行が容易になります。 - データの事前検証: Salesforce にアップロードする前に、可能な限りクライアント側でデータのクレンジングと検証(必須項目のチェック、データ型のフォーマットなど)を行います。これにより、Salesforce 側でのエラーを最小限に抑え、処理効率を高めることができます。
- ポーリング戦略: ジョブステータスを監視する際は、むやみにリクエストを繰り返すのではなく、指数関数的バックオフ(最初は短く、徐々に間隔を長くしていく)などの戦略を採用し、API コール数を節約します。
- 外部 ID の活用: 外部システムとのデータ同期を行う場合は、必ず `upsert` 操作と外部 ID を活用します。これにより、レコードの新規作成と更新を単一の操作で、冪等性(べきとうせい、何度実行しても同じ結果になること)を保ちながら実行できます。
- PK Chunking との使い分け: Bulk API 2.0 はデータのロード(書き込み)に最適化されています。一方、非常に大量のデータ(数百万件以上)を抽出(読み取り)する場合は、Bulk API 1.0 または Bulk API 2.0 のクエリジョブで PK Chunking を利用することを検討してください。これにより、テーブルロックを回避し、効率的にデータを抽出できます。
これらのプラクティスを念頭に置くことで、Salesforce Bulk API 2.0 を用いて、スケーラブルで信頼性の高いデータ統合ソリューションを構築することが可能になります。
コメント
コメントを投稿