TLDR; This post shows how to build a caching system for a data dashboard in FastHTML. It combines Parquet files for persistence with in-memory storage for speed. By pre-computing metrics and storing them in a custom cache class, the system delivers fast responses while minimising API calls and server load. It's designed to handle outages and rate limits gracefully, making it ideal for any dashboard that needs up-to-date data. The code is adapted from another project (not weather related) by an LLM, so the idea is the same from the caching point of view but I haven't verified the weather implementation works perfectly! I have specified at the beginning of each section whether you can skip it if you are just interested in the caching mechanism and not any of the weather stuff.
Imagine you're tasked with building a weather dashboard (or API - the caching would allow for more performant APIs also) where users can view historical weather data for different cities. Think any dashboard that needs live or non-static data. The application needs to:
However, several challenges make this non-trivial:
The solution? A robust caching system that balances data freshness with performance, pre-computing and storing results while allowing for periodic updates.
Let's start with our project structure:
weather_dashboard/
├── app/
│ ├── __init__.py
│ ├── cache_update.py # Cache update endpoints
│ └── components/ # UI components
│ ├── __init__.py
│ └── charts.py # Weather chart components
├── data/
│ ├── __init__.py
│ ├── cache/ # Cached parquet files
│ ├── cache_manager.py # Cache management
│ └── cache_state.py # Global cache state
├── weather_data/
│ ├── __init__.py
│ ├── client.py # API client
│ └── processor.py # Data processing
├── configuration/
│ ├── __init__.py
│ ├── constants.py # Config constants
│ └── logging_config.py # Logging setup
├── logs/ # Log files
├── main.py # Application entry
├── update_cache.py # Script that calls the cache update endpoint (invoked via cron job)
├── pyproject.toml # requirements etc.
└── README.md # Documentation
First, install the required packages (I use uv
for package management and find it infinitely better than pip
):
uv sync
Skip this section if you're just interested in the caching mechanism
First, we need a client for fetching weather data from a public API (we'll use Open-Meteo, which is completely free and doesn't require authentication). The key features are:
# weather_data/client.py
from datetime import datetime
import requests
import polars as pl
import logging
logger = logging.getLogger(__name__)
class WeatherDataClient:
def __init__(self):
self.base_url = "https://archive-api.open-meteo.com/v1/archive"
def fetch_historical_data(
self,
city_name: str,
latitude: float,
longitude: float,
start_date: datetime,
end_date: datetime
) -> pl.DataFrame:
"""Fetch historical weather data for a given location and date range"""
try:
# Format dates for the API
start_date_str = start_date.strftime("%Y-%m-%d")
end_date_str = end_date.strftime("%Y-%m-%d")
# Make API request
response = requests.get(
self.base_url,
params={
"latitude": latitude,
"longitude": longitude,
"start_date": start_date_str,
"end_date": end_date_str,
"daily": "temperature_2m_max,temperature_2m_min,precipitation_sum,windspeed_10m_max",
"timezone": "GMT"
},
timeout=10 # Added timeout for production reliability
)
response.raise_for_status()
# Parse API response
data = response.json()
# Convert to polars DataFrame
daily_data = {
"date": data["daily"]["time"],
"temp_max": data["daily"]["temperature_2m_max"],
"temp_min": data["daily"]["temperature_2m_min"],
"precipitation": data["daily"]["precipitation_sum"],
"wind_speed": data["daily"]["windspeed_10m_max"],
}
df = pl.DataFrame(daily_data)
# Add city name column for easy identification
df = df.with_columns(pl.lit(city_name).alias("city"))
# Convert date strings to datetime objects
df = df.with_columns(
pl.col("date").str.strptime(pl.Date, fmt="%Y-%m-%d")
)
return df
except requests.RequestException as e:
logger.error(f"API request failed for {city_name}: {str(e)}")
raise
except Exception as e:
logger.error(f"Error processing data for {city_name}: {str(e)}")
raise
Also skip this section if you're just interested in the caching mechanism
Next, we need a processor for calculating weather metrics. The key features are:
# weather_data/processor.py
import polars as pl
import logging
logger = logging.getLogger(__name__)
class WeatherDataProcessor:
@staticmethod
def calculate_metrics(df: pl.DataFrame) -> pl.DataFrame:
"""Calculate additional weather metrics"""
if df.is_empty():
return df
try:
# Calculate temperature range (daily high - low)
df = df.with_columns(
(pl.col("temp_max") - pl.col("temp_min")).alias("temp_range")
)
# Calculate 7-day moving averages
df = df.with_columns([
pl.col("temp_max").rolling_mean(window_size=7).alias("temp_max_7d_avg"),
pl.col("temp_min").rolling_mean(window_size=7).alias("temp_min_7d_avg"),
pl.col("precipitation").rolling_mean(window_size=7).alias("precip_7d_avg")
])
# Calculate cumulative precipitation
df = df.with_columns(
pl.col("precipitation").cum_sum().alias("cumulative_precip")
)
return df
except Exception as e:
logger.error(f"Error calculating metrics: {str(e)}")
raise
@staticmethod
def generate_alerts(df: pl.DataFrame) -> pl.DataFrame:
"""Generate weather alerts based on conditions"""
if df.is_empty():
return df
try:
return df.with_columns([
# High temperature alert (over 30°C/86°F)
(pl.col("temp_max") > 30).alias("heat_alert"),
# Heavy rain alert (over 20mm/~0.8in)
(pl.col("precipitation") > 20).alias("heavy_rain_alert"),
# Strong wind alert (over 40 km/h or ~25 mph)
(pl.col("wind_speed") > 40).alias("high_wind_alert"),
# Extreme temperature range (daily swing > 20°C/36°F)
(pl.col("temp_range") > 20).alias("extreme_temp_range_alert")
])
except Exception as e:
logger.error(f"Error generating alerts: {str(e)}")
raise
Now we need a cache manager to store processed data efficiently. The key features are:
# data/cache_manager.py
from datetime import datetime, timedelta
from pathlib import Path
import polars as pl
import logging
from weather_data.client import WeatherDataClient
from weather_data.processor import WeatherDataProcessor
logger = logging.getLogger(__name__)
# City coordinates - hardcoded for simplicity
CITY_COORDINATES = {
"london": {"lat": 51.5074, "lon": -0.1278},
"new_york": {"lat": 40.7128, "lon": -74.0060},
"tokyo": {"lat": 35.6762, "lon": 139.6503},
"sydney": {"lat": -33.8688, "lon": 151.2093},
"cairo": {"lat": 30.0444, "lon": 31.2357}
}
class CacheManager:
def __init__(
self,
cache_dir: str = "data/cache",
retention_days: int = 30 # Keep a month of data
):
self.cache_dir = Path(cache_dir)
self.cache_dir.mkdir(parents=True, exist_ok=True)
self.retention_days = retention_days
self.weather_client = WeatherDataClient()
self.processor = WeatherDataProcessor()
# Create a file lock directory for concurrency control
self.lock_dir = self.cache_dir / "locks"
self.lock_dir.mkdir(exist_ok=True)
def _get_cache_path(self, city: str, year_month: str) -> Path:
"""Generate cache file path for a city and year-month"""
return self.cache_dir / f"{city}_{year_month}.parquet"
def _get_lock_path(self, city: str, year_month: str) -> Path:
"""Generate lock file path for a city and year-month"""
return self.lock_dir / f"{city}_{year_month}.lock"
def cache_city_data(
self,
city: str,
start_date: datetime,
end_date: datetime,
overwrite: bool = False
) -> None:
"""Cache processed weather data for a specific city and date range"""
# Convert city name to lowercase for consistency
city = city.lower().replace(" ", "_")
# Get city coordinates
if city not in CITY_COORDINATES:
logger.error(f"Unknown city: {city}")
raise ValueError(f"Unknown city: {city}")
# Group by year-month to create manageable cache files
current_date = start_date.replace(day=1) # Start at beginning of month
end_month = end_date.replace(day=1)
while current_date <= end_month:
year_month = current_date.strftime("%Y_%m")
cache_path = self._get_cache_path(city, year_month)
lock_path = self._get_lock_path(city, year_month)
# Check if data already exists and we're not forcing an overwrite
if cache_path.exists() and not overwrite:
logger.debug(f"Cache already exists for {city} for {year_month}")
current_date = (current_date.replace(day=28) + timedelta(days=4)).replace(day=1) # Next month
continue
# Simple file-based locking to prevent concurrent updates
if lock_path.exists():
logger.warning(f"Cache update already in progress for {city} for {year_month}")
current_date = (current_date.replace(day=28) + timedelta(days=4)).replace(day=1) # Next month
continue
try:
# Create lock file
lock_path.touch()
# Calculate month end date
next_month = (current_date.replace(day=28) + timedelta(days=4)).replace(day=1)
month_end = next_month - timedelta(days=1)
# Adjust if this is the last month in the range
if month_end > end_date:
month_end = end_date
# Fetch raw data
logger.info(f"Fetching data for {city} from {current_date.date()} to {month_end.date()}")
raw_data = self.weather_client.fetch_historical_data(
city_name=city,
latitude=CITY_COORDINATES[city]["lat"],
longitude=CITY_COORDINATES[city]["lon"],
start_date=current_date,
end_date=month_end
)
if raw_data.is_empty():
logger.warning(f"No data available for {city} for {year_month}")
current_date = next_month # Move to next month
continue
# Process data
processed_data = self.processor.calculate_metrics(raw_data)
processed_data = self.processor.generate_alerts(processed_data)
# Save to cache
processed_data.write_parquet(cache_path)
logger.info(f"Cached data for {city} for {year_month}")
# Move to next month
current_date = next_month
except Exception as e:
logger.error(f"Failed to cache {city} data for {year_month}: {str(e)}")
raise
finally:
# Remove lock file
if lock_path.exists():
lock_path.unlink()
# Clean up old cache files
self._cleanup_old_cache_files(city)
def _cleanup_old_cache_files(self, city: str) -> None:
"""Remove cache files older than retention_days"""
try:
cutoff_date = datetime.now() - timedelta(days=self.retention_days)
cutoff_year_month = cutoff_date.strftime("%Y_%m")
for file_path in self.cache_dir.glob(f"{city}_*.parquet"):
file_year_month = file_path.stem.split('_', 1)[1]
if file_year_month < cutoff_year_month:
file_path.unlink()
logger.debug(f"Removed old cache file: {file_path}")
except Exception as e:
logger.error(f"Error cleaning up old cache files: {str(e)}")
def load_cached_data(
self,
city: str,
start_date: datetime,
end_date: datetime
) -> pl.DataFrame:
"""Load cached data for a date range
NOTE to self: I think I just used glob to get all parquet files in the cache_dir and then concatenated the dataframes and ran .unique() to get rid of any duplicate row. Much easier but less efficient.
"""
city = city.lower().replace(" ", "_")
dfs = []
# Convert dates to months to retrieve cache files
current_date = start_date.replace(day=1) # Start at beginning of month
end_month = end_date.replace(day=1)
while current_date <= end_month:
year_month = current_date.strftime("%Y_%m")
cache_path = self._get_cache_path(city, year_month)
if cache_path.exists():
try:
df = pl.read_parquet(cache_path)
# Filter for just the dates we want
df = df.filter(
(pl.col("date") >= start_date) &
(pl.col("date") <= end_date)
)
dfs.append(df)
except Exception as e:
logger.error(f"Error reading cache file {cache_path}: {str(e)}")
else:
logger.debug(f"No cache file for {city} for {year_month}")
# Move to next month
current_date = (current_date.replace(day=28) + timedelta(days=4)).replace(day=1)
if not dfs:
logger.warning(f"No cached data found for {city} between {start_date.date()} and {end_date.date()}")
return pl.DataFrame()
return pl.concat(dfs)
Now let's implement a singleton cache state manager to maintain an in-memory cache. The key features are:
# data/cache_state.py
from typing import Dict, Optional
import polars as pl
from datetime import datetime, timedelta
import logging
from data.cache_manager import CacheManager
logger = logging.getLogger(__name__)
class CacheState:
_instance: Optional["CacheState"] = None
_cache: Dict[str, pl.DataFrame] = {} # city -> DataFrame mapping
_cache_manager = None # Note the use of a class variable here!
_last_updated: Dict[str, datetime] = {} # Track last update for each city
def __new__(cls) -> "CacheState":
if cls._instance is None:
logger.info("Initializing CacheState singleton")
cls._instance = super(CacheState, cls).__new__(cls)
cls._instance._initialize()
return cls._instance
def _initialize(self) -> None:
"""Initialize cache manager and load initial data"""
# Note that self._cache_manager is a class variable not an instance variable. Shared across instances.
self._cache_manager = CacheManager(
cache_dir="data/cache",
retention_days=30
)
self._load_recent_data()
def _load_recent_data(self) -> None:
"""Load last 30 days of data for popular cities"""
popular_cities = ["london", "new_york", "tokyo", "sydney", "cairo"]
end_date = datetime.now()
start_date = end_date - timedelta(days=30)
logger.info(f"Preloading data for {len(popular_cities)} popular cities")
for city in popular_cities:
try:
df = self._cache_manager.load_cached_data(
city=city,
start_date=start_date,
end_date=end_date
)
if not df.is_empty():
self._cache[city] = df
self._last_updated[city] = datetime.now()
logger.info(f"Preloaded {len(df)} records for {city}")
else:
logger.warning(f"No data available to preload for {city}")
except Exception as e:
logger.error(f"Failed to load cache for {city}: {str(e)}")
def get_city_data(
self,
city: str,
start_date: datetime,
end_date: datetime,
force_refresh: bool = False
) -> pl.DataFrame:
"""Get data for a specific city and date range"""
city = city.lower().replace(" ", "_")
# Check if we need to refresh the cache
needs_refresh = (
city not in self._cache or
force_refresh or
city not in self._last_updated or
(datetime.now() - self._last_updated.get(city, datetime.min)).seconds > 3600 # 1-hour cache
)
if needs_refresh:
logger.info(f"Refreshing cache for {city}")
try:
df = self._cache_manager.load_cached_data(
city=city,
start_date=start_date,
end_date=end_date
)
if not df.is_empty():
self._cache[city] = df
self._last_updated[city] = datetime.now()
logger.info(f"Cache refreshed for {city} with {len(df)} records")
else:
logger.warning(f"No data available to refresh cache for {city}")
except Exception as e:
logger.error(f"Failed to refresh cache for {city}: {str(e)}")
# If refresh fails but we have cached data, use it
if city in self._cache:
logger.info(f"Using existing cache for {city}")
else:
logger.error(f"No cached data available for {city}")
raise
# Filter cached data for requested date range
if city in self._cache:
filtered_df = self._cache[city].filter(
(pl.col("date") >= start_date) &
(pl.col("date") <= end_date)
)
logger.debug(f"Returning {len(filtered_df)} records for {city} from cached data")
return filtered_df
logger.warning(f"No cached data available for {city}")
return pl.DataFrame()
def update_city(self, city: str, start_date: datetime, end_date: datetime, overwrite: bool = True) -> None:
"""Update cache for a specific city and date range"""
city = city.lower().replace(" ", "_")
try:
logger.info(f"Updating cache for {city} from {start_date.date()} to {end_date.date()}")
self._cache_manager.cache_city_data(
city=city,
start_date=start_date,
end_date=end_date,
overwrite=overwrite
)
# Refresh in-memory cache
self._cache[city] = self._cache_manager.load_cached_data(
city=city,
start_date=start_date - timedelta(days=30), # Load a bit more data
end_date=end_date
)
self._last_updated[city] = datetime.now()
logger.info(f"Cache updated for {city}")
except Exception as e:
logger.error(f"Failed to update {city}: {str(e)}")
raise
def get_cache_stats(self) -> Dict:
"""Return statistics about the current cache state"""
return {
"cached_cities": list(self._cache.keys()),
"last_updated": {k: v.isoformat() for k, v in self._last_updated.items()},
"records_per_city": {k: len(v) for k, v in self._cache.items()},
"memory_usage_mb": {
k: round(v.estimated_size() / (1024 * 1024), 2)
for k, v in self._cache.items()
}
}
# Global singleton instance
cache_state = CacheState()
Now let's implement session management to handle user preferences. Key features:
# app/session.py
from datetime import datetime, timedelta
from typing import Dict, Any, Optional
import logging
from data.cache_state import cache_state
logger = logging.getLogger(__name__)
def init_session(
session: Dict[str, Any],
city: Optional[str] = None,
start_date: Optional[datetime] = None,
reset: bool = False
) -> None:
"""Initialize or update session state"""
if ("initialized" not in session) or reset:
logger.info("Initializing new session")
session.clear()
# Set default city
session["city"] = city or "london"
# Set default date range (last 30 days)
end_date = datetime.now().replace(hour=0, minute=0, second=0, microsecond=0)
session["end_date"] = end_date.isoformat()
session["start_date"] = (end_date - timedelta(days=30)).isoformat()
# Chart preferences
session["chart_type"] = "line"
session["metrics"] = ["temp_max", "temp_min", "precipitation"]
session["show_alerts"] = True
session["initialized"] = True
logger.debug(f"Session initialized with city={session['city']}")
# Update specific fields if provided
if city:
session["city"] = city.lower()
if start_date:
session["start_date"] = start_date.isoformat()
def get_data_for_session(session: Dict[str, Any]) -> pl.DataFrame:
"""Get cached data based on session parameters"""
try:
city = session["city"]
start_date = datetime.fromisoformat(session["start_date"])
end_date = datetime.fromisoformat(session["end_date"])
logger.debug(f"Getting data for session: city={city}, start={start_date.date()}, end={end_date.date()}")
df = cache_state.get_city_data(
city=city,
start_date=start_date,
end_date=end_date
)
logger.debug(f"Retrieved {len(df)} records for session")
return df
except Exception as e:
logger.error(f"Error getting data for session: {str(e)}")
raise
Skip this section if you're just interested in the caching mechanism
Let's create a powerful visualization component. Key features:
# app/components/charts.py
from typing import Dict, Any
import plotly.graph_objects as go
from plotly.subplots import make_subplots
from fasthtml.common import *
import polars as pl
import logging
logger = logging.getLogger(__name__)
def create_weather_chart(df: pl.DataFrame, session: Dict[str, Any]) -> Div:
"""Create an interactive weather chart with metrics and alerts"""
try:
if df.is_empty():
return Div(
"No data available for selected period",
cls="p-4 text-center text-gray-500"
)
# Ensure we have all required columns
required_cols = ["date", "temp_max", "temp_min", "precipitation"]
if not all(col in df.columns for col in required_cols):
missing = [col for col in required_cols if col not in df.columns]
logger.error(f"Missing required columns in data: {missing}")
return Div(
f"Data is incomplete (missing: {', '.join(missing)})",
cls="p-4 text-center text-red-500"
)
# Create figure with secondary y-axis for precipitation
fig = make_subplots(
rows=2,
cols=1,
row_heights=[0.7, 0.3],
shared_xaxes=True,
vertical_spacing=0.05,
subplot_titles=(f"Temperature for {session['city'].title()}", "Precipitation")
)
# Add temperature data
fig.add_trace(
go.Scatter(
x=df["date"],
y=df["temp_max"],
mode="lines",
name="Max Temp (°C)",
line=dict(color="red")
),
row=1, col=1
)
fig.add_trace(
go.Scatter(
x=df["date"],
y=df["temp_min"],
mode="lines",
name="Min Temp (°C)",
line=dict(color="blue")
),
row=1, col=1
)
# Add moving averages if they exist
if "temp_max_7d_avg" in df.columns and "temp_max_7d_avg" in session.get("metrics", []):
fig.add_trace(
go.Scatter(
x=df["date"],
y=df["temp_max_7d_avg"],
mode="lines",
name="7-day Max Avg",
line=dict(color="orange", dash="dash")
),
row=1, col=1
)
if "temp_min_7d_avg" in df.columns and "temp_min_7d_avg" in session.get("metrics", []):
fig.add_trace(
go.Scatter(
x=df["date"],
y=df["temp_min_7d_avg"],
mode="lines",
name="7-day Min Avg",
line=dict(color="lightblue", dash="dash")
),
row=1, col=1
)
# Add precipitation bars
fig.add_trace(
go.Bar(
x=df["date"],
y=df["precipitation"],
name="Precipitation (mm)",
marker_color="blue"
),
row=2, col=1
)
# Add weather alerts if enabled
if session.get("show_alerts", False):
alert_cols = ["heat_alert", "heavy_rain_alert", "high_wind_alert"]
if all(col in df.columns for col in alert_cols):
# Heat alerts
heat_alerts = df.filter(pl.col("heat_alert"))
if not heat_alerts.is_empty():
fig.add_trace(
go.Scatter(
x=heat_alerts["date"],
y=heat_alerts["temp_max"],
mode="markers",
name="Heat Alert",
marker=dict(
symbol="triangle-up",
size=12,
color="red"
)
),
row=1, col=1
)
# Heavy rain alerts
rain_alerts = df.filter(pl.col("heavy_rain_alert"))
if not rain_alerts.is_empty():
fig.add_trace(
go.Scatter(
x=rain_alerts["date"],
y=rain_alerts["precipitation"],
mode="markers",
name="Heavy Rain Alert",
marker=dict(
symbol="triangle-up",
size=12,
color="darkblue"
)
),
row=2, col=1
)
# Update layout
fig.update_layout(
height=600,
template="plotly_white",
showlegend=True,
legend=dict(
orientation="h",
yanchor="bottom",
y=1.02,
xanchor="right",
x=1
),
margin=dict(l=20, r=20, t=50, b=20),
)
# Add y-axis titles
fig.update_yaxes(title_text="Temperature (°C)", row=1, col=1)
fig.update_yaxes(title_text="Precipitation (mm)", row=2, col=1)
# Convert to FastHTML component
return Div(
H2(
f"Weather Data for {session['city'].title()}",
cls="text-xl font-bold mb-4"
),
NotStr(fig.to_html(
full_html=False,
include_plotlyjs=False,
config={
'displayModeBar': True,
'responsive': True,
'scrollZoom': True
}
)),
id="weather-chart"
)
except Exception as e:
logger.error(f"Error creating weather chart: {str(e)}")
return Div(
f"Error creating chart: {str(e)}",
cls="p-4 text-center text-red-500"
)
Now let's put it all together in our main application:
# main.py
from datetime import datetime
from fasthtml.common import *
from app.cache_update import cache_app
from app.session import init_session, get_data_for_session
from app.components.charts import create_weather_chart
from data.cache_state import cache_state
import logging
from configuration.logging_config import setup_logging
# Setup logging
setup_logging()
logger = logging.getLogger(__name__)
app, rt = fast_app(
hdrs=(
Script(src="https://cdn.plot.ly/plotly-latest.min.js"),
Script(src="https://cdn.tailwindcss.com"),
Link(
rel="stylesheet",
href="https://cdn.jsdelivr.net/npm/[email protected]/daisyui.css",
),
),
routes=[
Mount("/cache", cache_app), # Mount cache update endpoints
],
pico=False,
)
@app.get("/")
def landing(session):
"""Main landing page"""
logger.info("Rendering landing page")
init_session(session)
try:
df = get_data_for_session(session)
return Div(
# Header
Div(
H1("Weather Dashboard", cls="text-2xl font-bold"),
P("Historical weather data with interactive visualizations", cls="text-gray-600"),
cls="mb-6"
),
# Controls
Div(
Form(
id="controls-form",
cls="flex flex-wrap gap-4 items-end",
hx_post="/update-view",
hx_target="#chart-container",
hx_indicator="#loading-indicator"
)(
# City selection
Div(cls="form-control")(
Label("City", cls="label"),
Select(
name="city",
cls="select select-bordered"
)(
Option("London", value="london", selected=(session["city"] == "london")),
Option("New York", value="new_york", selected=(session["city"] == "new_york")),
Option("Tokyo", value="tokyo", selected=(session["city"] == "tokyo")),
Option("Sydney", value="sydney", selected=(session["city"] == "sydney")),
Option("Cairo", value="cairo", selected=(session["city"] == "cairo")),
),
),
# Date range
Div(cls="form-control")(
Label("Start Date", cls="label"),
Input(
type="date",
name="start_date",
value=datetime.fromisoformat(session["start_date"])
.strftime("%Y-%m-%d"),
cls="input input-bordered"
),
),
Div(cls="form-control")(
Label("End Date", cls="label"),
Input(
type="date",
name="end_date",
value=datetime.fromisoformat(session["end_date"])
.strftime("%Y-%m-%d"),
cls="input input-bordered"
),
),
# Metrics
Div(cls="form-control")(
Label("Show Moving Averages", cls="label"),
Div(
Input(
type="checkbox",
name="show_averages",
checked="checked" if "temp_max_7d_avg" in session.get("metrics", []) else None,
cls="checkbox"
),
cls="flex items-center gap-2"
),
),
# Alerts
Div(cls="form-control")(
Label("Show Weather Alerts", cls="label"),
Div(
Input(
type="checkbox",
name="show_alerts",
checked="checked" if session.get("show_alerts", False) else None,
cls="checkbox"
),
cls="flex items-center gap-2"
),
),
Button("Update", cls="btn btn-primary"),
),
cls="mb-6"
),
# Loading indicator
Div(
"Loading...",
id="loading-indicator",
cls="htmx-indicator p-4 text-center"
),
# Chart container
Div(
create_weather_chart(df, session),
id="chart-container",
cls="border rounded-lg p-4 bg-white"
),
# Cache status
Div(
P(
"Last updated: ",
datetime.fromisoformat(
cache_state._last_updated.get(
session["city"],
datetime.min.isoformat()
)
).strftime("%Y-%m-%d %H:%M:%S"),
cls="text-sm text-gray-500"
),
Button(
"Refresh Data",
cls="btn btn-sm btn-outline",
hx_post=f"/cache/update?city={session['city']}",
hx_target="#chart-container"
),
cls="mt-4"
)
)
except Exception as e:
logger.error(f"Error rendering landing page: {str(e)}")
return Div(
H1("Weather Dashboard", cls="text-2xl font-bold"),
P(f"Error loading dashboard: {str(e)}", cls="text-red-500"),
)
@app.post("/update-view")
def update_view(
session,
city: str,
start_date: str,
end_date: str,
show_averages: Optional[str] = None,
show_alerts: Optional[str] = None
):
"""Update view based on user selections"""
try:
# Update session
session["city"] = city.lower()
session["start_date"] = datetime.strptime(start_date, "%Y-%m-%d").isoformat()
session["end_date"] = datetime.strptime(end_date, "%Y-%m-%d").isoformat()
# Update chart preferences
if show_averages:
session["metrics"] = ["temp_max", "temp_min", "precipitation", "temp_max_7d_avg", "temp_min_7d_avg"]
else:
session["metrics"] = ["temp_max", "temp_min", "precipitation"]
session["show_alerts"] = show_alerts is not None
# Get fresh data
df = get_data_for_session(session)
# Return updated chart
return create_weather_chart(df, session)
except Exception as e:
logger.error(f"Error updating view: {str(e)}")
return Div(
f"Error updating chart: {str(e)}",
cls="p-4 text-center text-red-500"
)
Let's also implement the cache update endpoints:
# app/cache_update.py
import logging
import traceback
from datetime import datetime, timedelta
from typing import Optional
from fasthtml.common import *
from data.cache_state import cache_state
logger = logging.getLogger(__name__)
cache_app, _ = fast_app()
@cache_app.post("/update")
def update_cache(city: Optional[str] = None):
"""Update cache for a city or multiple cities"""
try:
now = datetime.now()
start_date = now - timedelta(days=30)
end_date = now
if city:
# Update specific city
city = city.lower()
logger.info(f"Manually updating cache for {city}")
cache_state.update_city(
city=city,
start_date=start_date,
end_date=end_date,
overwrite=True
)
return Div(
P(f"Cache updated for {city}"),
cls="p-4 text-center text-green-500"
)
else:
# Update all popular cities
cities = ["london", "new_york", "tokyo", "sydney", "cairo"]
logger.info(f"Manually updating cache for {len(cities)} cities")
for city in cities:
try:
cache_state.update_city(
city=city,
start_date=start_date,
end_date=end_date,
overwrite=True
)
except Exception as e:
logger.error(f"Failed to update {city}: {str(e)}")
return Div(
P("Cache updated for all cities"),
cls="p-4 text-center text-green-500"
)
except Exception as e:
logger.error(f"Caching failed:\n{str(e)}")
logger.error(f"Traceback:\n{traceback.format_exc()}")
return Div(
P(f"Error updating cache: {str(e)}"),
cls="p-4 text-center text-red-500"
)
@cache_app.get("/status")
def cache_status():
"""Get cache status information"""
try:
stats = cache_state.get_cache_stats()
return Div(
H2("Cache Status", cls="text-xl font-bold mb-4"),
Table(cls="table w-full")(
Thead(
Tr(
Th("City"),
Th("Last Updated"),
Th("Records"),
Th("Size (MB)")
)
),
Tbody(
*[
Tr(
Td(city.title()),
Td(stats["last_updated"].get(city, "Never")),
Td(str(stats["records_per_city"].get(city, 0))),
Td(str(stats["memory_usage_mb"].get(city, 0)))
)
for city in stats["cached_cities"]
]
)
)
)
except Exception as e:
logger.error(f"Error getting cache status: {str(e)}")
return Div(
P(f"Error getting cache status: {str(e)}"),
cls="p-4 text-center text-red-500"
)
Finally, let's create a script that can be run periodically to update the cache:
# update_cache.py
import argparse
import logging
from datetime import datetime, timedelta
import requests
import time
from configuration.logging_config import setup_logging
setup_logging()
logger = logging.getLogger(__name__)
def trigger_cache_update(city=None, base_url="http://localhost:5001"):
"""Trigger cache update via server endpoint"""
try:
if city:
endpoint = f"{base_url}/cache/update?city={city}"
logger.info(f"Triggering cache update for {city}")
else:
endpoint = f"{base_url}/cache/update"
logger.info("Triggering full cache update")
response = requests.post(endpoint, timeout=30)
response.raise_for_status()
logger.info("Cache update triggered successfully")
return True
except requests.RequestException as e:
logger.error(f"Error triggering cache update: {str(e)}")
return False
def run_daily_update():
"""Run a full cache update once per day"""
while True:
try:
logger.info("Running daily cache update")
trigger_cache_update()
# Sleep for 24 hours
logger.info("Cache update complete. Sleeping for 24 hours...")
time.sleep(86400) # 24 hours
except Exception as e:
logger.error(f"Error in daily update loop: {str(e)}")
time.sleep(3600) # Wait an hour before retrying after error
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Trigger weather data cache updates")
parser.add_argument(
"--city",
type=str,
help="Specific city to update (optional)",
)
parser.add_argument(
"--daemon",
action="store_true",
help="Run as a daemon process for periodic updates",
)
parser.add_argument(
"--url",
type=str,
default="http://localhost:5001",
help="Base URL of the weather app",
)
args = parser.parse_args()
if args.daemon:
logger.info("Starting cache update daemon")
run_daily_update()
else:
trigger_cache_update(args.city, args.url)
You can run this script manually or set it up as a cron job or systemd service:
# Run manually for a specific city
python update_cache.py --city london
# Run manually for all cities
python update_cache.py
# Run as a daemon process
python update_cache.py --daemon
For production environments, we can use cron jobs for scheduled updates. Run crontab -e
and add lines like this.
# Update all cities once per day at 2 AM
0 2 * * * cd /path/to/project/weater_app_folder && /home/username/.cargo/bin/uv run update_cache.py
# Update popular cities every 6 hours
0 */6 * * * cd /path/to/project/weater_app_folder && /home/username/.cargo/bin/uv run update_cache.py --city london
0 */6 * * * cd /path/to/project/weater_app_folder && /home/username/.cargo/bin/uv run update_cache.py --city new_york
The complete architecture we've built provides:
WeatherDataClient
fetches data from a public APIWeatherDataProcessor
calculates metrics and alertsCacheManager
stores processed data in Parquet filesCacheState
singleton provides fast in-memory access to frequently used dataWe've built a robust caching system for a weather dashboard using FastHTML. The weater use case is just an example needless to say.
The multi-layered caching approach (file-based + in-memory) provides a good balance between persistence and speed. By precomputing metrics and storing them efficiently in Parquet files, we minimise both computation time and storage requirements.