Salesforce Bulk API 2.0 をマスターする:大規模データ統合のための包括的ガイド

Salesforce 統合エンジニアの視点から、大規模なデータセットを効率的に処理するための強力なツール、Bulk API 2.0 について徹底的に解説します。日々の業務で数百万件のレコードを扱う際、この API は必要不可欠です。本記事では、その基本原理から実践的なコード例、そして現場で培ったベストプラクティスまで、包括的にご紹介します。


背景と応用シーン

Salesforce は多くの企業にとってビジネスの中核をなすプラットフォームですが、その価値を最大限に引き出すには、外部システムとのスムーズなデータ連携が鍵となります。例えば、基幹システム (ERP) の顧客マスタを Salesforce の取引先オブジェクトに同期する、データウェアハウス (DWH) に蓄積された販売実績を Salesforce に取り込む、あるいは古いデータをアーカイブするために一括でエクスポートするといったシナリオが考えられます。

このような数万、数百万レコード規模のデータを扱う際に、標準の REST API や SOAP API を使用すると、API ガバナ制限に抵触しやすく、処理時間も膨大になります。そこで登場するのが、大規模データセットの非同期処理に特化した Bulk API です。

特に Bulk API 2.0 は、従来の Bulk API 1.0 と比較してフレームワークが大幅に簡素化されました。開発者はバッチの分割や管理を Salesforce プラットフォームに任せることができ、ジョブの作成、データアップロード、完了通知の受け取りという、よりシンプルなステップで統合処理を実装できます。これにより、開発工数を削減しつつ、信頼性の高いデータ連携を実現できます。

主な応用シーン:

  • 初期データ移行:レガシーシステムから Salesforce へ、数百万件の顧客データや商談データを一度に移行する。
  • 定期的データ同期:毎晩、ERP から最新の商品マスタや在庫情報を Salesforce に同期する。
  • データクレンジング:外部のクレンジングサービスで処理した後の大量の補正済みデータを Salesforce に書き戻す。
  • データアーカイブ:活動履歴やケースなど、古くなったレコードをコンプライアンス目的で外部ストレージに一括エクスポートする。

原理説明

Bulk API 2.0 は RESTful なアーキテクチャに基づいており、その処理フローは非常に直感的です。ここでは、データを取り込む Ingest (インジェスト) ジョブを例に、一連の流れを解説します。

  1. ジョブの作成 (Create a Job):
    まず、どのような操作をどのオブジェクトに対して行うかを定義した「ジョブ」を作成します。HTTP POST リクエストを /services/data/vXX.X/jobs/ingest エンドポイントに送信します。リクエストボディには、対象オブジェクト (例: `Account`)、操作種別 (Operation) (例: `insert`, `update`, `upsert`, `delete`, `hardDelete`)、データ形式 (`CSV`) などを指定します。Salesforce はこのリクエストを受け取ると、一意のジョブ ID と、データをアップロードするための専用 URL をレスポンスとして返します。

  2. データのアップロード (Upload Data):
    次に、ステップ 1 で受け取った専用 URL に対して、CSV 形式のデータを HTTP PUT リクエストでアップロードします。この時点ではまだデータ処理は開始されません。Salesforce はアップロードされたデータを一時領域に保管します。

  3. ジョブのクローズ (Close the Job):
    データのアップロードが完了したら、ジョブの状態 (State) を `UploadComplete` に変更するよう Salesforce に通知します。これは、ジョブ ID を含むエンドポイントに対して、HTTP PATCH リクエストを送信することで行います。この通知をもって、Salesforce はバックグラウンドでの非同期処理を開始します。

  4. ジョブの監視 (Monitor the Job):
    ジョブの処理は非同期で行われるため、クライアントは定期的にジョブのステータスをポーリングして確認する必要があります。ジョブ情報エンドポイントに GET リクエストを送信すると、現在の状態 (`InProgress`, `JobComplete`, `Failed` など) を取得できます。

  5. 結果の取得 (Retrieve Results):
    ジョブの状態が `JobComplete` になったら、処理結果を取得できます。Bulk API 2.0 は、成功したレコード、失敗したレコード、未処理のレコードをそれぞれ個別のエンドポイントからダウンロードできるように提供します。特に失敗したレコードの結果ファイルには、エラー理由が詳細に記載されているため、デバッグに非常に役立ちます。

この一連の流れからわかるように、Bulk API 2.0 はバッチの管理を Salesforce に完全に委任します。我々統合エンジニアは、ジョブという大きな単位でデータを投入するだけでよく、内部的な並列処理や負荷分散はプラットフォームが最適化してくれます。これは、Bulk API 1.0 のように手動でバッチを作成・管理する必要があった従来の方法と比較して、大きな進歩と言えます。


示例代码

ここでは、cURL を使用して取引先 (Account) を新規作成する一連の API コールを示します。これらの例は Salesforce Developer の公式ドキュメントに基づいています。

1. ジョブの作成 (Create a Job)

まず、取引先を `insert` するためのジョブを作成します。リクエストボディで操作内容を定義します。

curl -X POST \
https://MyDomainName.my.salesforce.com/services/data/v58.0/jobs/ingest \
-H "Authorization: Bearer 00D......." \
-H "Content-Type: application/json; charset=UTF-8" \
-H "Accept: application/json" \
-d '{
  "object" : "Account",
  "contentType" : "CSV",
  "operation" : "insert",
  "lineEnding" : "LF"
}'

解説:
上記のリクエストが成功すると、Salesforce は以下のような JSON レスポンスを返します。この中の `id` がジョブ ID であり、`contentUrl` が次にデータをアップロードする先のエンドポイントになります。

{
  "id": "750R0000000zLL3IAM",
  "operation": "insert",
  "object": "Account",
  "createdById": "005R0000000g9gFIAQ",
  "createdDate": "2023-11-20T10:00:00.000+0000",
  "systemModstamp": "2023-11-20T10:00:00.000+0000",
  "state": "Open",
  "concurrencyMode": "Parallel",
  "contentType": "CSV",
  "apiVersion": 58.0,
  "contentUrl": "services/data/v58.0/jobs/ingest/750R0000000zLL3IAM/batches",
  "lineEnding": "LF",
  "columnDelimiter": "COMMA"
}

2. ジョブデータのアップロード (Upload Job Data)

次に、先ほど取得した `contentUrl` に対して、CSV データを PUT メソッドでアップロードします。CSV のヘッダー行は Salesforce の項目 API 名と一致させる必要があります。

curl -X PUT \
https://MyDomainName.my.salesforce.com/services/data/v58.0/jobs/ingest/750R0000000zLL3IAM/batches \
-H "Authorization: Bearer 00D......." \
-H "Content-Type: text/csv" \
-d 'Name,Industry,NumberOfEmployees
Sample Account 1,Biotechnology,100
Sample Account 2,Energy,500'

解説:
このリクエストが成功すると、HTTP ステータスコード `201 Created` が返され、データの受け付けが完了したことを意味します。

3. ジョブのクローズ (Close the Job)

データのアップロードが完了したら、ジョブの状態を `UploadComplete` に変更して、Salesforce に処理開始を依頼します。

curl -X PATCH \
https://MyDomainName.my.salesforce.com/services/data/v58.0/jobs/ingest/750R0000000zLL3IAM \
-H "Authorization: Bearer 00D......." \
-H "Content-Type: application/json; charset=UTF-8" \
-H "Accept: application/json" \
-d '{
  "state" : "UploadComplete"
}'

解説:
このリクエストが成功すると、`state` が `UploadComplete` に更新されたジョブ情報が返されます。これ以降、Salesforce 内部で非同期処理が開始されます。

4. ジョブ情報の確認 (Check Job Info)

ジョブの処理状況を確認するために、ジョブ情報エンドポイントに GET リクエストを送信します。

curl -X GET \
https://MyDomainName.my.salesforce.com/services/data/v58.0/jobs/ingest/750R0000000zLL3IAM/ \
-H "Authorization: Bearer 00D......."

解説:
レスポンス内の `state` を確認します。`InProgress` であれば処理中、`JobComplete` であれば完了、`Failed` であればジョブ全体が失敗したことを示します。また、`numberRecordsProcessed` や `numberRecordsFailed` といったフィールドで進捗状況も確認できます。

5. 成功結果の取得 (Get Successful Results)

ジョブが `JobComplete` になったら、成功したレコードの結果を取得します。結果には Salesforce が自動採番したレコード ID が含まれます。

curl -X GET \
https://MyDomainName.my.salesforce.com/services/data/v58.0/jobs/ingest/750R0000000zLL3IAM/successfulResults/ \
-H "Authorization: Bearer 00D......."

解説:
レスポンスは CSV 形式で返されます。`sf__Id` 列に作成されたレコードの ID が、`sf__Created` 列に作成フラグが含まれます。同様に、失敗したレコードは `failedResults/` エンドポイントから取得でき、その CSV にはエラーメッセージを含む `sf__Error` 列が追加されています。


注意事項

Bulk API 2.0 を利用する上で、以下の点に注意する必要があります。

  • 権限 (Permissions):
    API を実行するユーザには、「API の有効化」システム権限が必要です。また、対象オブジェクトに対する適切な作成・参照・更新・削除権限 (CRUD) も必要です。`hardDelete` 操作を行う場合は「Bulk API ハードデリート」権限が追加で必要になります。

  • API 制限 (API Limits):
    Bulk API 2.0 には、Salesforce 組織の健全性を保つためのガバナ制限が存在します。
    • 24時間あたりのレコード数: 組織のエディションとライセンス数に応じて、24時間以内に処理できるレコード総数に上限があります (例: Enterprise Edition では最大1億レコード)。
    • ジョブあたりのファイルサイズ: アップロードする CSV ファイルのサイズは 150MB 未満である必要があります。
    • 1日のジョブ数: 24時間以内に作成できるジョブの数にも 100,000 件という上限があります。
    これらの制限は変更される可能性があるため、常に最新の Salesforce Developer ドキュメントを確認してください。

  • エラー処理 (Error Handling):
    ジョブが完了したら、必ず `failedResults/` エンドポイントを確認し、失敗したレコードがないかチェックするべきです。失敗理由 (入力規則違反、必須項目欠落など) を分析し、データを修正して再実行するためのロジックを統合ツール側に実装することが重要です。

  • トランザクション管理 (Transaction Management):
    Bulk API は、アップロードされたデータを内部的に複数のチャンク (塊) に分割して処理します。各チャンクは個別のトランザクションとして扱われます。つまり、1つのジョブに含まれる全レコードが成功するか、全レコードが失敗するかの「All or None」の動作は保証されません。一部のレコードが成功し、一部が失敗する可能性があります。この点を理解した上で、データの一貫性を保つための設計を行う必要があります。

まとめとベストプラクティス

Salesforce Bulk API 2.0 は、大規模データ統合の複雑さを大幅に軽減し、開発者が本来のビジネスロジックに集中できるようにする強力なツールです。そのシンプルなワークフローとプラットフォームによる自動最適化は、現代の Salesforce 開発・統合プロジェクトにおいて不可欠な要素です。

最後に、統合エンジニアとしてのベストプラクティスをいくつか共有します。

  • 適切な API の選択:
    数千件を超えるデータを扱う場合は Bulk API 2.0 が最適です。しかし、数百件程度のリアルタイムに近い処理が求められる場合は、標準の REST API (Composite API など) の方がレイテンシが低く適している場合があります。要件に応じて使い分けましょう。

  • データの前処理:
    Salesforce にアップロードする前に、可能な限りデータをクレンジング・検証しておくことが重要です。これにより、処理の失敗率が劇的に低下し、手戻りやデバッグの工数を削減できます。

  • クエリジョブの活用:
    データ抽出 (Query) でも Bulk API 2.0 は非常に強力です。数百万件のレコードをエクスポートする際、SOAP API の `queryMore()` をループさせるよりも遥かに効率的です。Salesforce が自動的に結果セットを分割し、複数のファイルとしてダウンロードリンクを提供してくれます。

  • 堅牢な監視とリトライ機構の実装:
    本番環境の連携処理では、ジョブのステータスを自動で監視し、失敗時には管理者に通知する仕組みを必ず構築してください。また、一時的なネットワークエラーなどで失敗した場合に備え、インテリジェントなリトライ (再試行) ロジックを組み込むことを推奨します。

Bulk API 2.0 を正しく理解し活用することで、スケーラブルで信頼性の高い Salesforce データ統合基盤を構築することが可能です。本記事がその一助となれば幸いです。

コメント