Orchestration with Prefect

The Business Problem: Automating and Scaling Analytics Workflows

Utilities rely on multiple machine learning models for forecasting, maintenance, outage prediction, and more. Running these models manually is inefficient and error-prone. Analysts often repeat the same steps daily or weekly, manually refreshing datasets, retraining models, and exporting results into reports or dashboards. This approach doesn’t scale and is vulnerable to human error.

Moreover, utility analytics often involve dependencies between models. For example, outage prediction workflows may depend on weather forecasts, while voltage optimization might require the latest load forecasts. Running these workflows in isolation risks misalignment and duplicated effort. Without orchestration, insights arrive late or inconsistently, limiting their value for operations that require real-time or near-real-time decision-making.

The Analytics Solution: Workflow Automation with Orchestration Tools

Orchestration platforms like Prefect automate and coordinate complex analytics pipelines. They manage task scheduling, dependencies, and error handling, ensuring that models run reliably and on time. Instead of manually executing scripts, utilities can define workflows where each task—data extraction, feature engineering, model training, prediction generation—runs automatically in sequence.

Prefect also provides monitoring and alerting, so teams are notified of failures or delays. Workflows can be configured to run at specific intervals (such as hourly forecasts) or in response to triggers (like severe weather alerts). This automation frees analysts to focus on improving models rather than rerunning them, while also providing operational consistency.

Integration with cloud and enterprise platforms allows Prefect to orchestrate workloads across different environments, including SCADA-linked systems, asset databases, and MLOps pipelines. This creates a unified backbone for all analytics operations.

Benefits for Utility Operations

Automating analytics workflows ensures that models run reliably and produce consistent outputs for decision-makers. Operators receive the latest forecasts without waiting for manual updates. Maintenance teams get updated asset risk scores automatically populated in dashboards. Outage planners can trigger risk models whenever storm warnings arise.

This reduces manual intervention, speeds insight delivery, and supports real-time operational use cases. Automated orchestration also improves governance by maintaining logs of every run, including data sources and model versions, simplifying compliance audits.

Transition to the Demo

In this chapter’s demo, we will build an orchestrated pipeline using Prefect. We will:

This demonstration shows how orchestration transforms machine learning from isolated scripts into coordinated, reliable workflows that align with the pace and complexity of utility operations.

pyfile shortcode: missing param 'file'. Example: {{< pyfile file="script.py" >}}


Code

"""
Chapter 16: Integrated Orchestration of ML Pipelines
Pipeline scheduling and orchestration using Prefect for utilities.
"""

import pandas as pd
import numpy as np
from prefect import flow, task
from sklearn.ensemble import RandomForestClassifier, GradientBoostingClassifier
from sklearn.metrics import classification_report
from statsmodels.tsa.arima.model import ARIMA

# --- TASKS ---

@task
def predictive_maintenance_task(samples=500):
    np.random.seed(42)
    temp = np.random.normal(60, 5, samples)
    vibration = np.random.normal(0.2, 0.05, samples)
    oil_quality = np.random.normal(70, 10, samples)
    age = np.random.randint(1, 30, samples)
    failure_prob = 1 / (1 + np.exp(-(0.05*(temp-65) + 8*(vibration-0.25))))
    failure = np.random.binomial(1, failure_prob)

    df = pd.DataFrame({"Temperature": temp, "Vibration": vibration, "OilQuality": oil_quality, "Age": age, "Failure": failure})
    X, y = df[["Temperature", "Vibration", "OilQuality", "Age"]], df["Failure"]
    model = RandomForestClassifier(n_estimators=100).fit(X, y)
    preds = model.predict(X)
    print("Predictive Maintenance Report:")
    print(classification_report(y, preds))
    return df

@task
def load_forecasting_task():
    date_rng = pd.date_range(start="2023-01-01", periods=24*14, freq="H")
    load = 900 + 100*np.sin(2*np.pi*date_rng.hour/24) + np.random.normal(0, 30, len(date_rng))
    ts = pd.Series(load, index=date_rng)
    model = ARIMA(ts, order=(2, 1, 2)).fit()
    forecast = model.forecast(steps=24)
    print("\nLoad Forecast (Next 24 hours):")
    print(forecast.tail())
    return forecast

@task
def outage_prediction_task(samples=1000):
    wind = np.random.normal(20, 7, samples)
    trees = np.random.uniform(0, 1, samples)
    rainfall = np.random.normal(50, 15, samples)
    outage_prob = 1 / (1 + np.exp(-(0.15*(wind-25) + 2*(trees-0.5))))
    outage = np.random.binomial(1, outage_prob)
    df = pd.DataFrame({"WindSpeed": wind, "TreeDensity": trees, "Rainfall": rainfall, "Outage": outage})
    X, y = df[["WindSpeed", "TreeDensity", "Rainfall"]], df["Outage"]
    model = GradientBoostingClassifier().fit(X, y)
    preds = model.predict(X)
    print("\nOutage Prediction Report:")
    print(classification_report(y, preds))
    return df

@task
def cybersecurity_task():
    print("\nCybersecurity task simulated (CICIDS2017 streaming detection placeholder).")
    # In production, link to Chapter 13’s intrusion detection pipeline here.

# --- ORCHESTRATED FLOW ---
@flow
def utility_ml_pipeline():
    print("\n--- Running Utility ML Pipeline ---")
    predictive_maintenance_task()
    load_forecasting_task()
    outage_prediction_task()
    cybersecurity_task()
    print("\n--- Pipeline Run Complete ---")

if __name__ == "__main__":
    utility_ml_pipeline()