In this issue (11 sections)
In Issue 2, we built a clean schema with a field registry. Now comes the unglamorous truth: getting vendor data into that schema is 80% of the work.
We spent 2 weeks designing the perfect database. We spent 6 weeks fighting with vendor data. Every price in a different format. Every null represented differently. Every product with a slightly different SKU.
In this issue:
- Why vendor data is chaos (and always will be)
- The 4-step ingestion pipeline (fetch -> transform -> validate -> upsert)
- Normalization strategies for units, nulls, and duplicates
- Why we chose scripts over Airflow
History Anchor: HMMs -> Modern Agent Architectures
Data ingestion has evolved from batch ETL jobs (Informatica, SSIS) through Hadoop MapReduce to modern ELT with dbt and Airflow. For AI agents, the ingestion layer has an additional constraint: the data must be normalized precisely enough for an LLM to reason about it. Messy data doesn’t just cause query errors — it causes hallucinations.
The Chaos
Nightmare 1: The Unit Chaos
SAME LAPTOP, 4 VENDORS, 4 DIFFERENT PRICE FORMATS:
Vendor A (Europe):
"price": "EUR 1.299,00"
Vendor B (US):
"price": "$1,499.00 USD"
Vendor C (API):
"price_cents": 149900
Vendor D (CSV):
"list_price": "1499"
OUR SCHEMA EXPECTS:
price: NUMERIC (1499.00)
unit: USD
PROBLEM: 4 different formats, 3 different currencies
The same laptop. Four vendors. Four completely different representations of the price. Our schema expects one consistent format.
Nightmare 2: The Null Chaos
MISSING WEIGHT DATA, 4 DIFFERENT REPRESENTATIONS:
Vendor A: "weight": null
Vendor B: "weight": "N/A"
Vendor C: "weight": 0
Vendor D: "weight": ""
QUESTION: What do we do?
- Reject the product? (lose data)
- Use default value? (inaccurate)
- Flag as missing? (track it)
PROBLEM: No consistent null handling
“Missing” doesn’t mean the same thing to every vendor. Some send null. Some send "N/A". Some send 0 (is that missing or actual zero weight?). Some send empty strings.
Nightmare 3: The Duplicate Chaos
SAME LAPTOP, 3 VENDORS, 3 DIFFERENT IDENTIFIERS:
Vendor A:
"sku": "XPS-15-2024"
"name": "Dell XPS 15"
"price": 1899
Vendor B:
"sku": "XPS15"
"name": "XPS 15 Laptop"
"price": 1899.00
Vendor C:
"sku": "DELL-XPS-15"
"name": "Dell XPS 15 (2024)"
"price": 1899
QUESTION: Are these the same product?
- Different SKUs
- Slightly different names
- Same price (but different formats)
PROBLEM: Need deduplication strategy
Three vendors, three different SKUs, three slightly different names. Is “XPS-15-2024” the same as “XPS15”? Your database needs one canonical entry.
The Realization
We spent 2 weeks building the perfect schema. We spent 6 weeks getting vendor data to fit it. Ingestion is 80% of the work.
The Diagnosis: Why Vendor Data Is Chaos
What Vendors Send vs. What You Need
WHAT VENDORS SEND (The Reality):
- Different field names ("price" vs "list_price")
- Different units (EUR, USD, cents)
- Different formats ("$1,499" vs 1499 vs "1.499,00")
- Different null representations (null, "N/A", 0, "")
- Different schemas (some have weight, some don't)
- Typos and errors ("weightt": 2.5)
WHAT YOUR SCHEMA NEEDS (The Standard):
- Consistent field names (price, weight_kg, brand)
- One unit (USD, kg)
- One format (NUMERIC: 1499.00)
- One null strategy (null = missing, not 0 or "N/A")
- Required fields always present
- No typos, no errors
The pattern: Vendors don’t care about your schema. You have to transform their data into your format, every time, for every vendor.
The Gap (The Ingestion Layer Bridges It)
VENDOR DATA (Chaos)
|
v
INGESTION LAYER
1. FETCH: Get data from vendor (API, CSV, scrape)
2. TRANSFORM: Normalize to your schema
3. VALIDATE: Check if it meets rules
4. UPSERT: Update if exists, insert if new
|
v
YOUR DATABASE (Clean, Standardized)
The Ingestion Pipeline
The 4-Step Flow
Step 1: FETCH — Get raw data from vendor
Sources: API calls (JSON), CSV files (download), web scraping (HTML).
Output: Raw vendor data (as-is).
Key Principle: Don’t transform during fetch. Get the data exactly as the vendor sends it.
Step 2: TRANSFORM — Convert to your schema
- Map fields:
"list_price"->"price","product_name"->"name","weightt"->"weight_kg"(fix typos) - Normalize units:
"EUR 1,299"->1428.90(USD),"5.5 lbs"->2.49(kg) - Handle nulls:
"N/A"->null,""->null,0->null(if 0 means missing) - Preserve original: Store raw data in
full_dataJSONB
Step 3: VALIDATE — Check if data meets schema rules
- Required fields: sku present? name present? price present?
- Types: price is numeric? weight is numeric?
- Ranges: price > 0? weight > 0?
- If validation fails: log error with details, skip product, continue with next
Step 4: UPSERT — Insert or update in database
- Check: Does SKU already exist?
- Yes -> UPDATE (price, specs, updated_at)
- No -> INSERT (new product)
- Conflict resolution: if price changed, update; if specs changed, update; if vendor data is older, skip
INSERT INTO product (...) VALUES (...)
ON CONFLICT (sku) DO UPDATE SET ...
Normalization Strategies
Strategy 1: Price Normalization
Input: "EUR 1.299,00"
|
v
1. Detect currency: EUR
2. Handle European format: 1.299,00 -> 1299.00
3. Remove symbols: 1299.00
4. Convert EUR -> USD: 1299 x 1.10 = 1428.90
5. Round to 2 decimals: 1428.90
|
v
Output: 1428.90 (NUMERIC, USD)
# The problem: "EUR 1.299,00" needs to become 1428.90 (USD)
# The solution: Normalize in steps
def normalize_price(raw, source_currency="USD"):
# 1. Remove currency symbols
cleaned = raw.replace("EUR", "").replace("$", "").strip()
# 2. Handle European format (1.299,00 -> 1299.00)
if "," in cleaned and "." in cleaned:
if cleaned.rindex(",") > cleaned.rindex("."):
# European: 1.299,00 -> remove dots, swap comma
cleaned = cleaned.replace(".", "").replace(",", ".")
else:
# US: 1,499.00 -> just remove commas
cleaned = cleaned.replace(",", "")
elif "," in cleaned:
cleaned = cleaned.replace(",", ".")
# 3. Convert to float
value = float(cleaned)
# 4. Convert currency to USD
rates = {"EUR": 1.10, "GBP": 1.27, "USD": 1.0}
value = value * rates.get(source_currency, 1.0)
# 5. Round to 2 decimals
return round(value, 2)
Strategy 2: Weight Normalization
Input: "5.5 lbs"
|
v
1. Extract number: 5.5
2. Extract unit: "lbs"
3. Convert to kg: 5.5 x 0.453592 = 2.49
4. Round to 2 decimals: 2.49
|
v
Output: 2.49 (NUMERIC, kg)
# The problem: "5.5 lbs" needs to become 2.49 (kg)
# The solution: Extract unit, convert
import re
def normalize_weight(raw):
if not raw:
return None
# Extract number and unit
match = re.match(r"([\d.]+)\s*(kg|lbs|lb|g|oz)?", str(raw).lower())
if not match:
return None
value = float(match.group(1))
unit = match.group(2) or "kg" # Default to kg if no unit
# Convert to kg
conversions = {
"lbs": 0.453592,
"lb": 0.453592,
"g": 0.001,
"oz": 0.0283495,
"kg": 1.0
}
value = value * conversions.get(unit, 1.0)
return round(value, 2)
Strategy 3: Null Handling
NULL HANDLING DECISION TREE:
Is field required (sku, name, price)?
YES -> Reject product (log error)
NO -> Continue
Is value a null-equivalent ("N/A", "", 0)?
YES -> Convert to NULL
NO -> Keep value
Should we flag missing data?
YES -> Set flag: missing_weight = true
NO -> Just store null
# The problem: "N/A", "", 0, null all mean "missing"
# The solution: Normalize to Python None
NULL_EQUIVALENTS = {None, "", "N/A", "n/a", "NA", "null", "NULL", "-"}
def normalize_null(value, treat_zero_as_null=False):
if value in NULL_EQUIVALENTS:
return None
if treat_zero_as_null and value == 0:
return None
return value
Strategy 4: Deduplication
DEDUPLICATION STRATEGY:
Step 1: Normalize SKU
- Remove hyphens, spaces
- Lowercase
- "XPS-15-2024" -> "xps152024"
- "XPS 15" -> "xps15"
Step 2: Check for match
- Exact normalized SKU match? -> Update existing
- Fuzzy name match (>90%)? -> Flag for review
- No match? -> Insert new
Step 3: Conflict resolution
- If prices differ: Use most recent vendor data
- If specs differ: Merge (keep most complete)
- Always track: which vendor, when updated
Scripts vs. Pipelines
When to Use Each
Use scripts when:
- Schema is evolving (frequent changes)
- Vendors are inconsistent (different formats)
- Volume is manageable (< 10M products)
- Need manual review/validation
- Team is small (< 5 engineers)
- Benefits: easy to debug, fast to iterate, flexible, low overhead
Use pipelines (Airflow/dbt) when:
- Schema is stable (rarely changes)
- Vendors are consistent (same format)
- Volume is large (> 10M products)
- Need scheduling/orchestration
- Team is large (> 5 engineers)
- Benefits: automated scheduling, dependency management (DAGs), monitoring/alerting built-in, scales to large volumes
Our choice: Scripts. Schema evolving, vendors inconsistent, volume under 1M. We can always migrate to Airflow later — but starting there would have slowed us down.
Error Handling
The Error Handling Pattern
# The problem: Some products fail validation
# The solution: Log errors, continue with valid products
def ingest_vendor(vendor_name, vendor_data):
results = {"success": [], "errors": []}
for raw_product in vendor_data:
try:
# 1. Transform
product = transform(raw_product, vendor_name)
# 2. Validate
errors = validate(product)
if errors:
raise ValueError(f"Validation failed: {errors}")
# 3. Upsert
db.upsert(product)
results["success"].append(product["sku"])
except Exception as e:
results["errors"].append({
"sku": raw_product.get("sku", "UNKNOWN"),
"vendor": vendor_name,
"error": str(e),
"raw_data": raw_product # Preserve for debugging
})
# Report
print(f"Ingested {len(results['success'])} products")
print(f"Errors: {len(results['errors'])}")
return results
The full_data Safety Net (Your Data Lake in Postgres)
Always preserve the original vendor data. Think of full_data as a mini Data Lake — raw, untransformed vendor data sitting right next to your normalized columns:
def transform(raw_product, vendor_name):
return {
"sku": normalize_sku(raw_product.get("sku")),
"name": raw_product.get("name") or raw_product.get("product_name"),
"price": normalize_price(raw_product.get("price")),
"weight_kg": normalize_weight(raw_product.get("weight")),
"brand": raw_product.get("brand"),
# ALWAYS preserve original data
"full_data": {
"vendor": vendor_name,
"fetched_at": datetime.now().isoformat(),
"raw": raw_product # Original, untouched
}
}
The Proof: Before/After
| Before | After |
|---|---|
| 30% of products rejected (validation failures) | 95% of products ingested successfully |
| 15% duplicate products (same laptop, different SKUs) | 2% duplicates (normalized SKUs catch most) |
| 40% missing weight data (nulls handled inconsistently) | 15% missing weight (but flagged, not broken) |
| 0% vendor data preserved (lost for debugging) | 100% vendor data preserved (in full_data JSONB) |
| Manual fixes: 20 hours/week | Manual fixes: 2 hours/week |
What changed: We stopped hoping vendor data was clean and started normalizing it systematically.
The Checklist: Ingestion Layer Readiness
| Item | Score | |
|---|---|---|
| Are all units normalized to one standard? | Units | |
| Are all formats converted to schema types? | Formats | |
| Do you handle nulls consistently? | Nulls | |
| Do you deduplicate across vendors? | Duplicates | |
| Do you preserve original vendor data? | Raw Data | |
| Do you validate before inserting? | Validation | |
| Do you log failed products with details? | Errors | |
| Do you update existing products? | Upsert |
Score Interpretation:
- 0-3: Ingestion not ready. Fix this before building agents.
- 4-6: Prototype-ready. Proceed with caution.
- 7-8: Production-ready. Move to Issue 4.
What’s Next
Issue 4: The Intent Layer
Now that you have clean data in your database, how does the agent understand what users want?
“User says ‘Find me something good.’ Is that a search? A question? A comparison? We had one giant prompt doing everything. It broke constantly.”
What You’ll Learn:
- Intent classification design (search, refine, compare, ask)
- Multi-intent handling (“Find laptops and compare the top two”)
- Entity extraction (resolving “the first one” to an actual product)
Key Takeaways
- 1 The Problem: Vendor data is chaos (different units, formats, nulls, duplicates).
- 2 The Solution: 4-step pipeline (Fetch -> Transform -> Validate -> Upsert) + Normalize aggressively (units, formats, nulls) + Preserve raw data in full_data JSONB + Scripts over pipelines (for flexibility).
- 3 Key Takeaway: Ingestion is 80% of the work. Get it right, and everything else becomes easier.
Glossary
- ELT: Extract-Load-Transform — load raw data first, transform in database (vs ETL which transforms before loading)
- Normalization: Converting data to standard format (EUR -> USD, lbs -> kg)
- Upsert: Update if exists, insert if new (INSERT … ON CONFLICT DO UPDATE)
- Null Handling: Strategy for missing data (reject, default, or flag)
- Deduplication: Removing duplicate entries across vendors
- Raw Data: Original vendor data before transformation
- Validation: Checking if transformed data meets schema rules
- full_data: JSONB column storing original vendor data — your “Data Lake in Postgres”
- The Airflow Tax: Overhead of running orchestration infrastructure before you need it
Until next issue,
Sentient Zero Labs