MLOps AI Engineer Interview Preparation Guide
MLOps AI Engineer Interview Preparation Guide
Table of Contents
- General MLOps Concepts
- AWS MLOps
- Azure MLOps
- Kubeflow
- Docker & Containerization
- CI/CD for ML
- Model Monitoring & Governance
- Infrastructure as Code
General MLOps Concepts
Q1: What is MLOps and why is it important?
Answer: MLOps (Machine Learning Operations) is a practice that combines ML, DevOps, and data engineering to deploy and maintain ML systems in production reliably and efficiently. It's important because:
- Reproducibility: Ensures consistent model training and deployment
- Scalability: Handles growing data and model complexity
- Reliability: Maintains model performance in production
- Collaboration: Bridges gap between data scientists and operations teams
- Compliance: Ensures governance and auditability
- Speed: Accelerates model deployment and iteration cycles
Q2: Explain the ML lifecycle and where MLOps fits in.
Answer: The ML lifecycle includes:
- Data Collection & Preparation: MLOps ensures data versioning, quality checks
- Model Development: Experiment tracking, version control
- Model Training: Automated pipelines, resource management
- Model Validation: Automated testing, performance metrics
- Model Deployment: CI/CD pipelines, containerization
- Model Monitoring: Performance tracking, drift detection
- Model Retraining: Automated triggers, feedback loops
MLOps provides the infrastructure, processes, and tools to automate and standardize these stages.
Q3: What are the key challenges in ML model deployment?
Answer:
- Model Drift: Performance degradation over time due to data changes
- Data Drift: Changes in input data distribution
- Scalability: Handling increased load and concurrent requests
- Latency Requirements: Meeting real-time inference needs
- Dependency Management: Managing complex ML library dependencies
- Resource Management: Efficient compute and memory usage
- Version Control: Managing multiple model versions
- Rollback Capabilities: Quick recovery from failed deployments
- Security: Protecting models and data
- Compliance: Meeting regulatory requirements
Q4: Explain different deployment patterns for ML models.
Answer:
- Blue-Green Deployment: Two identical environments, switch traffic between them
- Canary Deployment: Gradual rollout to subset of users
- A/B Testing: Compare model performance with control group
- Shadow Deployment: New model runs alongside existing without affecting users
- Rolling Deployment: Gradual replacement of instances
- Multi-armed Bandit: Dynamic traffic allocation based on performance
AWS MLOps
Q5: What are the key AWS services for MLOps?
Answer:
- Amazon SageMaker: End-to-end ML platform
- AWS CodePipeline: CI/CD for ML workflows
- AWS CodeBuild: Build and test ML models
- AWS CodeCommit: Source control
- Amazon ECR: Container registry for ML images
- AWS Step Functions: Orchestrate ML workflows
- Amazon CloudWatch: Monitoring and logging
- AWS Lambda: Serverless model inference
- Amazon ECS/EKS: Container orchestration
- AWS Batch: Batch processing for training jobs
Q6: Explain SageMaker Pipelines and its components.
Answer: SageMaker Pipelines is a CI/CD service for ML workflows:
Components:
- Pipeline: DAG of steps for ML workflow
- Steps: Individual operations (processing, training, evaluation)
- Parameters: Runtime configuration values
- Properties: Step outputs that can be referenced
- Conditions: Conditional execution logic
- Model Registry: Centralized model store
Step Types:
- ProcessingStep: Data preprocessing
- TrainingStep: Model training
- TuningStep: Hyperparameter optimization
- CreateModelStep: Model creation
- RegisterModelStep: Model registration
- TransformStep: Batch inference
- ConditionStep: Conditional branching
Q7: How do you implement model monitoring in AWS?
Answer:
# SageMaker Model Monitor setup
from sagemaker.model_monitor import DefaultModelMonitor
from sagemaker.model_monitor.dataset_format import DatasetFormat
# Create monitor
monitor = DefaultModelMonitor(
role=role,
instance_count=1,
instance_type='ml.m5.xlarge',
volume_size_in_gb=20,
max_runtime_in_seconds=3600,
)
# Create baseline
monitor.suggest_baseline(
baseline_dataset=baseline_data_uri,
dataset_format=DatasetFormat.csv(header=True),
output_s3_uri=baseline_results_uri,
)
# Create monitoring schedule
monitor.create_monitoring_schedule(
monitor_schedule_name=schedule_name,
endpoint_input=endpoint_name,
output_s3_uri=monitoring_output_uri,
statistics=statistics_uri,
constraints=constraints_uri,
schedule_config=schedule_config,
)
Monitoring includes:
- Data quality monitoring
- Model quality monitoring
- Bias drift detection
- Feature attribution drift
Q8: Explain SageMaker Multi-Model Endpoints.
Answer: Multi-Model Endpoints allow hosting multiple models on a single endpoint:
Benefits:
- Cost optimization through resource sharing
- Reduced infrastructure management
- Dynamic model loading/unloading
- A/B testing capabilities
Implementation:
# Create multi-model endpoint
from sagemaker.multidatamodel import MultiDataModel
mdm = MultiDataModel(
name="my-multi-model",
model_data_prefix=model_data_prefix,
image_uri=container_image,
role=role,
)
# Deploy endpoint
predictor = mdm.deploy(
initial_instance_count=1,
instance_type='ml.m5.large'
)
# Add models dynamically
mdm.add_model(model_data_source=model1_uri, model_data_path="model1.tar.gz")
mdm.add_model(model_data_source=model2_uri, model_data_path="model2.tar.gz")
# Invoke specific model
response = predictor.predict(data, target_model="model1.tar.gz")
Q9: How do you implement auto-scaling for SageMaker endpoints?
Answer:
import boto3
# Configure auto-scaling
autoscaling = boto3.client('application-autoscaling')
# Register scalable target
autoscaling.register_scalable_target(
ServiceNamespace='sagemaker',
ResourceId=f'endpoint/{endpoint_name}/variant/{variant_name}',
ScalableDimension='sagemaker:variant:DesiredInstanceCount',
MinCapacity=1,
MaxCapacity=10,
RoleArn=autoscaling_role_arn
)
# Create scaling policy
autoscaling.put_scaling_policy(
PolicyName='CPUTargetTracking',
ServiceNamespace='sagemaker',
ResourceId=f'endpoint/{endpoint_name}/variant/{variant_name}',
ScalableDimension='sagemaker:variant:DesiredInstanceCount',
PolicyType='TargetTrackingScaling',
TargetTrackingScalingPolicyConfiguration={
'TargetValue': 70.0,
'PredefinedMetricSpecification': {
'PredefinedMetricType': 'SageMakerVariantInvocationsPerInstance'
},
'ScaleOutCooldown': 300,
'ScaleInCooldown': 300
}
)
Azure MLOps
Q10: What are the key Azure services for MLOps?
Answer:
- Azure Machine Learning: End-to-end ML platform
- Azure DevOps: CI/CD pipelines
- Azure Container Registry: Container management
- Azure Kubernetes Service: Container orchestration
- Azure Functions: Serverless inference
- Azure Application Insights: Monitoring
- Azure Key Vault: Secret management
- Azure Data Factory: Data pipeline orchestration
- Azure Logic Apps: Workflow automation
Q11: Explain Azure ML Pipelines and their components.
Answer: Azure ML Pipelines automate ML workflows:
Components:
- Pipeline: Workflow definition
- Steps: Individual operations
- Datasets: Data inputs/outputs
- Compute Targets: Execution environments
- Environments: Software dependencies
- Experiments: Tracking runs
Example Pipeline:
from azureml.pipeline.core import Pipeline, PipelineData
from azureml.pipeline.steps import PythonScriptStep
# Define pipeline data
processed_data = PipelineData("processed_data", datastore=datastore)
model_output = PipelineData("model_output", datastore=datastore)
# Data preprocessing step
prep_step = PythonScriptStep(
script_name="prepare_data.py",
arguments=["--input_data", input_dataset.as_named_input("raw_data"),
"--output_data", processed_data],
outputs=[processed_data],
compute_target=compute_target,
source_directory="scripts"
)
# Training step
train_step = PythonScriptStep(
script_name="train_model.py",
arguments=["--training_data", processed_data,
"--model_output", model_output],
inputs=[processed_data],
outputs=[model_output],
compute_target=compute_target,
source_directory="scripts"
)
# Create pipeline
pipeline = Pipeline(workspace=ws, steps=[prep_step, train_step])
Q12: How do you implement model deployment in Azure ML?
Answer:
from azureml.core import Model
from azureml.core.webservice import AciWebservice, Webservice
from azureml.core.model import InferenceConfig
# Register model
model = Model.register(workspace=ws,
model_path="outputs/model.pkl",
model_name="my_model")
# Create inference configuration
inference_config = InferenceConfig(
entry_script="score.py",
environment=environment
)
# Configure deployment
aci_config = AciWebservice.deploy_configuration(
cpu_cores=1,
memory_gb=1,
tags={"type": "classification"},
description="Scikit-learn model deployment"
)
# Deploy model
service = Model.deploy(workspace=ws,
name="my-service",
models=[model],
inference_config=inference_config,
deployment_config=aci_config)
service.wait_for_deployment(show_output=True)
Q13: Explain Azure ML model monitoring and drift detection.
Answer: Azure ML provides built-in monitoring capabilities:
Data Drift Monitoring:
from azureml.datadrift import DataDriftDetector
# Create data drift monitor
drift_detector = DataDriftDetector.create_from_datasets(
workspace=ws,
name="drift_detector",
baseline_data_set=baseline_dataset,
target_data_set=target_dataset,
compute_target=compute_target,
frequency="Week",
feature_list=None,
drift_threshold=0.3,
latency=24
)
# Get drift results
drift_result = drift_detector.get_output(
start_time=datetime(2021, 1, 1),
end_time=datetime(2021, 12, 31)
)
Model Performance Monitoring:
- Application Insights integration
- Custom metrics logging
- Automated alerts
- Dashboard visualization
Kubeflow
Q14: What is Kubeflow and its main components?
Answer: Kubeflow is an open-source ML platform for Kubernetes:
Core Components:
- Kubeflow Pipelines: Workflow orchestration
- Katib: Hyperparameter tuning
- KFServing/KServe: Model serving
- Notebooks: Jupyter notebook management
- Training Operators: Distributed training
- Central Dashboard: Web UI
Benefits:
- Kubernetes-native ML workflows
- Scalable and portable
- Multi-cloud support
- Open-source ecosystem
Q15: Explain Kubeflow Pipelines architecture.
Answer: Kubeflow Pipelines consists of:
Components:
- Pipeline: ML workflow as DAG
- Component: Reusable task
- Step: Instance of component
- Artifact: Input/output data
- Experiment: Grouping of runs
- Run: Single execution
Architecture:
# Example pipeline component
def preprocess_data(input_path: str, output_path: str):
"""Data preprocessing component"""
return dsl.ContainerOp(
name='preprocess-data',
image='gcr.io/my-project/preprocess:latest',
arguments=['--input', input_path, '--output', output_path],
file_outputs={'processed_data': '/tmp/processed_data'}
)
def train_model(data_path: str, model_path: str):
"""Model training component"""
return dsl.ContainerOp(
name='train-model',
image='gcr.io/my-project/train:latest',
arguments=['--data', data_path, '--model', model_path],
file_outputs={'model': '/tmp/model'}
)
@dsl.pipeline(name='ml-pipeline', description='ML training pipeline')
def ml_pipeline():
prep_task = preprocess_data('/data/raw', '/data/processed')
train_task = train_model(prep_task.outputs['processed_data'], '/models/output')
Q16: How do you implement hyperparameter tuning with Katib?
Answer:
apiVersion: kubeflow.org/v1beta1
kind: Experiment
metadata:
name: hyperparameter-tuning
spec:
algorithm:
algorithmName: random
objective:
type: maximize
objectiveMetricName: accuracy
parameters:
- name: learning_rate
parameterType: double
feasibleSpace:
min: "0.001"
max: "0.1"
- name: batch_size
parameterType: int
feasibleSpace:
min: "16"
max: "128"
- name: num_epochs
parameterType: int
feasibleSpace:
min: "10"
max: "50"
trialTemplate:
primaryContainerName: training-container
trialSpec:
apiVersion: batch/v1
kind: Job
spec:
template:
spec:
containers:
- name: training-container
image: gcr.io/my-project/training:latest
command:
- "python"
- "train.py"
- "--learning_rate=${trialParameters.learningRate}"
- "--batch_size=${trialParameters.batchSize}"
- "--num_epochs=${trialParameters.numEpochs}"
restartPolicy: Never
parallelTrialCount: 3
maxTrialCount: 12
maxFailedTrialCount: 3
Q17: Explain KServe (KFServing) for model serving.
Answer: KServe provides serverless model inference:
Features:
- Multiple ML frameworks support
- Automatic scaling
- Canary deployments
- Multi-model serving
- Transformer/Predictor pattern
Example InferenceService:
apiVersion: serving.kserve.io/v1beta1
kind: InferenceService
metadata:
name: sklearn-iris
spec:
predictor:
sklearn:
storageUri: gs://kfserving-examples/models/sklearn/iris
transformer:
containers:
- image: gcr.io/my-project/transformer:latest
name: transformer
canaryTrafficPercent: 10
minReplicas: 1
maxReplicas: 10
Docker & Containerization
Q18: Why is containerization important for MLOps?
Answer:
- Reproducibility: Consistent environment across development and production
- Dependency Management: Isolated package dependencies
- Portability: Run anywhere containers are supported
- Scalability: Easy horizontal scaling
- Version Control: Immutable infrastructure
- Resource Efficiency: Lightweight compared to VMs
Q19: How do you create a Docker image for ML model serving?
Answer:
# Dockerfile for ML model serving
FROM python:3.8-slim
# Set working directory
WORKDIR /app
# Copy requirements and install dependencies
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
# Copy model and application code
COPY model/ ./model/
COPY app.py .
COPY utils.py .
# Expose port
EXPOSE 8000
# Set environment variables
ENV MODEL_PATH=/app/model/model.pkl
ENV PYTHONPATH=/app
# Health check
HEALTHCHECK --interval=30s --timeout=10s --start-period=5s --retries=3 \
CMD curl -f http://localhost:8000/health || exit 1
# Run the application
CMD ["python", "app.py"]
# app.py - Flask serving application
from flask import Flask, request, jsonify
import pickle
import numpy as np
import os
app = Flask(__name__)
# Load model at startup
model_path = os.getenv('MODEL_PATH', 'model/model.pkl')
with open(model_path, 'rb') as f:
model = pickle.load(f)
@app.route('/predict', methods=['POST'])
def predict():
try:
data = request.json
features = np.array(data['features']).reshape(1, -1)
prediction = model.predict(features)
return jsonify({'prediction': prediction.tolist()})
except Exception as e:
return jsonify({'error': str(e)}), 400
@app.route('/health', methods=['GET'])
def health():
return jsonify({'status': 'healthy'})
if __name__ == '__main__':
app.run(host='0.0.0.0', port=8000)
Q20: How do you optimize Docker images for ML workloads?
Answer:
# Multi-stage build for smaller images
FROM python:3.8-slim as builder
# Install build dependencies
RUN apt-get update && apt-get install -y \
build-essential \
&& rm -rf /var/lib/apt/lists/*
# Copy and install Python dependencies
COPY requirements.txt .
RUN pip install --user --no-cache-dir -r requirements.txt
# Production stage
FROM python:3.8-slim
# Copy installed packages from builder
COPY --from=builder /root/.local /root/.local
# Add local bin to PATH
ENV PATH=/root/.local/bin:$PATH
# Copy application code
COPY src/ /app/
WORKDIR /app
# Use non-root user
RUN useradd --create-home --shell /bin/bash app
USER app
CMD ["python", "serve.py"]
Optimization strategies:
- Multi-stage builds
- Minimal base images
- Layer caching
- .dockerignore file
- Security best practices
CI/CD for ML
Q21: How does CI/CD differ for ML compared to traditional software?
Answer: Traditional CI/CD vs ML CI/CD:
| Aspect | Traditional | ML CI/CD |
|---|---|---|
| Artifacts | Code | Code + Data + Models |
| Testing | Unit/Integration tests | Data validation + Model performance |
| Deployment | Code deployment | Model deployment + monitoring |
| Triggers | Code changes | Code/Data/Model changes |
| Rollback | Previous code version | Previous model version |
| Monitoring | System metrics | Model performance metrics |
ML-specific considerations:
- Data versioning and validation
- Model performance testing
- A/B testing for model comparison
- Gradual rollout strategies
- Drift detection and retraining
Q22: Design a complete ML CI/CD pipeline.
Answer:
# GitHub Actions workflow for ML CI/CD
name: ML CI/CD Pipeline
on:
push:
branches: [main]
pull_request:
branches: [main]
jobs:
data-validation:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
- name: Setup Python
uses: actions/setup-python@v2
with:
python-version: 3.8
- name: Install dependencies
run: |
pip install -r requirements.txt
- name: Validate data schema
run: |
python scripts/validate_data.py
- name: Run data quality checks
run: |
python scripts/data_quality_checks.py
model-training:
needs: data-validation
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
- name: Setup Python
uses: actions/setup-python@v2
- name: Train model
run: |
python scripts/train_model.py
- name: Evaluate model
run: |
python scripts/evaluate_model.py
- name: Upload model artifacts
uses: actions/upload-artifact@v2
with:
name: model-artifacts
path: models/
model-testing:
needs: model-training
runs-on: ubuntu-latest
steps:
- name: Download model artifacts
uses: actions/download-artifact@v2
with:
name: model-artifacts
- name: Run model tests
run: |
python tests/test_model.py
- name: Performance benchmarking
run: |
python tests/benchmark_model.py
deploy-staging:
needs: model-testing
runs-on: ubuntu-latest
if: github.ref == 'refs/heads/main'
steps:
- name: Deploy to staging
run: |
docker build -t model-service:${{ github.sha }} .
docker push $ECR_REGISTRY/model-service:${{ github.sha }}
kubectl set image deployment/model-service \
model-service=$ECR_REGISTRY/model-service:${{ github.sha }}
integration-tests:
needs: deploy-staging
runs-on: ubuntu-latest
steps:
- name: Run integration tests
run: |
python tests/integration_tests.py --endpoint $STAGING_ENDPOINT
deploy-production:
needs: integration-tests
runs-on: ubuntu-latest
steps:
- name: Deploy to production
run: |
# Canary deployment
kubectl patch deployment model-service -p \
'{"spec":{"template":{"metadata":{"labels":{"version":"canary"}}}}}'
# Monitor and promote if successful
python scripts/canary_deployment.py
Q23: How do you implement automated model testing?
Answer:
# tests/test_model.py
import pytest
import numpy as np
import pandas as pd
from sklearn.metrics import accuracy_score, precision_score, recall_score
import joblib
class TestModel:
@pytest.fixture
def model(self):
return joblib.load('models/model.pkl')
@pytest.fixture
def test_data(self):
return pd.read_csv('data/test_data.csv')
def test_model_exists(self, model):
"""Test that model file exists and loads properly"""
assert model is not None
assert hasattr(model, 'predict')
def test_model_input_shape(self, model, test_data):
"""Test model accepts correct input shape"""
X_test = test_data.drop('target', axis=1)
predictions = model.predict(X_test)
assert len(predictions) == len(X_test)
def test_model_output_type(self, model, test_data):
"""Test model output format"""
X_test = test_data.drop('target', axis=1).iloc[:5]
predictions = model.predict(X_test)
assert isinstance(predictions, np.ndarray)
assert predictions.dtype in [np.int64, np.float64]
def test_model_accuracy_threshold(self, model, test_data):
"""Test model meets minimum accuracy threshold"""
X_test = test_data.drop('target', axis=1)
y_test = test_data['target']
predictions = model.predict(X_test)
accuracy = accuracy_score(y_test, predictions)
assert accuracy >= 0.8, f"Model accuracy {accuracy} below threshold"
def test_model_bias_fairness(self, model, test_data):
"""Test model fairness across different groups"""
X_test = test_data.drop('target', axis=1)
y_test = test_data['target']
predictions = model.predict(X_test)
# Test across sensitive attribute
for group in test_data['sensitive_attr'].unique():
group_mask = test_data['sensitive_attr'] == group
group_accuracy = accuracy_score(
y_test[group_mask],
predictions[group_mask]
)
assert group_accuracy >= 0.7, f"Bias detected for group {group}"
def test_model_robustness(self, model, test_data):
"""Test model robustness to input perturbations"""
X_test = test_data.drop('target', axis=1).iloc[:100]
original_predictions = model.predict(X_test)
# Add small noise
noise = np.random.normal(0, 0.01, X_test.shape)
X_noisy = X_test + noise
noisy_predictions = model.predict(X_noisy)
# Check prediction stability
stability = np.mean(original_predictions == noisy_predictions)
assert stability >= 0.9, f"Model not robust to noise: {stability}"
Model Monitoring & Governance
Q24: What are the key metrics to monitor for deployed ML models?
Answer: Performance Metrics:
- Accuracy, Precision, Recall, F1-score
- AUC-ROC, AUC-PR
- Mean Absolute Error, RMSE
- Business-specific metrics
Operational Metrics:
- Response time/latency
- Throughput (requests per second)
- Error rates (4xx, 5xx)
- Resource utilization (CPU, memory)
- Availability/uptime
Data Quality Metrics:
- Data drift detection
- Feature distribution changes
- Missing value rates
- Outlier detection
- Schema validation
Model Drift Metrics:
- Prediction drift
- Concept drift
- Population stability index (PSI)
- Characteristic stability index (CSI)
Q25: How do you implement drift detection?
Answer:
import numpy as np
from scipy import stats
from sklearn.metrics import jensen_shannon_distance
import pandas as pd
class DriftDetector:
def __init__(self, reference_data, threshold=0.1):
self.reference_data = reference_data
self.threshold = threshold
def detect_statistical_drift(self, current_data, method='ks'):
"""Detect drift using statistical tests"""
drift_results = {}
for column in self.reference_data.columns:
if method == 'ks':
# Kolmogorov-Smirnov test
statistic, p_value = stats.ks_2samp(
self.reference_data[column],
current_data[column]
)
drift_detected = p_value < 0.05
elif method == 'js':
# Jensen-Shannon divergence
js_distance = jensen_shannon_distance(
np.histogram(self.reference_data[column], bins=50)[0],
np.histogram(current_data[column], bins=50)[0]
)
drift_detected = js_distance > self.threshold
statistic, p_value = js_distance, None
drift_results[column] = {
'statistic': statistic,
'p_value': p_value,
'drift_detected': drift_detected
}
return drift_results
def detect_prediction_drift(self, reference_predictions, current_predictions):
"""Detect drift in model predictions"""
# Population Stability Index (PSI)
def calculate_psi(expected, actual, buckets=10):
expected_perc = pd.cut(expected, buckets, duplicates='drop').value_counts() / len(expected)
actual_perc = pd.cut(actual, buckets, duplicates='drop').value_counts() / len(actual)
psi = np.sum((actual_perc - expected_perc) * np.log(actual_perc / expected_perc))
return psi
psi = calculate_psi(reference_predictions, current_predictions)
# PSI interpretation
if psi < 0.1:
stability = "stable"
elif psi < 0.25:
stability = "moderate_drift"
else:
stability = "significant_drift"
return {
'psi': psi,
'stability': stability,
'drift_detected': psi > 0.1
}
# Usage example
detector = DriftDetector(reference_data=training_data)
drift_results = detector.detect_statistical_drift(production_data, method='ks')
# Set up monitoring
for feature, results in drift_results.items():
if results['drift_detected']:
print(f"Drift detected in feature {feature}")
# Trigger retraining pipeline
Q26: How do you implement model governance and compliance?
Answer:
class ModelGovernance:
def __init__(self, model_registry):
self.model_registry = model_registry
def register_model(self, model, metadata):
"""Register model with governance metadata"""
governance_metadata = {
'model_id': str(uuid.uuid4()),
'timestamp': datetime.utcnow(),
'version': metadata.get('version'),
'author': metadata.get('author'),
'description': metadata.get('description'),
'training_data': metadata.get('training_data'),
'performance_metrics': metadata.get('metrics'),
'compliance_status': 'pending',
'approval_status': 'pending',
'risk_assessment': self.assess_risk(model, metadata),
'bias_check': self.check_bias(model, metadata),
'explainability_score': self.calculate_explainability(model)
}
self.model_registry.register(model, governance_metadata)
return governance_metadata['model_id']
def assess_risk(self, model, metadata):
"""Assess model risk level"""
risk_factors = {
'data_sensitivity': metadata.get('data_sensitivity', 'medium'),
'business_impact': metadata.get('business_impact', 'medium'),
'model_complexity': self.calculate_complexity(model),
'deployment_scope': metadata.get('deployment_scope', 'limited')
}
# Risk scoring logic
risk_score = 0
for factor, value in risk_factors.items():
if value == 'high':
risk_score += 3
elif value == 'medium':
risk_score += 2
else:
risk_score += 1
if risk_score <= 4:
return 'low'
Comments