- Overview
- API Resources
Supply Chain & Retail Solutions API guide
This page is a hands-on walkthrough for customer IT teams, solution engineers, and support engineers integrating their systems with the Data Ingestion API. Each step shows the relevant API calls inline so you can read or copy as you go, with links to the reference pages where each topic is covered in full detail. For a tick-as-you-go progress tracker, pair this page with the Data onboarding checklist.
Overview
A typical integration moves data through four stages:
- Extract data from your source systems.
- Transform it into the data model the solution expects.
- Align field names with the API's canonical schema.
- Batch and send the data to the API.
We use the Commercial Pricing customer table as the running example so the same data flows through every step. The same patterns apply to any other table in any other solution.
Before you begin
Three things must be in place before sending any data:
-
A Personal Access Token (PAT). Generate one in the Peak platform — see Getting Started → Creating an authorization token. Export it once so you can reuse it across steps:
export PEAK_AUTH_TOKEN=<your-PAT>export PEAK_AUTH_TOKEN=<your-PAT> -
A rolled-out solution. The warehouse tables must exist before you can ingest. Rollout is usually done by Peak during onboarding (
POST /api/v2/schema/rollout— see Schema lifecycle → Roll out a schema). To confirm what's rolled out for your tenant:curl -X GET \ 'https://ingestion.peak.ai/api/v2/schema/solutions' \ -H "Authorization: $PEAK_AUTH_TOKEN"curl -X GET \ 'https://ingestion.peak.ai/api/v2/schema/solutions' \ -H "Authorization: $PEAK_AUTH_TOKEN"The response lists every solution with its
solutionName,prefix,suffix,targetSchemaName, andtableCount. Note yoursolutionName(e.g.,QP_OOTB) — every subsequent call needs it. -
The full warehouse table names you'll ingest into. Pull the schema for your solution to see them:
curl -X GET \ 'https://ingestion.peak.ai/api/v2/schema?solutionName=QP_OOTB' \ -H "Authorization: $PEAK_AUTH_TOKEN"curl -X GET \ 'https://ingestion.peak.ai/api/v2/schema?solutionName=QP_OOTB' \ -H "Authorization: $PEAK_AUTH_TOKEN"Each entry in
schema[]has anobjectName(e.g.,QP_CUSTOMER_OOTB) and acolumns[]array — those names are your target schema for the rest of the steps.
Step 1 — Extract data from your source systems
The Ingestion API does not pull data. Your team is responsible for getting it out of your source systems into a place from which you can send it. The right extraction approach depends on where the data lives.
From a data warehouse (Snowflake or Redshift)
Both warehouses can export tables to CSV directly. Tune the export so columns match by name and NULLs are written in a form the ingest layer recognises.
Snowflake — COPY INTO @stage defaults to no headers, gzip, and \N for nulls. Add HEADER = TRUE so columns match by name, and NULL_IF = ('') so nulls are written as empty strings:
COPY INTO @your_stage/customers/
FROM (SELECT customer_id, customer_name, customer_category, customer_subcategory,
customer_price_list_id, source, updated_at
FROM customers)
FILE_FORMAT = (TYPE = CSV HEADER = TRUE NULL_IF = (''));
COPY INTO @your_stage/customers/
FROM (SELECT customer_id, customer_name, customer_category, customer_subcategory,
customer_price_list_id, source, updated_at
FROM customers)
FILE_FORMAT = (TYPE = CSV HEADER = TRUE NULL_IF = (''));
Redshift — UNLOAD defaults to no headers, no compression, and no file extension. Add HEADER and EXTENSION 'csv' so files are easy to glob:
UNLOAD ('SELECT customer_id, customer_name, customer_category, customer_subcategory,
customer_price_list_id, source, updated_at
FROM customers')
TO 's3://your-bucket/customers/'
HEADER
FORMAT CSV
EXTENSION 'csv';
UNLOAD ('SELECT customer_id, customer_name, customer_category, customer_subcategory,
customer_price_list_id, source, updated_at
FROM customers')
TO 's3://your-bucket/customers/'
HEADER
FORMAT CSV
EXTENSION 'csv';
Both produce one folder per table with sortable part-files — the shape the historical data ingestion reference script consumes directly.
From other sources (ERPs, retail platforms, files, REST APIs)
Peak's Data Sources feature can pull data from enterprise resource planning (ERP) systems, retail platforms, file feeds, and REST APIs on a schedule or on demand. Configure a connector in the User Guide, then either let Peak land the data in the warehouse for an export step like above, or transform and send directly from the connector's output:
| Source kind | Configuration page |
|---|---|
| PostgreSQL, MSSQL, MySQL, Oracle, Snowflake, Redshift | Each has a dedicated page — for example configuring the PostgreSQL connector |
| Amazon S3, Google Ads, REST API, webhooks | Application connectors |
| FTP/SFTP, files | File storage ingestion and FTP overview |
Quick rule of thumb: connectors are for steady-state, scheduled flows; the Ingestion API (this guide) is for direct programmatic submission with fine-grained control over batching, retries, and timing.
Step 2 — Transform your data into the required data model
Your transformation step takes a row from your source system and reshapes it into the shape the API expects for the target table. The reshape covers four things:
- Column names must match the schema's column names (case-insensitive — Step 3 covers this).
- Data types must match —
string,integer,float,numeric,boolean,date,timestamp, orjson. See Data Types for the full catalog and per-type rules (for example,numericrequiresprecision+scale;date/timestampneed atimestampFormatvalidation). - Required keys must be present in every row payload — the API rejects a row with
DI_E_23N01if a key is missing. - Nullable values can be sent as
nullfor that row, but the key must still be present in the JSON.
Each per-table page lists the canonical schema with Required, Nullable, and Data Type per column — for our example, see Customer. Here's a worked source-to-target reshape:
| Source field (your system) | Target field (API) | Type | Notes |
|---|---|---|---|
CustomerID | customer_id | string | rename + lowercase |
Customer Name | customer_name | string | rename, drop the space |
Customer Category | customer_category | string | rename, nullable — send null if missing |
Subcategory | customer_subcategory | string | nullable |
PriceListID | customer_price_list_id | string | nullable |
| (n/a) | source | string | nullable — supply if you have it, otherwise null |
LastUpdated | updated_at | timestamp | reformat to YYYY-MM-DD HH:MI:SS ±hh:mm |
A row that was { "CustomerID": "C-001", "Customer Name": "Acme Ltd", "PriceListID": null, "LastUpdated": "2026-06-01T10:00:00Z" } becomes:
{
"customer_id": "C-001",
"customer_name": "Acme Ltd",
"customer_category": null,
"customer_subcategory": null,
"customer_price_list_id": null,
"source": null,
"updated_at": "2026-06-01 10:00:00 +00:00"
}
{
"customer_id": "C-001",
"customer_name": "Acme Ltd",
"customer_category": null,
"customer_subcategory": null,
"customer_price_list_id": null,
"source": null,
"updated_at": "2026-06-01 10:00:00 +00:00"
}
Every key is present; values you don't have are null rather than absent.
Step 3 — Align field names with the API's canonical schema
The API matches columns by name against the schema (case-insensitive). Two common situations come up:
Renaming source columns to the canonical names
The per-table reference pages are the source of truth — your transformation step should rename to match. If you load via the historical data ingestion script, the script reads CSV headers and matches them case-insensitively, so you can keep your CSV headers in your preferred casing as long as the names match.
Adding columns the schema doesn't have
If a column you need is genuinely tenant-specific (not part of the standard schema), add it post-rollout via POST /api/v2/schema/{objectName}/add-attribute:
curl -X POST \
'https://ingestion.peak.ai/api/v2/schema/QP_CUSTOMER_OOTB/add-attribute' \
-H "Authorization: $PEAK_AUTH_TOKEN" \
-H 'Content-Type: application/json' \
-d '{
"solutionName": "QP_OOTB",
"columnName": "loyalty_tier",
"dataType": "string",
"defaultValue": null,
"validations": [
{ "type": "required" }
]
}'
curl -X POST \
'https://ingestion.peak.ai/api/v2/schema/QP_CUSTOMER_OOTB/add-attribute' \
-H "Authorization: $PEAK_AUTH_TOKEN" \
-H 'Content-Type: application/json' \
-d '{
"solutionName": "QP_OOTB",
"columnName": "loyalty_tier",
"dataType": "string",
"defaultValue": null,
"validations": [
{ "type": "required" }
]
}'
Two things worth knowing before you call this:
- Automatic column-name prefix on standard tables. The API prefixes the new column name with
C_(Snowflake) orc_(Redshift) — soloyalty_tierbecomesC_loyalty_tierin the warehouse, and the response returns that prefixed name. Customer queries (SELECT C_loyalty_tier FROM ...) must use the prefixed name. Columns added to custom tables are not prefixed. See Schema lifecycle → Automatic column-name prefix for the full mechanics. requiredandnonNullare independent. Include{type: "required"}if the key must be present in every payload; include{type: "nonNull"}if the value cannot benull; both, either, or neither — pick what fits the column.defaultValueandnonNulltogether is rejected (the API enforces this; see Schema lifecycle → Controlling required-ness and nullability).
Errors you'll see if names don't line up
| Error code | When |
|---|---|
DI_E_42703 | A column in your data isn't in the schema (typo, stale source) |
DI_E_23N01 | A column the schema requires is missing from your row |
DI_E_23502 | A non-nullable column has a null value |
These all surface in the response's failed[] array and in the Data Quality Dashboard.
Step 4 — Batch and send your data to the API
The ingest endpoint accepts up to 2000 rows per request and a 1 MB payload size. Tenant rate limit is 50 requests per second. Pick the right approach for the data volume.
For ongoing ingestion (low-to-moderate volume)
Send batches directly to POST /api/v2/objects/{objectName} with a JSON body containing solutionName, data, and operationType.
curl -X POST \
'https://ingestion.peak.ai/api/v2/objects/QP_CUSTOMER_OOTB' \
-H "Authorization: $PEAK_AUTH_TOKEN" \
-H 'Content-Type: application/json' \
-d '{
"solutionName": "QP_OOTB",
"operationType": "UPSERT",
"data": [
{
"customer_id": "C-001",
"customer_name": "Acme Ltd",
"customer_category": "B2B",
"customer_subcategory": null,
"customer_price_list_id": "PL-001",
"source": "ERP",
"updated_at": "2026-06-01 10:00:00 +00:00"
},
{
"customer_id": "C-002",
"customer_name": "Globex Inc",
"customer_category": null,
"customer_subcategory": null,
"customer_price_list_id": null,
"source": null,
"updated_at": "2026-06-01 10:05:00 +00:00"
}
]
}'
curl -X POST \
'https://ingestion.peak.ai/api/v2/objects/QP_CUSTOMER_OOTB' \
-H "Authorization: $PEAK_AUTH_TOKEN" \
-H 'Content-Type: application/json' \
-d '{
"solutionName": "QP_OOTB",
"operationType": "UPSERT",
"data": [
{
"customer_id": "C-001",
"customer_name": "Acme Ltd",
"customer_category": "B2B",
"customer_subcategory": null,
"customer_price_list_id": "PL-001",
"source": "ERP",
"updated_at": "2026-06-01 10:00:00 +00:00"
},
{
"customer_id": "C-002",
"customer_name": "Globex Inc",
"customer_category": null,
"customer_subcategory": null,
"customer_price_list_id": null,
"source": null,
"updated_at": "2026-06-01 10:05:00 +00:00"
}
]
}'
Choose the operation type per table, not per batch:
UPSERT— insert or update by primary key. Use when downstream only needs the current state of each record.APPEND— insert only. Use when downstream needs every version (typically when the primary key includes a temporal column). See Operation types for the decision tree.
Validate before persisting using dryRun — same payload, same response shape, but no rows are written:
curl -X POST \
'https://ingestion.peak.ai/api/v2/objects/QP_CUSTOMER_OOTB' \
-H "Authorization: $PEAK_AUTH_TOKEN" \
-H 'Content-Type: application/json' \
-d '{ "solutionName": "QP_OOTB", "operationType": "UPSERT", "dryRun": true,
"data": [ { "customer_id": "C-001", "customer_name": "Acme Ltd", ... } ] }'
curl -X POST \
'https://ingestion.peak.ai/api/v2/objects/QP_CUSTOMER_OOTB' \
-H "Authorization: $PEAK_AUTH_TOKEN" \
-H 'Content-Type: application/json' \
-d '{ "solutionName": "QP_OOTB", "operationType": "UPSERT", "dryRun": true,
"data": [ { "customer_id": "C-001", "customer_name": "Acme Ltd", ... } ] }'
Read the response:
| Status | Meaning | Action |
|---|---|---|
200 OK | Every row passed inline validation and was accepted | Continue |
207 Multi-Status | Some rows passed, some failed | Inspect failed[] in the response body, fix or quarantine those rows |
400 Bad Request | The whole batch failed (every row failed validation, or the payload itself is malformed) | Fix the payload, retry |
429 Too Many Requests | Tenant rate limit hit | Back off, retry |
5xx | Transient server-side issue | Back off, retry |
See Validation behavior for what runs synchronously (returned in the response) versus asynchronously (surfaces only in _failed_rows + Data Quality Dashboard), and Error codes for every code's meaning and resolution.
For one-shot historical loads (millions to billions of rows)
Use the Historical data ingestion guide. It ships with a self-contained Python reference script that handles the work you'd otherwise have to build — streaming CSV from disk so memory stays flat, dynamic batching to fit the 2000-row / 1 MB limits, a shared rate-limiter across worker threads, exponential backoff on 429 / 5xx, checkpoint-based resume so a crash doesn't restart from row zero, and a failed-row capture file for triage.
Configure the script's CONFIG block — csv_path, object_name, solution_name, operation_type, parallel — and run it. One run loads one table; run it once per table.
Inspecting outcomes
The two places to look after a load:
- The HTTP response on each call — for synchronous failures (schema checks, format/type validation, intra-payload PK/UK duplicates). Use the
failed[]array for per-row detail. - The Data Quality Dashboard in your Peak tenant — aggregates outcomes for every load, plus picks up asynchronous failures (foreign-key violations, PK collisions against rows already persisted). It is the recommended place for ongoing monitoring; the
<table_name>_failed_rowstable behind it has every failed-row payload + the error codes that fired.
Next
- Tick through the Data onboarding checklist as you work through a real integration.
- Bookmark the API Guide — endpoint reference, error-code catalog, response status codes.
- See Schema lifecycle when you need to upgrade a schema, add a column, or create a custom table.
- See Scheduled ingestion for how the warehouse-write timing works (every 30 minutes by default; configurable per table).
- Overview
- Before you begin
- Step 1 — Extract data from your source systems
- From a data warehouse (Snowflake or Redshift)
- From other sources (ERPs, retail platforms, files, REST APIs)
- Step 2 — Transform your data into the required data model
- Step 3 — Align field names with the API's canonical schema
- Renaming source columns to the canonical names
- Adding columns the schema doesn't have
- Errors you'll see if names don't line up
- Step 4 — Batch and send your data to the API
- For ongoing ingestion (low-to-moderate volume)
- For one-shot historical loads (millions to billions of rows)
- Inspecting outcomes
- Next