Python 6 min read

MLOps: Machine Learning Operations Complete Guide

Master MLOps for production ML systems. Learn model versioning, experiment tracking, CI/CD for ML, model serving, and monitoring best practices.

MR

Moshiour Rahman

Advertisement

What is MLOps?

MLOps (Machine Learning Operations) applies DevOps principles to machine learning systems. It bridges the gap between model development and production deployment, ensuring reliable, scalable ML systems.

MLOps vs Traditional ML

Traditional MLMLOps
Manual experimentsAutomated tracking
Jupyter notebooksVersion-controlled code
Local modelsDeployed services
Ad-hoc testingAutomated testing
Manual monitoringContinuous monitoring

MLOps Lifecycle

┌─────────────────────────────────────────────────────────┐
│                    MLOps Lifecycle                      │
├─────────────────────────────────────────────────────────┤
│  Data     →  Training  →  Evaluation  →  Deployment    │
│    ↑                                          ↓        │
│    └──────────── Monitoring & Feedback ←──────┘        │
└─────────────────────────────────────────────────────────┘

Experiment Tracking with MLflow

Installation

pip install mlflow

Basic Tracking

import mlflow
import mlflow.sklearn
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score, f1_score

# Set experiment
mlflow.set_experiment("customer-churn-prediction")

# Start run
with mlflow.start_run(run_name="random-forest-v1"):
    # Log parameters
    params = {
        "n_estimators": 100,
        "max_depth": 10,
        "min_samples_split": 5
    }
    mlflow.log_params(params)

    # Train model
    model = RandomForestClassifier(**params)
    model.fit(X_train, y_train)

    # Predict and evaluate
    predictions = model.predict(X_test)
    accuracy = accuracy_score(y_test, predictions)
    f1 = f1_score(y_test, predictions)

    # Log metrics
    mlflow.log_metrics({
        "accuracy": accuracy,
        "f1_score": f1
    })

    # Log model
    mlflow.sklearn.log_model(
        model,
        "model",
        registered_model_name="ChurnPredictor"
    )

    # Log artifacts
    mlflow.log_artifact("feature_importance.png")

    print(f"Run ID: {mlflow.active_run().info.run_id}")

MLflow Projects

# MLproject file
name: churn_prediction

conda_env: conda.yaml

entry_points:
  main:
    parameters:
      n_estimators: {type: int, default: 100}
      max_depth: {type: int, default: 10}
      data_path: {type: str, default: "data/train.csv"}
    command: "python train.py --n_estimators {n_estimators} --max_depth {max_depth} --data_path {data_path}"

  evaluate:
    parameters:
      model_uri: str
      test_data: str
    command: "python evaluate.py --model_uri {model_uri} --test_data {test_data}"
# Run project
mlflow run . -P n_estimators=200 -P max_depth=15

# Run from git
mlflow run git@github.com:user/ml-project.git -P n_estimators=200

Model Registry

from mlflow.tracking import MlflowClient

client = MlflowClient()

# Register model
model_uri = f"runs:/{run_id}/model"
mlflow.register_model(model_uri, "ChurnPredictor")

# Transition model stage
client.transition_model_version_stage(
    name="ChurnPredictor",
    version=1,
    stage="Production"
)

# Load production model
model = mlflow.pyfunc.load_model(
    model_uri="models:/ChurnPredictor/Production"
)

# Make predictions
predictions = model.predict(new_data)

Data Versioning with DVC

Setup

pip install dvc
dvc init
dvc remote add -d myremote s3://my-bucket/dvc-store

Track Data

# Add data to DVC
dvc add data/training_data.csv

# Track changes in git
git add data/training_data.csv.dvc data/.gitignore
git commit -m "Add training data"

# Push data to remote
dvc push

DVC Pipelines

# dvc.yaml
stages:
  prepare:
    cmd: python src/prepare.py
    deps:
      - src/prepare.py
      - data/raw/
    params:
      - prepare.split_ratio
    outs:
      - data/processed/

  train:
    cmd: python src/train.py
    deps:
      - src/train.py
      - data/processed/
    params:
      - train.n_estimators
      - train.max_depth
    outs:
      - models/model.pkl
    metrics:
      - metrics.json:
          cache: false

  evaluate:
    cmd: python src/evaluate.py
    deps:
      - src/evaluate.py
      - models/model.pkl
      - data/processed/test.csv
    metrics:
      - evaluation.json:
          cache: false
# Run pipeline
dvc repro

# Compare experiments
dvc metrics diff

# Show pipeline
dvc dag

CI/CD for ML

GitHub Actions

# .github/workflows/ml-pipeline.yml
name: ML Pipeline

on:
  push:
    branches: [main]
  pull_request:
    branches: [main]

jobs:
  test:
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v4

      - name: Set up Python
        uses: actions/setup-python@v4
        with:
          python-version: '3.10'

      - name: Install dependencies
        run: |
          pip install -r requirements.txt
          pip install pytest pytest-cov

      - name: Run tests
        run: pytest tests/ --cov=src/

      - name: Upload coverage
        uses: codecov/codecov-action@v3

  train:
    needs: test
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v4

      - name: Configure DVC
        run: |
          pip install dvc[s3]
          dvc remote modify myremote access_key_id ${{ secrets.AWS_ACCESS_KEY }}
          dvc remote modify myremote secret_access_key ${{ secrets.AWS_SECRET_KEY }}

      - name: Pull data
        run: dvc pull

      - name: Train model
        env:
          MLFLOW_TRACKING_URI: ${{ secrets.MLFLOW_URI }}
        run: |
          dvc repro

      - name: Push artifacts
        run: dvc push

  deploy:
    needs: train
    if: github.ref == 'refs/heads/main'
    runs-on: ubuntu-latest
    steps:
      - name: Deploy model
        run: |
          # Deploy to production
          python scripts/deploy_model.py --stage production

Model Serving

FastAPI Server

from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
import mlflow
import pandas as pd

app = FastAPI()

# Load model on startup
@app.on_event("startup")
def load_model():
    global model
    model = mlflow.pyfunc.load_model("models:/ChurnPredictor/Production")

class PredictionRequest(BaseModel):
    features: dict

class PredictionResponse(BaseModel):
    prediction: int
    probability: float

@app.post("/predict", response_model=PredictionResponse)
def predict(request: PredictionRequest):
    try:
        df = pd.DataFrame([request.features])
        prediction = model.predict(df)[0]
        probability = model.predict_proba(df)[0].max()

        return PredictionResponse(
            prediction=int(prediction),
            probability=float(probability)
        )
    except Exception as e:
        raise HTTPException(status_code=500, detail=str(e))

@app.get("/health")
def health():
    return {"status": "healthy"}

Docker Deployment

FROM python:3.10-slim

WORKDIR /app

COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

COPY src/ ./src/
COPY models/ ./models/

EXPOSE 8000

CMD ["uvicorn", "src.server:app", "--host", "0.0.0.0", "--port", "8000"]

Kubernetes Deployment

# deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
  name: ml-model-server
spec:
  replicas: 3
  selector:
    matchLabels:
      app: ml-model-server
  template:
    metadata:
      labels:
        app: ml-model-server
    spec:
      containers:
        - name: model-server
          image: myregistry/ml-model:latest
          ports:
            - containerPort: 8000
          resources:
            requests:
              memory: "512Mi"
              cpu: "500m"
            limits:
              memory: "1Gi"
              cpu: "1000m"
          livenessProbe:
            httpGet:
              path: /health
              port: 8000
            initialDelaySeconds: 30
          readinessProbe:
            httpGet:
              path: /health
              port: 8000
            initialDelaySeconds: 5
---
apiVersion: v1
kind: Service
metadata:
  name: ml-model-service
spec:
  selector:
    app: ml-model-server
  ports:
    - port: 80
      targetPort: 8000
  type: LoadBalancer

Model Monitoring

Performance Monitoring

import pandas as pd
from evidently import ColumnMapping
from evidently.report import Report
from evidently.metric_preset import DataDriftPreset, TargetDriftPreset

def monitor_model(
    reference_data: pd.DataFrame,
    current_data: pd.DataFrame,
    target_column: str
):
    column_mapping = ColumnMapping(
        target=target_column,
        prediction='prediction'
    )

    # Data drift report
    data_drift_report = Report(metrics=[
        DataDriftPreset()
    ])
    data_drift_report.run(
        reference_data=reference_data,
        current_data=current_data,
        column_mapping=column_mapping
    )

    # Target drift report
    target_drift_report = Report(metrics=[
        TargetDriftPreset()
    ])
    target_drift_report.run(
        reference_data=reference_data,
        current_data=current_data,
        column_mapping=column_mapping
    )

    return {
        "data_drift": data_drift_report.as_dict(),
        "target_drift": target_drift_report.as_dict()
    }

Metrics Dashboard

from prometheus_client import Counter, Histogram, start_http_server
import time

# Define metrics
prediction_counter = Counter(
    'model_predictions_total',
    'Total predictions made',
    ['model_version', 'prediction_class']
)

prediction_latency = Histogram(
    'model_prediction_latency_seconds',
    'Prediction latency in seconds'
)

def predict_with_metrics(model, features):
    start_time = time.time()

    prediction = model.predict(features)

    # Record metrics
    latency = time.time() - start_time
    prediction_latency.observe(latency)
    prediction_counter.labels(
        model_version="v1",
        prediction_class=str(prediction)
    ).inc()

    return prediction

# Start metrics server
start_http_server(8001)

Feature Store

from feast import FeatureStore, Entity, Feature, FeatureView, FileSource
from datetime import timedelta

# Define entity
customer = Entity(
    name="customer",
    value_type=ValueType.INT64,
    description="Customer ID"
)

# Define feature source
customer_features_source = FileSource(
    path="data/customer_features.parquet",
    timestamp_field="event_timestamp"
)

# Define feature view
customer_features = FeatureView(
    name="customer_features",
    entities=["customer"],
    ttl=timedelta(days=1),
    features=[
        Feature(name="age", dtype=ValueType.INT64),
        Feature(name="total_purchases", dtype=ValueType.FLOAT),
        Feature(name="account_age_days", dtype=ValueType.INT64)
    ],
    online=True,
    source=customer_features_source
)

# Initialize store
store = FeatureStore(repo_path=".")

# Get features for training
training_df = store.get_historical_features(
    entity_df=entity_df,
    features=["customer_features:age", "customer_features:total_purchases"]
).to_df()

# Get features for inference (online)
online_features = store.get_online_features(
    features=["customer_features:age", "customer_features:total_purchases"],
    entity_rows=[{"customer": 12345}]
).to_dict()

Summary

ComponentTool
Experiment TrackingMLflow, Weights & Biases
Data VersioningDVC, LakeFS
Model RegistryMLflow, SageMaker
Feature StoreFeast, Tecton
Model ServingSeldon, KServe
MonitoringEvidently, Prometheus

MLOps enables reliable, reproducible, and scalable machine learning in production.

Advertisement

MR

Moshiour Rahman

Software Architect & AI Engineer

Share:
MR

Moshiour Rahman

Software Architect & AI Engineer

Enterprise software architect with deep expertise in financial systems, distributed architecture, and AI-powered applications. Building large-scale systems at Fortune 500 companies. Specializing in LLM orchestration, multi-agent systems, and cloud-native solutions. I share battle-tested patterns from real enterprise projects.

Related Articles

Comments

Comments are powered by GitHub Discussions.

Configure Giscus at giscus.app to enable comments.