UiPath Documentation
industry-department-solutions
latest
false
  • Overview
    • Introduction
    • Getting Started
    • Integration walkthrough
    • API Guide
    • Schema lifecycle
    • Scheduled ingestion
    • Historical data ingestion
    • Data Quality Dashboard
    • Customizations
    • Data Onboarding Checklist
  • API Resources

Supply Chain & Retail Solutions API guide

Integrating with the Data Ingestion API

This page is a hands-on walkthrough for customer IT teams, solution engineers, and support engineers integrating their systems with the Data Ingestion API. Each step shows the relevant API calls inline so you can read or copy as you go, with links to the reference pages where each topic is covered in full detail. For a tick-as-you-go progress tracker, pair this page with the Data onboarding checklist.

Overview

A typical integration moves data through four stages:

  1. Extract data from your source systems.
  2. Transform it into the data model the solution expects.
  3. Align field names with the API's canonical schema.
  4. Batch and send the data to the API.

We use the Commercial Pricing customer table as the running example so the same data flows through every step. The same patterns apply to any other table in any other solution.

Before you begin

Three things must be in place before sending any data:

  • A Personal Access Token (PAT). Generate one in the Peak platform — see Getting Started → Creating an authorization token. Export it once so you can reuse it across steps:

    export PEAK_AUTH_TOKEN=<your-PAT>
    export PEAK_AUTH_TOKEN=<your-PAT>
    
  • A rolled-out solution. The warehouse tables must exist before you can ingest. Rollout is usually done by Peak during onboarding (POST /api/v2/schema/rollout — see Schema lifecycle → Roll out a schema). To confirm what's rolled out for your tenant:

    curl -X GET \
      'https://ingestion.peak.ai/api/v2/schema/solutions' \
      -H "Authorization: $PEAK_AUTH_TOKEN"
    curl -X GET \
      'https://ingestion.peak.ai/api/v2/schema/solutions' \
      -H "Authorization: $PEAK_AUTH_TOKEN"
    

    The response lists every solution with its solutionName, prefix, suffix, targetSchemaName, and tableCount. Note your solutionName (e.g., QP_OOTB) — every subsequent call needs it.

  • The full warehouse table names you'll ingest into. Pull the schema for your solution to see them:

    curl -X GET \
      'https://ingestion.peak.ai/api/v2/schema?solutionName=QP_OOTB' \
      -H "Authorization: $PEAK_AUTH_TOKEN"
    curl -X GET \
      'https://ingestion.peak.ai/api/v2/schema?solutionName=QP_OOTB' \
      -H "Authorization: $PEAK_AUTH_TOKEN"
    

    Each entry in schema[] has an objectName (e.g., QP_CUSTOMER_OOTB) and a columns[] array — those names are your target schema for the rest of the steps.

Step 1 — Extract data from your source systems

The Ingestion API does not pull data. Your team is responsible for getting it out of your source systems into a place from which you can send it. The right extraction approach depends on where the data lives.

From a data warehouse (Snowflake or Redshift)

Both warehouses can export tables to CSV directly. Tune the export so columns match by name and NULLs are written in a form the ingest layer recognises.

SnowflakeCOPY INTO @stage defaults to no headers, gzip, and \N for nulls. Add HEADER = TRUE so columns match by name, and NULL_IF = ('') so nulls are written as empty strings:

COPY INTO @your_stage/customers/
FROM (SELECT customer_id, customer_name, customer_category, customer_subcategory,
             customer_price_list_id, source, updated_at
      FROM customers)
FILE_FORMAT = (TYPE = CSV HEADER = TRUE NULL_IF = (''));
COPY INTO @your_stage/customers/
FROM (SELECT customer_id, customer_name, customer_category, customer_subcategory,
             customer_price_list_id, source, updated_at
      FROM customers)
FILE_FORMAT = (TYPE = CSV HEADER = TRUE NULL_IF = (''));

RedshiftUNLOAD defaults to no headers, no compression, and no file extension. Add HEADER and EXTENSION 'csv' so files are easy to glob:

UNLOAD ('SELECT customer_id, customer_name, customer_category, customer_subcategory,
                customer_price_list_id, source, updated_at
         FROM customers')
TO 's3://your-bucket/customers/'
HEADER
FORMAT CSV
EXTENSION 'csv';
UNLOAD ('SELECT customer_id, customer_name, customer_category, customer_subcategory,
                customer_price_list_id, source, updated_at
         FROM customers')
TO 's3://your-bucket/customers/'
HEADER
FORMAT CSV
EXTENSION 'csv';

Both produce one folder per table with sortable part-files — the shape the historical data ingestion reference script consumes directly.

From other sources (ERPs, retail platforms, files, REST APIs)

Peak's Data Sources feature can pull data from enterprise resource planning (ERP) systems, retail platforms, file feeds, and REST APIs on a schedule or on demand. Configure a connector in the User Guide, then either let Peak land the data in the warehouse for an export step like above, or transform and send directly from the connector's output:

Source kindConfiguration page
PostgreSQL, MSSQL, MySQL, Oracle, Snowflake, RedshiftEach has a dedicated page — for example configuring the PostgreSQL connector
Amazon S3, Google Ads, REST API, webhooksApplication connectors
FTP/SFTP, filesFile storage ingestion and FTP overview

Quick rule of thumb: connectors are for steady-state, scheduled flows; the Ingestion API (this guide) is for direct programmatic submission with fine-grained control over batching, retries, and timing.

Step 2 — Transform your data into the required data model

Your transformation step takes a row from your source system and reshapes it into the shape the API expects for the target table. The reshape covers four things:

  • Column names must match the schema's column names (case-insensitive — Step 3 covers this).
  • Data types must match — string, integer, float, numeric, boolean, date, timestamp, or json. See Data Types for the full catalog and per-type rules (for example, numeric requires precision + scale; date/timestamp need a timestampFormat validation).
  • Required keys must be present in every row payload — the API rejects a row with DI_E_23N01 if a key is missing.
  • Nullable values can be sent as null for that row, but the key must still be present in the JSON.

Each per-table page lists the canonical schema with Required, Nullable, and Data Type per column — for our example, see Customer. Here's a worked source-to-target reshape:

Source field (your system)Target field (API)TypeNotes
CustomerIDcustomer_idstringrename + lowercase
Customer Namecustomer_namestringrename, drop the space
Customer Categorycustomer_categorystringrename, nullable — send null if missing
Subcategorycustomer_subcategorystringnullable
PriceListIDcustomer_price_list_idstringnullable
(n/a)sourcestringnullable — supply if you have it, otherwise null
LastUpdatedupdated_attimestampreformat to YYYY-MM-DD HH:MI:SS ±hh:mm

A row that was { "CustomerID": "C-001", "Customer Name": "Acme Ltd", "PriceListID": null, "LastUpdated": "2026-06-01T10:00:00Z" } becomes:

{
  "customer_id": "C-001",
  "customer_name": "Acme Ltd",
  "customer_category": null,
  "customer_subcategory": null,
  "customer_price_list_id": null,
  "source": null,
  "updated_at": "2026-06-01 10:00:00 +00:00"
}
{
  "customer_id": "C-001",
  "customer_name": "Acme Ltd",
  "customer_category": null,
  "customer_subcategory": null,
  "customer_price_list_id": null,
  "source": null,
  "updated_at": "2026-06-01 10:00:00 +00:00"
}

Every key is present; values you don't have are null rather than absent.

Step 3 — Align field names with the API's canonical schema

The API matches columns by name against the schema (case-insensitive). Two common situations come up:

Renaming source columns to the canonical names

The per-table reference pages are the source of truth — your transformation step should rename to match. If you load via the historical data ingestion script, the script reads CSV headers and matches them case-insensitively, so you can keep your CSV headers in your preferred casing as long as the names match.

Adding columns the schema doesn't have

If a column you need is genuinely tenant-specific (not part of the standard schema), add it post-rollout via POST /api/v2/schema/{objectName}/add-attribute:

curl -X POST \
  'https://ingestion.peak.ai/api/v2/schema/QP_CUSTOMER_OOTB/add-attribute' \
  -H "Authorization: $PEAK_AUTH_TOKEN" \
  -H 'Content-Type: application/json' \
  -d '{
    "solutionName": "QP_OOTB",
    "columnName": "loyalty_tier",
    "dataType": "string",
    "defaultValue": null,
    "validations": [
      { "type": "required" }
    ]
  }'
curl -X POST \
  'https://ingestion.peak.ai/api/v2/schema/QP_CUSTOMER_OOTB/add-attribute' \
  -H "Authorization: $PEAK_AUTH_TOKEN" \
  -H 'Content-Type: application/json' \
  -d '{
    "solutionName": "QP_OOTB",
    "columnName": "loyalty_tier",
    "dataType": "string",
    "defaultValue": null,
    "validations": [
      { "type": "required" }
    ]
  }'

Two things worth knowing before you call this:

  • Automatic column-name prefix on standard tables. The API prefixes the new column name with C_ (Snowflake) or c_ (Redshift) — so loyalty_tier becomes C_loyalty_tier in the warehouse, and the response returns that prefixed name. Customer queries (SELECT C_loyalty_tier FROM ...) must use the prefixed name. Columns added to custom tables are not prefixed. See Schema lifecycle → Automatic column-name prefix for the full mechanics.
  • required and nonNull are independent. Include {type: "required"} if the key must be present in every payload; include {type: "nonNull"} if the value cannot be null; both, either, or neither — pick what fits the column. defaultValue and nonNull together is rejected (the API enforces this; see Schema lifecycle → Controlling required-ness and nullability).

Errors you'll see if names don't line up

Error codeWhen
DI_E_42703A column in your data isn't in the schema (typo, stale source)
DI_E_23N01A column the schema requires is missing from your row
DI_E_23502A non-nullable column has a null value

These all surface in the response's failed[] array and in the Data Quality Dashboard.

Step 4 — Batch and send your data to the API

The ingest endpoint accepts up to 2000 rows per request and a 1 MB payload size. Tenant rate limit is 50 requests per second. Pick the right approach for the data volume.

For ongoing ingestion (low-to-moderate volume)

Send batches directly to POST /api/v2/objects/{objectName} with a JSON body containing solutionName, data, and operationType.

curl -X POST \
  'https://ingestion.peak.ai/api/v2/objects/QP_CUSTOMER_OOTB' \
  -H "Authorization: $PEAK_AUTH_TOKEN" \
  -H 'Content-Type: application/json' \
  -d '{
    "solutionName": "QP_OOTB",
    "operationType": "UPSERT",
    "data": [
      {
        "customer_id": "C-001",
        "customer_name": "Acme Ltd",
        "customer_category": "B2B",
        "customer_subcategory": null,
        "customer_price_list_id": "PL-001",
        "source": "ERP",
        "updated_at": "2026-06-01 10:00:00 +00:00"
      },
      {
        "customer_id": "C-002",
        "customer_name": "Globex Inc",
        "customer_category": null,
        "customer_subcategory": null,
        "customer_price_list_id": null,
        "source": null,
        "updated_at": "2026-06-01 10:05:00 +00:00"
      }
    ]
  }'
curl -X POST \
  'https://ingestion.peak.ai/api/v2/objects/QP_CUSTOMER_OOTB' \
  -H "Authorization: $PEAK_AUTH_TOKEN" \
  -H 'Content-Type: application/json' \
  -d '{
    "solutionName": "QP_OOTB",
    "operationType": "UPSERT",
    "data": [
      {
        "customer_id": "C-001",
        "customer_name": "Acme Ltd",
        "customer_category": "B2B",
        "customer_subcategory": null,
        "customer_price_list_id": "PL-001",
        "source": "ERP",
        "updated_at": "2026-06-01 10:00:00 +00:00"
      },
      {
        "customer_id": "C-002",
        "customer_name": "Globex Inc",
        "customer_category": null,
        "customer_subcategory": null,
        "customer_price_list_id": null,
        "source": null,
        "updated_at": "2026-06-01 10:05:00 +00:00"
      }
    ]
  }'

Choose the operation type per table, not per batch:

  • UPSERT — insert or update by primary key. Use when downstream only needs the current state of each record.
  • APPEND — insert only. Use when downstream needs every version (typically when the primary key includes a temporal column). See Operation types for the decision tree.

Validate before persisting using dryRun — same payload, same response shape, but no rows are written:

curl -X POST \
  'https://ingestion.peak.ai/api/v2/objects/QP_CUSTOMER_OOTB' \
  -H "Authorization: $PEAK_AUTH_TOKEN" \
  -H 'Content-Type: application/json' \
  -d '{ "solutionName": "QP_OOTB", "operationType": "UPSERT", "dryRun": true,
        "data": [ { "customer_id": "C-001", "customer_name": "Acme Ltd", ... } ] }'
curl -X POST \
  'https://ingestion.peak.ai/api/v2/objects/QP_CUSTOMER_OOTB' \
  -H "Authorization: $PEAK_AUTH_TOKEN" \
  -H 'Content-Type: application/json' \
  -d '{ "solutionName": "QP_OOTB", "operationType": "UPSERT", "dryRun": true,
        "data": [ { "customer_id": "C-001", "customer_name": "Acme Ltd", ... } ] }'

Read the response:

StatusMeaningAction
200 OKEvery row passed inline validation and was acceptedContinue
207 Multi-StatusSome rows passed, some failedInspect failed[] in the response body, fix or quarantine those rows
400 Bad RequestThe whole batch failed (every row failed validation, or the payload itself is malformed)Fix the payload, retry
429 Too Many RequestsTenant rate limit hitBack off, retry
5xxTransient server-side issueBack off, retry

See Validation behavior for what runs synchronously (returned in the response) versus asynchronously (surfaces only in _failed_rows + Data Quality Dashboard), and Error codes for every code's meaning and resolution.

For one-shot historical loads (millions to billions of rows)

Use the Historical data ingestion guide. It ships with a self-contained Python reference script that handles the work you'd otherwise have to build — streaming CSV from disk so memory stays flat, dynamic batching to fit the 2000-row / 1 MB limits, a shared rate-limiter across worker threads, exponential backoff on 429 / 5xx, checkpoint-based resume so a crash doesn't restart from row zero, and a failed-row capture file for triage.

Configure the script's CONFIG block — csv_path, object_name, solution_name, operation_type, parallel — and run it. One run loads one table; run it once per table.

Inspecting outcomes

The two places to look after a load:

  • The HTTP response on each call — for synchronous failures (schema checks, format/type validation, intra-payload PK/UK duplicates). Use the failed[] array for per-row detail.
  • The Data Quality Dashboard in your Peak tenant — aggregates outcomes for every load, plus picks up asynchronous failures (foreign-key violations, PK collisions against rows already persisted). It is the recommended place for ongoing monitoring; the <table_name>_failed_rows table behind it has every failed-row payload + the error codes that fired.
  • Tick through the Data onboarding checklist as you work through a real integration.
  • Bookmark the API Guide — endpoint reference, error-code catalog, response status codes.
  • See Schema lifecycle when you need to upgrade a schema, add a column, or create a custom table.
  • See Scheduled ingestion for how the warehouse-write timing works (every 30 minutes by default; configurable per table).

Was this page helpful?

Connect

Need help? Support

Want to learn? UiPath Academy

Have questions? UiPath Forum

Stay updated