背景と適用シナリオ
Salesforce インテグレーションエンジニアとして、私が日々直面する最も大きな課題の一つは、大規模なデータセットをいかに効率的かつ確実に Salesforce との間でやり取りするかということです。企業の基幹システム(ERP)からのマスターデータ同期、データウェアハウスへの実績データのエクスポート、あるいは新規 Salesforce 組織への初期データ移行など、何十万、何百万ものレコードを扱うシナリオは珍しくありません。
このような大規模データボリューム(Large Data Volumes, LDV)を扱う際に、標準の REST API や SOAP API のような同期的な API を使用すると、API コール数の制限、タイムアウト、そしてパフォーマンスの低下といった問題に直面します。レコードを一つずつ処理する方法では、現実的な時間内に処理を完了させることが困難です。
ここで登場するのが Bulk API です。Bulk API は、大量のレコードを非同期で処理するために特別に設計された RESTful API です。Salesforce は、この Bulk API のバージョンとして 1.0 と 2.0 を提供していますが、現在では Bulk API 2.0 の利用が強く推奨されています。Bulk API 1.0 がバッチの作成と管理を開発者側で行う必要があったのに対し、Bulk API 2.0 はこのプロセスを大幅に簡素化しました。データをアップロードするだけで、Salesforce が自動的に最適なバッチ(チャンク)に分割し、並列処理を実行してくれます。これにより、開発者はインテグレーションロジックそのものに集中できるようになりました。
主な適用シナリオ
- 初期データ移行:レガシーシステムから Salesforce へ、数百万件のアカウント、取引先責任者、商談レコードを移行する。
- 定期的なデータ同期:毎晩、ERP システムから最新の製品マスタや価格表を Salesforce に同期する。
- データクレンジング:Salesforce 内の重複レコードや古いデータを一括で更新・削除する。
- データアーカイブ:活動履歴やケースなど、古くなったレコードを外部ストレージにエクスポートしてアーカイブする。
この記事では、Salesforce インテグレーションエンジニアの視点から、Bulk API 2.0 の原理、具体的な使用方法、そして実践的な注意事項について詳しく解説していきます。
原理説明
Bulk API 2.0 のアーキテクチャは、シンプルさと効率性を追求して設計されています。その中核をなすのは「ジョブ(Job)」という概念です。データロードのプロセス全体が、一つのジョブとして管理されます。以下に、Bulk API 2.0 を利用した一般的なデータ挿入(insert)オペレーションのワークフローをステップごとに説明します。
1. ジョブの作成 (Create a Job)
まず、どのようなデータを、どのオブジェクトに対して、どのような操作(挿入、更新、アップサート、削除など)を行いたいかを定義した「ジョブ」を作成します。これは、Salesforce の /services/data/vXX.X/jobs/ingest
エンドポイントに対して POST リクエストを送信することで行います。リクエストボディには、対象オブジェクト(例:Account
)、操作タイプ(例:insert
)、データ形式(CSV
)などの情報を含めます。
Salesforce はこのリクエストを受け取ると、ジョブを一意に識別するための Job ID と、データをアップロードするための専用 URL (contentUrl
) をレスポンスとして返します。
2. データのアップロード (Upload Data)
次に、ステップ 1 で取得した contentUrl
に対して、処理したいレコードデータを含む CSV ファイルの内容を PUT リクエストで送信します。この時点では、データはまだ Salesforce の一時領域にアップロードされるだけで、実際のレコード処理は開始されません。ファイルサイズは最大 150 MB まで許容されます。
3. ジョブのクローズ (Close the Job)
データのアップロードが完了したら、Salesforce に処理を開始するよう指示を出す必要があります。これは、ジョブのエンドポイント (/services/data/vXX.X/jobs/ingest/{jobId}
) に対して、ジョブの状態を UploadComplete
に変更する PATCH リクエストを送信することで行います。このリクエストが受理されると、Salesforce はアップロードされた CSV データをキューに追加し、内部で自動的に最適なサイズのチャンクに分割(チャンキング)し、並列処理を開始します。
4. ジョブステータスの監視 (Monitor Job Status)
ジョブは非同期で処理されるため、即座に完了するわけではありません。インテグレーションクライアントは、定期的にジョブ情報のエンドポイント (/services/data/vXX.X/jobs/ingest/{jobId}
) に GET リクエストを送信し、ジョブのステータスをポーリング(確認)する必要があります。ジョブのステータスは、Open
→ UploadComplete
→ InProgress
→ JobComplete
(成功)、Failed
(失敗)、または Aborted
(中止)と遷移していきます。
5. 結果の取得 (Retrieve Results)
ジョブのステータスが JobComplete
または Failed
になったら、処理結果を取得できます。Bulk API 2.0 は、結果を取得するための 3 つの専用エンドポイントを提供しています。
/jobs/ingest/{jobId}/successfulResults/
: 正常に処理されたレコードのリストを取得します。/jobs/ingest/{jobId}/failedResults/
: 何らかの理由で処理に失敗したレコードのリストと、そのエラー理由を取得します。/jobs/ingest/{jobId}/unprocessedRecords/
: サーバーエラーなどで処理されなかったレコードのリストを取得します。
サンプルコード
ここでは、cURL を用いて Bulk API 2.0 を操作する一連のコード例を示します。これらの例は Salesforce 公式ドキュメントに基づいています。
1. ジョブの作成 (Account オブジェクトへの挿入ジョブ)
まず、取引先 (Account) を挿入するためのジョブを作成します。
# YOUR_ACCESS_TOKEN は有効な OAuth 2.0 アクセストークンに置き換えてください # MyDomainName.my.salesforce.com は自身の組織のドメインに置き換えてください # vXX.X は使用する API バージョン (例: v58.0) に置き換えてください curl --location --request POST 'https://MyDomainName.my.salesforce.com/services/data/vXX.X/jobs/ingest' \ --header 'Authorization: Bearer YOUR_ACCESS_TOKEN' \ --header 'Content-Type: application/json; charset=UTF-8' \ --header 'Accept: application/json' \ --data-raw '{ "object" : "Account", "contentType" : "CSV", "operation" : "insert", "lineEnding" : "LF" }'
注释 (コメント):
object
: 操作対象の Salesforce オブジェクトを指定します。contentType
: アップロードするデータの形式。Bulk API 2.0 ではCSV
が標準です。operation
: 実行する DML 操作 (insert
,update
,upsert
,delete
,hardDelete
) を指定します。lineEnding
: CSV ファイルの改行コードを指定します。LF
(Line Feed) またはCRLF
(Carriage Return Line Feed) が選択できます。一般的にはLF
が推奨されます。
id
(Job ID) と contentUrl
を後続のステップで使用します。
2. データのアップロード
次に、作成したジョブに対して CSV データをアップロードします。ここでは account-data.csv
というファイルにデータが記述されていると仮定します。
# contentUrl はステップ1で返却された URL に置き換えてください # account-data.csv はアップロードする CSV ファイルのパスを指定してください curl --location --request PUT 'contentUrl' \ --header 'Authorization: Bearer YOUR_ACCESS_TOKEN' \ --header 'Content-Type: text/csv' \ --data-binary '@account-data.csv'
account-data.csv の内容例:
Name,AnnualRevenue,Industry "Test Account 1",1000000,"Technology" "Test Account 2",5000000,"Finance"注释 (コメント):
Content-Type: text/csv
: ヘッダーでデータ形式が CSV であることを明示します。--data-binary '@account-data.csv'
: ファイルの内容をリクエストボディとして送信します。
3. ジョブのクローズ
データアップロード後、ジョブをクローズして Salesforce に処理を開始させます。
# jobId はステップ1で返却された ID に置き換えてください curl --location --request PATCH 'https://MyDomainName.my.salesforce.com/services/data/vXX.X/jobs/ingest/jobId' \ --header 'Authorization: Bearer YOUR_ACCESS_TOKEN' \ --header 'Content-Type: application/json; charset=UTF-8' \ --header 'Accept: application/json' \ --data-raw '{ "state" : "UploadComplete" }'
注释 (コメント):
state: "UploadComplete"
を指定することで、Salesforce の処理キューにジョブが投入されます。
4. ジョブステータスの取得
ジョブの進行状況を確認します。
# jobId はステップ1で返却された ID に置き換えてください curl --location --request GET 'https://MyDomainName.my.salesforce.com/services/data/vXX.X/jobs/ingest/jobId' \ --header 'Authorization: Bearer YOUR_ACCESS_TOKEN' \ --header 'Accept: application/json'
レスポンス内の state
フィールドの値が JobComplete
になるまで、一定間隔でこのリクエストを繰り返します。
注意事項
権限 (Permissions)
Bulk API 2.0 を利用するユーザーには、適切な権限が必要です。
- API の有効化 (API Enabled): プロファイルまたは権限セットでこのシステム権限が必須です。
- オブジェクト・項目レベルのセキュリティ: ユーザーは対象オブジェクトへの作成、参照、更新、削除権限と、アクセスする項目への参照・編集権限を持っている必要があります。Bulk API はこれらの権限設定をすべて順守します。
- 一括 API の物理削除 (Bulk API Hard Delete):
hardDelete
操作を実行する場合は、この追加の権限が必要です。この権限を持つユーザーは、ごみ箱を経由せずにレコードを物理削除できてしまうため、付与には細心の注意が必要です。
API 制限 (API Limits)
Bulk API 2.0 は大規模データ処理に最適化されていますが、無制限ではありません。主要な制限を理解しておくことが重要です。
- レコード処理数: 24時間あたりに処理できるレコード数には上限があります。例えば、Enterprise Edition では 1億5000万レコードが上限です。この上限は組織のエディションによって異なります。
- ジョブ作成数: 24時間あたりに作成できるジョブの数は 100,000 件に制限されています。
- ファイルサイズ: アップロードする CSV ファイルのサイズは 150MB までです。
- ジョブのタイムアウト: ジョブが 24 時間以上キューイングされた状態が続くと、Salesforce によって中止される場合があります。
エラーハンドリング (Error Handling)
インテグレーションを堅牢にするためには、エラーハンドリングが不可欠です。ジョブが完了したら、必ず failedResults
エンドポイントを確認し、失敗したレコードがないかチェックしてください。
失敗結果の CSV ファイルには、元のデータに加えて sf__Error
という列が追加されています。この列には、「REQUIRED_FIELD_MISSING(必須項目がありません)」や「DUPLICATE_VALUE(重複した値です)」といった具体的なエラーメッセージが記載されています。
エラー内容に応じて、データを修正して再実行する、あるいは手動で対応するといったロジックをインテグレーション側に実装する必要があります。特に、一時的なエラー(例:ロック競合)の可能性を考慮し、リトライメカニズムを組み込むことも有効です。
まとめとベストプラクティス
Bulk API 2.0 は、Salesforce との間に発生する大規模データ連携の課題を解決するための、非常に強力でシンプルなツールです。自動的なチャンキングと並列処理により、インテグレーションエンジニアは複雑なバッチ管理から解放され、ビジネスロジックの実装に専念できます。
最後に、Bulk API 2.0 を最大限に活用するためのベストプラクティスをいくつか紹介します。
- 可能な限り Bulk API 2.0 を選択する: XML 形式のサポートや大規模クエリなど、Bulk API 1.0 でしか実現できない特定の要件がない限り、常にシンプルで効率的な Bulk API 2.0 を優先して使用すべきです。
- API 使用状況を監視する: [設定] > [一括データ読み込みジョブ] ページや、
/services/data/vXX.X/limits
REST API リソースを活用して、日々の API 制限の消費状況を常に把握し、制限超過のリスクを管理してください。 - アップロード前にデータを検証する: Salesforce にアップロードする前に、データ側で型チェック、必須項目の存在チェック、フォーマットの統一などの事前クレンジングを行うことで、Salesforce 側でのバリデーションルールエラーを最小限に抑え、処理成功率を高めることができます。
- 賢明なポーリング戦略を実装する: ジョブステータスを監視する際、固定間隔で頻繁にポーリングすると API コールを無駄に消費します。最初は短い間隔で、徐々に間隔を広げていく「エクスポネンシャルバックオフ」のような戦略を採用することを推奨します。
- NULL 値の扱いに注意する: 既存のレコードの項目値を NULL に更新したい場合、CSV ファイルでその項目値に
#N/A
という特殊な値を指定する必要があります。空のままにすると、その項目は更新対象から無視されます。
これらの原理とベストプラクティスを理解し、適切に適用することで、Salesforce を中心としたデータエコシステムをより堅牢かつスケーラブルに構築することができるでしょう。
コメント
コメントを投稿