UiPath Documentation
industry-department-solutions
latest
false
  • Overview
    • Introduction
    • Getting Started
    • Integration walkthrough
    • API Guide
    • Schema lifecycle
    • Scheduled ingestion
    • Historical data ingestion
    • Data Quality Dashboard
    • Customizations
    • Data Onboarding Checklist
  • API Resources

Supply Chain & Retail Solutions API guide

Historical data ingestion

This page describes how to load large volumes of historical data (millions to billions of rows) into the v2 Data Ingestion API. It provides a reference Python script that handles streaming, batching, rate limiting, retries, and resume — along with the warehouse-specific configuration you'll need to give it.

Scope. This script is for one-shot historical loads — a single, possibly multi-hour load performed once during onboarding. For ongoing incremental data after that initial load, post new rows directly to POST /api/v2/objects/{objectName} using the contracts in the API Guide. The script's checkpoint is a crash-recovery aid within a single load, not a state machine across separate runs over time.

For everyday low-volume ingestion, the API contracts described in Getting Started and the API Guide are sufficient. Use this page when you have a one-time bulk load to perform.

Before you start

  • Your solution's tables have been rolled out (see Schema lifecycle).
  • You have a Personal Access Token (see Getting Started).
  • The table you're loading is available as CSV — either a single file, or a folder of CSV part-files (as produced by chunked warehouse exports).
  • Every CSV file includes a header row, and the column names match the rolled-out schema (case-insensitive). When the source is a folder, every file in it must share the same header.

Source file format

The script reads CSV files. Headers are required — every CSV file must include a header row with the column names from the rolled-out schema (case-insensitive). The script matches columns by name, so the order of columns in your SELECT doesn't matter and there's no risk of accidentally writing quoted_price values into the discount_pct column. If csv_path is a folder of part-files, every file in it must share the same header.

Beyond headers, two things about your unload can vary, and the script handles both:

  • File naming pattern — set file_glob in CONFIG when csv_path is a folder (default *.csv).
  • Gzip compression — auto-detected from the .gz extension; no flag needed.

File or folder?

csv_path accepts either form:

  • A file path — e.g. ./customers.csv. file_glob is ignored.
  • A folder path — e.g. ./customers/. Every file in the folder matching file_glob is loaded as if it were one logical CSV. Use this for chunked warehouse exports.

Folder-load file order. When csv_path is a folder, files are loaded in lexicographic (byte-by-byte ASCII) order of their filenames. This is the right order when the names already encode order as zero-padded numeric or ISO-style date prefixes — the form that warehouse exports produce by default.

FilenamesLexicographic orderRight?
part-0001.csv, part-0002.csv, …, part-9999.csvpart-0001, part-0002, …, part-9999✓ matches numeric order
2024-01-15.csv, 2024-02-10.csv, 2024-06-01.csv2024-01-15, 2024-02-10, 2024-06-01✓ matches chronological order
part-1.csv, part-2.csv, …, part-10.csvpart-1, part-10, part-2, …10 sorts before 2 (not zero-padded)
15-01-2023.csv, 10-02-2024.csv, 01-06-2024.csv (DD-MM-YYYY)01-06-2024, 10-02-2024, 15-01-2023✗ day-first puts June 2024 before Jan 2023
01-15-2023.csv, 02-10-2024.csv, 06-01-2024.csv (MM-DD-YYYY)01-15-2023, 02-10-2024, 06-01-2024✗ year is last, so 2023 and 2024 interleave
January_report.csv, February_report.csv, March_report.csvFebruary, January, March✗ alphabetical, not chronological

If file order doesn't matter (every file is an independent batch and rows have no cross-file dependency), this isn't a concern. If file order does matter — for example, you're using APPEND and the row sequence is meaningful, or UPSERT and later files supersede earlier ones — make sure your filenames sort lexicographically into the order you intend. The safe pattern is a zero-padded numeric or ISO-date prefix (e.g., part-0001.csv, 2024-06-01.csv); rename or re-export if your source uses something else.

Fully qualified object_name

object_name must be the exact table name as rolled out — including any prefix or suffix that was applied at rollout. For example, if your rollout applied a prefix acme_ and suffix _v1, the configured object_name is acme_customers_v1, not customers. If your rollout supplied neither a prefix nor a suffix, object_name is just the table name itself (e.g., customers).

To look up the exact value for any table, call GET /api/v2/schema?solutionName=<your-solution> and read the objectName field for each entry — that's the exact string to use in object_name.

From Snowflake

Snowflake COPY INTO @stage defaults to no headers, gzip compression, and \N as the null marker. Add HEADER = TRUE so columns match by name, and NULL_IF = ('') so nulls are written as empty strings instead of \N. (The script handles either marker, but empty strings give a cleaner CSV.)

COPY INTO @your_stage/orders/
FROM (SELECT custcode, addcode, postcode, streetcode, basketvalue FROM orders)
FILE_FORMAT = (TYPE = CSV HEADER = TRUE NULL_IF = (''));
COPY INTO @your_stage/orders/
FROM (SELECT custcode, addcode, postcode, streetcode, basketvalue FROM orders)
FILE_FORMAT = (TYPE = CSV HEADER = TRUE NULL_IF = (''));

Point CONFIG at the folder:

"csv_path":    "./orders/",
"object_name": "acme_orders_v1",
"file_glob":   "*.csv.gz",     # use "*.csv" if you disabled gzip with COMPRESSION = NONE
"csv_path":    "./orders/",
"object_name": "acme_orders_v1",
"file_glob":   "*.csv.gz",     # use "*.csv" if you disabled gzip with COMPRESSION = NONE

From Redshift

Redshift UNLOAD defaults to no headers, no compression, and no file extension on the part-files. Add HEADER so columns match by name, and EXTENSION 'csv' so files are easy to glob:

UNLOAD ('SELECT custcode, addcode, postcode, streetcode, basketvalue FROM orders')
TO 's3://your-bucket/orders/'
HEADER
FORMAT CSV
EXTENSION 'csv';
UNLOAD ('SELECT custcode, addcode, postcode, streetcode, basketvalue FROM orders')
TO 's3://your-bucket/orders/'
HEADER
FORMAT CSV
EXTENSION 'csv';

Point CONFIG at the folder:

"csv_path":    "./orders/",
"object_name": "acme_orders_v1",
"file_glob":   "*.csv",        # default; use "*" if you kept Redshift's extensionless parts, or "*.gz" if you enabled GZIP
"csv_path":    "./orders/",
"object_name": "acme_orders_v1",
"file_glob":   "*.csv",        # default; use "*" if you kept Redshift's extensionless parts, or "*.gz" if you enabled GZIP

Null values and bad cells

The script treats empty strings and \N as null — covering the Redshift UNLOAD and Snowflake COPY INTO defaults. Whitespace-only cells are treated as null too. Anything else (a literal "NULL" or "NA", for example) is sent as that exact string, so convert it in your unload query if needed.

Uncastable cells are passed through as raw strings rather than crashing the load. If "abc" lands in an integer column, the API returns that single row as a failed row with the appropriate error code (see Validation behavior for the failed-row response shape) — one bad cell becomes one rejected row, not a stopped load.

Operation type — APPEND or UPSERT

APPEND

For a one-time bulk load like this, APPEND is the right choice — it's significantly faster than UPSERT when you don't need update-by-primary-key semantics.

One thing to note about APPEND: if a batch partially succeeds and you re-send the same batch, the rows that already landed will fail with a duplicate-primary-key error. The script's checkpointing handles this for you — a successful batch is never re-sent on a subsequent run.

UPSERT

Use UPSERT for ongoing incremental data, where retries and out-of-order arrivals are expected.

If you are using UPSERT and the same primary key might appear in more than one row of your source, set parallel: false in the script's CONFIG. Otherwise concurrent workers can apply those rows in unpredictable order, and you can't tell which version of the row will end up in the warehouse. For APPEND, or for UPSERT where each key is unique across the source, leave parallel: true for speed.

Loading multiple tables

The script loads one table per run. If your solution has several tables to load, run the script once per table — and run them in foreign-key dependency order, parent tables first. For example, if order_items references orders and orders references products, load products, then orders, then order_items. Loading them out of order causes foreign-key violations in the asynchronous validation stage.

Reference script

The script is self-contained — paste it into a single Python file, edit the CONFIG block at the top, and run it. It requires only the requests library (pip install requests).

#!/usr/bin/env python3
"""
Bulk-ingest historical CSV data into the Peak Data Ingestion v2 API.

- One run loads one table from a CSV file or a folder of CSV part-files.
- Streams the source row-by-row (no full in-memory load).
- Dynamically sizes batches to <=2000 rows AND <=1 MB serialized.
- Concurrent workers share a token-bucket rate limiter.
- Exponential backoff on 5xx / 429; never retries 4xx.
- Checkpoint lets multi-hour runs resume cleanly after a crash.
- Failed rows from 207 responses written to a side file for triage.
"""

from __future__ import annotations

import csv
import gzip
import json
import logging
import os
import threading
import time
from concurrent.futures import ThreadPoolExecutor, as_completed
from pathlib import Path
from typing import Any, Iterable

import requests

# ============================================================================
# Configuration — edit before running.
# ============================================================================

CONFIG: dict[str, Any] = {
    "base_url":      "https://ingestion.peak.ai",     # spoke tenants: https://ingestion.<cluster-identifier>.peak.ai
    "auth_token":    os.environ["PEAK_AUTH_TOKEN"],   # export PEAK_AUTH_TOKEN=...
    "solution_name": "your-solution-name",

    # csv_path    — required. CSV file OR folder of CSV part-files (e.g.
    #               "./orders/" containing part-0001.csv, part-0002.csv, ...).
    # object_name — required. Fully qualified table name as rolled out —
    #               include any prefix/suffix from your rollout (for example
    #               "acme_customers_v1", not just "customers").
    # file_glob   — optional, default "*.csv". Only used when csv_path is a
    #               folder. Set to "*.csv.gz" for gzipped Snowflake unloads, or
    #               "*" for Redshift's extensionless part-files.
    "csv_path":    "./customers.csv",
    "object_name": "acme_customers_v1",
    "file_glob":   "*.csv",

    "operation_type": "APPEND",                       # APPEND is fastest for historical

    # Set False if operation_type is UPSERT and the SAME primary key can appear
    # in more than one row of your source. Concurrent workers finish in an
    # unpredictable order, so for duplicated keys you can't tell which version
    # ends up in the warehouse. See the note below.
    "parallel": True,
}

# Tuning — fixed for predictable behaviour against the API's limits.
MAX_ROWS_PER_BATCH      = 2000
MAX_BYTES_PER_BATCH     = 1_048_576                   # 1 MB
RATE_LIMIT_RPS          = 30                          # well under the 50 RPS cap
WORKERS                 = 6                           # threads used only when parallel=True
MAX_RETRIES             = 5
INITIAL_BACKOFF_SECONDS = 1.0
REQUEST_TIMEOUT_SECONDS = 60
CHECKPOINT_DIR          = "./checkpoints"
FAILED_ROWS_DIR         = "./failed_rows"

# ============================================================================
# NOTE — parallelism and operation type
# ----------------------------------------------------------------------------
# Keep parallel=True for speed if:
#   - operation_type is APPEND, OR
#   - operation_type is UPSERT and every primary key appears in only one row.
#
# Set parallel=False if operation_type is UPSERT and the SAME primary key can
# appear in more than one row of your source. Concurrent workers finish in
# unpredictable order, so you can't tell which version of a duplicated key
# will end up in the warehouse — the same load run twice can produce different
# results. Sending one batch at a time fixes that.
#
# Alternatively, dedupe the source before running (keep only the latest record
# per key) and leave parallel=True.
# ============================================================================

logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s")
log = logging.getLogger("bulk_ingest")


# ----------------------------------------------------------------------------
# Token bucket — thread-safe, shared across all workers.
# ----------------------------------------------------------------------------
class RateLimiter:
    def __init__(self, rps: int) -> None:
        self.rps = rps
        self.tokens = float(rps)
        self.last_refill = time.monotonic()
        self.lock = threading.Lock()

    def acquire(self) -> None:
        while True:
            with self.lock:
                now = time.monotonic()
                self.tokens = min(self.rps, self.tokens + (now - self.last_refill) * self.rps)
                self.last_refill = now
                if self.tokens >= 1:
                    self.tokens -= 1
                    return
                wait = (1 - self.tokens) / self.rps
            time.sleep(wait)


# ----------------------------------------------------------------------------
# Schema fetcher — auto-discovers column types for casting CSV strings.
# ----------------------------------------------------------------------------
def fetch_schema_types(base_url: str, auth_token: str, solution_name: str) -> dict[str, dict[str, str]]:
    url = f"{base_url}/api/v2/schema"
    headers = {"Authorization": auth_token}
    resp = requests.get(url, params={"solutionName": solution_name}, headers=headers, timeout=30)
    resp.raise_for_status()
    schema = resp.json()
    type_map: dict[str, dict[str, str]] = {}
    for table in schema.get("schema", []):
        name = table["objectName"].upper()
        type_map[name] = {
            col["columnName"]: col.get("dataType") or col.get("peakDataType") or "string"
            for col in table.get("columns", [])
        }
    return type_map


# ----------------------------------------------------------------------------
# Type casting — CSV string -> JSON value of the right type.
# ----------------------------------------------------------------------------
# Per-type casters. string/date/timestamp pass through unchanged (API validates format).
_CASTERS = {
    "integer": int,
    "numeric": float,
    "float":   float,
    "boolean": lambda s: s.lower() in ("true", "1", "t", "yes", "y"),
    "json":    json.loads,
}


def cast_value(value: str, data_type: str) -> Any:
    if value is None:
        return None
    # Treat standard warehouse null markers as None: empty string,
    # "\N" (Snowflake/Postgres default), and whitespace-only cells.
    stripped = value.strip()
    if stripped in ("", "\\N"):
        return None
    caster = _CASTERS.get((data_type or "string").lower())
    return caster(stripped) if caster else value


def cast_row(row: dict[str, str], column_types: dict[str, str]) -> dict[str, Any]:
    """Cast each cell to its schema type.

    If a cell cannot be cast (for example, "abc" in an integer column), the raw
    string is passed through unchanged. The API will then surface it as a
    structured failed-row error with the right code, rather than the worker
    thread crashing.
    """
    out: dict[str, Any] = {}
    for col, raw in row.items():
        try:
            out[col] = cast_value(raw, column_types.get(col, "string"))
        except (ValueError, TypeError, json.JSONDecodeError):
            out[col] = raw
    return out


# ----------------------------------------------------------------------------
# Source resolution — csv_path can be a single CSV file OR a folder of part-files
# (e.g. ./orders/ containing part-0001.csv, part-0002.csv). Folders are read in
# sorted filename order so the load is deterministic.
# ----------------------------------------------------------------------------
def resolve_csv_paths(csv_path: str, file_glob: str = "*.csv") -> list[str]:
    p = Path(csv_path)
    if p.is_dir():
        files = sorted(str(f) for f in p.glob(file_glob))
        if not files:
            raise FileNotFoundError(f"no files matching '{file_glob}' in folder: {csv_path}")
        return files
    return [csv_path]


# ----------------------------------------------------------------------------
# Streaming batch builder — reads every source file row-by-row and yields
# lists respecting both row count and size. Batches span file boundaries, so
# part-files don't produce undersized batches at their edges. Every file must
# carry its own header row.
# ----------------------------------------------------------------------------
def iter_batches(
    csv_paths: list[str],
    column_types: dict[str, str],
    max_rows: int,
    max_bytes: int,
) -> Iterable[list[dict[str, Any]]]:
    batch: list[dict[str, Any]] = []
    batch_bytes = 0
    for csv_path in csv_paths:
        opener = gzip.open if csv_path.endswith(".gz") else open
        with opener(csv_path, "rt", newline="", encoding="utf-8") as f:
            reader = csv.DictReader(f)
            for raw_row in reader:
                # DictReader puts cells beyond the header width under the None key.
                # That means the row is wider than the header — downstream values
                # would silently shift into the wrong column, so fail fast instead.
                if None in raw_row:
                    extras = raw_row[None]
                    raise RuntimeError(
                        f"{csv_path}: row has more columns than the header. "
                        f"Extra cells: {extras!r}. "
                        f"Check that every row matches the header width."
                    )
                row = cast_row(raw_row, column_types)
                row_bytes = len(json.dumps(row, default=str).encode("utf-8"))
                if batch and (len(batch) >= max_rows or batch_bytes + row_bytes >= max_bytes):
                    yield batch
                    batch, batch_bytes = [], 0
                batch.append(row)
                batch_bytes += row_bytes
    if batch:
        yield batch


# ----------------------------------------------------------------------------
# Per-batch ingest — retries 5xx and 429 with exponential backoff.
# ----------------------------------------------------------------------------
def ingest_batch(
    base_url: str,
    auth_token: str,
    object_name: str,
    solution_name: str,
    operation_type: str,
    batch: list[dict[str, Any]],
    *,
    rate_limiter: RateLimiter,
) -> dict[str, Any]:
    url = f"{base_url}/api/v2/objects/{object_name}"
    headers = {
        "Authorization": auth_token,
        "Content-Type": "application/json",
        "accept": "application/json",
    }
    body = {"solutionName": solution_name, "data": batch, "operationType": operation_type}

    backoff = INITIAL_BACKOFF_SECONDS
    for attempt in range(1, MAX_RETRIES + 1):
        rate_limiter.acquire()
        try:
            resp = requests.post(url, headers=headers, json=body, timeout=REQUEST_TIMEOUT_SECONDS)
        except requests.RequestException as exc:
            if attempt == MAX_RETRIES:
                raise
            log.warning("network error: %s; retry in %.1fs", exc, backoff)
            time.sleep(backoff)
            backoff *= 2
            continue

        # 200 = all rows ok; 207 = partial; 400 = all failed validation (terminal, do not retry)
        if resp.status_code in (200, 207, 400):
            return resp.json()

        # 429 / 5xx — back off and retry
        if resp.status_code == 429 or 500 <= resp.status_code < 600:
            if attempt == MAX_RETRIES:
                raise RuntimeError(f"max retries reached ({resp.status_code}): {resp.text[:200]}")
            log.warning("status %s; retry in %.1fs (attempt %d/%d)", resp.status_code, backoff, attempt, MAX_RETRIES)
            time.sleep(backoff)
            backoff *= 2
            continue

        # 401, 403, 404, 409 — terminal
        raise RuntimeError(f"{resp.status_code}: {resp.text[:200]}")

    raise RuntimeError("unreachable")


# ----------------------------------------------------------------------------
# Single-table runner — resume from checkpoint, parallelise batches.
# ----------------------------------------------------------------------------
def load_table(cfg: dict[str, Any], type_map: dict[str, dict[str, str]], rate_limiter: RateLimiter, workers: int) -> None:
    object_name = cfg["object_name"]
    file_glob = cfg.get("file_glob", "*.csv")
    csv_paths = resolve_csv_paths(cfg["csv_path"], file_glob)
    log.info("[%s] %d source file(s), glob=%s", object_name, len(csv_paths), file_glob)
    column_types = type_map.get(object_name.upper(), {})
    if not column_types:
        log.warning("[%s] no schema columns found; values pass through as strings", object_name)

    Path(CHECKPOINT_DIR).mkdir(parents=True, exist_ok=True)
    Path(FAILED_ROWS_DIR).mkdir(parents=True, exist_ok=True)
    ckpt_path = Path(CHECKPOINT_DIR) / f"{object_name}.json"
    failed_path = Path(FAILED_ROWS_DIR) / f"{object_name}.jsonl"

    next_batch = 0
    if ckpt_path.exists():
        next_batch = json.loads(ckpt_path.read_text()).get("next_batch", 0)
        if next_batch > 0:
            log.info("[%s] resuming from batch #%d", object_name, next_batch)

    # Skip already-completed batches by index; still must stream past them.
    pending = [
        (i, b) for i, b in enumerate(
            iter_batches(csv_paths, column_types, MAX_ROWS_PER_BATCH, MAX_BYTES_PER_BATCH)
        ) if i >= next_batch
    ]
    if not pending:
        log.info("[%s] already complete", object_name)
        return
    log.info("[%s] %d batches remaining", object_name, len(pending))

    total_ok = total_failed = 0
    ckpt_lock = threading.Lock()
    completed: set[int] = set()
    to_persist = next_batch

    def submit(idx_batch):
        idx, batch = idx_batch
        return idx, ingest_batch(
            cfg["base_url"], cfg["auth_token"], object_name, cfg["solution_name"],
            cfg["operation_type"], batch, rate_limiter=rate_limiter,
        )

    with ThreadPoolExecutor(max_workers=workers) as pool:
        futures = {pool.submit(submit, ib): ib for ib in pending}
        for fut in as_completed(futures):
            idx, result = fut.result()
            ok = result.get("successRows", 0)
            failed = result.get("failedRows", 0)
            total_ok += ok
            total_failed += failed

            if result.get("failed"):
                with failed_path.open("a") as f:
                    for row in result["failed"]:
                        f.write(json.dumps({"batch": idx, **row}) + "\n")

            with ckpt_lock:
                completed.add(idx)
                while to_persist in completed:
                    completed.remove(to_persist)
                    to_persist += 1
                ckpt_path.write_text(json.dumps({"next_batch": to_persist}))

            log.info(
                "[%s] batch #%d: %d ok / %d failed (running: %d ok / %d failed)",
                object_name, idx, ok, failed, total_ok, total_failed,
            )

    log.info("[%s] complete: %d rows ingested, %d failed", object_name, total_ok, total_failed)


def main() -> None:
    cfg = CONFIG
    log.info("Fetching schema column types for solution %s", cfg["solution_name"])
    type_map = fetch_schema_types(cfg["base_url"], cfg["auth_token"], cfg["solution_name"])
    log.info("Schema fetched: %d tables", len(type_map))
    rate_limiter = RateLimiter(RATE_LIMIT_RPS)

    # parallel=False forces a single worker so batches are applied in submission
    # order — needed for UPSERT loads where a primary key can repeat across batches.
    workers = WORKERS if cfg.get("parallel", True) else 1
    log.info("Concurrency: %s", f"{workers} workers" if workers > 1 else "serial (batches applied in order)")

    log.info("=" * 70)
    log.info("Loading %s from %s", cfg["object_name"], cfg["csv_path"])
    log.info("=" * 70)
    load_table(cfg, type_map, rate_limiter, workers)
    log.info("Done.")


if __name__ == "__main__":
    main()
#!/usr/bin/env python3
"""
Bulk-ingest historical CSV data into the Peak Data Ingestion v2 API.

- One run loads one table from a CSV file or a folder of CSV part-files.
- Streams the source row-by-row (no full in-memory load).
- Dynamically sizes batches to <=2000 rows AND <=1 MB serialized.
- Concurrent workers share a token-bucket rate limiter.
- Exponential backoff on 5xx / 429; never retries 4xx.
- Checkpoint lets multi-hour runs resume cleanly after a crash.
- Failed rows from 207 responses written to a side file for triage.
"""

from __future__ import annotations

import csv
import gzip
import json
import logging
import os
import threading
import time
from concurrent.futures import ThreadPoolExecutor, as_completed
from pathlib import Path
from typing import Any, Iterable

import requests

# ============================================================================
# Configuration — edit before running.
# ============================================================================

CONFIG: dict[str, Any] = {
    "base_url":      "https://ingestion.peak.ai",     # spoke tenants: https://ingestion.<cluster-identifier>.peak.ai
    "auth_token":    os.environ["PEAK_AUTH_TOKEN"],   # export PEAK_AUTH_TOKEN=...
    "solution_name": "your-solution-name",

    # csv_path    — required. CSV file OR folder of CSV part-files (e.g.
    #               "./orders/" containing part-0001.csv, part-0002.csv, ...).
    # object_name — required. Fully qualified table name as rolled out —
    #               include any prefix/suffix from your rollout (for example
    #               "acme_customers_v1", not just "customers").
    # file_glob   — optional, default "*.csv". Only used when csv_path is a
    #               folder. Set to "*.csv.gz" for gzipped Snowflake unloads, or
    #               "*" for Redshift's extensionless part-files.
    "csv_path":    "./customers.csv",
    "object_name": "acme_customers_v1",
    "file_glob":   "*.csv",

    "operation_type": "APPEND",                       # APPEND is fastest for historical

    # Set False if operation_type is UPSERT and the SAME primary key can appear
    # in more than one row of your source. Concurrent workers finish in an
    # unpredictable order, so for duplicated keys you can't tell which version
    # ends up in the warehouse. See the note below.
    "parallel": True,
}

# Tuning — fixed for predictable behaviour against the API's limits.
MAX_ROWS_PER_BATCH      = 2000
MAX_BYTES_PER_BATCH     = 1_048_576                   # 1 MB
RATE_LIMIT_RPS          = 30                          # well under the 50 RPS cap
WORKERS                 = 6                           # threads used only when parallel=True
MAX_RETRIES             = 5
INITIAL_BACKOFF_SECONDS = 1.0
REQUEST_TIMEOUT_SECONDS = 60
CHECKPOINT_DIR          = "./checkpoints"
FAILED_ROWS_DIR         = "./failed_rows"

# ============================================================================
# NOTE — parallelism and operation type
# ----------------------------------------------------------------------------
# Keep parallel=True for speed if:
#   - operation_type is APPEND, OR
#   - operation_type is UPSERT and every primary key appears in only one row.
#
# Set parallel=False if operation_type is UPSERT and the SAME primary key can
# appear in more than one row of your source. Concurrent workers finish in
# unpredictable order, so you can't tell which version of a duplicated key
# will end up in the warehouse — the same load run twice can produce different
# results. Sending one batch at a time fixes that.
#
# Alternatively, dedupe the source before running (keep only the latest record
# per key) and leave parallel=True.
# ============================================================================

logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s")
log = logging.getLogger("bulk_ingest")


# ----------------------------------------------------------------------------
# Token bucket — thread-safe, shared across all workers.
# ----------------------------------------------------------------------------
class RateLimiter:
    def __init__(self, rps: int) -> None:
        self.rps = rps
        self.tokens = float(rps)
        self.last_refill = time.monotonic()
        self.lock = threading.Lock()

    def acquire(self) -> None:
        while True:
            with self.lock:
                now = time.monotonic()
                self.tokens = min(self.rps, self.tokens + (now - self.last_refill) * self.rps)
                self.last_refill = now
                if self.tokens >= 1:
                    self.tokens -= 1
                    return
                wait = (1 - self.tokens) / self.rps
            time.sleep(wait)


# ----------------------------------------------------------------------------
# Schema fetcher — auto-discovers column types for casting CSV strings.
# ----------------------------------------------------------------------------
def fetch_schema_types(base_url: str, auth_token: str, solution_name: str) -> dict[str, dict[str, str]]:
    url = f"{base_url}/api/v2/schema"
    headers = {"Authorization": auth_token}
    resp = requests.get(url, params={"solutionName": solution_name}, headers=headers, timeout=30)
    resp.raise_for_status()
    schema = resp.json()
    type_map: dict[str, dict[str, str]] = {}
    for table in schema.get("schema", []):
        name = table["objectName"].upper()
        type_map[name] = {
            col["columnName"]: col.get("dataType") or col.get("peakDataType") or "string"
            for col in table.get("columns", [])
        }
    return type_map


# ----------------------------------------------------------------------------
# Type casting — CSV string -> JSON value of the right type.
# ----------------------------------------------------------------------------
# Per-type casters. string/date/timestamp pass through unchanged (API validates format).
_CASTERS = {
    "integer": int,
    "numeric": float,
    "float":   float,
    "boolean": lambda s: s.lower() in ("true", "1", "t", "yes", "y"),
    "json":    json.loads,
}


def cast_value(value: str, data_type: str) -> Any:
    if value is None:
        return None
    # Treat standard warehouse null markers as None: empty string,
    # "\N" (Snowflake/Postgres default), and whitespace-only cells.
    stripped = value.strip()
    if stripped in ("", "\\N"):
        return None
    caster = _CASTERS.get((data_type or "string").lower())
    return caster(stripped) if caster else value


def cast_row(row: dict[str, str], column_types: dict[str, str]) -> dict[str, Any]:
    """Cast each cell to its schema type.

    If a cell cannot be cast (for example, "abc" in an integer column), the raw
    string is passed through unchanged. The API will then surface it as a
    structured failed-row error with the right code, rather than the worker
    thread crashing.
    """
    out: dict[str, Any] = {}
    for col, raw in row.items():
        try:
            out[col] = cast_value(raw, column_types.get(col, "string"))
        except (ValueError, TypeError, json.JSONDecodeError):
            out[col] = raw
    return out


# ----------------------------------------------------------------------------
# Source resolution — csv_path can be a single CSV file OR a folder of part-files
# (e.g. ./orders/ containing part-0001.csv, part-0002.csv). Folders are read in
# sorted filename order so the load is deterministic.
# ----------------------------------------------------------------------------
def resolve_csv_paths(csv_path: str, file_glob: str = "*.csv") -> list[str]:
    p = Path(csv_path)
    if p.is_dir():
        files = sorted(str(f) for f in p.glob(file_glob))
        if not files:
            raise FileNotFoundError(f"no files matching '{file_glob}' in folder: {csv_path}")
        return files
    return [csv_path]


# ----------------------------------------------------------------------------
# Streaming batch builder — reads every source file row-by-row and yields
# lists respecting both row count and size. Batches span file boundaries, so
# part-files don't produce undersized batches at their edges. Every file must
# carry its own header row.
# ----------------------------------------------------------------------------
def iter_batches(
    csv_paths: list[str],
    column_types: dict[str, str],
    max_rows: int,
    max_bytes: int,
) -> Iterable[list[dict[str, Any]]]:
    batch: list[dict[str, Any]] = []
    batch_bytes = 0
    for csv_path in csv_paths:
        opener = gzip.open if csv_path.endswith(".gz") else open
        with opener(csv_path, "rt", newline="", encoding="utf-8") as f:
            reader = csv.DictReader(f)
            for raw_row in reader:
                # DictReader puts cells beyond the header width under the None key.
                # That means the row is wider than the header — downstream values
                # would silently shift into the wrong column, so fail fast instead.
                if None in raw_row:
                    extras = raw_row[None]
                    raise RuntimeError(
                        f"{csv_path}: row has more columns than the header. "
                        f"Extra cells: {extras!r}. "
                        f"Check that every row matches the header width."
                    )
                row = cast_row(raw_row, column_types)
                row_bytes = len(json.dumps(row, default=str).encode("utf-8"))
                if batch and (len(batch) >= max_rows or batch_bytes + row_bytes >= max_bytes):
                    yield batch
                    batch, batch_bytes = [], 0
                batch.append(row)
                batch_bytes += row_bytes
    if batch:
        yield batch


# ----------------------------------------------------------------------------
# Per-batch ingest — retries 5xx and 429 with exponential backoff.
# ----------------------------------------------------------------------------
def ingest_batch(
    base_url: str,
    auth_token: str,
    object_name: str,
    solution_name: str,
    operation_type: str,
    batch: list[dict[str, Any]],
    *,
    rate_limiter: RateLimiter,
) -> dict[str, Any]:
    url = f"{base_url}/api/v2/objects/{object_name}"
    headers = {
        "Authorization": auth_token,
        "Content-Type": "application/json",
        "accept": "application/json",
    }
    body = {"solutionName": solution_name, "data": batch, "operationType": operation_type}

    backoff = INITIAL_BACKOFF_SECONDS
    for attempt in range(1, MAX_RETRIES + 1):
        rate_limiter.acquire()
        try:
            resp = requests.post(url, headers=headers, json=body, timeout=REQUEST_TIMEOUT_SECONDS)
        except requests.RequestException as exc:
            if attempt == MAX_RETRIES:
                raise
            log.warning("network error: %s; retry in %.1fs", exc, backoff)
            time.sleep(backoff)
            backoff *= 2
            continue

        # 200 = all rows ok; 207 = partial; 400 = all failed validation (terminal, do not retry)
        if resp.status_code in (200, 207, 400):
            return resp.json()

        # 429 / 5xx — back off and retry
        if resp.status_code == 429 or 500 <= resp.status_code < 600:
            if attempt == MAX_RETRIES:
                raise RuntimeError(f"max retries reached ({resp.status_code}): {resp.text[:200]}")
            log.warning("status %s; retry in %.1fs (attempt %d/%d)", resp.status_code, backoff, attempt, MAX_RETRIES)
            time.sleep(backoff)
            backoff *= 2
            continue

        # 401, 403, 404, 409 — terminal
        raise RuntimeError(f"{resp.status_code}: {resp.text[:200]}")

    raise RuntimeError("unreachable")


# ----------------------------------------------------------------------------
# Single-table runner — resume from checkpoint, parallelise batches.
# ----------------------------------------------------------------------------
def load_table(cfg: dict[str, Any], type_map: dict[str, dict[str, str]], rate_limiter: RateLimiter, workers: int) -> None:
    object_name = cfg["object_name"]
    file_glob = cfg.get("file_glob", "*.csv")
    csv_paths = resolve_csv_paths(cfg["csv_path"], file_glob)
    log.info("[%s] %d source file(s), glob=%s", object_name, len(csv_paths), file_glob)
    column_types = type_map.get(object_name.upper(), {})
    if not column_types:
        log.warning("[%s] no schema columns found; values pass through as strings", object_name)

    Path(CHECKPOINT_DIR).mkdir(parents=True, exist_ok=True)
    Path(FAILED_ROWS_DIR).mkdir(parents=True, exist_ok=True)
    ckpt_path = Path(CHECKPOINT_DIR) / f"{object_name}.json"
    failed_path = Path(FAILED_ROWS_DIR) / f"{object_name}.jsonl"

    next_batch = 0
    if ckpt_path.exists():
        next_batch = json.loads(ckpt_path.read_text()).get("next_batch", 0)
        if next_batch > 0:
            log.info("[%s] resuming from batch #%d", object_name, next_batch)

    # Skip already-completed batches by index; still must stream past them.
    pending = [
        (i, b) for i, b in enumerate(
            iter_batches(csv_paths, column_types, MAX_ROWS_PER_BATCH, MAX_BYTES_PER_BATCH)
        ) if i >= next_batch
    ]
    if not pending:
        log.info("[%s] already complete", object_name)
        return
    log.info("[%s] %d batches remaining", object_name, len(pending))

    total_ok = total_failed = 0
    ckpt_lock = threading.Lock()
    completed: set[int] = set()
    to_persist = next_batch

    def submit(idx_batch):
        idx, batch = idx_batch
        return idx, ingest_batch(
            cfg["base_url"], cfg["auth_token"], object_name, cfg["solution_name"],
            cfg["operation_type"], batch, rate_limiter=rate_limiter,
        )

    with ThreadPoolExecutor(max_workers=workers) as pool:
        futures = {pool.submit(submit, ib): ib for ib in pending}
        for fut in as_completed(futures):
            idx, result = fut.result()
            ok = result.get("successRows", 0)
            failed = result.get("failedRows", 0)
            total_ok += ok
            total_failed += failed

            if result.get("failed"):
                with failed_path.open("a") as f:
                    for row in result["failed"]:
                        f.write(json.dumps({"batch": idx, **row}) + "\n")

            with ckpt_lock:
                completed.add(idx)
                while to_persist in completed:
                    completed.remove(to_persist)
                    to_persist += 1
                ckpt_path.write_text(json.dumps({"next_batch": to_persist}))

            log.info(
                "[%s] batch #%d: %d ok / %d failed (running: %d ok / %d failed)",
                object_name, idx, ok, failed, total_ok, total_failed,
            )

    log.info("[%s] complete: %d rows ingested, %d failed", object_name, total_ok, total_failed)


def main() -> None:
    cfg = CONFIG
    log.info("Fetching schema column types for solution %s", cfg["solution_name"])
    type_map = fetch_schema_types(cfg["base_url"], cfg["auth_token"], cfg["solution_name"])
    log.info("Schema fetched: %d tables", len(type_map))
    rate_limiter = RateLimiter(RATE_LIMIT_RPS)

    # parallel=False forces a single worker so batches are applied in submission
    # order — needed for UPSERT loads where a primary key can repeat across batches.
    workers = WORKERS if cfg.get("parallel", True) else 1
    log.info("Concurrency: %s", f"{workers} workers" if workers > 1 else "serial (batches applied in order)")

    log.info("=" * 70)
    log.info("Loading %s from %s", cfg["object_name"], cfg["csv_path"])
    log.info("=" * 70)
    load_table(cfg, type_map, rate_limiter, workers)
    log.info("Done.")


if __name__ == "__main__":
    main()

Running the script

  1. Save the script as bulk_ingest.py.
  2. Install the dependency: pip install requests.
  3. Export your Personal Access Token: export PEAK_AUTH_TOKEN=....
  4. Edit the CONFIG block — set solution_name, csv_path, and object_name. Set file_glob too if your source is a folder with non-default file names, and set base_url to https://ingestion.<cluster-identifier>.peak.ai if your tenant is on a spoke cluster (your account manager has the cluster identifier).
  5. Run it: python bulk_ingest.py.

Result

The script logs progress per batch. You can stop it (Ctrl-C) and resume it later — it picks up from the last successfully-persisted batch index.

Any rows the API rejects during the load are written to failed_rows/<table>.jsonl for triage. Asynchronous validation failures (foreign-key violations, primary-key collisions against pre-existing data) don't appear there — they show up in the Data Quality Dashboard instead.

Resuming after a crash

If the script stops mid-load — a network blip, a laptop reboot, anything — just run it again and it picks up from where it left off. No rows are re-sent and none are dropped.

Two files persist between runs to make that work:

  • checkpoints/<table>.json — tracks how far the load got, so the script knows where to resume.
  • failed_rows/<table>.jsonl — appended to each time the API rejects a row.

On resume, the script reads the checkpoint, skips the batches it already completed, and continues from there. You'll see [<table>] resuming from batch #N in the logs at startup.

If you want a clean restart — for example, you've truncated the warehouse table and want to load from scratch — delete the state files first. Replace <table> with the object_name from your CONFIG (e.g., acme_customers_v1):

rm checkpoints/<table>.json failed_rows/<table>.jsonl
python bulk_ingest.py
rm checkpoints/<table>.json failed_rows/<table>.jsonl
python bulk_ingest.py

Or wipe all tables' state at once if you're re-loading several:

rm -rf checkpoints failed_rows
rm -rf checkpoints failed_rows

Editing the source CSV between runs is not supported. The checkpoint tracks batch indices, not row content — so if you add or modify rows in the source file and re-run, the changes can be silently skipped. For example, adding one row to a CSV that previously fit in a single batch leaves the new row in batch #0, which the checkpoint marks "skip." If you need to add or modify rows after a load has completed, post them directly to POST /api/v2/objects/{objectName} rather than re-running the script with the edited file.

Tips for multi-day runs

  • Run from a stable host. A laptop that sleeps or moves between networks will interrupt the run; the script will resume cleanly, but you'll lose elapsed time. If you're running on a remote host over SSH, use nohup or a terminal multiplexer (tmux, screen) so the script survives the session dropping.
  • Monitor disk space. Failed-row files grow with the volume of validation failures. For a dataset with 1% failure rate across 100M rows, that's 1M JSONL entries.
  • Investigate sustained errors early. If you see persistent 5xx responses or sudden throughput drops, stop the run and submit a support ticket before exhausting retries. A dataset of this size is easier to debug at hour 2 than at hour 20.
  • Treat the failed-row files as a second-pass workload. After the bulk load completes, fix the data issues identified in the failed_rows/ directory and re-submit those rows in a smaller load. Each JSONL entry captures the row as the API saw it — extract the row payload, fix the offending field(s), and submit a small batch via POST /api/v2/objects/{objectName} directly (see the API Guide for the request shape).

Was this page helpful?

Connect

Need help? Support

Want to learn? UiPath Academy

Have questions? UiPath Forum

Stay updated