VOOZH about

URL: https://dev.to/jeferson0993/building-a-biomedical-data-lake-with-fastapi-minio-and-postgresql-50pk

⇱ Building a Biomedical Data Lake with FastAPI, MinIO, and PostgreSQL - DEV Community


How to implement a dataset catalog with 4 immutable layers, full provenance tracking, and automated backup for bioinformatics environments

The Problem

Biomedical computational research deals with data from dozens of public sources: NCBI GEO (gene expression), UniProt (proteins), PubMed (literature), PDB (structures), DrugBank, and more. Each with different formats, update frequencies, and quality levels.

Without governance, the typical scenario is:

  • Data downloaded manually, scattered across folders
  • Nobody knows which version was used in which analysis
  • Reproducibility? Good luck
  • Collaboration? Everyone has their own copy

We built the Biomedical Data Lake to solve this — a centralized catalog with 4 immutable layers, graph-based provenance, and automated backup.


Architecture

The Data Lake organizes data into 4 layers, each corresponding to a MinIO bucket (S3-compatible):

raw/ → Original collected data (immutable, 90-day object-lock)
processed/ → Filtered, normalized, or aligned data
curated/ → Curated and annotated data ready for consumption
archive/ → Historical snapshots for audit (180-day lifecycle)

Promotion between layers is always adjacent (raw → processed → curated → archive) and performs a copy — no data is ever altered in-place.

Stack

Layer Technology
Backend Python 3.12+ / FastAPI (async)
ORM SQLAlchemy 2.0 (async) + Alembic
Validation Pydantic v2
Object storage MinIO (S3-compatible)
Database PostgreSQL 16
Frontend Vanilla TypeScript + Vite + Plotly + Tailwind 4
Testing pytest + httpx + respx
Lint/Type ruff + mypy
Observability prometheus-fastapi-instrumentator

Data Model

The core of the system is the datasets table in PostgreSQL:

# app/models/dataset.py
class Dataset(Base):
 __tablename__ = "datasets"

 id: Mapped[uuid.UUID] = mapped_column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
 name: Mapped[str]
 source: Mapped[SourceType] # geo, ncbi_gene, pubmed, uniprot, upload
 external_id: Mapped[str] # GSE2034, P04637, 32581362...
 layer: Mapped[DatasetLayer] # raw, processed, curated, archive
 minio_bucket: Mapped[str]
 minio_prefix: Mapped[str]
 file_count: Mapped[int]
 total_size: Mapped[int | None]
 metadata_: Mapped[dict] = mapped_column(JSONB) # {"source": "geo", "date": "2026-06-17", "version": "v1"}
 tags: Mapped[list[str]] = mapped_column(ARRAY(Text))
 created_at: Mapped[datetime]
 updated_at: Mapped[datetime]

Each dataset carries mandatory JSONB metadata (source, date, version) — validated in the schema and auto-filled if omitted:

# app/schemas/catalog.py
class DatasetCreate(BaseModel):
 name: str = Field(..., min_length=1, max_length=256)
 source: SourceType
 external_id: str = Field(..., min_length=1, max_length=128)
 layer: DatasetLayer = DatasetLayer.raw
 minio_bucket: str
 minio_prefix: str
 metadata_: dict[str, object] | None = None
 tags: list[str] | None = None

 @model_validator(mode="after")
 def _ensure_metadata(self) -> DatasetCreate:
 if self.metadata_ is None:
 self.metadata_ = {}
 if "source" not in self.metadata_:
 self.metadata_["source"] = str(self.source)
 if "date" not in self.metadata_:
 self.metadata_["date"] = datetime.now(UTC).date().isoformat()
 if "version" not in self.metadata_:
 self.metadata_["version"] = "v1"
 return self

Layer Promotion

The golden rule: raw data is immutable. The API blocks DELETE on the raw layer, and MinIO has active object-lock.

Promoting a dataset performs a copy of the objects in MinIO plus a provenance record:

# app/services/layer_service.py
_LAYER_ORDER = [
 DatasetLayer.raw,
 DatasetLayer.processed,
 DatasetLayer.curated,
 DatasetLayer.archive,
]

async def promote_dataset(session: AsyncSession, data: PromoteRequest) -> dict[str, object]:
 dataset = await session.get(Dataset, data.dataset_id)
 if not dataset:
 raise NotFoundError("Dataset not found")

 current_idx = _LAYER_ORDER.index(dataset.layer)
 target_idx = _LAYER_ORDER.index(data.target_layer)

 if target_idx != current_idx + 1:
 raise ServiceError(
 f"Cannot promote from {dataset.layer} to {data.target_layer}. "
 f"Only adjacent promotion is allowed."
 )

 # Copy objects in MinIO (never move)
 mc = MinioClient()
 objects = await mc.list_objects(dataset.minio_bucket, prefix=dataset.minio_prefix)
 for obj in objects:
 await mc.copy_object(
 source_bucket=dataset.minio_bucket,
 source_object=obj["object_name"],
 dest_bucket=data.target_layer,
 dest_object=f"{dataset.minio_prefix}/{obj['object_name'].split('/')[-1]}",
 )

 old_layer = dataset.layer
 dataset.layer = data.target_layer
 dataset.minio_bucket = data.target_layer

 provenance = Provenance(
 dataset_id=dataset.id,
 action=ProvenanceAction.promotion,
 layer_from=old_layer,
 layer_to=data.target_layer,
 )
 session.add(provenance)
 await session.commit()

 return {"status": "completed", "source_layer": old_layer, "target_layer": data.target_layer}

Graph Provenance

Every transformation records its ancestry. The /provenance/{id}/graph endpoint returns the full graph — useful for audit and lineage tracking:

# app/models/provenance.py
class Provenance(Base):
 __tablename__ = "provenance"

 id: Mapped[uuid.UUID]
 dataset_id: Mapped[uuid.UUID] # FK → datasets
 source_dataset_id: Mapped[uuid.UUID | None] # FK → datasets (optional)
 action: Mapped[ProvenanceAction] # collection, promotion, pipeline, manual
 layer_from: Mapped[DatasetLayer | None]
 layer_to: Mapped[DatasetLayer | None]
 parameters: Mapped[dict | None] = mapped_column(JSONB)
 created_at: Mapped[datetime]

API Endpoints

Catalog

Method Route Description
GET /catalog List/search datasets (?layer=&source=&q=&tags=)
GET /catalog/stats Statistics (datasets per layer, storage per source)
GET /catalog/{id} Dataset details
GET /catalog/{id}/files List files in MinIO
GET /catalog/{id}/download/{filename} Presigned URL for download
POST /catalog Create dataset
DELETE /catalog/{id} Delete (blocked if layer = raw)

Provenance

Method Route Description
GET /provenance/{dataset_id} Linear lineage
GET /provenance/{dataset_id}/graph Full graph (ancestors + descendants)

Backup

Method Route Description
POST /backup/trigger Trigger manual backup
GET /backup/jobs List jobs
GET /backup/jobs/{id} Job details

Frontend: Scientific Dashboard

The frontend (Vanilla TypeScript + Vite + Plotly) has 5 pages:

  • Dashboard — Plotly charts: datasets per layer, storage per source
  • Catalog — table with search/filters/pagination
  • Dataset Detail — metadata, files, provenance timeline
  • Layers — cards with counts, promote button, history
  • Backup — job status, manual trigger

Practical Usage

Catalog a dataset

curl -X POST http://localhost:8002/catalog \
 -H "Content-Type: application/json" \
 -d '{
 "name": "GSE2034 - Breast Cancer Metastasis",
 "source": "geo",
 "external_id": "GSE2034",
 "layer": "raw",
 "minio_bucket": "raw",
 "minio_prefix": "geo/GSE2034"
 }'

Promote to processed

curl -X POST http://localhost:8002/layers/promote \
 -H "Content-Type: application/json" \
 -d '{
 "dataset_id": "<uuid>",
 "target_layer": "processed",
 "notes": "After FastQC + MultiQC"
 }'

View provenance

curl http://localhost:8002/provenance/<uuid>/graph

Trigger backup

curl -X POST http://localhost:8002/backup/trigger

Automated Backup

The scheduler runs on asyncio with hourly checks:

  • Daily at 2 AM: copies data to archive/backups/daily/
  • Weekly Sunday at 3 AM: archive/backups/weekly/
# app/tasks/backup_scheduler.py
async def run(self) -> None:
 while True:
 now = datetime.now(UTC)
 if await self._needs_backup(now):
 await self._run_backup()
 await asyncio.sleep(3600)

Testing: 29 Tests with Mocks

The testing strategy combines:

  • Unit tests — mock SQLAlchemy session and MinIO via fixtures in conftest.py
  • Integration tests — requires real PostgreSQL + MinIO on localhost, with @pytest.mark.skipif
# Unit tests
uv run pytest tests/ -v

# Integration
uv run pytest tests/test_integration/ -v

Lessons Learned

  1. Async is great for I/O, terrible for debugging. Async tests with pytest-asyncio + mocks require careful fixture scoping.

  2. Immutability pays dividends. Having raw data protected by object-lock + API enforcement prevents accidents. Every transformation creates a new artifact — nothing is lost.

  3. JSONB with schema validation. Mandatory metadata (source, date, version) is validated in Pydantic before reaching the database. The rest of the JSONB is free-form for each source.

  4. MinIO + async is not trivial. The minio-py client is not natively async. We built a wrapper with run_in_executor to avoid blocking the event loop.

  5. Promotion as copy. Copying objects between buckets may seem inefficient, but it's the only way to guarantee the raw layer stays intact. For production with large datasets, an async job would be better.

  6. Frameworkless frontend. Vanilla TS + Vite + Plotly handled a scientific dashboard well. Fewer dependencies, more control.


Repository

This project is part of a larger monorepo (16 bioinformatics projects). The full code is at: GitHub: jeferson0993/02-data-lake

02-data-lake/
├── app/ → FastAPI backend
├── frontend/ → TypeScript SPA
├── tests/ → 29 tests
└── docker-compose.yml

If you work with scientific data or need a governed dataset catalog, I hope this project serves as inspiration. The FastAPI + MinIO + PostgreSQL stack is light enough for an academic lab and robust enough for production.

Comments and questions are welcome!

GET IN TOUCHE: