How to implement a dataset catalog with 4 immutable layers, full provenance tracking, and automated backup for bioinformatics environments
The Problem
Biomedical computational research deals with data from dozens of public sources: NCBI GEO (gene expression), UniProt (proteins), PubMed (literature), PDB (structures), DrugBank, and more. Each with different formats, update frequencies, and quality levels.
Without governance, the typical scenario is:
- Data downloaded manually, scattered across folders
- Nobody knows which version was used in which analysis
- Reproducibility? Good luck
- Collaboration? Everyone has their own copy
We built the Biomedical Data Lake to solve this — a centralized catalog with 4 immutable layers, graph-based provenance, and automated backup.
Architecture
The Data Lake organizes data into 4 layers, each corresponding to a MinIO bucket (S3-compatible):
raw/ → Original collected data (immutable, 90-day object-lock)
processed/ → Filtered, normalized, or aligned data
curated/ → Curated and annotated data ready for consumption
archive/ → Historical snapshots for audit (180-day lifecycle)
Promotion between layers is always adjacent (raw → processed → curated → archive) and performs a copy — no data is ever altered in-place.
Stack
| Layer | Technology |
|---|---|
| Backend | Python 3.12+ / FastAPI (async) |
| ORM | SQLAlchemy 2.0 (async) + Alembic |
| Validation | Pydantic v2 |
| Object storage | MinIO (S3-compatible) |
| Database | PostgreSQL 16 |
| Frontend | Vanilla TypeScript + Vite + Plotly + Tailwind 4 |
| Testing | pytest + httpx + respx |
| Lint/Type | ruff + mypy |
| Observability | prometheus-fastapi-instrumentator |
Data Model
The core of the system is the datasets table in PostgreSQL:
# app/models/dataset.py
class Dataset(Base):
__tablename__ = "datasets"
id: Mapped[uuid.UUID] = mapped_column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
name: Mapped[str]
source: Mapped[SourceType] # geo, ncbi_gene, pubmed, uniprot, upload
external_id: Mapped[str] # GSE2034, P04637, 32581362...
layer: Mapped[DatasetLayer] # raw, processed, curated, archive
minio_bucket: Mapped[str]
minio_prefix: Mapped[str]
file_count: Mapped[int]
total_size: Mapped[int | None]
metadata_: Mapped[dict] = mapped_column(JSONB) # {"source": "geo", "date": "2026-06-17", "version": "v1"}
tags: Mapped[list[str]] = mapped_column(ARRAY(Text))
created_at: Mapped[datetime]
updated_at: Mapped[datetime]
Each dataset carries mandatory JSONB metadata (source, date, version) — validated in the schema and auto-filled if omitted:
# app/schemas/catalog.py
class DatasetCreate(BaseModel):
name: str = Field(..., min_length=1, max_length=256)
source: SourceType
external_id: str = Field(..., min_length=1, max_length=128)
layer: DatasetLayer = DatasetLayer.raw
minio_bucket: str
minio_prefix: str
metadata_: dict[str, object] | None = None
tags: list[str] | None = None
@model_validator(mode="after")
def _ensure_metadata(self) -> DatasetCreate:
if self.metadata_ is None:
self.metadata_ = {}
if "source" not in self.metadata_:
self.metadata_["source"] = str(self.source)
if "date" not in self.metadata_:
self.metadata_["date"] = datetime.now(UTC).date().isoformat()
if "version" not in self.metadata_:
self.metadata_["version"] = "v1"
return self
Layer Promotion
The golden rule: raw data is immutable. The API blocks DELETE on the raw layer, and MinIO has active object-lock.
Promoting a dataset performs a copy of the objects in MinIO plus a provenance record:
# app/services/layer_service.py
_LAYER_ORDER = [
DatasetLayer.raw,
DatasetLayer.processed,
DatasetLayer.curated,
DatasetLayer.archive,
]
async def promote_dataset(session: AsyncSession, data: PromoteRequest) -> dict[str, object]:
dataset = await session.get(Dataset, data.dataset_id)
if not dataset:
raise NotFoundError("Dataset not found")
current_idx = _LAYER_ORDER.index(dataset.layer)
target_idx = _LAYER_ORDER.index(data.target_layer)
if target_idx != current_idx + 1:
raise ServiceError(
f"Cannot promote from {dataset.layer} to {data.target_layer}. "
f"Only adjacent promotion is allowed."
)
# Copy objects in MinIO (never move)
mc = MinioClient()
objects = await mc.list_objects(dataset.minio_bucket, prefix=dataset.minio_prefix)
for obj in objects:
await mc.copy_object(
source_bucket=dataset.minio_bucket,
source_object=obj["object_name"],
dest_bucket=data.target_layer,
dest_object=f"{dataset.minio_prefix}/{obj['object_name'].split('/')[-1]}",
)
old_layer = dataset.layer
dataset.layer = data.target_layer
dataset.minio_bucket = data.target_layer
provenance = Provenance(
dataset_id=dataset.id,
action=ProvenanceAction.promotion,
layer_from=old_layer,
layer_to=data.target_layer,
)
session.add(provenance)
await session.commit()
return {"status": "completed", "source_layer": old_layer, "target_layer": data.target_layer}
Graph Provenance
Every transformation records its ancestry. The /provenance/{id}/graph endpoint returns the full graph — useful for audit and lineage tracking:
# app/models/provenance.py
class Provenance(Base):
__tablename__ = "provenance"
id: Mapped[uuid.UUID]
dataset_id: Mapped[uuid.UUID] # FK → datasets
source_dataset_id: Mapped[uuid.UUID | None] # FK → datasets (optional)
action: Mapped[ProvenanceAction] # collection, promotion, pipeline, manual
layer_from: Mapped[DatasetLayer | None]
layer_to: Mapped[DatasetLayer | None]
parameters: Mapped[dict | None] = mapped_column(JSONB)
created_at: Mapped[datetime]
API Endpoints
Catalog
| Method | Route | Description |
|---|---|---|
GET |
/catalog |
List/search datasets (?layer=&source=&q=&tags=) |
GET |
/catalog/stats |
Statistics (datasets per layer, storage per source) |
GET |
/catalog/{id} |
Dataset details |
GET |
/catalog/{id}/files |
List files in MinIO |
GET |
/catalog/{id}/download/{filename} |
Presigned URL for download |
POST |
/catalog |
Create dataset |
DELETE |
/catalog/{id} |
Delete (blocked if layer = raw) |
Provenance
| Method | Route | Description |
|---|---|---|
GET |
/provenance/{dataset_id} |
Linear lineage |
GET |
/provenance/{dataset_id}/graph |
Full graph (ancestors + descendants) |
Backup
| Method | Route | Description |
|---|---|---|
POST |
/backup/trigger |
Trigger manual backup |
GET |
/backup/jobs |
List jobs |
GET |
/backup/jobs/{id} |
Job details |
Frontend: Scientific Dashboard
The frontend (Vanilla TypeScript + Vite + Plotly) has 5 pages:
- Dashboard — Plotly charts: datasets per layer, storage per source
- Catalog — table with search/filters/pagination
- Dataset Detail — metadata, files, provenance timeline
- Layers — cards with counts, promote button, history
- Backup — job status, manual trigger
Practical Usage
Catalog a dataset
curl -X POST http://localhost:8002/catalog \
-H "Content-Type: application/json" \
-d '{
"name": "GSE2034 - Breast Cancer Metastasis",
"source": "geo",
"external_id": "GSE2034",
"layer": "raw",
"minio_bucket": "raw",
"minio_prefix": "geo/GSE2034"
}'
Promote to processed
curl -X POST http://localhost:8002/layers/promote \
-H "Content-Type: application/json" \
-d '{
"dataset_id": "<uuid>",
"target_layer": "processed",
"notes": "After FastQC + MultiQC"
}'
View provenance
curl http://localhost:8002/provenance/<uuid>/graph
Trigger backup
curl -X POST http://localhost:8002/backup/trigger
Automated Backup
The scheduler runs on asyncio with hourly checks:
-
Daily at 2 AM: copies data to
archive/backups/daily/ -
Weekly Sunday at 3 AM:
archive/backups/weekly/
# app/tasks/backup_scheduler.py
async def run(self) -> None:
while True:
now = datetime.now(UTC)
if await self._needs_backup(now):
await self._run_backup()
await asyncio.sleep(3600)
Testing: 29 Tests with Mocks
The testing strategy combines:
-
Unit tests — mock SQLAlchemy session and MinIO via fixtures in
conftest.py -
Integration tests — requires real PostgreSQL + MinIO on
localhost, with@pytest.mark.skipif
# Unit tests
uv run pytest tests/ -v
# Integration
uv run pytest tests/test_integration/ -v
Lessons Learned
Async is great for I/O, terrible for debugging. Async tests with
pytest-asyncio+ mocks require careful fixture scoping.Immutability pays dividends. Having raw data protected by object-lock + API enforcement prevents accidents. Every transformation creates a new artifact — nothing is lost.
JSONB with schema validation. Mandatory metadata (
source,date,version) is validated in Pydantic before reaching the database. The rest of the JSONB is free-form for each source.MinIO + async is not trivial. The
minio-pyclient is not natively async. We built a wrapper withrun_in_executorto avoid blocking the event loop.Promotion as copy. Copying objects between buckets may seem inefficient, but it's the only way to guarantee the raw layer stays intact. For production with large datasets, an async job would be better.
Frameworkless frontend. Vanilla TS + Vite + Plotly handled a scientific dashboard well. Fewer dependencies, more control.
Repository
This project is part of a larger monorepo (16 bioinformatics projects). The full code is at: GitHub: jeferson0993/02-data-lake
02-data-lake/
├── app/ → FastAPI backend
├── frontend/ → TypeScript SPA
├── tests/ → 29 tests
└── docker-compose.yml
If you work with scientific data or need a governed dataset catalog, I hope this project serves as inspiration. The FastAPI + MinIO + PostgreSQL stack is light enough for an academic lab and robust enough for production.
Comments and questions are welcome!
GET IN TOUCHE:
For further actions, you may consider blocking this person and/or reporting abuse
