Building a Caching System with FastHTML

April 11, 2025
fasthtml

TLDR; This post shows how to build a caching system for a data dashboard in FastHTML. It combines Parquet files for persistence with in-memory storage for speed. By pre-computing metrics and storing them in a custom cache class, the system delivers fast responses while minimising API calls and server load. It's designed to handle outages and rate limits gracefully, making it ideal for any dashboard that needs up-to-date data. The code is adapted from another project (not weather related) by an LLM, so the idea is the same from the caching point of view but I haven't verified the weather implementation works perfectly! I have specified at the beginning of each section whether you can skip it if you are just interested in the caching mechanism and not any of the weather stuff.

The Example Use Case: Weather Data Dashboard

Imagine you're tasked with building a weather dashboard (or API - the caching would allow for more performant APIs also) where users can view historical weather data for different cities. Think any dashboard that needs live or non-static data. The application needs to:

  1. Fetch raw weather data from a public API
  2. Process this data to calculate various metrics (averages, trends, etc.)
  3. Present interactive visualizations to users
  4. Update data periodically for current conditions

However, several challenges make this non-trivial:

  • API Limitations: APIs often have rate limits and latency
  • Computational Expense: Calculating metrics on large datasets is computationally expensive
  • Performance Expectations: Users expect near-instant responses
  • Data Volume: Historical weather data can be substantial so we can't just store all historical data in memory

The solution? A robust caching system that balances data freshness with performance, pre-computing and storing results while allowing for periodic updates.

Project Setup and Architecture

Let's start with our project structure:

weather_dashboard/
├── app/
│   ├── __init__.py
│   ├── cache_update.py          # Cache update endpoints
│   └── components/              # UI components
│       ├── __init__.py
│       └── charts.py            # Weather chart components
├── data/
│   ├── __init__.py
│   ├── cache/                   # Cached parquet files
│   ├── cache_manager.py         # Cache management
│   └── cache_state.py           # Global cache state
├── weather_data/
│   ├── __init__.py
│   ├── client.py                # API client
│   └── processor.py             # Data processing
├── configuration/
│   ├── __init__.py
│   ├── constants.py             # Config constants
│   └── logging_config.py        # Logging setup
├── logs/                        # Log files
├── main.py                      # Application entry
├── update_cache.py              # Script that calls the cache update endpoint (invoked via cron job)
├── pyproject.toml               # requirements etc.
└── README.md                    # Documentation

Dependencies

First, install the required packages (I use uv for package management and find it infinitely better than pip):

uv sync
  • FastHTML: For creating the web application
  • Polars: A high-performance dataframe library (Pandas but faster and nicer to use imo)
  • Plotly: For interactive visualizations
  • Requests: For API calls

Core Component 1: Weather Data Client

Skip this section if you're just interested in the caching mechanism

First, we need a client for fetching weather data from a public API (we'll use Open-Meteo, which is completely free and doesn't require authentication). The key features are:

  • Connects to the free Open-Meteo API
  • Handles errors gracefully with proper logging
  • Returns data in a Polars DataFrame for efficient processing
  • Converts dates to appropriate data types
# weather_data/client.py
from datetime import datetime
import requests
import polars as pl
import logging

logger = logging.getLogger(__name__)

class WeatherDataClient:
    def __init__(self):
        self.base_url = "https://archive-api.open-meteo.com/v1/archive"

    def fetch_historical_data(
        self,
        city_name: str,
        latitude: float,
        longitude: float,
        start_date: datetime,
        end_date: datetime
    ) -> pl.DataFrame:
        """Fetch historical weather data for a given location and date range"""
        try:
            # Format dates for the API
            start_date_str = start_date.strftime("%Y-%m-%d")
            end_date_str = end_date.strftime("%Y-%m-%d")

            # Make API request
            response = requests.get(
                self.base_url,
                params={
                    "latitude": latitude,
                    "longitude": longitude,
                    "start_date": start_date_str,
                    "end_date": end_date_str,
                    "daily": "temperature_2m_max,temperature_2m_min,precipitation_sum,windspeed_10m_max",
                    "timezone": "GMT"
                },
                timeout=10  # Added timeout for production reliability
            )
            response.raise_for_status()

            # Parse API response
            data = response.json()

            # Convert to polars DataFrame
            daily_data = {
                "date": data["daily"]["time"],
                "temp_max": data["daily"]["temperature_2m_max"],
                "temp_min": data["daily"]["temperature_2m_min"],
                "precipitation": data["daily"]["precipitation_sum"],
                "wind_speed": data["daily"]["windspeed_10m_max"],
            }

            df = pl.DataFrame(daily_data)

            # Add city name column for easy identification
            df = df.with_columns(pl.lit(city_name).alias("city"))

            # Convert date strings to datetime objects
            df = df.with_columns(
                pl.col("date").str.strptime(pl.Date, fmt="%Y-%m-%d")
            )

            return df

        except requests.RequestException as e:
            logger.error(f"API request failed for {city_name}: {str(e)}")
            raise
        except Exception as e:
            logger.error(f"Error processing data for {city_name}: {str(e)}")
            raise

Core Component 2: Data Processor

Also skip this section if you're just interested in the caching mechanism

Next, we need a processor for calculating weather metrics. The key features are:

  • Calculates useful weather metrics (temperature range, moving averages, cumulative values)
  • Generates weather alerts based on threshold conditions
  • Handles empty DataFrames gracefully
  • Comprehensive error handling
# weather_data/processor.py
import polars as pl
import logging

logger = logging.getLogger(__name__)

class WeatherDataProcessor:
    @staticmethod
    def calculate_metrics(df: pl.DataFrame) -> pl.DataFrame:
        """Calculate additional weather metrics"""
        if df.is_empty():
            return df

        try:
            # Calculate temperature range (daily high - low)
            df = df.with_columns(
                (pl.col("temp_max") - pl.col("temp_min")).alias("temp_range")
            )

            # Calculate 7-day moving averages
            df = df.with_columns([
                pl.col("temp_max").rolling_mean(window_size=7).alias("temp_max_7d_avg"),
                pl.col("temp_min").rolling_mean(window_size=7).alias("temp_min_7d_avg"),
                pl.col("precipitation").rolling_mean(window_size=7).alias("precip_7d_avg")
            ])

            # Calculate cumulative precipitation
            df = df.with_columns(
                pl.col("precipitation").cum_sum().alias("cumulative_precip")
            )

            return df

        except Exception as e:
            logger.error(f"Error calculating metrics: {str(e)}")
            raise

    @staticmethod
    def generate_alerts(df: pl.DataFrame) -> pl.DataFrame:
        """Generate weather alerts based on conditions"""
        if df.is_empty():
            return df

        try:
            return df.with_columns([
                # High temperature alert (over 30°C/86°F)
                (pl.col("temp_max") > 30).alias("heat_alert"),

                # Heavy rain alert (over 20mm/~0.8in)
                (pl.col("precipitation") > 20).alias("heavy_rain_alert"),

                # Strong wind alert (over 40 km/h or ~25 mph)
                (pl.col("wind_speed") > 40).alias("high_wind_alert"),

                # Extreme temperature range (daily swing > 20°C/36°F)
                (pl.col("temp_range") > 20).alias("extreme_temp_range_alert")
            ])
        except Exception as e:
            logger.error(f"Error generating alerts: {str(e)}")
            raise

Core Component 3: Cache Manager

Now we need a cache manager to store processed data efficiently. The key features are:

  • Stores processed data in Parquet format for efficiency
  • Organizes cache files by city and month for better management
  • Implements file-based locking to prevent concurrent updates
  • Automatically cleans up old cache files
  • Handles date ranges across multiple months
# data/cache_manager.py
from datetime import datetime, timedelta
from pathlib import Path
import polars as pl
import logging

from weather_data.client import WeatherDataClient
from weather_data.processor import WeatherDataProcessor

logger = logging.getLogger(__name__)

# City coordinates - hardcoded for simplicity
CITY_COORDINATES = {
    "london": {"lat": 51.5074, "lon": -0.1278},
    "new_york": {"lat": 40.7128, "lon": -74.0060},
    "tokyo": {"lat": 35.6762, "lon": 139.6503},
    "sydney": {"lat": -33.8688, "lon": 151.2093},
    "cairo": {"lat": 30.0444, "lon": 31.2357}
}

class CacheManager:
    def __init__(
        self,
        cache_dir: str = "data/cache",
        retention_days: int = 30  # Keep a month of data
    ):
        self.cache_dir = Path(cache_dir)
        self.cache_dir.mkdir(parents=True, exist_ok=True)
        self.retention_days = retention_days
        self.weather_client = WeatherDataClient()
        self.processor = WeatherDataProcessor()

        # Create a file lock directory for concurrency control
        self.lock_dir = self.cache_dir / "locks"
        self.lock_dir.mkdir(exist_ok=True)

    def _get_cache_path(self, city: str, year_month: str) -> Path:
        """Generate cache file path for a city and year-month"""
        return self.cache_dir / f"{city}_{year_month}.parquet"

    def _get_lock_path(self, city: str, year_month: str) -> Path:
        """Generate lock file path for a city and year-month"""
        return self.lock_dir / f"{city}_{year_month}.lock"

    def cache_city_data(
        self,
        city: str,
        start_date: datetime,
        end_date: datetime,
        overwrite: bool = False
    ) -> None:
        """Cache processed weather data for a specific city and date range"""
        # Convert city name to lowercase for consistency
        city = city.lower().replace(" ", "_")

        # Get city coordinates
        if city not in CITY_COORDINATES:
            logger.error(f"Unknown city: {city}")
            raise ValueError(f"Unknown city: {city}")

        # Group by year-month to create manageable cache files
        current_date = start_date.replace(day=1)  # Start at beginning of month
        end_month = end_date.replace(day=1)

        while current_date <= end_month:
            year_month = current_date.strftime("%Y_%m")
            cache_path = self._get_cache_path(city, year_month)
            lock_path = self._get_lock_path(city, year_month)

            # Check if data already exists and we're not forcing an overwrite
            if cache_path.exists() and not overwrite:
                logger.debug(f"Cache already exists for {city} for {year_month}")
                current_date = (current_date.replace(day=28) + timedelta(days=4)).replace(day=1)  # Next month
                continue

            # Simple file-based locking to prevent concurrent updates
            if lock_path.exists():
                logger.warning(f"Cache update already in progress for {city} for {year_month}")
                current_date = (current_date.replace(day=28) + timedelta(days=4)).replace(day=1)  # Next month
                continue

            try:
                # Create lock file
                lock_path.touch()

                # Calculate month end date
                next_month = (current_date.replace(day=28) + timedelta(days=4)).replace(day=1)
                month_end = next_month - timedelta(days=1)

                # Adjust if this is the last month in the range
                if month_end > end_date:
                    month_end = end_date

                # Fetch raw data
                logger.info(f"Fetching data for {city} from {current_date.date()} to {month_end.date()}")
                raw_data = self.weather_client.fetch_historical_data(
                    city_name=city,
                    latitude=CITY_COORDINATES[city]["lat"],
                    longitude=CITY_COORDINATES[city]["lon"],
                    start_date=current_date,
                    end_date=month_end
                )

                if raw_data.is_empty():
                    logger.warning(f"No data available for {city} for {year_month}")
                    current_date = next_month  # Move to next month
                    continue

                # Process data
                processed_data = self.processor.calculate_metrics(raw_data)
                processed_data = self.processor.generate_alerts(processed_data)

                # Save to cache
                processed_data.write_parquet(cache_path)
                logger.info(f"Cached data for {city} for {year_month}")

                # Move to next month
                current_date = next_month

            except Exception as e:
                logger.error(f"Failed to cache {city} data for {year_month}: {str(e)}")
                raise
            finally:
                # Remove lock file
                if lock_path.exists():
                    lock_path.unlink()

        # Clean up old cache files
        self._cleanup_old_cache_files(city)

    def _cleanup_old_cache_files(self, city: str) -> None:
        """Remove cache files older than retention_days"""
        try:
            cutoff_date = datetime.now() - timedelta(days=self.retention_days)
            cutoff_year_month = cutoff_date.strftime("%Y_%m")

            for file_path in self.cache_dir.glob(f"{city}_*.parquet"):
                file_year_month = file_path.stem.split('_', 1)[1]
                if file_year_month < cutoff_year_month:
                    file_path.unlink()
                    logger.debug(f"Removed old cache file: {file_path}")
        except Exception as e:
            logger.error(f"Error cleaning up old cache files: {str(e)}")

    def load_cached_data(
        self,
        city: str,
        start_date: datetime,
        end_date: datetime
    ) -> pl.DataFrame:
        """Load cached data for a date range
        NOTE to self: I think I just used glob to get all parquet files in the cache_dir and then concatenated the dataframes and ran .unique() to get rid of any duplicate row. Much easier but less efficient.
        """
        city = city.lower().replace(" ", "_")
        dfs = []

        # Convert dates to months to retrieve cache files
        current_date = start_date.replace(day=1)  # Start at beginning of month
        end_month = end_date.replace(day=1)

        while current_date <= end_month:
            year_month = current_date.strftime("%Y_%m")
            cache_path = self._get_cache_path(city, year_month)

            if cache_path.exists():
                try:
                    df = pl.read_parquet(cache_path)
                    # Filter for just the dates we want
                    df = df.filter(
                        (pl.col("date") >= start_date) &
                        (pl.col("date") <= end_date)
                    )
                    dfs.append(df)
                except Exception as e:
                    logger.error(f"Error reading cache file {cache_path}: {str(e)}")
            else:
                logger.debug(f"No cache file for {city} for {year_month}")

            # Move to next month
            current_date = (current_date.replace(day=28) + timedelta(days=4)).replace(day=1)

        if not dfs:
            logger.warning(f"No cached data found for {city} between {start_date.date()} and {end_date.date()}")
            return pl.DataFrame()

        return pl.concat(dfs)

Core Component 4: Cache State Management

Now let's implement a singleton cache state manager to maintain an in-memory cache. The key features are:

  • Singleton pattern ensures a single source of truth for cached data
  • Preloads data for popular cities
  • Smart refresh logic based on timestamp and cache state
  • Provides cache statistics for monitoring
  • Thread-safe design with proper error handling
# data/cache_state.py
from typing import Dict, Optional
import polars as pl
from datetime import datetime, timedelta
import logging

from data.cache_manager import CacheManager

logger = logging.getLogger(__name__)

class CacheState:
    _instance: Optional["CacheState"] = None
    _cache: Dict[str, pl.DataFrame] = {}  # city -> DataFrame mapping
    _cache_manager = None # Note the use of a class variable here!
    _last_updated: Dict[str, datetime] = {}  # Track last update for each city

    def __new__(cls) -> "CacheState":
        if cls._instance is None:
            logger.info("Initializing CacheState singleton")
            cls._instance = super(CacheState, cls).__new__(cls)
            cls._instance._initialize()
        return cls._instance

    def _initialize(self) -> None:
        """Initialize cache manager and load initial data"""
        # Note that self._cache_manager is a class variable not an instance variable. Shared across instances.
        self._cache_manager = CacheManager(
            cache_dir="data/cache",
            retention_days=30
        )
        self._load_recent_data()

    def _load_recent_data(self) -> None:
        """Load last 30 days of data for popular cities"""
        popular_cities = ["london", "new_york", "tokyo", "sydney", "cairo"]
        end_date = datetime.now()
        start_date = end_date - timedelta(days=30)

        logger.info(f"Preloading data for {len(popular_cities)} popular cities")
        for city in popular_cities:
            try:
                df = self._cache_manager.load_cached_data(
                    city=city,
                    start_date=start_date,
                    end_date=end_date
                )
                if not df.is_empty():
                    self._cache[city] = df
                    self._last_updated[city] = datetime.now()
                    logger.info(f"Preloaded {len(df)} records for {city}")
                else:
                    logger.warning(f"No data available to preload for {city}")
            except Exception as e:
                logger.error(f"Failed to load cache for {city}: {str(e)}")

    def get_city_data(
        self,
        city: str,
        start_date: datetime,
        end_date: datetime,
        force_refresh: bool = False
    ) -> pl.DataFrame:
        """Get data for a specific city and date range"""
        city = city.lower().replace(" ", "_")

        # Check if we need to refresh the cache
        needs_refresh = (
            city not in self._cache or
            force_refresh or
            city not in self._last_updated or
            (datetime.now() - self._last_updated.get(city, datetime.min)).seconds > 3600  # 1-hour cache
        )

        if needs_refresh:
            logger.info(f"Refreshing cache for {city}")
            try:
                df = self._cache_manager.load_cached_data(
                    city=city,
                    start_date=start_date,
                    end_date=end_date
                )
                if not df.is_empty():
                    self._cache[city] = df
                    self._last_updated[city] = datetime.now()
                    logger.info(f"Cache refreshed for {city} with {len(df)} records")
                else:
                    logger.warning(f"No data available to refresh cache for {city}")
            except Exception as e:
                logger.error(f"Failed to refresh cache for {city}: {str(e)}")
                # If refresh fails but we have cached data, use it
                if city in self._cache:
                    logger.info(f"Using existing cache for {city}")
                else:
                    logger.error(f"No cached data available for {city}")
                    raise

        # Filter cached data for requested date range
        if city in self._cache:
            filtered_df = self._cache[city].filter(
                (pl.col("date") >= start_date) &
                (pl.col("date") <= end_date)
            )
            logger.debug(f"Returning {len(filtered_df)} records for {city} from cached data")
            return filtered_df

        logger.warning(f"No cached data available for {city}")
        return pl.DataFrame()

    def update_city(self, city: str, start_date: datetime, end_date: datetime, overwrite: bool = True) -> None:
        """Update cache for a specific city and date range"""
        city = city.lower().replace(" ", "_")
        try:
            logger.info(f"Updating cache for {city} from {start_date.date()} to {end_date.date()}")
            self._cache_manager.cache_city_data(
                city=city,
                start_date=start_date,
                end_date=end_date,
                overwrite=overwrite
            )
            # Refresh in-memory cache
            self._cache[city] = self._cache_manager.load_cached_data(
                city=city,
                start_date=start_date - timedelta(days=30),  # Load a bit more data
                end_date=end_date
            )
            self._last_updated[city] = datetime.now()
            logger.info(f"Cache updated for {city}")
        except Exception as e:
            logger.error(f"Failed to update {city}: {str(e)}")
            raise

    def get_cache_stats(self) -> Dict:
        """Return statistics about the current cache state"""
        return {
            "cached_cities": list(self._cache.keys()),
            "last_updated": {k: v.isoformat() for k, v in self._last_updated.items()},
            "records_per_city": {k: len(v) for k, v in self._cache.items()},
            "memory_usage_mb": {
                k: round(v.estimated_size() / (1024 * 1024), 2)
                for k, v in self._cache.items()
            }
        }

# Global singleton instance
cache_state = CacheState()

Core Component 5: Session Management

Now let's implement session management to handle user preferences. Key features:

  • Manages user session preferences
  • Sets sensible defaults for new sessions
  • Retrieves data from the cache based on session parameters
  • Comprehensive error handling
# app/session.py
from datetime import datetime, timedelta
from typing import Dict, Any, Optional
import logging

from data.cache_state import cache_state

logger = logging.getLogger(__name__)

def init_session(
    session: Dict[str, Any],
    city: Optional[str] = None,
    start_date: Optional[datetime] = None,
    reset: bool = False
) -> None:
    """Initialize or update session state"""
    if ("initialized" not in session) or reset:
        logger.info("Initializing new session")
        session.clear()

        # Set default city
        session["city"] = city or "london"

        # Set default date range (last 30 days)
        end_date = datetime.now().replace(hour=0, minute=0, second=0, microsecond=0)
        session["end_date"] = end_date.isoformat()
        session["start_date"] = (end_date - timedelta(days=30)).isoformat()

        # Chart preferences
        session["chart_type"] = "line"
        session["metrics"] = ["temp_max", "temp_min", "precipitation"]
        session["show_alerts"] = True

        session["initialized"] = True
        logger.debug(f"Session initialized with city={session['city']}")

    # Update specific fields if provided
    if city:
        session["city"] = city.lower()
    if start_date:
        session["start_date"] = start_date.isoformat()

def get_data_for_session(session: Dict[str, Any]) -> pl.DataFrame:
    """Get cached data based on session parameters"""
    try:
        city = session["city"]
        start_date = datetime.fromisoformat(session["start_date"])
        end_date = datetime.fromisoformat(session["end_date"])

        logger.debug(f"Getting data for session: city={city}, start={start_date.date()}, end={end_date.date()}")

        df = cache_state.get_city_data(
            city=city,
            start_date=start_date,
            end_date=end_date
        )

        logger.debug(f"Retrieved {len(df)} records for session")
        return df
    except Exception as e:
        logger.error(f"Error getting data for session: {str(e)}")
        raise

Core Component 6: Chart Components

Skip this section if you're just interested in the caching mechanism

Let's create a powerful visualization component. Key features:

  • Creates interactive Plotly charts with temperature and precipitation data
  • Displays moving averages and weather alerts
  • Responsive design with proper error handling
  • Adapts to available data columns
# app/components/charts.py
from typing import Dict, Any
import plotly.graph_objects as go
from plotly.subplots import make_subplots
from fasthtml.common import *
import polars as pl
import logging

logger = logging.getLogger(__name__)

def create_weather_chart(df: pl.DataFrame, session: Dict[str, Any]) -> Div:
    """Create an interactive weather chart with metrics and alerts"""
    try:
        if df.is_empty():
            return Div(
                "No data available for selected period",
                cls="p-4 text-center text-gray-500"
            )

        # Ensure we have all required columns
        required_cols = ["date", "temp_max", "temp_min", "precipitation"]
        if not all(col in df.columns for col in required_cols):
            missing = [col for col in required_cols if col not in df.columns]
            logger.error(f"Missing required columns in data: {missing}")
            return Div(
                f"Data is incomplete (missing: {', '.join(missing)})",
                cls="p-4 text-center text-red-500"
            )

        # Create figure with secondary y-axis for precipitation
        fig = make_subplots(
            rows=2,
            cols=1,
            row_heights=[0.7, 0.3],
            shared_xaxes=True,
            vertical_spacing=0.05,
            subplot_titles=(f"Temperature for {session['city'].title()}", "Precipitation")
        )

        # Add temperature data
        fig.add_trace(
            go.Scatter(
                x=df["date"],
                y=df["temp_max"],
                mode="lines",
                name="Max Temp (°C)",
                line=dict(color="red")
            ),
            row=1, col=1
        )

        fig.add_trace(
            go.Scatter(
                x=df["date"],
                y=df["temp_min"],
                mode="lines",
                name="Min Temp (°C)",
                line=dict(color="blue")
            ),
            row=1, col=1
        )

        # Add moving averages if they exist
        if "temp_max_7d_avg" in df.columns and "temp_max_7d_avg" in session.get("metrics", []):
            fig.add_trace(
                go.Scatter(
                    x=df["date"],
                    y=df["temp_max_7d_avg"],
                    mode="lines",
                    name="7-day Max Avg",
                    line=dict(color="orange", dash="dash")
                ),
                row=1, col=1
            )

        if "temp_min_7d_avg" in df.columns and "temp_min_7d_avg" in session.get("metrics", []):
            fig.add_trace(
                go.Scatter(
                    x=df["date"],
                    y=df["temp_min_7d_avg"],
                    mode="lines",
                    name="7-day Min Avg",
                    line=dict(color="lightblue", dash="dash")
                ),
                row=1, col=1
            )

        # Add precipitation bars
        fig.add_trace(
            go.Bar(
                x=df["date"],
                y=df["precipitation"],
                name="Precipitation (mm)",
                marker_color="blue"
            ),
            row=2, col=1
        )

        # Add weather alerts if enabled
        if session.get("show_alerts", False):
            alert_cols = ["heat_alert", "heavy_rain_alert", "high_wind_alert"]
            if all(col in df.columns for col in alert_cols):
                # Heat alerts
                heat_alerts = df.filter(pl.col("heat_alert"))
                if not heat_alerts.is_empty():
                    fig.add_trace(
                        go.Scatter(
                            x=heat_alerts["date"],
                            y=heat_alerts["temp_max"],
                            mode="markers",
                            name="Heat Alert",
                            marker=dict(
                                symbol="triangle-up",
                                size=12,
                                color="red"
                            )
                        ),
                        row=1, col=1
                    )

                # Heavy rain alerts
                rain_alerts = df.filter(pl.col("heavy_rain_alert"))
                if not rain_alerts.is_empty():
                    fig.add_trace(
                        go.Scatter(
                            x=rain_alerts["date"],
                            y=rain_alerts["precipitation"],
                            mode="markers",
                            name="Heavy Rain Alert",
                            marker=dict(
                                symbol="triangle-up",
                                size=12,
                                color="darkblue"
                            )
                        ),
                        row=2, col=1
                    )

        # Update layout
        fig.update_layout(
            height=600,
            template="plotly_white",
            showlegend=True,
            legend=dict(
                orientation="h",
                yanchor="bottom",
                y=1.02,
                xanchor="right",
                x=1
            ),
            margin=dict(l=20, r=20, t=50, b=20),
        )

        # Add y-axis titles
        fig.update_yaxes(title_text="Temperature (°C)", row=1, col=1)
        fig.update_yaxes(title_text="Precipitation (mm)", row=2, col=1)

        # Convert to FastHTML component
        return Div(
            H2(
                f"Weather Data for {session['city'].title()}",
                cls="text-xl font-bold mb-4"
            ),
            NotStr(fig.to_html(
                full_html=False,
                include_plotlyjs=False,
                config={
                    'displayModeBar': True,
                    'responsive': True,
                    'scrollZoom': True
                }
            )),
            id="weather-chart"
        )
    except Exception as e:
        logger.error(f"Error creating weather chart: {str(e)}")
        return Div(
            f"Error creating chart: {str(e)}",
            cls="p-4 text-center text-red-500"
        )

Core Component 7: Main Application

Now let's put it all together in our main application:

# main.py
from datetime import datetime
from fasthtml.common import *
from app.cache_update import cache_app
from app.session import init_session, get_data_for_session
from app.components.charts import create_weather_chart
from data.cache_state import cache_state
import logging

from configuration.logging_config import setup_logging

# Setup logging
setup_logging()
logger = logging.getLogger(__name__)

app, rt = fast_app(
    hdrs=(
        Script(src="https://cdn.plot.ly/plotly-latest.min.js"),
        Script(src="https://cdn.tailwindcss.com"),
        Link(
            rel="stylesheet",
            href="https://cdn.jsdelivr.net/npm/[email protected]/daisyui.css",
        ),
    ),
    routes=[
        Mount("/cache", cache_app),  # Mount cache update endpoints
    ],
    pico=False,
)

@app.get("/")
def landing(session):
    """Main landing page"""
    logger.info("Rendering landing page")
    init_session(session)

    try:
        df = get_data_for_session(session)

        return Div(
            # Header
            Div(
                H1("Weather Dashboard", cls="text-2xl font-bold"),
                P("Historical weather data with interactive visualizations", cls="text-gray-600"),
                cls="mb-6"
            ),

            # Controls
            Div(
                Form(
                    id="controls-form",
                    cls="flex flex-wrap gap-4 items-end",
                    hx_post="/update-view",
                    hx_target="#chart-container",
                    hx_indicator="#loading-indicator"
                )(
                    # City selection
                    Div(cls="form-control")(
                        Label("City", cls="label"),
                        Select(
                            name="city",
                            cls="select select-bordered"
                        )(
                            Option("London", value="london", selected=(session["city"] == "london")),
                            Option("New York", value="new_york", selected=(session["city"] == "new_york")),
                            Option("Tokyo", value="tokyo", selected=(session["city"] == "tokyo")),
                            Option("Sydney", value="sydney", selected=(session["city"] == "sydney")),
                            Option("Cairo", value="cairo", selected=(session["city"] == "cairo")),
                        ),
                    ),

                    # Date range
                    Div(cls="form-control")(
                        Label("Start Date", cls="label"),
                        Input(
                            type="date",
                            name="start_date",
                            value=datetime.fromisoformat(session["start_date"])
                                .strftime("%Y-%m-%d"),
                            cls="input input-bordered"
                        ),
                    ),

                    Div(cls="form-control")(
                        Label("End Date", cls="label"),
                        Input(
                            type="date",
                            name="end_date",
                            value=datetime.fromisoformat(session["end_date"])
                                .strftime("%Y-%m-%d"),
                            cls="input input-bordered"
                        ),
                    ),

                    # Metrics
                    Div(cls="form-control")(
                        Label("Show Moving Averages", cls="label"),
                        Div(
                            Input(
                                type="checkbox",
                                name="show_averages",
                                checked="checked" if "temp_max_7d_avg" in session.get("metrics", []) else None,
                                cls="checkbox"
                            ),
                            cls="flex items-center gap-2"
                        ),
                    ),

                    # Alerts
                    Div(cls="form-control")(
                        Label("Show Weather Alerts", cls="label"),
                        Div(
                            Input(
                                type="checkbox",
                                name="show_alerts",
                                checked="checked" if session.get("show_alerts", False) else None,
                                cls="checkbox"
                            ),
                            cls="flex items-center gap-2"
                        ),
                    ),

                    Button("Update", cls="btn btn-primary"),
                ),
                cls="mb-6"
            ),

            # Loading indicator
            Div(
                "Loading...",
                id="loading-indicator",
                cls="htmx-indicator p-4 text-center"
            ),

            # Chart container
            Div(
                create_weather_chart(df, session),
                id="chart-container",
                cls="border rounded-lg p-4 bg-white"
            ),

            # Cache status
            Div(
                P(
                    "Last updated: ",
                    datetime.fromisoformat(
                        cache_state._last_updated.get(
                            session["city"],
                            datetime.min.isoformat()
                        )
                    ).strftime("%Y-%m-%d %H:%M:%S"),
                    cls="text-sm text-gray-500"
                ),
                Button(
                    "Refresh Data",
                    cls="btn btn-sm btn-outline",
                    hx_post=f"/cache/update?city={session['city']}",
                    hx_target="#chart-container"
                ),
                cls="mt-4"
            )
        )
    except Exception as e:
        logger.error(f"Error rendering landing page: {str(e)}")
        return Div(
            H1("Weather Dashboard", cls="text-2xl font-bold"),
            P(f"Error loading dashboard: {str(e)}", cls="text-red-500"),
        )

@app.post("/update-view")
def update_view(
    session,
    city: str,
    start_date: str,
    end_date: str,
    show_averages: Optional[str] = None,
    show_alerts: Optional[str] = None
):
    """Update view based on user selections"""
    try:
        # Update session
        session["city"] = city.lower()
        session["start_date"] = datetime.strptime(start_date, "%Y-%m-%d").isoformat()
        session["end_date"] = datetime.strptime(end_date, "%Y-%m-%d").isoformat()

        # Update chart preferences
        if show_averages:
            session["metrics"] = ["temp_max", "temp_min", "precipitation", "temp_max_7d_avg", "temp_min_7d_avg"]
        else:
            session["metrics"] = ["temp_max", "temp_min", "precipitation"]

        session["show_alerts"] = show_alerts is not None

        # Get fresh data
        df = get_data_for_session(session)

        # Return updated chart
        return create_weather_chart(df, session)
    except Exception as e:
        logger.error(f"Error updating view: {str(e)}")
        return Div(
            f"Error updating chart: {str(e)}",
            cls="p-4 text-center text-red-500"
        )

Cache Update Endpoints

Let's also implement the cache update endpoints:

# app/cache_update.py
import logging
import traceback
from datetime import datetime, timedelta
from typing import Optional

from fasthtml.common import *
from data.cache_state import cache_state

logger = logging.getLogger(__name__)

cache_app, _ = fast_app()

@cache_app.post("/update")
def update_cache(city: Optional[str] = None):
    """Update cache for a city or multiple cities"""
    try:
        now = datetime.now()
        start_date = now - timedelta(days=30)
        end_date = now

        if city:
            # Update specific city
            city = city.lower()
            logger.info(f"Manually updating cache for {city}")
            cache_state.update_city(
                city=city,
                start_date=start_date,
                end_date=end_date,
                overwrite=True
            )
            return Div(
                P(f"Cache updated for {city}"),
                cls="p-4 text-center text-green-500"
            )
        else:
            # Update all popular cities
            cities = ["london", "new_york", "tokyo", "sydney", "cairo"]
            logger.info(f"Manually updating cache for {len(cities)} cities")

            for city in cities:
                try:
                    cache_state.update_city(
                        city=city,
                        start_date=start_date,
                        end_date=end_date,
                        overwrite=True
                    )
                except Exception as e:
                    logger.error(f"Failed to update {city}: {str(e)}")

            return Div(
                P("Cache updated for all cities"),
                cls="p-4 text-center text-green-500"
            )

    except Exception as e:
        logger.error(f"Caching failed:\n{str(e)}")
        logger.error(f"Traceback:\n{traceback.format_exc()}")
        return Div(
            P(f"Error updating cache: {str(e)}"),
            cls="p-4 text-center text-red-500"
        )

@cache_app.get("/status")
def cache_status():
    """Get cache status information"""
    try:
        stats = cache_state.get_cache_stats()

        return Div(
            H2("Cache Status", cls="text-xl font-bold mb-4"),
            Table(cls="table w-full")(
                Thead(
                    Tr(
                        Th("City"),
                        Th("Last Updated"),
                        Th("Records"),
                        Th("Size (MB)")
                    )
                ),
                Tbody(
                    *[
                        Tr(
                            Td(city.title()),
                            Td(stats["last_updated"].get(city, "Never")),
                            Td(str(stats["records_per_city"].get(city, 0))),
                            Td(str(stats["memory_usage_mb"].get(city, 0)))
                        )
                        for city in stats["cached_cities"]
                    ]
                )
            )
        )
    except Exception as e:
        logger.error(f"Error getting cache status: {str(e)}")
        return Div(
            P(f"Error getting cache status: {str(e)}"),
            cls="p-4 text-center text-red-500"
        )

Automated Cache Updates

Finally, let's create a script that can be run periodically to update the cache:

# update_cache.py
import argparse
import logging
from datetime import datetime, timedelta
import requests
import time

from configuration.logging_config import setup_logging

setup_logging()
logger = logging.getLogger(__name__)

def trigger_cache_update(city=None, base_url="http://localhost:5001"):
    """Trigger cache update via server endpoint"""
    try:
        if city:
            endpoint = f"{base_url}/cache/update?city={city}"
            logger.info(f"Triggering cache update for {city}")
        else:
            endpoint = f"{base_url}/cache/update"
            logger.info("Triggering full cache update")

        response = requests.post(endpoint, timeout=30)
        response.raise_for_status()

        logger.info("Cache update triggered successfully")
        return True
    except requests.RequestException as e:
        logger.error(f"Error triggering cache update: {str(e)}")
        return False

def run_daily_update():
    """Run a full cache update once per day"""
    while True:
        try:
            logger.info("Running daily cache update")
            trigger_cache_update()

            # Sleep for 24 hours
            logger.info("Cache update complete. Sleeping for 24 hours...")
            time.sleep(86400)  # 24 hours
        except Exception as e:
            logger.error(f"Error in daily update loop: {str(e)}")
            time.sleep(3600)  # Wait an hour before retrying after error

if __name__ == "__main__":
    parser = argparse.ArgumentParser(description="Trigger weather data cache updates")
    parser.add_argument(
        "--city",
        type=str,
        help="Specific city to update (optional)",
    )
    parser.add_argument(
        "--daemon",
        action="store_true",
        help="Run as a daemon process for periodic updates",
    )
    parser.add_argument(
        "--url",
        type=str,
        default="http://localhost:5001",
        help="Base URL of the weather app",
    )

    args = parser.parse_args()

    if args.daemon:
        logger.info("Starting cache update daemon")
        run_daily_update()
    else:
        trigger_cache_update(args.city, args.url)

You can run this script manually or set it up as a cron job or systemd service:

# Run manually for a specific city
python update_cache.py --city london

# Run manually for all cities
python update_cache.py

# Run as a daemon process
python update_cache.py --daemon

Setting Up a Cron Job for Cache Updates

For production environments, we can use cron jobs for scheduled updates. Run crontab -e and add lines like this.

# Update all cities once per day at 2 AM
0 2 * * * cd /path/to/project/weater_app_folder && /home/username/.cargo/bin/uv run update_cache.py

# Update popular cities every 6 hours
0 */6 * * * cd /path/to/project/weater_app_folder && /home/username/.cargo/bin/uv run update_cache.py --city london
0 */6 * * * cd /path/to/project/weater_app_folder && /home/username/.cargo/bin/uv run update_cache.py --city new_york

Putting It All Together

The complete architecture we've built provides:

  1. Efficient Data Retrieval: The WeatherDataClient fetches data from a public API
  2. Data Processing: The WeatherDataProcessor calculates metrics and alerts
  3. Persistent Caching: The CacheManager stores processed data in Parquet files
  4. In-Memory Caching: The CacheState singleton provides fast in-memory access to frequently used data
  5. Session Management: Manages user preferences and state
  6. Visualization: Interactive charts display the weather data
  7. Automated Updates: Scheduled cache refreshes via cron jobs

Key Benefits of This Caching Architecture:

  1. Reduced API Calls: By caching data, we minimize calls to external APIs
  2. Improved Performance: Precomputed metrics mean faster page loads
  3. Reduced Computational Load: Calculations happen once, results are reused
  4. Resilience: The system can still function if external APIs are down
  5. Scalability: Cache can be expanded to more cities and longer time periods
  6. Freshness Control: Configurable update intervals balance freshness with performance

Conclusion

We've built a robust caching system for a weather dashboard using FastHTML. The weater use case is just an example needless to say.

The multi-layered caching approach (file-based + in-memory) provides a good balance between persistence and speed. By precomputing metrics and storing them efficiently in Parquet files, we minimise both computation time and storage requirements.