VOOZH about

URL: https://dev.to/datanestdigital/mlops-pipeline-architecture-from-experiment-to-production-3oih

⇱ MLOps Pipeline Architecture: From Experiment to Production - DEV Community


Only 22% of companies using machine learning have successfully deployed a model to production. The other 78% are stuck in what the industry calls "the last mile problem" — a Jupyter notebook that works on a laptop but will never serve a single real prediction.

The gap between experiment and production is where MLOps lives. This guide walks through building a complete pipeline — from experiment tracking to model serving and monitoring — with practical code you can adapt to your own stack.

The MLOps Maturity Model

Before building, understand where you are:

Level Description Characteristics
0 Manual Notebooks, manual deployment, no versioning
1 ML Pipeline Automated training, experiment tracking
2 CI/CD for ML Automated testing, deployment pipelines
3 Full MLOps Automated retraining, monitoring, feedback loops

Most teams are at Level 0 or 1. This guide gets you to Level 2 and points toward Level 3.

Architecture Overview

┌───────────────────────────────────────────────────────────┐
│ MLOps Pipeline │
│ │
│ ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐ │
│ │ Feature │ │ Training │ │ Model │ │ Model │ │
│ │ Store │→ │ Pipeline │→ │ Registry │→ │ Serving │ │
│ └──────────┘ └──────────┘ └──────────┘ └──────────┘ │
│ ↑ ↑ ↑ │ │
│ │ │ │ ↓ │
│ ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐ │
│ │ Data │ │Experiment│ │Validation│ │Monitoring│ │
│ │ Source │ │ Tracking │ │ Gates │ │& Alerts │ │
│ └──────────┘ └──────────┘ └──────────┘ └──────────┘ │
└───────────────────────────────────────────────────────────┘

Step 1: Experiment Tracking with MLflow

Experiment tracking is the foundation. If you can't reproduce an experiment, you can't debug or improve it.

import mlflow
from mlflow.tracking import MlflowClient
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.ensemble import GradientBoostingClassifier
from sklearn.metrics import (
 accuracy_score, precision_score, recall_score, f1_score, roc_auc_score
)


# Configure MLflow
mlflow.set_tracking_uri("http://mlflow-server:5000")
mlflow.set_experiment("customer-churn-prediction")


def train_model(
 data_path: str,
 hyperparams: dict,
 experiment_name: str = "customer-churn-prediction",
) -> str:
 """Train a model with full experiment tracking."""

 mlflow.set_experiment(experiment_name)

 with mlflow.start_run(run_name=f"gbm-{hyperparams.get('n_estimators', 100)}") as run:
 # Log parameters
 mlflow.log_params(hyperparams)
 mlflow.log_param("data_path", data_path)

 # Load and prepare data
 df = pd.read_parquet(data_path)
 X = df.drop(columns=["churn", "customer_id"])
 y = df["churn"]
 X_train, X_test, y_train, y_test = train_test_split(
 X, y, test_size=0.2, random_state=42, stratify=y
 )

 # Log dataset metadata for reproducibility
 mlflow.log_param("train_size", len(X_train))
 mlflow.log_param("test_size", len(X_test))
 mlflow.log_param("feature_count", X_train.shape[1])
 mlflow.log_param("positive_rate", float(y.mean()))

 # Train
 model = GradientBoostingClassifier(**hyperparams)
 model.fit(X_train, y_train)

 # Evaluate
 y_pred = model.predict(X_test)
 y_proba = model.predict_proba(X_test)[:, 1]

 metrics = {
 "accuracy": accuracy_score(y_test, y_pred),
 "precision": precision_score(y_test, y_pred),
 "recall": recall_score(y_test, y_pred),
 "f1": f1_score(y_test, y_pred),
 "auc_roc": roc_auc_score(y_test, y_proba),
 }
 mlflow.log_metrics(metrics)

 # Log feature importance as an artifact
 importance_df = pd.DataFrame({
 "feature": X_train.columns,
 "importance": model.feature_importances_,
 }).sort_values("importance", ascending=False)
 importance_df.to_csv("feature_importance.csv", index=False)
 mlflow.log_artifact("feature_importance.csv")

 # Log model with signature for schema enforcement
 from mlflow.models import infer_signature
 signature = infer_signature(X_test, y_pred)
 mlflow.sklearn.log_model(
 model,
 "model",
 signature=signature,
 registered_model_name="churn-classifier",
 )

 print(f"Run ID: {run.info.run_id}")
 print(f"Metrics: {metrics}")
 return run.info.run_id

Step 2: Feature Store Pattern

A feature store ensures consistent features between training and serving — one of the most common sources of production bugs when skipped.

from dataclasses import dataclass, field
from datetime import datetime
from typing import Any
import pandas as pd
import hashlib
import json


@dataclass
class FeatureDefinition:
 name: str
 description: str
 dtype: str
 transform_fn: str # Reference to transformation function
 source_table: str
 version: str = "1.0"


class SimpleFeatureStore:
 """Lightweight feature store for consistent feature engineering.

 This is a simplified implementation to illustrate the pattern.
 Production systems would use Feast, Tecton, or a similar framework.
 """

 def __init__(self, storage_path: str):
 self.storage_path = storage_path
 self.registry: dict[str, FeatureDefinition] = {}

 def register_feature_group(
 self, group_name: str, features: list[FeatureDefinition]
 ):
 """Register a group of related features."""
 for feature in features:
 key = f"{group_name}.{feature.name}"
 self.registry[key] = feature

 def compute_features(
 self,
 group_name: str,
 entity_df: pd.DataFrame,
 transforms: dict,
 ) -> pd.DataFrame:
 """Compute features for given entities using registered transforms."""
 result = entity_df.copy()

 for key, feature_def in self.registry.items():
 if not key.startswith(group_name):
 continue
 transform = transforms.get(feature_def.transform_fn)
 if transform:
 result[feature_def.name] = transform(result)

 # Cache computed features with version hash
 cache_key = self._compute_hash(group_name, entity_df)
 cache_path = f"{self.storage_path}/{group_name}/{cache_key}.parquet"
 result.to_parquet(cache_path)

 return result

 def get_training_features(
 self, group_name: str, entity_df: pd.DataFrame
 ) -> pd.DataFrame:
 """Get features for training (point-in-time correct).

 In production, this performs point-in-time joins
 to prevent data leakage from future events.
 """
 cache_key = self._compute_hash(group_name, entity_df)
 cache_path = f"{self.storage_path}/{group_name}/{cache_key}.parquet"

 try:
 return pd.read_parquet(cache_path)
 except FileNotFoundError:
 raise ValueError(
 f"Features not computed for {group_name}. "
 "Run compute_features first."
 )

 def get_serving_features(
 self, group_name: str, entity_ids: list
 ) -> pd.DataFrame:
 """Get latest features for real-time serving."""
 # In production, this reads from a low-latency store like Redis
 latest_path = f"{self.storage_path}/{group_name}/latest.parquet"
 df = pd.read_parquet(latest_path)
 return df[df["entity_id"].isin(entity_ids)]

 def _compute_hash(
 self, group_name: str, entity_df: pd.DataFrame
 ) -> str:
 content = f"{group_name}_{len(entity_df)}_{entity_df.columns.tolist()}"
 return hashlib.md5(content.encode()).hexdigest()[:12]


# Usage example
store = SimpleFeatureStore("/data/feature_store")

customer_features = [
 FeatureDefinition(
 name="total_purchases_30d",
 description="Total purchase amount in last 30 days",
 dtype="float64",
 transform_fn="compute_purchases_30d",
 source_table="transactions",
 ),
 FeatureDefinition(
 name="login_frequency_7d",
 description="Number of logins in last 7 days",
 dtype="int64",
 transform_fn="compute_login_freq_7d",
 source_table="user_events",
 ),
 FeatureDefinition(
 name="support_tickets_90d",
 description="Support tickets opened in last 90 days",
 dtype="int64",
 transform_fn="compute_support_tickets",
 source_table="support_tickets",
 ),
]

store.register_feature_group("customer", customer_features)

Step 3: Model Validation Gates

Never deploy a model without automated validation. This framework checks performance, regression, latency, and data drift before any model reaches production.

from dataclasses import dataclass
from enum import Enum


class ValidationResult(Enum):
 PASS = "pass"
 WARN = "warn"
 FAIL = "fail"


@dataclass
class ValidationCheck:
 name: str
 result: ValidationResult
 actual_value: float
 threshold: float
 message: str


class ModelValidator:
 """Automated model validation before deployment.

 Runs a battery of checks against configurable thresholds.
 Any FAIL result blocks promotion to production.
 """

 def __init__(
 self,
 min_accuracy: float = 0.80,
 min_auc: float = 0.75,
 max_latency_ms: float = 100,
 min_coverage: float = 0.95,
 ):
 self.min_accuracy = min_accuracy
 self.min_auc = min_auc
 self.max_latency_ms = max_latency_ms
 self.min_coverage = min_coverage
 self.checks: list[ValidationCheck] = []

 def validate_performance(self, metrics: dict) -> bool:
 """Check model performance against minimum thresholds."""
 checks = [
 ("accuracy", metrics.get("accuracy", 0), self.min_accuracy),
 ("auc_roc", metrics.get("auc_roc", 0), self.min_auc),
 ]

 all_pass = True
 for name, actual, threshold in checks:
 if actual >= threshold:
 result = ValidationResult.PASS
 elif actual >= threshold * 0.95: # Within 5% — warn but don't block
 result = ValidationResult.WARN
 else:
 result = ValidationResult.FAIL
 all_pass = False

 self.checks.append(ValidationCheck(
 name=f"performance_{name}",
 result=result,
 actual_value=actual,
 threshold=threshold,
 message=f"{name}: {actual:.4f} (threshold: {threshold})"
 ))

 return all_pass

 def validate_regression(
 self, current_metrics: dict, baseline_metrics: dict,
 max_degradation: float = 0.02,
 ) -> bool:
 """Ensure new model doesn't regress vs. the current production baseline."""
 all_pass = True

 for metric_name in ["accuracy", "f1", "auc_roc"]:
 current = current_metrics.get(metric_name, 0)
 baseline = baseline_metrics.get(metric_name, 0)
 degradation = baseline - current

 if degradation <= 0:
 result = ValidationResult.PASS
 elif degradation <= max_degradation:
 result = ValidationResult.WARN
 else:
 result = ValidationResult.FAIL
 all_pass = False

 self.checks.append(ValidationCheck(
 name=f"regression_{metric_name}",
 result=result,
 actual_value=current,
 threshold=baseline - max_degradation,
 message=(
 f"{metric_name}: current={current:.4f}, "
 f"baseline={baseline:.4f}, "
 f"degradation={degradation:.4f}"
 ),
 ))

 return all_pass

 def validate_latency(self, latency_p50: float, latency_p99: float) -> bool:
 """Check inference latency against SLA requirements."""
 p99_pass = latency_p99 <= self.max_latency_ms

 self.checks.append(ValidationCheck(
 name="latency_p99",
 result=ValidationResult.PASS if p99_pass else ValidationResult.FAIL,
 actual_value=latency_p99,
 threshold=self.max_latency_ms,
 message=f"p99 latency: {latency_p99:.1f}ms (max: {self.max_latency_ms}ms)",
 ))

 return p99_pass

 def validate_data_drift(
 self,
 training_stats: dict,
 serving_stats: dict,
 max_psi: float = 0.2,
 ) -> bool:
 """Check for Population Stability Index (PSI) drift."""
 all_pass = True
 for feature in training_stats:
 if feature in serving_stats:
 train_mean = training_stats[feature]["mean"]
 serve_mean = serving_stats[feature]["mean"]
 drift = abs(train_mean - serve_mean) / (abs(train_mean) + 1e-8)

 if drift > max_psi:
 all_pass = False

 self.checks.append(ValidationCheck(
 name=f"drift_{feature}",
 result=(
 ValidationResult.PASS if drift <= max_psi * 0.5
 else ValidationResult.WARN if drift <= max_psi
 else ValidationResult.FAIL
 ),
 actual_value=drift,
 threshold=max_psi,
 message=f"{feature} drift: {drift:.4f} (max PSI: {max_psi})",
 ))

 return all_pass

 def generate_report(self) -> str:
 """Generate a human-readable validation report."""
 lines = ["=" * 60, "MODEL VALIDATION REPORT", "=" * 60]

 for check in self.checks:
 icon = {
 ValidationResult.PASS: "PASS",
 ValidationResult.WARN: "WARN",
 ValidationResult.FAIL: "FAIL",
 }[check.result]
 lines.append(f" [{icon}] {check.message}")

 passed = sum(1 for c in self.checks if c.result == ValidationResult.PASS)
 warned = sum(1 for c in self.checks if c.result == ValidationResult.WARN)
 failed = sum(1 for c in self.checks if c.result == ValidationResult.FAIL)

 lines.append("=" * 60)
 lines.append(
 f"Results: {passed} passed, {warned} warnings, {failed} failed"
 )
 lines.append(
 f"Overall: {'APPROVED' if failed == 0 else 'REJECTED'}"
 )
 lines.append("=" * 60)

 return "\n".join(lines)

Step 4: Model Serving with FastAPI

import time
from contextlib import asynccontextmanager
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
import mlflow
import pandas as pd
import numpy as np


class PredictionRequest(BaseModel):
 customer_id: str
 features: dict[str, float]


class PredictionResponse(BaseModel):
 customer_id: str
 prediction: int
 probability: float
 model_version: str
 latency_ms: float


# Global model storage
model_store = {}


@asynccontextmanager
async def lifespan(app: FastAPI):
 """Load the production model at startup, clean up on shutdown."""
 model_uri = "models:/churn-classifier/Production"
 model_store["model"] = mlflow.sklearn.load_model(model_uri)
 model_store["version"] = "production"
 yield
 model_store.clear()


app = FastAPI(title="Churn Prediction API", lifespan=lifespan)


@app.post("/predict", response_model=PredictionResponse)
async def predict(request: PredictionRequest):
 start_time = time.time()

 model = model_store.get("model")
 if not model:
 raise HTTPException(status_code=503, detail="Model not loaded")

 features_df = pd.DataFrame([request.features])

 prediction = int(model.predict(features_df)[0])
 probability = float(model.predict_proba(features_df)[0][1])

 latency_ms = (time.time() - start_time) * 1000

 return PredictionResponse(
 customer_id=request.customer_id,
 prediction=prediction,
 probability=probability,
 model_version=model_store["version"],
 latency_ms=round(latency_ms, 2),
 )


@app.get("/health")
async def health():
 return {
 "status": "healthy",
 "model_loaded": "model" in model_store,
 "model_version": model_store.get("version", "none"),
 }

Step 5: Model Monitoring

The model is deployed — now you need to know when it degrades.

from collections import defaultdict
from datetime import datetime, timedelta
import numpy as np


class ModelMonitor:
 """Monitor model performance and data drift in production.

 Tracks predictions, computes rolling metrics, and fires alerts
 when thresholds are breached.
 """

 def __init__(self, model_name: str, alert_callback=None):
 self.model_name = model_name
 self.predictions: list[dict] = []
 self.alert_callback = alert_callback

 def log_prediction(
 self,
 features: dict,
 prediction: int,
 probability: float,
 actual: int | None = None,
 latency_ms: float = 0,
 ):
 """Log a prediction for monitoring."""
 self.predictions.append({
 "timestamp": datetime.utcnow(),
 "features": features,
 "prediction": prediction,
 "probability": probability,
 "actual": actual,
 "latency_ms": latency_ms,
 })

 def compute_metrics(self, window_hours: int = 24) -> dict:
 """Compute monitoring metrics over a rolling time window."""
 cutoff = datetime.utcnow() - timedelta(hours=window_hours)
 recent = [
 p for p in self.predictions
 if p["timestamp"] > cutoff
 ]

 if not recent:
 return {"error": "No predictions in window"}

 predictions = [p["prediction"] for p in recent]
 probabilities = [p["probability"] for p in recent]

 metrics = {
 "prediction_count": len(recent),
 "positive_rate": np.mean(predictions),
 "avg_probability": np.mean(probabilities),
 "probability_std": np.std(probabilities),
 "avg_latency_ms": np.mean([p["latency_ms"] for p in recent]),
 "p99_latency_ms": np.percentile(
 [p["latency_ms"] for p in recent], 99
 ),
 }

 # If ground truth labels are available, compute accuracy
 labeled = [p for p in recent if p["actual"] is not None]
 if labeled:
 correct = sum(
 1 for p in labeled if p["prediction"] == p["actual"]
 )
 metrics["accuracy"] = correct / len(labeled)
 metrics["labeled_count"] = len(labeled)

 return metrics

 def check_alerts(self, metrics: dict):
 """Check if any metrics breach alert thresholds."""
 alerts = []

 if metrics.get("positive_rate", 0) > 0.5:
 alerts.append(
 f"High positive prediction rate: "
 f"{metrics['positive_rate']:.2%}"
 )

 if metrics.get("p99_latency_ms", 0) > 200:
 alerts.append(
 f"High p99 latency: "
 f"{metrics['p99_latency_ms']:.0f}ms"
 )

 if metrics.get("accuracy", 1.0) < 0.75:
 alerts.append(
 f"Low accuracy: {metrics['accuracy']:.2%}"
 )

 if alerts and self.alert_callback:
 for alert in alerts:
 self.alert_callback(self.model_name, alert)

 return alerts

CI/CD Pipeline for ML

# .github/workflows/ml-pipeline.yml
name: ML Pipeline

on:
 push:
 paths:
 - 'ml/**'
 - 'features/**'
 schedule:
 - cron: '06**1' # Weekly retraining on Monday mornings

jobs:
 test:
 runs-on: ubuntu-latest
 steps:
 - uses: actions/checkout@v4
 - uses: actions/setup-python@v5
 with:
 python-version: '3.12'
 - run: pip install -r requirements.txt
 - run: pytest ml/tests/ -v

 train:
 needs: test
 runs-on: ubuntu-latest
 steps:
 - uses: actions/checkout@v4
 - uses: actions/setup-python@v5
 with:
 python-version: '3.12'
 - run: pip install -r requirements.txt
 - name: Train Model
 env:
 MLFLOW_TRACKING_URI: ${{ secrets.MLFLOW_URI }}
 run: python ml/train.py --config ml/configs/production.yaml

 validate:
 needs: train
 runs-on: ubuntu-latest
 steps:
 - uses: actions/checkout@v4
 - name: Run Validation Gates
 run: python ml/validate.py --run-id ${{ needs.train.outputs.run_id }}
 - name: Check Results
 run: |
 if [ "$(cat validation_result.txt)" != "APPROVED" ]; then
 echo "Model validation failed — blocking deployment"
 exit 1
 fi

 deploy:
 needs: validate
 runs-on: ubuntu-latest
 if: github.ref == 'refs/heads/main'
 environment: production
 steps:
 - name: Promote Model to Production
 run: |
 python ml/promote.py \
 --run-id ${{ needs.train.outputs.run_id }} \
 --stage Production
 - name: Deploy to Kubernetes
 run: |
 kubectl set image deployment/churn-model \
 model=ghcr.io/myorg/churn-model:${{ github.sha }}

Summary

A production MLOps pipeline has these components:

Component Purpose Example Tools
Experiment Tracking Reproducibility MLflow, W&B
Feature Store Consistent features Feast, custom
Training Pipeline Automated training Airflow, GitHub Actions
Model Registry Version management MLflow, SageMaker
Validation Gates Quality assurance Custom checks
Model Serving Inference API FastAPI, Triton
Monitoring Drift detection Custom, Evidently

Start with experiment tracking and model serving — those two alone eliminate the biggest production failures. Add feature stores, validation gates, and monitoring iteratively as your models mature.


For production-ready data pipeline templates, feature engineering patterns, and more hands-on engineering guides, visit DataStack Pro.


Related Articles