背景と応用シナリオ
Salesforceは強力なCRMプラットフォームですが、その価値を最大限に引き出すには、外部システムとのデータ連携や大量データの適切な管理が不可欠です。数万、数百万レコードに及ぶ初期データ移行、基幹システムとの夜間バッチ同期、定期的なデータクレンジングなど、大規模なデータ操作は多くのプロジェクトで共通の課題となります。これらの課題に対処するためにSalesforceが提供しているのが、Bulk APIです。
Bulk APIは、大量のレコードを非同期で効率的に処理するために設計されたRESTfulなAPIです。特に、本記事で詳説するBulk API 2.0は、従来のBulk API 1.0を簡素化し、より直感的に利用できるよう改良されたバージョンです。1.0では開発者が手動でデータをバッチに分割(チャンキング)する必要がありましたが、2.0ではSalesforceが自動的に処理を最適化してくれるため、開発者はデータ準備とジョブの管理に集中できます。このシンプルさから、現在ではBulk API 2.0の利用が強く推奨されています。
主な応用シナリオ:
- 初期データ移行:レガシーシステムからSalesforceへ数百万件の取引先や商談データを移行する。
- システム連携:ERPやデータウェアハウス(DWH)とSalesforce間で、毎日数万件のデータを同期する。
- データアーカイブ:古くなった活動履歴やケースレコードを外部ストレージに移動し、Salesforceのパフォーマンスを維持する。
- データクレンジング:外部の名寄せツールでクレンジングした結果を、一括でSalesforceに反映させる。
原理説明
Bulk API 2.0のアーキテクチャは、非同期処理とRESTの原則に基づいています。ユーザーは直接レコードを1件ずつ送信するのではなく、「ジョブ(Job)」という単位で処理をリクエストします。Salesforceはリクエストを受け付けた後、バックグラウンドでデータを処理し、完了後に結果を返します。この非同期モデルにより、クライアントは長時間待機する必要がなく、大規模な処理を安定して実行できます。
Bulk API 2.0の基本的なワークフローは、以下のステップで構成されます。
1. ジョブの作成 (Create a Job)
まず、どのような操作(insert
, update
, upsert
, delete
, hardDelete
)をどのオブジェクト(例:Account
)に対して行うかを定義した「ジョブ」を作成します。このリクエストを送信すると、SalesforceはジョブIDと、データアップロード用の専用URLを返却します。
2. データのアップロード (Upload Data)
次に、作成されたジョブの専用URLに対して、処理対象のデータをCSV形式でアップロードします。データはHTTP PUTリクエストのボディとして送信します。この時点ではまだデータの処理は開始されません。
3. ジョブのクローズ (Close the Job)
データのアップロードが完了したら、ジョブの状態を「UploadComplete
」に変更するリクエストを送信します。これにより、Salesforceはキューイングされたデータの処理を開始します。
4. ジョブステータスの監視 (Monitor Job Status)
ジョブは非同期で処理されるため、クライアントは定期的にジョブIDを使ってステータスを問い合わせる(ポーリングする)必要があります。ジョブのステータスは、Open
→ UploadComplete
→ InProgress
→ JobComplete
(またはFailed
)のように遷移します。
5. 結果の取得 (Retrieve Results)
ジョブのステータスがJobComplete
になったら、処理結果を取得できます。結果は「成功したレコード」「失敗したレコード」「未処理のレコード」の3種類に分かれており、それぞれ専用のエンドポイントからCSV形式でダウンロードできます。失敗したレコードの結果には、エラーメッセージが含まれており、原因の特定に役立ちます。
サンプルコード
ここでは、cURLコマンドを使って取引先(Account)レコードを新規登録(insert)する一連の流れを、Salesforce公式ドキュメントに準拠した形で示します。vXX.X
はAPIバージョン(例:v58.0
)、YOUR_INSTANCE
はSalesforceインスタンス名(例:MyDomainName.my.salesforce.com
)、YOUR_SESSION_ID
は有効なアクセストークンに置き換えてください。
ステップ1:ジョブの作成 (Insert)
取引先オブジェクトにレコードを挿入するためのジョブを作成します。lineEnding
はCSVの改行コードを指定しており、LF
(Line Feed)が推奨されます。
curl -X POST https://YOUR_INSTANCE/services/data/vXX.X/jobs/ingest \ -H "Authorization: Bearer YOUR_SESSION_ID" \ -H "Content-Type: application/json; charset=UTF-8" \ -H "Accept: application/json" \ -d '{ "object" : "Account", "contentType" : "CSV", "operation" : "insert", "lineEnding" : "LF" }'
レスポンス (抜粋):
SalesforceはジョブID(id
)とデータアップロード用URL(contentUrl
)を返します。これらの値は後続のステップで使用します。
{ "id": "750R0000000z0f9IAA", "operation": "insert", "object": "Account", "createdById": "005R0000000j1bFIAQ", "createdDate": "2023-11-20T10:00:00.000+0000", "systemModstamp": "2023-11-20T10:00:00.000+0000", "state": "Open", "concurrencyMode": "Parallel", "contentType": "CSV", "apiVersion": 58.0, "contentUrl": "services/data/v58.0/jobs/ingest/750R0000000z0f9IAA/batches", "lineEnding": "LF", "columnDelimiter": "COMMA" }
ステップ2:CSVデータのアップロード
ステップ1で取得したcontentUrl
に対して、CSVデータをPUTリクエストでアップロードします。CSVのヘッダー行には、オブジェクトのAPI参照名を正確に記述する必要があります。
curl -X PUT https://YOUR_INSTANCE/services/data/vXX.X/jobs/ingest/750R0000000z0f9IAA/batches \ -H "Authorization: Bearer YOUR_SESSION_ID" \ -H "Content-Type: text/csv" \ -H "Accept: application/json" \ --data-binary @request.csv
request.csv ファイルの内容:
Name,Description,Phone "Test Account 1","Created via Bulk API 2.0","111-222-3333" "Test Account 2","Another account created via Bulk API 2.0","444-555-6666"
このリクエストが成功すると、ステータスコード 201 Created
が返されます。
ステップ3:ジョブのクローズ
データのアップロードが完了したことをSalesforceに通知し、処理を開始させます。
curl -X PATCH https://YOUR_INSTANCE/services/data/vXX.X/jobs/ingest/750R0000000z0f9IAA \ -H "Authorization: Bearer YOUR_SESSION_ID" \ -H "Content-Type: application/json; charset=UTF-8" \ -H "Accept: application/json" \ -d '{ "state" : "UploadComplete" }'
レスポンスでジョブのstate
がUploadComplete
に更新されたことが確認できます。
ステップ4:ジョブ情報の確認
ジョブの処理状況を定期的に確認します。state
がJobComplete
になるまでポーリングを続けます。
curl -X GET https://YOUR_INSTANCE/services/data/vXX.X/jobs/ingest/750R0000000z0f9IAA \ -H "Authorization: Bearer YOUR_SESSION_ID" \ -H "Accept: application/json"
完了時のレスポンス (抜粋):
state
がJobComplete
になり、処理されたレコード数(numberRecordsProcessed
)や失敗したレコード数(numberRecordsFailed
)が確認できます。
{ "id": "750R0000000z0f9IAA", "state": "JobComplete", "numberRecordsProcessed": 2, "numberRecordsFailed": 0, ... }
ステップ5:成功/失敗結果の取得
処理が完了したら、結果を取得します。ここでは失敗したレコードの結果を取得する例を示します。成功した結果を取得する場合は、パスを/successfulResults/
に変更します。
curl -X GET https://YOUR_INSTANCE/services/data/vXX.X/jobs/ingest/750R0000000z0f9IAA/failedResults/ \ -H "Authorization: Bearer YOUR_SESSION_ID" \ -H "Accept: text/csv"
失敗時のレスポンス例 (CSV形式):
失敗したレコードには、元のデータに加えてsf__Error
とsf__Id
の列が付与されます。sf__Error
列に具体的なエラー理由が記載されます。
"sf__Id","sf__Error","Name","Description","Phone" "","REQUIRED_FIELD_MISSING:Required fields are missing: [Name]","","Missing Name field",""
注意事項
Bulk API 2.0を安定して運用するためには、権限、API制限、エラー処理について正しく理解しておく必要があります。
権限設定
- API Enabled: APIを利用するユーザーのプロファイルまたは権限セットで、「API 有効」権限が必要です。
- オブジェクト権限: ジョブで操作するオブジェクトに対する適切なCRUD(作成、参照、更新、削除)権限が必要です。例えば、
insert
操作には「作成」権限が必須です。 - Manage Data Integrations: すべてのBulk API 2.0ジョブ情報を参照・管理するには、「データインテグレーションの管理」権限が役立ちます。
API制限
Bulk API 2.0には、ガバナ制限と呼ばれる独自の上限値が設定されています。これらは通常のSOAP/REST APIコール数とは別にカウントされます。
- レコード処理数: 24時間以内に処理できるレコード数に上限があります。例えば、Enterprise Editionでは1億5千万レコードです。この上限は組織のエディションによって異なります。
- ジョブ作成数: 24時間以内に作成できるジョブの数は100,000件に制限されています。
- ファイルサイズ: アップロードするCSVファイルのサイズは、150MB未満である必要があります。
- ジョブの有効期間: 作成されたジョブおよびその結果データは、7日後に自動的に削除されます。監査などで結果が必要な場合は、期間内にダウンロードしておく必要があります。
エラー処理
大量データ処理では、一部のレコードでエラーが発生することが想定されます。堅牢なインテグレーションを構築するには、適切なエラーハンドリングが不可欠です。
- 失敗結果の解析: ジョブ完了後、必ず失敗結果(failedResults)エンドポイントを確認し、エラー内容をログに記録します。
sf__Error
列には、入力規則違反、必須項目欠落、データ型不一致などの具体的な理由が含まれます。 - 再処理ロジック: 一時的なロックエラー(
UNABLE_TO_LOCK_ROW
)など、再試行によって解決する可能性のあるエラーについては、再処理ロジックを実装することを検討します。 - データ品質: 最も一般的なエラーはデータそのものに起因します。Salesforceにアップロードする前に、必須項目がすべて埋まっているか、データ型が正しいかなどをクライアント側で検証することで、エラー率を大幅に低減できます。
まとめとベストプラクティス
Salesforce Bulk API 2.0は、大量のデータをシンプルかつ効率的に処理するための強力なツールです。RESTベースの直感的なインターフェースと、Salesforceによる自動的な並列処理により、開発者は複雑なバッチ管理から解放され、データ連携ロジックそのものに集中することができます。
最後に、Bulk API 2.0を最大限に活用するためのベストプラクティスをいくつか紹介します。
- データ品質を最優先する: アップロード前のデータ検証が、エラーを減らし、手戻りをなくす最も効果的な方法です。
- 適切なジョブサイズを設計する: 数レコードだけのジョブを大量に作成するのは非効率です。数千から数万レコードを1つのジョブにまとめることで、APIのオーバーヘッドを削減できます。ただし、1ファイルのサイズは150MB未満に抑えてください。
- 並列処理と直列処理を使い分ける: デフォルトではジョブは並列(
Parallel
)で処理され、スループットが最大化されます。しかし、親子関係にあるレコードを同時に更新するなど、レコードロック競合が懸念される場合は、ジョブ作成時に"concurrencyMode": "Serial"
を指定することで、直列処理に切り替え、競合を回避できます。 - 賢明なポーリング戦略を立てる: ジョブのステータスを確認するためのポーリングは、無駄に頻繁に行うべきではありません。最初は短い間隔で、徐々に間隔を広げていく「エクスポネンシャルバックオフ」などの戦略を採用し、APIコールを節約しましょう。
- 結果を必ず確認・保管する: ジョブが完了したら、成功・失敗の結果を必ず取得し、処理内容を検証してください。必要に応じて、監査証跡として結果ファイルを保管する仕組みを構築しましょう。
これらの原則を守ることで、Bulk API 2.0を用いたスケーラブルで信頼性の高いデータ連携基盤を構築できるでしょう。
コメント
コメントを投稿