MLOps AI Engineer Interview Preparation Guide

 

MLOps AI Engineer Interview Preparation Guide

Table of Contents

  1. General MLOps Concepts
  2. AWS MLOps
  3. Azure MLOps
  4. Kubeflow
  5. Docker & Containerization
  6. CI/CD for ML
  7. Model Monitoring & Governance
  8. Infrastructure as Code

General MLOps Concepts

Q1: What is MLOps and why is it important?

Answer: MLOps (Machine Learning Operations) is a practice that combines ML, DevOps, and data engineering to deploy and maintain ML systems in production reliably and efficiently. It's important because:

  • Reproducibility: Ensures consistent model training and deployment
  • Scalability: Handles growing data and model complexity
  • Reliability: Maintains model performance in production
  • Collaboration: Bridges gap between data scientists and operations teams
  • Compliance: Ensures governance and auditability
  • Speed: Accelerates model deployment and iteration cycles

Q2: Explain the ML lifecycle and where MLOps fits in.

Answer: The ML lifecycle includes:

  1. Data Collection & Preparation: MLOps ensures data versioning, quality checks
  2. Model Development: Experiment tracking, version control
  3. Model Training: Automated pipelines, resource management
  4. Model Validation: Automated testing, performance metrics
  5. Model Deployment: CI/CD pipelines, containerization
  6. Model Monitoring: Performance tracking, drift detection
  7. Model Retraining: Automated triggers, feedback loops

MLOps provides the infrastructure, processes, and tools to automate and standardize these stages.

Q3: What are the key challenges in ML model deployment?

Answer:

  • Model Drift: Performance degradation over time due to data changes
  • Data Drift: Changes in input data distribution
  • Scalability: Handling increased load and concurrent requests
  • Latency Requirements: Meeting real-time inference needs
  • Dependency Management: Managing complex ML library dependencies
  • Resource Management: Efficient compute and memory usage
  • Version Control: Managing multiple model versions
  • Rollback Capabilities: Quick recovery from failed deployments
  • Security: Protecting models and data
  • Compliance: Meeting regulatory requirements

Q4: Explain different deployment patterns for ML models.

Answer:

  • Blue-Green Deployment: Two identical environments, switch traffic between them
  • Canary Deployment: Gradual rollout to subset of users
  • A/B Testing: Compare model performance with control group
  • Shadow Deployment: New model runs alongside existing without affecting users
  • Rolling Deployment: Gradual replacement of instances
  • Multi-armed Bandit: Dynamic traffic allocation based on performance

AWS MLOps

Q5: What are the key AWS services for MLOps?

Answer:

  • Amazon SageMaker: End-to-end ML platform
  • AWS CodePipeline: CI/CD for ML workflows
  • AWS CodeBuild: Build and test ML models
  • AWS CodeCommit: Source control
  • Amazon ECR: Container registry for ML images
  • AWS Step Functions: Orchestrate ML workflows
  • Amazon CloudWatch: Monitoring and logging
  • AWS Lambda: Serverless model inference
  • Amazon ECS/EKS: Container orchestration
  • AWS Batch: Batch processing for training jobs

Q6: Explain SageMaker Pipelines and its components.

Answer: SageMaker Pipelines is a CI/CD service for ML workflows:

Components:

  • Pipeline: DAG of steps for ML workflow
  • Steps: Individual operations (processing, training, evaluation)
  • Parameters: Runtime configuration values
  • Properties: Step outputs that can be referenced
  • Conditions: Conditional execution logic
  • Model Registry: Centralized model store

Step Types:

  • ProcessingStep: Data preprocessing
  • TrainingStep: Model training
  • TuningStep: Hyperparameter optimization
  • CreateModelStep: Model creation
  • RegisterModelStep: Model registration
  • TransformStep: Batch inference
  • ConditionStep: Conditional branching

Q7: How do you implement model monitoring in AWS?

Answer:

# SageMaker Model Monitor setup
from sagemaker.model_monitor import DefaultModelMonitor
from sagemaker.model_monitor.dataset_format import DatasetFormat

# Create monitor
monitor = DefaultModelMonitor(
    role=role,
    instance_count=1,
    instance_type='ml.m5.xlarge',
    volume_size_in_gb=20,
    max_runtime_in_seconds=3600,
)

# Create baseline
monitor.suggest_baseline(
    baseline_dataset=baseline_data_uri,
    dataset_format=DatasetFormat.csv(header=True),
    output_s3_uri=baseline_results_uri,
)

# Create monitoring schedule
monitor.create_monitoring_schedule(
    monitor_schedule_name=schedule_name,
    endpoint_input=endpoint_name,
    output_s3_uri=monitoring_output_uri,
    statistics=statistics_uri,
    constraints=constraints_uri,
    schedule_config=schedule_config,
)

Monitoring includes:

  • Data quality monitoring
  • Model quality monitoring
  • Bias drift detection
  • Feature attribution drift

Q8: Explain SageMaker Multi-Model Endpoints.

Answer: Multi-Model Endpoints allow hosting multiple models on a single endpoint:

Benefits:

  • Cost optimization through resource sharing
  • Reduced infrastructure management
  • Dynamic model loading/unloading
  • A/B testing capabilities

Implementation:

# Create multi-model endpoint
from sagemaker.multidatamodel import MultiDataModel

mdm = MultiDataModel(
    name="my-multi-model",
    model_data_prefix=model_data_prefix,
    image_uri=container_image,
    role=role,
)

# Deploy endpoint
predictor = mdm.deploy(
    initial_instance_count=1,
    instance_type='ml.m5.large'
)

# Add models dynamically
mdm.add_model(model_data_source=model1_uri, model_data_path="model1.tar.gz")
mdm.add_model(model_data_source=model2_uri, model_data_path="model2.tar.gz")

# Invoke specific model
response = predictor.predict(data, target_model="model1.tar.gz")

Q9: How do you implement auto-scaling for SageMaker endpoints?

Answer:

import boto3

# Configure auto-scaling
autoscaling = boto3.client('application-autoscaling')

# Register scalable target
autoscaling.register_scalable_target(
    ServiceNamespace='sagemaker',
    ResourceId=f'endpoint/{endpoint_name}/variant/{variant_name}',
    ScalableDimension='sagemaker:variant:DesiredInstanceCount',
    MinCapacity=1,
    MaxCapacity=10,
    RoleArn=autoscaling_role_arn
)

# Create scaling policy
autoscaling.put_scaling_policy(
    PolicyName='CPUTargetTracking',
    ServiceNamespace='sagemaker',
    ResourceId=f'endpoint/{endpoint_name}/variant/{variant_name}',
    ScalableDimension='sagemaker:variant:DesiredInstanceCount',
    PolicyType='TargetTrackingScaling',
    TargetTrackingScalingPolicyConfiguration={
        'TargetValue': 70.0,
        'PredefinedMetricSpecification': {
            'PredefinedMetricType': 'SageMakerVariantInvocationsPerInstance'
        },
        'ScaleOutCooldown': 300,
        'ScaleInCooldown': 300
    }
)

Azure MLOps

Q10: What are the key Azure services for MLOps?

Answer:

  • Azure Machine Learning: End-to-end ML platform
  • Azure DevOps: CI/CD pipelines
  • Azure Container Registry: Container management
  • Azure Kubernetes Service: Container orchestration
  • Azure Functions: Serverless inference
  • Azure Application Insights: Monitoring
  • Azure Key Vault: Secret management
  • Azure Data Factory: Data pipeline orchestration
  • Azure Logic Apps: Workflow automation

Q11: Explain Azure ML Pipelines and their components.

Answer: Azure ML Pipelines automate ML workflows:

Components:

  • Pipeline: Workflow definition
  • Steps: Individual operations
  • Datasets: Data inputs/outputs
  • Compute Targets: Execution environments
  • Environments: Software dependencies
  • Experiments: Tracking runs

Example Pipeline:

from azureml.pipeline.core import Pipeline, PipelineData
from azureml.pipeline.steps import PythonScriptStep

# Define pipeline data
processed_data = PipelineData("processed_data", datastore=datastore)
model_output = PipelineData("model_output", datastore=datastore)

# Data preprocessing step
prep_step = PythonScriptStep(
    script_name="prepare_data.py",
    arguments=["--input_data", input_dataset.as_named_input("raw_data"),
               "--output_data", processed_data],
    outputs=[processed_data],
    compute_target=compute_target,
    source_directory="scripts"
)

# Training step
train_step = PythonScriptStep(
    script_name="train_model.py",
    arguments=["--training_data", processed_data,
               "--model_output", model_output],
    inputs=[processed_data],
    outputs=[model_output],
    compute_target=compute_target,
    source_directory="scripts"
)

# Create pipeline
pipeline = Pipeline(workspace=ws, steps=[prep_step, train_step])

Q12: How do you implement model deployment in Azure ML?

Answer:

from azureml.core import Model
from azureml.core.webservice import AciWebservice, Webservice
from azureml.core.model import InferenceConfig

# Register model
model = Model.register(workspace=ws,
                      model_path="outputs/model.pkl",
                      model_name="my_model")

# Create inference configuration
inference_config = InferenceConfig(
    entry_script="score.py",
    environment=environment
)

# Configure deployment
aci_config = AciWebservice.deploy_configuration(
    cpu_cores=1,
    memory_gb=1,
    tags={"type": "classification"},
    description="Scikit-learn model deployment"
)

# Deploy model
service = Model.deploy(workspace=ws,
                      name="my-service",
                      models=[model],
                      inference_config=inference_config,
                      deployment_config=aci_config)

service.wait_for_deployment(show_output=True)

Q13: Explain Azure ML model monitoring and drift detection.

Answer: Azure ML provides built-in monitoring capabilities:

Data Drift Monitoring:

from azureml.datadrift import DataDriftDetector

# Create data drift monitor
drift_detector = DataDriftDetector.create_from_datasets(
    workspace=ws,
    name="drift_detector",
    baseline_data_set=baseline_dataset,
    target_data_set=target_dataset,
    compute_target=compute_target,
    frequency="Week",
    feature_list=None,
    drift_threshold=0.3,
    latency=24
)

# Get drift results
drift_result = drift_detector.get_output(
    start_time=datetime(2021, 1, 1),
    end_time=datetime(2021, 12, 31)
)

Model Performance Monitoring:

  • Application Insights integration
  • Custom metrics logging
  • Automated alerts
  • Dashboard visualization

Kubeflow

Q14: What is Kubeflow and its main components?

Answer: Kubeflow is an open-source ML platform for Kubernetes:

Core Components:

  • Kubeflow Pipelines: Workflow orchestration
  • Katib: Hyperparameter tuning
  • KFServing/KServe: Model serving
  • Notebooks: Jupyter notebook management
  • Training Operators: Distributed training
  • Central Dashboard: Web UI

Benefits:

  • Kubernetes-native ML workflows
  • Scalable and portable
  • Multi-cloud support
  • Open-source ecosystem

Q15: Explain Kubeflow Pipelines architecture.

Answer: Kubeflow Pipelines consists of:

Components:

  • Pipeline: ML workflow as DAG
  • Component: Reusable task
  • Step: Instance of component
  • Artifact: Input/output data
  • Experiment: Grouping of runs
  • Run: Single execution

Architecture:

# Example pipeline component
def preprocess_data(input_path: str, output_path: str):
    """Data preprocessing component"""
    return dsl.ContainerOp(
        name='preprocess-data',
        image='gcr.io/my-project/preprocess:latest',
        arguments=['--input', input_path, '--output', output_path],
        file_outputs={'processed_data': '/tmp/processed_data'}
    )

def train_model(data_path: str, model_path: str):
    """Model training component"""
    return dsl.ContainerOp(
        name='train-model',
        image='gcr.io/my-project/train:latest',
        arguments=['--data', data_path, '--model', model_path],
        file_outputs={'model': '/tmp/model'}
    )

@dsl.pipeline(name='ml-pipeline', description='ML training pipeline')
def ml_pipeline():
    prep_task = preprocess_data('/data/raw', '/data/processed')
    train_task = train_model(prep_task.outputs['processed_data'], '/models/output')

Q16: How do you implement hyperparameter tuning with Katib?

Answer:

apiVersion: kubeflow.org/v1beta1
kind: Experiment
metadata:
  name: hyperparameter-tuning
spec:
  algorithm:
    algorithmName: random
  objective:
    type: maximize
    objectiveMetricName: accuracy
  parameters:
  - name: learning_rate
    parameterType: double
    feasibleSpace:
      min: "0.001"
      max: "0.1"
  - name: batch_size
    parameterType: int
    feasibleSpace:
      min: "16"
      max: "128"
  - name: num_epochs
    parameterType: int
    feasibleSpace:
      min: "10"
      max: "50"
  trialTemplate:
    primaryContainerName: training-container
    trialSpec:
      apiVersion: batch/v1
      kind: Job
      spec:
        template:
          spec:
            containers:
            - name: training-container
              image: gcr.io/my-project/training:latest
              command:
              - "python"
              - "train.py"
              - "--learning_rate=${trialParameters.learningRate}"
              - "--batch_size=${trialParameters.batchSize}"
              - "--num_epochs=${trialParameters.numEpochs}"
            restartPolicy: Never
  parallelTrialCount: 3
  maxTrialCount: 12
  maxFailedTrialCount: 3

Q17: Explain KServe (KFServing) for model serving.

Answer: KServe provides serverless model inference:

Features:

  • Multiple ML frameworks support
  • Automatic scaling
  • Canary deployments
  • Multi-model serving
  • Transformer/Predictor pattern

Example InferenceService:

apiVersion: serving.kserve.io/v1beta1
kind: InferenceService
metadata:
  name: sklearn-iris
spec:
  predictor:
    sklearn:
      storageUri: gs://kfserving-examples/models/sklearn/iris
  transformer:
    containers:
    - image: gcr.io/my-project/transformer:latest
      name: transformer
  canaryTrafficPercent: 10
  minReplicas: 1
  maxReplicas: 10

Docker & Containerization

Q18: Why is containerization important for MLOps?

Answer:

  • Reproducibility: Consistent environment across development and production
  • Dependency Management: Isolated package dependencies
  • Portability: Run anywhere containers are supported
  • Scalability: Easy horizontal scaling
  • Version Control: Immutable infrastructure
  • Resource Efficiency: Lightweight compared to VMs

Q19: How do you create a Docker image for ML model serving?

Answer:

# Dockerfile for ML model serving
FROM python:3.8-slim

# Set working directory
WORKDIR /app

# Copy requirements and install dependencies
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

# Copy model and application code
COPY model/ ./model/
COPY app.py .
COPY utils.py .

# Expose port
EXPOSE 8000

# Set environment variables
ENV MODEL_PATH=/app/model/model.pkl
ENV PYTHONPATH=/app

# Health check
HEALTHCHECK --interval=30s --timeout=10s --start-period=5s --retries=3 \
  CMD curl -f http://localhost:8000/health || exit 1

# Run the application
CMD ["python", "app.py"]
# app.py - Flask serving application
from flask import Flask, request, jsonify
import pickle
import numpy as np
import os

app = Flask(__name__)

# Load model at startup
model_path = os.getenv('MODEL_PATH', 'model/model.pkl')
with open(model_path, 'rb') as f:
    model = pickle.load(f)

@app.route('/predict', methods=['POST'])
def predict():
    try:
        data = request.json
        features = np.array(data['features']).reshape(1, -1)
        prediction = model.predict(features)
        return jsonify({'prediction': prediction.tolist()})
    except Exception as e:
        return jsonify({'error': str(e)}), 400

@app.route('/health', methods=['GET'])
def health():
    return jsonify({'status': 'healthy'})

if __name__ == '__main__':
    app.run(host='0.0.0.0', port=8000)

Q20: How do you optimize Docker images for ML workloads?

Answer:

# Multi-stage build for smaller images
FROM python:3.8-slim as builder

# Install build dependencies
RUN apt-get update && apt-get install -y \
    build-essential \
    && rm -rf /var/lib/apt/lists/*

# Copy and install Python dependencies
COPY requirements.txt .
RUN pip install --user --no-cache-dir -r requirements.txt

# Production stage
FROM python:3.8-slim

# Copy installed packages from builder
COPY --from=builder /root/.local /root/.local

# Add local bin to PATH
ENV PATH=/root/.local/bin:$PATH

# Copy application code
COPY src/ /app/
WORKDIR /app

# Use non-root user
RUN useradd --create-home --shell /bin/bash app
USER app

CMD ["python", "serve.py"]

Optimization strategies:

  • Multi-stage builds
  • Minimal base images
  • Layer caching
  • .dockerignore file
  • Security best practices

CI/CD for ML

Q21: How does CI/CD differ for ML compared to traditional software?

Answer: Traditional CI/CD vs ML CI/CD:

Aspect Traditional ML CI/CD
Artifacts Code Code + Data + Models
Testing Unit/Integration tests Data validation + Model performance
Deployment Code deployment Model deployment + monitoring
Triggers Code changes Code/Data/Model changes
Rollback Previous code version Previous model version
Monitoring System metrics Model performance metrics

ML-specific considerations:

  • Data versioning and validation
  • Model performance testing
  • A/B testing for model comparison
  • Gradual rollout strategies
  • Drift detection and retraining

Q22: Design a complete ML CI/CD pipeline.

Answer:

# GitHub Actions workflow for ML CI/CD
name: ML CI/CD Pipeline

on:
  push:
    branches: [main]
  pull_request:
    branches: [main]

jobs:
  data-validation:
    runs-on: ubuntu-latest
    steps:
    - uses: actions/checkout@v2
    - name: Setup Python
      uses: actions/setup-python@v2
      with:
        python-version: 3.8
    - name: Install dependencies
      run: |
        pip install -r requirements.txt
    - name: Validate data schema
      run: |
        python scripts/validate_data.py
    - name: Run data quality checks
      run: |
        python scripts/data_quality_checks.py

  model-training:
    needs: data-validation
    runs-on: ubuntu-latest
    steps:
    - uses: actions/checkout@v2
    - name: Setup Python
      uses: actions/setup-python@v2
    - name: Train model
      run: |
        python scripts/train_model.py
    - name: Evaluate model
      run: |
        python scripts/evaluate_model.py
    - name: Upload model artifacts
      uses: actions/upload-artifact@v2
      with:
        name: model-artifacts
        path: models/

  model-testing:
    needs: model-training
    runs-on: ubuntu-latest
    steps:
    - name: Download model artifacts
      uses: actions/download-artifact@v2
      with:
        name: model-artifacts
    - name: Run model tests
      run: |
        python tests/test_model.py
    - name: Performance benchmarking
      run: |
        python tests/benchmark_model.py

  deploy-staging:
    needs: model-testing
    runs-on: ubuntu-latest
    if: github.ref == 'refs/heads/main'
    steps:
    - name: Deploy to staging
      run: |
        docker build -t model-service:${{ github.sha }} .
        docker push $ECR_REGISTRY/model-service:${{ github.sha }}
        kubectl set image deployment/model-service \
          model-service=$ECR_REGISTRY/model-service:${{ github.sha }}

  integration-tests:
    needs: deploy-staging
    runs-on: ubuntu-latest
    steps:
    - name: Run integration tests
      run: |
        python tests/integration_tests.py --endpoint $STAGING_ENDPOINT

  deploy-production:
    needs: integration-tests
    runs-on: ubuntu-latest
    steps:
    - name: Deploy to production
      run: |
        # Canary deployment
        kubectl patch deployment model-service -p \
          '{"spec":{"template":{"metadata":{"labels":{"version":"canary"}}}}}'
        # Monitor and promote if successful
        python scripts/canary_deployment.py

Q23: How do you implement automated model testing?

Answer:

# tests/test_model.py
import pytest
import numpy as np
import pandas as pd
from sklearn.metrics import accuracy_score, precision_score, recall_score
import joblib

class TestModel:
    @pytest.fixture
    def model(self):
        return joblib.load('models/model.pkl')
    
    @pytest.fixture
    def test_data(self):
        return pd.read_csv('data/test_data.csv')
    
    def test_model_exists(self, model):
        """Test that model file exists and loads properly"""
        assert model is not None
        assert hasattr(model, 'predict')
    
    def test_model_input_shape(self, model, test_data):
        """Test model accepts correct input shape"""
        X_test = test_data.drop('target', axis=1)
        predictions = model.predict(X_test)
        assert len(predictions) == len(X_test)
    
    def test_model_output_type(self, model, test_data):
        """Test model output format"""
        X_test = test_data.drop('target', axis=1).iloc[:5]
        predictions = model.predict(X_test)
        assert isinstance(predictions, np.ndarray)
        assert predictions.dtype in [np.int64, np.float64]
    
    def test_model_accuracy_threshold(self, model, test_data):
        """Test model meets minimum accuracy threshold"""
        X_test = test_data.drop('target', axis=1)
        y_test = test_data['target']
        predictions = model.predict(X_test)
        
        accuracy = accuracy_score(y_test, predictions)
        assert accuracy >= 0.8, f"Model accuracy {accuracy} below threshold"
    
    def test_model_bias_fairness(self, model, test_data):
        """Test model fairness across different groups"""
        X_test = test_data.drop('target', axis=1)
        y_test = test_data['target']
        predictions = model.predict(X_test)
        
        # Test across sensitive attribute
        for group in test_data['sensitive_attr'].unique():
            group_mask = test_data['sensitive_attr'] == group
            group_accuracy = accuracy_score(
                y_test[group_mask], 
                predictions[group_mask]
            )
            assert group_accuracy >= 0.7, f"Bias detected for group {group}"
    
    def test_model_robustness(self, model, test_data):
        """Test model robustness to input perturbations"""
        X_test = test_data.drop('target', axis=1).iloc[:100]
        original_predictions = model.predict(X_test)
        
        # Add small noise
        noise = np.random.normal(0, 0.01, X_test.shape)
        X_noisy = X_test + noise
        noisy_predictions = model.predict(X_noisy)
        
        # Check prediction stability
        stability = np.mean(original_predictions == noisy_predictions)
        assert stability >= 0.9, f"Model not robust to noise: {stability}"

Model Monitoring & Governance

Q24: What are the key metrics to monitor for deployed ML models?

Answer: Performance Metrics:

  • Accuracy, Precision, Recall, F1-score
  • AUC-ROC, AUC-PR
  • Mean Absolute Error, RMSE
  • Business-specific metrics

Operational Metrics:

  • Response time/latency
  • Throughput (requests per second)
  • Error rates (4xx, 5xx)
  • Resource utilization (CPU, memory)
  • Availability/uptime

Data Quality Metrics:

  • Data drift detection
  • Feature distribution changes
  • Missing value rates
  • Outlier detection
  • Schema validation

Model Drift Metrics:

  • Prediction drift
  • Concept drift
  • Population stability index (PSI)
  • Characteristic stability index (CSI)

Q25: How do you implement drift detection?

Answer:

import numpy as np
from scipy import stats
from sklearn.metrics import jensen_shannon_distance
import pandas as pd

class DriftDetector:
    def __init__(self, reference_data, threshold=0.1):
        self.reference_data = reference_data
        self.threshold = threshold
        
    def detect_statistical_drift(self, current_data, method='ks'):
        """Detect drift using statistical tests"""
        drift_results = {}
        
        for column in self.reference_data.columns:
            if method == 'ks':
                # Kolmogorov-Smirnov test
                statistic, p_value = stats.ks_2samp(
                    self.reference_data[column],
                    current_data[column]
                )
                drift_detected = p_value < 0.05
                
            elif method == 'js':
                # Jensen-Shannon divergence
                js_distance = jensen_shannon_distance(
                    np.histogram(self.reference_data[column], bins=50)[0],
                    np.histogram(current_data[column], bins=50)[0]
                )
                drift_detected = js_distance > self.threshold
                statistic, p_value = js_distance, None
                
            drift_results[column] = {
                'statistic': statistic,
                'p_value': p_value,
                'drift_detected': drift_detected
            }
            
        return drift_results
    
    def detect_prediction_drift(self, reference_predictions, current_predictions):
        """Detect drift in model predictions"""
        # Population Stability Index (PSI)
        def calculate_psi(expected, actual, buckets=10):
            expected_perc = pd.cut(expected, buckets, duplicates='drop').value_counts() / len(expected)
            actual_perc = pd.cut(actual, buckets, duplicates='drop').value_counts() / len(actual)
            
            psi = np.sum((actual_perc - expected_perc) * np.log(actual_perc / expected_perc))
            return psi
        
        psi = calculate_psi(reference_predictions, current_predictions)
        
        # PSI interpretation
        if psi < 0.1:
            stability = "stable"
        elif psi < 0.25:
            stability = "moderate_drift"
        else:
            stability = "significant_drift"
            
        return {
            'psi': psi,
            'stability': stability,
            'drift_detected': psi > 0.1
        }

# Usage example
detector = DriftDetector(reference_data=training_data)
drift_results = detector.detect_statistical_drift(production_data, method='ks')

# Set up monitoring
for feature, results in drift_results.items():
    if results['drift_detected']:
        print(f"Drift detected in feature {feature}")
        # Trigger retraining pipeline

Q26: How do you implement model governance and compliance?

Answer:

class ModelGovernance:
    def __init__(self, model_registry):
        self.model_registry = model_registry
        
    def register_model(self, model, metadata):
        """Register model with governance metadata"""
        governance_metadata = {
            'model_id': str(uuid.uuid4()),
            'timestamp': datetime.utcnow(),
            'version': metadata.get('version'),
            'author': metadata.get('author'),
            'description': metadata.get('description'),
            'training_data': metadata.get('training_data'),
            'performance_metrics': metadata.get('metrics'),
            'compliance_status': 'pending',
            'approval_status': 'pending',
            'risk_assessment': self.assess_risk(model, metadata),
            'bias_check': self.check_bias(model, metadata),
            'explainability_score': self.calculate_explainability(model)
        }
        
        self.model_registry.register(model, governance_metadata)
        return governance_metadata['model_id']
    
    def assess_risk(self, model, metadata):
        """Assess model risk level"""
        risk_factors = {
            'data_sensitivity': metadata.get('data_sensitivity', 'medium'),
            'business_impact': metadata.get('business_impact', 'medium'),
            'model_complexity': self.calculate_complexity(model),
            'deployment_scope': metadata.get('deployment_scope', 'limited')
        }
        
        # Risk scoring logic
        risk_score = 0
        for factor, value in risk_factors.items():
            if value == 'high':
                risk_score += 3
            elif value == 'medium':
                risk_score += 2
            else:
                risk_score += 1
                
        if risk_score <= 4:
            return 'low'

Comments

Popular posts from this blog

Self-contained Raspberry Pi surveillance System Without Continue Internet

COBOT with GenAI and Federated Learning

AI in Education: Embracing Change for Future-Ready Learning