MLOps for Utilities
The Business Problem: Scaling Machine Learning Beyond Pilots
Utilities increasingly run pilot projects to apply machine learning to forecasting, maintenance, or outage prediction. While these pilots often show promise, few make it into production. Models remain in notebooks or are run manually by analysts. There is no automated way to retrain them, deploy them reliably, or monitor their performance over time.
This gap between experimentation and production limits impact. Without a clear path to operational deployment, even accurate models fail to inform real-time decision-making. Meanwhile, regulatory requirements for auditability and reliability add complexity to deploying analytics in critical infrastructure environments.
Utilities need a structured approach to operationalize machine learning, ensuring models are reproducible, monitored, and integrated with existing IT and OT systems.
The Analytics Solution: Machine Learning Operations (MLOps)
MLOps brings the discipline of software operations to machine learning workflows. It provides frameworks and processes for model versioning, automated training pipelines, deployment, and monitoring.
In practice, this means using tools like MLflow to track model experiments and register the best-performing versions in a centralized repository. Deployment frameworks then package models as APIs, allowing integration with control room dashboards, SCADA feeds, or enterprise systems. Continuous monitoring tracks model drift, retraining models automatically when performance declines.
By formalizing these workflows, utilities move from ad hoc analytics to repeatable, scalable machine learning. MLOps also supports governance and compliance by recording exactly which data, code, and parameters produced each model, ensuring transparency and auditability.
Operational Benefits
Implementing MLOps accelerates the time from proof-of-concept to production, allowing utilities to realize value faster. Predictive maintenance models can be deployed as APIs that score assets continuously from incoming SCADA or IoT data. Load forecasting models can feed directly into market scheduling systems. Outage prediction models can be run automatically when storm warnings are issued, producing real-time crew staging plans.
MLOps also improves reliability. Automated retraining ensures models remain accurate as system conditions evolve, such as load growth or changes in DER penetration. Alerting and monitoring help detect anomalies, preventing models from silently degrading. This approach aligns machine learning with the rigor expected of operational tools in critical infrastructure.
Transition to the Demo
In this chapter’s demo, we will implement a complete MLOps workflow. We will:
- Train a transformer failure prediction model and log it in MLflow with full versioning.
- Register the model for production use and expose it through an API endpoint.
- Simulate real-time scoring of streaming data, illustrating how the model integrates into operational contexts.
This demonstration bridges the gap between analytics and operations, showing how machine learning can be embedded into utility workflows with the reliability, traceability, and scalability required for real-world impact.
Code
"""
Chapter 12: MLOps for Utilities
Automatic MLflow model serving with FastAPI and Kafka for real-time transformer failure prediction.
"""
import pandas as pd
import numpy as np
import mlflow
import mlflow.sklearn
from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import classification_report
from fastapi import FastAPI
from pydantic import BaseModel
import uvicorn
from kafka import KafkaConsumer
import json
import threading
# ---------- MODEL TRAINING & REGISTRY ----------
def generate_asset_data(samples=500):
"""
Synthetic transformer asset health dataset.
"""
np.random.seed(42)
temp = np.random.normal(60, 5, samples)
vibration = np.random.normal(0.2, 0.05, samples)
oil_quality = np.random.normal(70, 10, samples)
age = np.random.randint(1, 30, samples)
failure_prob = 1 / (1 + np.exp(-(0.05*(temp-65) + 8*(vibration-0.25))))
failure = np.random.binomial(1, failure_prob)
return pd.DataFrame({
"Temperature_C": temp,
"Vibration_g": vibration,
"OilQuality_Index": oil_quality,
"Age_Years": age,
"Failure": failure
})
def train_and_register_model(df, experiment_name="Utilities_MLOps", model_name="TransformerFailureModel"):
"""
Train a model, log to MLflow, and register it to the MLflow model registry.
"""
X = df[["Temperature_C", "Vibration_g", "OilQuality_Index", "Age_Years"]]
y = df["Failure"]
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
mlflow.set_experiment(experiment_name)
with mlflow.start_run():
model = RandomForestClassifier(n_estimators=100, random_state=42)
model.fit(X_train, y_train)
preds = model.predict(X_test)
print("Transformer Failure Prediction Report:")
print(classification_report(y_test, preds))
mlflow.sklearn.log_model(model, "rf_model", registered_model_name=model_name)
mlflow.log_param("n_estimators", 100)
mlflow.log_metric("accuracy", (preds == y_test).mean())
print(f"Model '{model_name}' registered in MLflow.")
return model
def load_production_model(model_name="TransformerFailureModel"):
"""
Load the latest production model from MLflow.
"""
print(f"Loading latest production model '{model_name}' from MLflow registry...")
return mlflow.sklearn.load_model(f"models:/{model_name}/Production")
# ---------- FASTAPI ENDPOINT ----------
app = FastAPI()
class TransformerData(BaseModel):
Temperature_C: float
Vibration_g: float
OilQuality_Index: float
Age_Years: int
model = None # Will be set dynamically
@app.post("/predict")
def predict(data: TransformerData):
"""
Predict transformer failure risk.
"""
X = np.array([[data.Temperature_C, data.Vibration_g, data.OilQuality_Index, data.Age_Years]])
pred = model.predict(X)[0]
return {"failure_risk": int(pred)}
# ---------- KAFKA STREAMING ----------
def start_kafka_consumer(model, topic="scada_stream", bootstrap_servers="localhost:9092"):
"""
Start Kafka consumer to ingest SCADA data and run live predictions.
"""
consumer = KafkaConsumer(
topic,
bootstrap_servers=bootstrap_servers,
value_deserializer=lambda x: json.loads(x.decode('utf-8'))
)
print(f"Kafka consumer connected to topic: {topic}")
for msg in consumer:
data = msg.value
X = np.array([[data["Temperature_C"], data["Vibration_g"], data["OilQuality_Index"], data["Age_Years"]]])
pred = model.predict(X)[0]
print(f"[Kafka Prediction] Transformer={data.get('TransformerID', 'Unknown')} | Failure Risk={pred}")
# ---------- MAIN ENTRY POINT ----------
if __name__ == "__main__":
# Train and register a model (only needed once)
df = generate_asset_data()
train_and_register_model(df)
# Load the latest Production model from MLflow
model = load_production_model()
# Start Kafka consumer in a background thread
threading.Thread(target=start_kafka_consumer, args=(model,), daemon=True).start()
# Start FastAPI server for REST predictions
print("Starting FastAPI at http://127.0.0.1:8000")
uvicorn.run(app, host="0.0.0.0", port=8000)