Fine Grained Demand Forecasting Accelerator
View Full Project Code on GitHub

AI-Powered Demand Forecasting - Intelligent Prediction Engine

๐Ÿค– Advanced Analytics Solution

Transform your retail operations with intelligent demand prediction that learns from historical patterns: - AI-driven forecasting across thousands of store-product combinations - Seasonal intelligence that adapts to holiday patterns and trends - Risk-aware predictions with confidence intervals for safety stock - Automated model training that scales to enterprise product catalogs

๐Ÿ“ˆ Industry Innovation

โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”    โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”    โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
โ”‚  Sales History  โ”‚โ”€โ”€โ”€โ–ถโ”‚  AI Forecasting โ”‚โ”€โ”€โ”€โ–ถโ”‚ Business Actionsโ”‚
โ”‚  โœ… Foundation  โ”‚    โ”‚ ๐Ÿ”„ Current step โ”‚    โ”‚  โณ Coming next โ”‚
โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜    โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜    โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜

๐ŸŽฏ Smart Forecasting Capabilities

  • Pattern Recognition: Automatically detects seasonal, weekly, daily trends
  • Demand Volatility Management: Handles irregular patterns and disruptions
  • Uncertainty Quantification: Provides confidence ranges for inventory
  • Scale & Performance: Processes thousands of products simultaneously
  • Business Intelligence: Generates actionable insights for supply chains

๐Ÿ“ฆ AI/ML Environment Setup

# Install libraries for serverless compute
# MAGIC %pip install prophet>=1.1.5 plotly>=5.17.0 scikit-learn>=1.3.0
# Restart Python to use newly installed libraries
dbutils.library.restartPython()

โš™๏ธ Forecasting Engine Configuration

import pandas as pd
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, count, max as spark_max, min as spark_min, current_timestamp
from pyspark.sql.functions import pandas_udf, PandasUDFType
from pyspark.sql.types import StructType, StructField, DateType, DoubleType, IntegerType, StringType, TimestampType
from prophet import Prophet
from datetime import datetime, timedelta
import warnings
warnings.filterwarnings('ignore')

print("๐Ÿ“š Libraries imported successfully")
# Get parameters from job or use defaults
catalog_name = dbutils.widgets.get("catalog_name") if dbutils.widgets.get("catalog_name") else "dev_demand_forecasting"
schema_name = dbutils.widgets.get("schema_name") if dbutils.widgets.get("schema_name") else "forecasting"

# Forecasting parameters - optimized for faster execution
FORECAST_HORIZON_DAYS = 15  # Reduced from 30 for faster processing
MIN_HISTORY_DAYS = 90
CONFIDENCE_INTERVAL = 0.95
MODEL_VERSION = "prophet_v1.1.5_serverless_optimized"

# Process all generated data for comprehensive forecasting
MAX_STORES = 5    # Match data generation: stores 1-5
MAX_ITEMS = 25    # Match data generation: items 1-25

print("๐Ÿ”ง AI Forecasting Engine Settings (Optimized):")
print(f"   ๐Ÿ”ฎ Forecast horizon: {FORECAST_HORIZON_DAYS} days ahead")
print(f"   ๐Ÿ“Š Minimum sales history: {MIN_HISTORY_DAYS} days")
print(f"   ๐ŸŽฏ Prediction confidence: {CONFIDENCE_INTERVAL * 100}%")
print(f"   ๐Ÿ›’ Retail scope: {MAX_STORES} stores ร— {MAX_ITEMS} products")
print(f"   ๐Ÿค– Processing: Intelligent auto-scaling (optimized)")

# Set up Spark session
spark = SparkSession.builder.getOrCreate()

๐Ÿ“Š Retail Sales Data Analysis

Historical Sales Pattern Analysis

Loading and analyzing your retail sales history to identify forecasting opportunities and data quality.

print("๐Ÿ“ฅ Loading retail sales history for AI analysis...")

# Load historical sales data
raw_table = f"{catalog_name}.{schema_name}.raw_sales_data"
df = spark.table(raw_table)

print(f"โœ… Sales data ready for analysis")
print(f"๐Ÿ›’ Total sales transactions: {df.count():,}")

# Data quality summary
date_range = df.select(spark_min('date'), spark_max('date')).collect()[0]
print(f"๐Ÿ“… Date range: {date_range[0]} to {date_range[1]}")
print(f"๐Ÿช Stores: {df.select('store').distinct().count()}")
print(f"๐Ÿ“ฆ Items: {df.select('item').distinct().count()}")
print("๐Ÿ” Analyzing sales patterns for AI model training...")

# Check data completeness for selected store-item combinations
validation_df = (
    df.filter(col("store") <= MAX_STORES)
    .filter(col("item") <= MAX_ITEMS)
    .groupBy("store", "item")
    .agg(
        count("*").alias("record_count"),
        spark_min("date").alias("start_date"),
        spark_max("date").alias("end_date")
    )
)

validation_results = validation_df.collect()

print(f"๐Ÿ“ˆ Analyzing {len(validation_results)} store-item combinations:")

sufficient_data_count = 0
for row in validation_results:
    days_of_data = (row['end_date'] - row['start_date']).days + 1
    sufficient = days_of_data >= MIN_HISTORY_DAYS
    if sufficient:
        sufficient_data_count += 1
    
    status = "โœ…" if sufficient else "โŒ"
    print(f"   {status} Store {row['store']}, Item {row['item']}: {row['record_count']} records, {days_of_data} days")

print(f"\n๐ŸŽฏ {sufficient_data_count}/{len(validation_results)} product-store combinations ready for AI forecasting")

๐Ÿง  AI-Powered Demand Prediction Engine

Intelligent Forecasting at Scale

Deploy advanced machine learning to predict demand across your entire product portfolio. Our AI engine will: 1. Learn from sales patterns (seasonality, trends, promotions) 2. Train product-specific models optimized for each store-item combination 3. Generate intelligent forecasts with uncertainty quantification 4. Scale automatically to handle enterprise product catalogs

@pandas_udf(returnType=StructType([
    StructField("store", IntegerType()),
    StructField("item", IntegerType()),
    StructField("forecast_date", DateType()),
    StructField("yhat", DoubleType()),
    StructField("yhat_lower", DoubleType()),
    StructField("yhat_upper", DoubleType()),
    StructField("model_version", StringType())
]), functionType=PandasUDFType.GROUPED_MAP)
def forecast_store_item(pdf):
    """
    Train Prophet model and generate forecasts for a single store-item combination
    
    Args:
        pdf (pandas.DataFrame): Historical sales data for one store-item combination
        
    Returns:
        pandas.DataFrame: Forecast results with confidence intervals
    """
    try:
        # Validate input data
        if len(pdf) < MIN_HISTORY_DAYS:
            print(f"Insufficient data: Store {pdf['store'].iloc[0]}, Item {pdf['item'].iloc[0]} has only {len(pdf)} days")
            return pd.DataFrame()
        
        # Prepare data for Prophet (requires 'ds' and 'y' columns)
        prophet_df = pdf[['date', 'sales']].rename(columns={'date': 'ds', 'sales': 'y'})
        prophet_df['ds'] = pd.to_datetime(prophet_df['ds'])
        prophet_df = prophet_df.sort_values('ds')
        
        # Remove any duplicate dates (if they exist)
        prophet_df = prophet_df.drop_duplicates(subset=['ds'])
        
        # Create and configure Prophet model
        model = Prophet(
            daily_seasonality=True,      # Capture day-of-week patterns
            weekly_seasonality=True,     # Capture weekly patterns  
            yearly_seasonality=True,     # Capture seasonal patterns
            interval_width=CONFIDENCE_INTERVAL,  # Set confidence interval width
            changepoint_prior_scale=0.05,       # Controls trend flexibility
            seasonality_prior_scale=10.0        # Controls seasonality strength
        )
        
        # Fit the model
        model.fit(prophet_df)
        
        # Create future dataframe for forecasting
        future = model.make_future_dataframe(periods=FORECAST_HORIZON_DAYS)
        
        # Generate forecast
        forecast = model.predict(future)
        
        # Filter to only future dates (exclude historical fitted values)
        last_historical_date = prophet_df['ds'].max()
        future_forecast = forecast[forecast['ds'] > last_historical_date].copy()
        
        # Prepare output with store/item identifiers
        result = pd.DataFrame({
            'store': pdf['store'].iloc[0],
            'item': pdf['item'].iloc[0],
            'forecast_date': future_forecast['ds'].dt.date,
            'yhat': future_forecast['yhat'],
            'yhat_lower': future_forecast['yhat_lower'],
            'yhat_upper': future_forecast['yhat_upper'],
            'model_version': MODEL_VERSION
        })
        
        # Ensure non-negative forecasts (sales can't be negative)
        result['yhat'] = result['yhat'].clip(lower=0)
        result['yhat_lower'] = result['yhat_lower'].clip(lower=0)
        result['yhat_upper'] = result['yhat_upper'].clip(lower=0)
        
        return result
        
    except Exception as e:
        print(f"โŒ Error forecasting Store {pdf['store'].iloc[0]}, Item {pdf['item'].iloc[0]}: {str(e)}")
        return pd.DataFrame()

print("โœ… AI demand prediction engine ready")

๐Ÿš€ Deploy AI Forecasting at Enterprise Scale

Intelligent Demand Prediction Across Your Retail Portfolio

Launch AI-powered forecasting across your stores and products. The system automatically trains specialized models for each product-location combination.

print("๐Ÿš€ Launching AI-powered demand forecasting...")

# Get all available store-item combinations dynamically from the data
available_combinations = (
    df.select("store", "item")
    .distinct()
    .collect()
)

print(f"๐ŸŽฏ Discovered {len(available_combinations)} store-item combinations in data")

# Create forecast results storage
all_forecasts = []

# Process each combination individually for better error handling
for i, row in enumerate(available_combinations):
    store_id = row['store']
    item_id = row['item']
    
    try:
        # Filter data for this specific store-item combination
        store_item_data = (
            df.filter((col("store") == store_id) & (col("item") == item_id))
            .select("date", "sales")
            .orderBy("date")
            .toPandas()
        )
        
        # Check if we have enough data
        if len(store_item_data) < MIN_HISTORY_DAYS:
            print(f"โš ๏ธ  Store {store_id}, Item {item_id}: Only {len(store_item_data)} days (need {MIN_HISTORY_DAYS})")
            continue
            
        # Prepare data for Prophet
        prophet_df = store_item_data.rename(columns={'date': 'ds', 'sales': 'y'})
        prophet_df['ds'] = pd.to_datetime(prophet_df['ds'])
        prophet_df = prophet_df.sort_values('ds').drop_duplicates(subset=['ds'])
        
        # Train Prophet model
        model = Prophet(
            daily_seasonality=True,
            weekly_seasonality=True,
            yearly_seasonality=True,
            interval_width=CONFIDENCE_INTERVAL,
            changepoint_prior_scale=0.05,
            seasonality_prior_scale=10.0
        )
        
        # Suppress Prophet logging
        import logging
        logging.getLogger('prophet').setLevel(logging.WARNING)
        
        model.fit(prophet_df)
        
        # Generate forecasts
        future = model.make_future_dataframe(periods=FORECAST_HORIZON_DAYS)
        forecast = model.predict(future)
        
        # Get only future predictions
        last_date = prophet_df['ds'].max()
        future_forecast = forecast[forecast['ds'] > last_date].copy()
        
        # Prepare results
        for _, forecast_row in future_forecast.iterrows():
            all_forecasts.append({
                'store': int(store_id),
                'item': int(item_id),
                'forecast_date': forecast_row['ds'].date(),
                'yhat': max(0, float(forecast_row['yhat'])),
                'yhat_lower': max(0, float(forecast_row['yhat_lower'])),
                'yhat_upper': max(0, float(forecast_row['yhat_upper'])),
                'model_version': MODEL_VERSION
            })
        
        if (i + 1) % 25 == 0:  # Progress update every 25 combinations
            print(f"๐Ÿ“ˆ Processed {i + 1}/{len(available_combinations)} combinations...")
            
    except Exception as e:
        print(f"โŒ Error with Store {store_id}, Item {item_id}: {str(e)}")
        continue

print(f"โœ… Forecasting complete! Generated predictions for {len(set([(f['store'], f['item']) for f in all_forecasts]))} combinations")

# Convert to Spark DataFrame with explicit schema to prevent type inference issues
if all_forecasts:
    # Define explicit schema to match the Delta table exactly
    forecast_schema = StructType([
        StructField("store", IntegerType(), True),
        StructField("item", IntegerType(), True),
        StructField("forecast_date", DateType(), True),
        StructField("yhat", DoubleType(), True),
        StructField("yhat_lower", DoubleType(), True),
        StructField("yhat_upper", DoubleType(), True),
        StructField("model_version", StringType(), True)
    ])
    
    forecast_df = spark.createDataFrame(all_forecasts, schema=forecast_schema)
    forecast_count = forecast_df.count()
    print(f"๐Ÿ”ฎ Generated {forecast_count:,} individual demand predictions")
else:
    forecast_df = None
    forecast_count = 0
    print("โŒ No forecasts generated")
if forecast_count > 0:
    print("โœ… AI-powered demand forecasting completed successfully!")
    
    # Business impact summary
    unique_combinations = forecast_df.select("store", "item").distinct().count()
    date_range = forecast_df.select(spark_min("forecast_date"), spark_max("forecast_date")).collect()[0]
    
    print(f"๐Ÿ›’ Products with AI forecasts: {unique_combinations} store-product combinations")
    print(f"๐Ÿ“… Demand predictions through: {date_range[1]}")
    print(f"๐Ÿ“Š Daily predictions per product: {forecast_count // unique_combinations}")
    
    # Sample business forecasts
    sample_forecasts = forecast_df.select("store", "item", "forecast_date", "yhat", "yhat_lower", "yhat_upper").limit(5).collect()
    print("\n๐Ÿ“‹ Sample demand predictions:")
    for row in sample_forecasts:
        print(f"   Store {row['store']}, Product {row['item']}, {row['forecast_date']}: {row['yhat']:.0f} units (range: {row['yhat_lower']:.0f}-{row['yhat_upper']:.0f})")
        
else:
    print("โŒ No demand forecasts generated - review data requirements")
if forecast_count > 0:
    print("๐Ÿ’พ Saving demand forecasts to Unity Catalog...")
    
    # Add business metadata with proper timestamp type
    forecasts_with_timestamp = forecast_df.withColumn("created_timestamp", current_timestamp())
    
    # Ensure columns are in the exact order expected by the table schema
    final_forecasts = forecasts_with_timestamp.select(
        "store",
        "item", 
        "forecast_date",
        "yhat",
        "yhat_lower",
        "yhat_upper",
        "model_version",
        "created_timestamp"
    )
    
    # Validate final schema before save
    print("๐Ÿ” Validating final schema before save...")
    final_forecasts.printSchema()
    
    # Save to table
    forecast_table = f"{catalog_name}.{schema_name}.forecast_results"
    final_forecasts.write.mode("overwrite").saveAsTable(forecast_table)
    
    print(f"โœ… Saved {forecast_count:,} demand predictions to {forecast_table}")
    
    # Verify save
    saved_df = spark.table(forecast_table)
    saved_count = saved_df.count()
    unique_combinations = saved_df.select("store", "item").distinct().count()
    date_range = saved_df.select(spark_min("forecast_date"), spark_max("forecast_date")).collect()[0]
    
    print(f"๐Ÿ“Š Verified: {saved_count:,} forecasts for {unique_combinations} store-item combinations")
    print(f"๐Ÿ“… Forecast period: {date_range[0]} to {date_range[1]}")
    
else:
    print("โš ๏ธ  No forecasts to save")

print(f"๐ŸŽฏ Forecast generation complete: {forecast_count:,} predictions ready for business use")

๐Ÿ“Š AI Forecasting Performance Summary

Business Impact Assessment

print("๐Ÿ“Š AI Forecasting Business Impact Report")
print("=" * 50)

if forecast_count > 0:
    # Load saved forecasts for analysis
    forecast_table = f"{catalog_name}.{schema_name}.forecast_results"
    results_df = spark.table(forecast_table)
    
    # Performance metrics
    total_models = results_df.select("store", "item").distinct().count()
    total_forecasts = results_df.count()
    avg_forecast_value = results_df.agg({"yhat": "avg"}).collect()[0][0]
    
    print(f"๐Ÿ›’ Products with AI forecasts: {total_models}")
    print(f"๐Ÿ”ฎ Total demand predictions: {total_forecasts:,}")
    print(f"๐Ÿ“Š Average daily demand forecast: {avg_forecast_value:.0f} units")
    print(f"๐Ÿค– AI Engine: Advanced time series modeling")
    print(f"โšก Processing: Enterprise auto-scaling platform")
    print(f"๐Ÿ“… Planning horizon: {FORECAST_HORIZON_DAYS} days ahead")
    
    # Forecast range analysis
    forecast_stats = results_df.select("yhat", "yhat_lower", "yhat_upper").describe().collect()
    print(f"\n๐Ÿ“ˆ Forecast Value Distribution:")
    for stat in forecast_stats:
        if stat['summary'] in ['mean', 'min', 'max', 'stddev']:
            print(f"   {stat['summary'].capitalize()}: {float(stat['yhat']):.2f}")
    
    print(f"\nโœ… AI demand forecasting engine operational!")
    
else:
    print("โŒ No AI models were successfully trained")
    print("   Review sales data requirements and training logs")

๐Ÿ“‹ Summary & Next Steps

โœ… AI Forecasting Engine Deployed:

  1. ๐Ÿง  Advanced AI Models: Specialized demand prediction for each product-store
  2. ๐Ÿ”ฎ Intelligent Forecasts: 30-day demand predictions with confidence ranges
  3. โšก Enterprise Scale: Automatic scaling across thousands of products
  4. ๐Ÿ’พ Business Integration: Forecasts available to all planning teams
  5. ๐Ÿ“Š Quality Assurance: Validated predictions ready for inventory planning

๐Ÿ”„ Business Intelligence Next:

Final Step: Executive Insights & Action Planning - Interactive demand visualization dashboards - Inventory optimization recommendations - Supply chain planning insights - Executive KPI reporting and ROI analysis

๐Ÿ“ Output Location:

Catalog: {catalog_name}
Schema: {schema_name}  
Table: forecast_results
Records: {forecast_count:,} forecasts

๐ŸŽ‰ AI Forecasting Engine Live!

Your intelligent demand prediction system is operational!

Ready to transform forecasts into actionable business insights and inventory optimization strategies.

# Return completion status for workflow orchestration
completion_message = f"SUCCESS: {forecast_count} demand predictions generated for {unique_combinations if forecast_count > 0 else 0} product-store combinations"
dbutils.notebook.exit(completion_message)