DE in the Wild #1 — Lean Software Concepts

Today I’m starting a series of articles about Data Engineering in the real world. I want to share what it really looks like to build and maintain data systems inside a company—with all the problems, trade-offs, and complexities that don’t show up in technical textbooks.
My goals with this series are to:
- Share real situations (without disclosing any corporate info).
- Explain how I make decisions in tricky contexts.
- Open the door for discussion so I can also hear other approaches.
Each post will follow the same structure: present a problem, walk through a concrete example, and show a practical solution. At the end, you’ll also find a TL;DR with key takeaways you can quickly share.
A Bit of Context: Not Knowing What Lean Was
For the last three years at Clarity AI, I’ve worked on data ingestion, specifically on a collection tool that integrates ML and gathers data used across multiple product lines. Collection requirements change often, so we tried to optimize for fast, safe iteration through tight collaboration between backend and data science. The challenge was that the original DE codebase wasn’t designed to evolve at that speed. This post covers the practices that helped us close that gap.
When I started, I wasn’t “doing Lean” on purpose. I just needed the system to be safer and easier to change. I added validations at the edges, wrote tests to lock behavior, and split big functions into smaller ones. Only later—through reading and conversations with peers—did I realize these steps mapped directly to Lean principles: build integrity in, amplify learning, decide as late as possible, and see the whole.
Let me walk you through the situation I encountered when I first took charge of DE tasks.
Part 1. The Starting Point
Onboarding to a new project can take many forms, but the main goal is always the same: understand the scope of your role and the system’s behavior you’re going to support. Most of the time, this means learning how another developer built things and trying to understand the possibilities and features of the different modules that make up the software.
Let’s look at the code example I’ll be using during this post. It’s a function that processes data coming from a vendor. The intention of this module is to capture vendor data, transform it as needed, and return the result:
import pandas as pd
import requests
from datetime import datetime
def process_vendor_feed(vendor_id: str, start_date: str, end_date: str) -> pd.DataFrame:
"""
Fetch vendor feed, process balances, and return as DataFrame.
Everything is handled inline, making it hard to test or extend.
"""
provider_code = "CLA4"
url = f"https://api.vendor.com/feeds/{vendor_id}?start={start_date}&end={end_date}"
resp = requests.get(url, timeout=10)
if resp.status_code != 200:
raise RuntimeError(f"Failed to fetch data: {resp.status_code}")
raw = resp.json()
records = []
for r in raw:
amount = float(r.get("amount", 0))
currency = r.get("currency", "USD")
# fake normalization
if currency != "USD":
amount = amount * 1.1
records.append({
"id": r.get("id"),
"amount": amount,
"currency": "USD",
"timestamp": datetime.fromisoformat(r.get("date"))
})
df = pd.DataFrame(records)
df["provider_code"] = provider_code
df["ingested_at"] = datetime.utcnow()
return df
This module makes business sense: it performs a task that needs to be done during the ingestion process. But the way it is structured introduces several problems:
- Hard to test in isolation (fetch + transform are tightly coupled).
- Function entry point can grow with too many parameters.
- No explicit contract for inputs or outputs from fetching functions.
The real challenge shows up when you need to modify the transformation logic. For example, what if your product manager asks you to change the balance calculation process to normalize currencies using data from an exchange rate table? With everything inline, that change becomes painful.
Part 2. Amplify Learning with a Golden Master
Let’s assume the function above is part of a bigger pipeline, and we only have integration tests at the pipeline level. In this situation, the only way to understand how process_vendor_feed
behaves is by looking at those integration tests. That makes it risky to change: we can’t be sure whether small modifications will break existing behavior.
The first step I usually take is to characterize the current behavior using the Golden Master technique:
# test_process_vendor_feed.py
import pandas as pd
import pandas.testing as pdt
def test_process_vendor_feed_golden_master():
df = process_vendor_feed("V123", "2023-01-01", "2023-01-31")
# Normalize for stability (ordering, index reset)
df_sorted = df.sort_values(by="id").reset_index(drop=True)
# Expected golden master (frozen output)
expected = pd.read_json("tests/golden_master_vendor_feed.json")
# Compare actual vs expected
pdt.assert_frame_equal(df_sorted, expected, check_dtype=False)
The idea of the Golden Master is to test the function with representative inputs, capture the output, and save it as the “master version.” Once this is in place, I can safely refactor the function and compare results against the Golden Master to ensure nothing changes unexpectedly.
Whenever the function changes, you rerun the test:
- If the output matches → behavior is stable.
- If it fails but the change is intentional → regenerate the JSON with the new “golden master.”
Doing this provides the first layer of stability. Now it becomes possible to build a new feature with confidence, knowing the old behavior is locked.
For example, in our case the product manager asked us to change the balance calculation: instead of multiplying amounts by a constant factor, we should normalize currencies using an exchange rate table.
EXCHANGE_RATES = {"EUR": 1.08, "USD": 1.0, "GBP": 1.25}
def process_vendor_feed(vendor_id: str, start_date: str, end_date: str) -> pd.DataFrame:
provider_code = "CLA4"
# Fetch
url = f"https://api.vendor.com/feeds/{vendor_id}?start={start_date}&end={end_date}"
resp = requests.get(url, timeout=10)
if resp.status_code != 200:
raise RuntimeError(f"Failed to fetch data: {resp.status_code}")
raw = resp.json()
# Transform with exchange rates
records = []
for r in raw:
amount = float(r.get("amount", 0))
currency = r.get("currency", "USD")
rate = EXCHANGE_RATES.get(currency, 1.0)
amount = amount * rate
records.append({
"id": r.get("id"),
"amount": amount,
"currency": "USD",
"timestamp": datetime.fromisoformat(r.get("date"))
})
# Add default columns
df = pd.DataFrame(records)
df["provider_code"] = provider_code
df["ingested_at"] = datetime.utcnow()
return df
Part 3. Find First Confirmation by Delivering Fast
With the first safety net (Golden Master) in place, I delivered the requested feature quickly and validated it with stakeholders. In this case, that meant changing the balance calculation to normalize amounts using an exchange rate table instead of a constant factor. Early feedback ensured the solution was solving the real problem, and any adjustments could be made before hardening the code.
Once the behavior was confirmed, I moved on to improving the code quality so the changes would be sustainable.
Part 4. Build Integrity In with Refactoring and Testing
Once the new feature is working and protected by a Golden Master, the next step is to improve the structure of the function. This is a good moment to refactor: make the code easier to understand, test, and extend—without changing its behavior.
In Lean terms, this is building integrity in. We don’t wait until later to add quality; instead, we reshape the code so that correctness and maintainability are part of the system itself.
Let’s refactor process_vendor_feed
into three smaller functions:
import pandas as pd
import requests
from datetime import datetime
def fetch_vendor_feed(vendor_id: str, start_date: str, end_date: str):
url = f"https://api.vendor.com/feeds/{vendor_id}?start={start_date}&end={end_date}"
resp = requests.get(url, timeout=10)
if resp.status_code != 200:
raise RuntimeError(f"Failed to fetch data: {resp.status_code}")
return resp.json()
def apply_exchange_rate(raw):
EXCHANGE_RATES = {"EUR": 1.08, "USD": 1.0, "GBP": 1.25}
records = []
for r in raw:
amount = float(r.get("amount", 0))
currency = r.get("currency", "USD")
rate = EXCHANGE_RATES.get(currency, 1.0)
records.append({
"id": r.get("id"),
"amount": amount * rate,
"currency": "USD",
"timestamp": datetime.fromisoformat(r.get("date"))
})
return records
def format_output(records):
provider_code = "CLA4"
df = pd.DataFrame(records)
df["provider_code"] = provider_code
df["ingested_at"] = datetime.utcnow()
return df
def process_vendor_feed(vendor_id: str, start_date: str, end_date: str) -> pd.DataFrame:
raw = fetch_vendor_feed(vendor_id, start_date, end_date)
records = apply_exchange_rate(raw)
return format_output(records)
Now each part can be tested independently:
fetch_vendor_feed
→ mock the API response.apply_exchange_rate
→ pure transformation test, no network.format_output
→ check output columns and defaults.- The outer
process_vendor_feed
simply orchestrates these steps.
Having that in mind, you can write better tests for each function:
from unittest.mock import patch
from types import SimpleNamespace
from datetime import datetime
import pandas as pd
@patch("pipeline.requests.get")
def test_fetch_vendor_feed_success(mock_get):
fake_data = [
{"id": "1", "amount": "10", "currency": "EUR", "date": "2025-01-01T00:00:00"}
]
mock_get.return_value = SimpleNamespace(status_code=200, json=lambda: fake_data)
out = fetch_vendor_feed("vendor-1", "2025-01-01", "2025-01-31")
assert isinstance(out, list)
assert len(out) == 1
assert {"id", "amount", "currency", "date"}.issubset(out[0].keys())
@patch("pipeline.requests.get")
def test_fetch_vendor_feed_failure(mock_get):
mock_get.return_value = SimpleNamespace(status_code=500, json=lambda: {})
raised = False
try:
fetch_vendor_feed("vendor-1", "2025-01-01", "2025-01-31")
except RuntimeError:
raised = True
assert raised
def test_apply_exchange_rate_simple_mapping():
raw = [
{"id": "A", "amount": "10", "currency": "EUR", "date": "2025-01-01T12:00:00"},
{"id": "B", "amount": 5, "currency": "USD", "date": "2025-01-01T12:00:00"},
{"id": "C", "amount": 8, "currency": "GBP", "date": "2025-01-01T12:00:00"},
]
out = apply_exchange_rate(raw)
# Shape
assert isinstance(out, list)
assert len(out) == 3
# Values
by_id = {r["id"]: r for r in out}
assert round(by_id["A"]["amount"], 2) == round(10 * 1.08, 2)
assert round(by_id["B"]["amount"], 2) == round(5 * 1.0, 2)
assert round(by_id["C"]["amount"], 2) == round(8 * 1.25, 2)
assert all(r["currency"] == "USD" for r in out)
# Types
assert all(isinstance(r["timestamp"], datetime) for r in out)
def test_format_output_columns_exact():
records = [
{"id": "A", "amount": 10.8, "currency": "USD", "timestamp": datetime(2025, 1, 1, 12, 0, 0)}
]
df = format_output(records)
# Exact list of columns, in order
expected_cols = ["id", "amount", "currency", "timestamp", "provider_code", "ingested_at"]
assert list(df.columns) == expected_cols
@patch("pipeline.fetch_vendor_feed", return_value=[
{"id": "A", "amount": "10", "currency": "EUR", "date": "2025-01-01T12:00:00"},
{"id": "B", "amount": "5", "currency": "USD", "date": "2025-01-02T09:30:00"},
])
def test_process_vendor_feed_integration_shape(mock_fetch):
df = process_vendor_feed("vendor-1", "2025-01-01", "2025-01-31")
# Shape and columns
assert isinstance(df, pd.DataFrame)
assert df.shape[0] == 2
assert {"id", "amount", "currency"}.issubset(df.columns)
# Normalization expectation
assert set(df["currency"]) == {"USD"}
# Simple value checks (rounded)
a_amount = float(df.loc[df["id"] == "A", "amount"].iloc[0])
b_amount = float(df.loc[df["id"] == "B", "amount"].iloc[0])
assert round(a_amount, 2) == round(10 * 1.08, 2)
assert round(b_amount, 2) == round(5.0, 2)
These two principles have helped us confirm and write better code. But it doesn't guarantee that things can't break. Sometimes systems fail in ways you don’t expect.
Part 5. See the Whole with Validation
We’ve made the code cleaner and safer—but what happens when upstream starts sending nulls or malformed fields? If fetch_vendor_feed
returns missing amounts or bad dates and we quietly coerce them to zeros, the pipeline “works,” but downstream metrics are wrong. It happened to me: users, not monitors, spotted the issue. The fix is validation at the boundaries of each big step.
In practice I learned this:
- Validate at ingress strictly: reject malformed inputs early.
- Sanitize after transformations: remove nulls and log how many were dropped.
- Validate again before output: enforce the contract that downstream depends on.
Example validation layer:
from typing import List
from datetime import datetime
import logging
from pydantic import BaseModel, Field, field_validator, ValidationError
logger = logging.getLogger("pipeline.validation")
class RawItem(BaseModel):
id: str
amount: float # "10" -> 10.0 allowed; must be >= 0
currency: str = Field(min_length=1)
date: str # ISO 8601 string
@field_validator("amount")
@classmethod
def non_negative_amount(cls, v: float) -> float:
if v is None:
raise ValueError("amount is required")
if v < 0:
raise ValueError("amount must be >= 0")
return v
@field_validator("date")
@classmethod
def check_date_is_iso(cls, v: str) -> str:
datetime.fromisoformat(v)
return v
class TransformedRecord(BaseModel):
id: str
amount: float
currency: str
timestamp: datetime
@field_validator("amount")
@classmethod
def non_negative_amount(cls, v: float) -> float:
if v < 0:
raise ValueError("amount must be >= 0")
return v
@field_validator("currency")
@classmethod
def must_be_usd(cls, v: str) -> str:
if v != "USD":
raise ValueError("currency must be 'USD' after normalization")
return v
def validate_raw_strict(rows: List[dict]) -> List[dict]:
cleaned: List[dict] = []
errors = []
for i, r in enumerate(rows):
try:
item = RawItem(**r)
cleaned.append(item.model_dump())
except ValidationError as e:
errors.append((i, e))
if errors:
raise ValueError(f"Raw validation failed for {len(errors)} rows out of {len(rows)}")
return cleaned
def drop_nulls_after_transform(records: List[dict]) -> List[dict]:
required = ("id", "amount", "timestamp")
before = len(records)
filtered = [
r for r in records
if all(r.get(k) is not None for k in required)
]
removed = before - len(filtered)
if removed:
logger.warning("Sanitize: dropped %d/%d records with null %s after transform",
removed, before, list(required))
return filtered
def validate_transformed_strict(records: List[dict]) -> List[dict]:
cleaned: List[dict] = []
errors = []
for i, r in enumerate(records):
try:
item = TransformedRecord(**r)
cleaned.append(item.model_dump())
except ValidationError as e:
errors.append((i, e))
if errors:
raise ValueError(f"Transformed validation failed for {len(errors)} rows out of {len(records)}")
return cleaned
After having done this, you can wire it into your pipeline:
from validation_layer import (
validate_raw_strict,
drop_nulls_after_transform,
validate_transformed_strict,
)
def process_vendor_feed(vendor_id: str, start_date: str, end_date: str):
# Ingress: strict
raw = fetch_vendor_feed(vendor_id, start_date, end_date)
raw = validate_raw_strict(raw)
# Transform
records = apply_exchange_rate(raw)
# Sanitize: drop nulls and log how many we removed
records = drop_nulls_after_transform(records)
# Pre-output: strict
records = validate_transformed_strict(records)
# Outgress
return format_output(records)
For me, this was an upsetting moment—but also a huge learning experience. In this case, when I think of Lean Software:
- Build integrity in: Quality is enforced where it matters—at step boundaries—so correctness isn’t deferred.
- See the whole: The pipeline’s end‑to‑end contract is explicit; upstream quirks can’t silently corrupt downstream outputs.
- Decide as late as possible: You choose strictness by where you place strict validators, without changing the core logic.
- Amplify learning: Logs quantify data quality issues, making them visible so teams can fix causes, not just symptoms.
Part 6. Final Conclusions and Acknowledgments
One key takeaway is that the learning process is not linear. When I started, I didn’t know about Lean Software Development. I simply noticed code smells and tried to fix them. Over time, I realized that many of these practices align with Lean principles.
I also want to acknowledge the valuable help of Eduardo Ferro, Data Platform leader and one of the strongest voices for Lean Software Development in Spain. His guidance has been essential in shaping my thinking. A recommended read from him: Introduction to Lean Software.
TL;DR
- Don’t build on shaky ground: Shipping features without solid tests is a recipe for disaster.
- Deliver fast: Validate with stakeholders as early as possible to allow quick adjustments.
- Refactor with safety nets: Use the Golden Master technique to capture current behavior before splitting code into smaller, cleaner parts.
- Spot patterns early: When adding new features, watch for repetitive tasks. If abstraction is hard, consider using AI as a helper.
- Design for change: Use modular, composable structures (like pipes in data pipelines) so the system can evolve without major rewrites.
Next Chapter
Thank you for reading this article. I hope you found it insightful. I already have a lot of information I want to write about in the next article. My plan is to publish it next month. Some ideas I want to cover:
- Common data engineering design patterns, and which ones are used most often in real-world pipelines
- The pyramid of testing and other "figures" that are used in tests
- How to integrate Soda validations (and SQL-based types of validations) for business-case experts validation
- How a mindset shift helps when you work with DBT to do your transformation
If you’re interested in one of these topics more than the others, let me know in the comments and I’ll try to start with that one. Also, don't forget to visit my page jlrcolmenares.com.