Salesforce Bulk API 徹底解説:開発者のための大容量データ処理ガイド

背景と適用シーン

Salesforce開発者として、私たちは日々様々なデータ操作に直面します。少量のレコードを作成・更新する場合、標準の REST APISOAP API は非常に効果的です。しかし、数万、数十万、あるいは数百万件のレコードを一度に処理する必要がある場合、これらの同期的なAPIでは性能の限界やガバナ制限(Governor Limits)の問題に直面します。例えば、1回のAPIコールで処理できるレコード数には限りがあり、大量のデータを処理しようとすると、多数のAPIコールが必要となり、結果として時間とAPIコール数を大量に消費してしまいます。

このような課題を解決するために設計されたのが Bulk API です。Bulk APIは、大量のデータを効率的に、かつ非同期(Asynchronous)で処理するために最適化されています。主な適用シーンは以下の通りです。

  • データ移行(Data Migration): レガシーシステムからSalesforceへ、あるいは他のSalesforce組織から大量の初期データを移行するケース。
  • 初期データロード(Initial Data Load): 新しいアプリケーションを導入する際に、マスタデータやトランザクションデータを一括で投入するケース。
  • 定期的なデータ同期(Periodic Data Synchronization): ERPやデータウェアハウスといった外部システムとSalesforce間で、夜間バッチなどを利用して大量のデータを定期的に同期するケース。
  • データのアーカイブと削除(Data Archiving and Deletion): 古くなったレコードを大量に外部システムへエクスポートしたり、一括で物理削除したりするケース。

Bulk APIを利用することで、開発者はAPIコール数を大幅に削減し、Salesforceのガバナ制限を効率的に回避しながら、大規模なデータセットを安定して処理することが可能になります。この記事では、特にモダンでシンプルな Bulk API 2.0 を中心に、その仕組みと具体的な利用方法を開発者視点で解説します。


原理説明

Bulk APIの核心は、その非同期(Asynchronous)処理モデルにあります。標準のREST APIがリクエストを送信すると即座に処理結果を返す同期的(Synchronous)な動作であるのに対し、Bulk APIはまず「ジョブ(Job)」を作成し、データをアップロードした後、Salesforceがバックグラウンドで処理を行います。開発者はジョブのステータスを定期的に確認し、処理が完了したら結果をダウンロードするという流れになります。

Bulk APIにはバージョン1.0と2.0が存在しますが、現在ではBulk API 2.0の使用が推奨されています。

Bulk API 1.0

Bulk API 1.0では、開発者が手動でデータを小さな「バッチ(Batch)」に分割してアップロードする必要がありました。フローは「ジョブの作成」→「バッチの作成とデータアップロード(複数可)」→「ジョブのクローズ」→「バッチ結果の確認」というステップを踏み、より細かい制御が可能でしたが、その分実装が複雑でした。XML形式のデータもサポートしていました。

Bulk API 2.0

Bulk API 2.0は、このプロセスを大幅に簡素化しました。RESTfulなアプローチを採用し、CSV形式のデータのみをサポートします。開発者はデータを分割する必要がなく、CSVファイル全体を一度にアップロードするだけです。Salesforceが内部で自動的にデータを最適なサイズのチャンクに分割し、並列処理(Parallel Processing)を行ってくれます。これにより、開発者は煩雑なバッチ管理から解放され、よりシンプルに大量のデータ処理を実装できます。

Bulk API 2.0の基本的な処理フローは以下の通りです。

  1. ジョブの作成(Create a Job): insert, update, upsert, delete, query といった操作と対象オブジェクトを指定して、データ処理のジョブを作成します。このリクエストへのレスポンスとして、ジョブIDとデータアップロード用のURLが返されます。
  2. データのアップロード(Upload Job Data): ステップ1で取得したURLに対して、処理したいデータを含むCSVファイルをHTTP PUT リクエストでアップロードします。
  3. ジョブのクローズ(Close the Job): データのアップロードが完了したことをSalesforceに通知するため、ジョブのステータスを UploadComplete に更新します。これにより、Salesforceはデータの処理を開始します。
  4. ジョブステータスの確認(Check Job Status): ジョブIDを使って定期的にジョブのステータスをポーリングします。ステータスが JobComplete または Failed になるまで待機します。
  5. 結果の取得(Get Job Results): 処理が完了したら、成功したレコードの結果、失敗したレコードの結果、そして未処理のレコードの結果をそれぞれ対応するエンドポイントからダウンロードします。

このシンプルなフローにより、Bulk API 2.0は大規模データ連携の実装を容易にし、開発者の生産性を向上させます。


サンプルコード

ここでは、Bulk API 2.0を使用して取引先(Account)オブジェクトに新しいレコードを挿入する例を見ていきましょう。以下の例では、cURLコマンドを使用していますが、任意のHTTPクライアント(Java, Python, Node.jsなど)で同様のリクエストを実装できます。

ステップ1: ジョブの作成

まず、取引先を挿入するためのジョブを作成します。HTTP POST リクエストを /services/data/vXX.X/jobs/ingest エンドポイントに送信します。

# cURL command to create a Bulk API 2.0 job
# -X POST: HTTP POSTメソッドを指定
# -H "Authorization: Bearer YOUR_SESSION_ID": 認証用のセッションID(アクセストークン)
# -H "Content-Type: application/json; charset=UTF-8": リクエストボディがJSON形式であることを指定
# -H "Accept: application/json": レスポンス形式としてJSONを要求
# -d @job.json: リクエストボディの内容をjob.jsonファイルから読み込む

curl https://YOUR_INSTANCE.salesforce.com/services/data/v58.0/jobs/ingest -X POST -H "Authorization: Bearer YOUR_SESSION_ID" -H "Content-Type: application/json; charset=UTF-8" -H "Accept: application/json" -d @job.json

リクエストボディとなる job.json ファイルの内容は以下の通りです。

{
  "object" : "Account",
  "contentType" : "CSV",
  "operation" : "insert",
  "lineEnding" : "LF"
}

コードの解説:

  • object: 処理対象のSalesforceオブジェクト(この場合は `Account`)。
  • contentType: アップロードするデータの形式。Bulk API 2.0では `CSV` を指定します。
  • operation: 実行するデータ操作(`insert`, `update`, `upsert`, `delete`, `hardDelete`, `query`, `queryAll`)。
  • lineEnding: CSVデータの改行コードを指定します。`LF` (Line Feed) または `CRLF` (Carriage Return Line Feed) を指定できます。

このリクエストが成功すると、Salesforceから以下のようなJSONレスポンスが返ってきます。

{
  "id": "750R0000000z0fIIAQ",
  "operation": "insert",
  "object": "Account",
  "createdById": "005R0000000iB0gIAE",
  "createdDate": "2023-11-20T10:00:00.000+0000",
  "systemModstamp": "2023-11-20T10:00:00.000+0000",
  "state": "Open",
  "concurrencyMode": "Parallel",
  "contentType": "CSV",
  "apiVersion": 58.0,
  "contentUrl": "jobs/ingest/750R0000000z0fIIAQ/batches",
  "lineEnding": "LF",
  "columnDelimiter": "COMMA"
}

このレスポンスに含まれる id (ジョブID) と contentUrl (データアップロード用URL) を次のステップで使用します。

ステップ2: CSVデータのアップロード

次に、ステップ1で取得した contentUrl に対して、CSVデータを PUT メソッドでアップロードします。

# cURL command to upload CSV data to the job
# -X PUT: HTTP PUTメソッドを指定
# -H "Content-Type: text/csv": アップロードするデータがCSV形式であることを指定
# -T "request.csv": アップロードするファイル名を指定

curl https://YOUR_INSTANCE.salesforce.com/services/data/v58.0/jobs/ingest/750R0000000z0fIIAQ/batches -X PUT -H "Authorization: Bearer YOUR_SESSION_ID" -H "Content-Type: text/csv" -T "request.csv"

アップロードする request.csv ファイルの内容は以下の通りです。1行目はヘッダー行で、API参照名と一致させる必要があります。

Name,Industry,NumberOfEmployees
Bulk API Account 1,Technology,500
Bulk API Account 2,Finance,1200

ステップ3: ジョブのクローズ

データのアップロードが完了したら、ジョブのステータスを `UploadComplete` に変更して、Salesforceに処理を開始するよう伝えます。これはジョブIDに対して PATCH リクエストを送信することで行います。

# cURL command to close the job
# -X PATCH: HTTP PATCHメソッドを指定
# -d '{"state" : "UploadComplete"}': リクエストボディでジョブの状態を更新

curl https://YOUR_INSTANCE.salesforce.com/services/data/v58.0/jobs/ingest/750R0000000z0fIIAQ -X PATCH -H "Authorization: Bearer YOUR_SESSION_ID" -H "Content-Type: application/json; charset=UTF-8" -d '{"state" : "UploadComplete"}'

このリクエストが成功すると、ジョブのステータスが `UploadComplete` に更新され、Salesforceがバックグラウンドでデータの処理を開始します。この後のステップでは、ジョブのステータスをポーリングし、完了後に結果を取得します。


注意事項

Bulk APIを効果的かつ安全に利用するためには、いくつかの重要な点に注意する必要があります。

権限(Permissions)

  • APIの有効化: Bulk APIを利用するユーザーのプロファイルまたは権限セットで「APIの有効化(API Enabled)」権限が必要です。
  • オブジェクト権限: 処理対象のオブジェクトに対する適切なCRUD(作成、参照、更新、削除)権限が必要です。
  • 一括APIのハードデリート: 物理削除(Hard Delete)操作を行う場合は、「一括APIのハードデリート(Bulk API Hard Delete)」権限が別途必要になります。

API制限(API Limits)

  • ジョブ数の制限: Bulk API 2.0では、24時間以内に作成できるジョブの数に制限があります。通常は100,000ジョブです。これは組織全体での制限です。
  • データ量の制限: 1つのジョブでアップロードできるデータは、ファイルサイズが100MBまでという制限があります。
  • 処理時間の制限: Salesforceはバッチの処理に最大2時間かけることがあります。それ以上時間がかかる場合、タイムアウトになる可能性があります。
  • APIコール数の消費: Bulk APIは標準APIとは異なり、レコード数ではなくバッチ単位やジョブ単位でAPIコールを消費するため、大量のデータを扱う際に非常に効率的です。しかし、ジョブの作成やステータス確認自体はAPIコールを消費しますので、過度なポーリングは避けるべきです。

エラー処理(Error Handling)

Bulk APIは非同期であるため、エラーハンドリングは非常に重要です。ジョブが完了(`JobComplete`)または失敗(`Failed`)した後、必ず結果ファイルを取得して、どのレコードが成功し、どのレコードが失敗したかを確認する必要があります。失敗したレコードの結果ファイル(`failedResults`)には、エラーメッセージが含まれているため、原因を特定してデータを修正し、再実行するための重要な情報源となります。

例えば、「REQUIRED_FIELD_MISSING」や「INVALID_FIELD_FOR_INSERT_UPDATE」といった一般的なエラーが返されます。堅牢な連携処理を構築するためには、これらのエラーを解析し、必要に応じて自動でリトライする仕組みを実装することが推奨されます。


まとめとベストプラクティス

Salesforce Bulk APIは、大規模なデータセットを扱う開発者にとって不可欠なツールです。その非同期処理モデルと効率的なリソース消費により、データ移行やシステム連携のパフォーマンスと信頼性を大幅に向上させることができます。

最後に、Bulk APIを最大限に活用するためのベストプラクティスをいくつか紹介します。

  1. Bulk API 2.0を優先的に使用する: ほとんどのユースケースにおいて、Bulk API 2.0は実装がシンプルで、Salesforceによる自動的な並列処理の恩恵を受けられるため、第一選択肢となります。XML形式のサポートや直列モード(Serial Mode)での処理が必要な場合に限り、Bulk API 1.0を検討します。
  2. 適切なデータチャンクサイズを考慮する: Bulk API 2.0は自動でチャンク分割を行いますが、1つのジョブで処理するデータが極端に大きい場合(数百万レコード以上)、ジョブを論理的に分割することを検討してください。これにより、エラー発生時の影響範囲を限定し、再処理を容易にすることができます。
  3. 並列処理を最大限に活用する: Bulk APIはデフォルトで並列モード(Parallel Mode)で動作します。これにより、複数のチャンクが同時に処理され、スループットが向上します。しかし、レコードロックの競合が発生しやすいオブジェクト(親レコードが同じ子レコードを大量に更新する場合など)では、デッドロックを避けるために直列モード(Serial Mode、Bulk API 1.0のみ)の使用を検討する必要があります。
  4. 堅牢なポーリングとエラーハンドリングを実装する: ジョブのステータスを確認するためのポーリングロジックは、指数バックオフ(Exponential Backoff)のような戦略を取り入れ、APIコールを無駄に消費しないように設計します。また、失敗したレコードは必ずログに記録し、修正・再処理のフローを確立しておくことが重要です。
  5. PK Chunkingを活用する: `query` 操作で数百万件以上の非常に大きなデータセットを抽出する場合、PK Chunking ヘッダーを有効にすることを検討してください。これにより、主キー(ID)の範囲に基づいてクエリが自動的に分割され、タイムアウトを回避しながら効率的にデータを抽出できます。

これらの原理とベストプラクティスを理解し、適切にBulk APIを設計・実装することで、Salesforceプラットフォーム上での大規模データ処理を成功に導くことができるでしょう。

コメント