Write a decorator rate_limit(calls_per_second) that limits how often a function can be called. If the limit is exceeded, raise a RateLimitExceeded exception (or block, depending on the use case). Assume single-threaded for now.
Track the timestamps of recent calls. A sliding window of the last N seconds is cleaner than a fixed counter that resets every second. Python's collections.deque with a maxlen is a good fit — it automatically discards the oldest timestamps as new ones arrive.
import time
import functools
from collections import deque
class RateLimitExceeded(Exception):
pass
def rate_limit(calls_per_second: int):
def decorator(func):
# Store timestamps of recent calls
call_times: deque = deque()
@functools.wraps(func)
def wrapper(*args, **kwargs):
now = time.monotonic()
window = 1.0 / calls_per_second # seconds per allowed call
# Remove timestamps outside the 1-second window
while call_times and now - call_times[0] >= 1.0:
call_times.popleft()
if len(call_times) >= calls_per_second:
raise RateLimitExceeded(
f"Rate limit exceeded: max {calls_per_second} calls/sec"
)
call_times.append(now)
return func(*args, **kwargs)
return wrapper
return decorator
# Usage
@rate_limit(calls_per_second=3)
def fetch_exchange_rate(currency: str) -> float:
return 5.0 # pretend API call
functools.wraps to preserve the wrapped function's metadata; time.monotonic() over time.time() (monotonic is not affected by system clock changes). Bonus points: discuss thread-safety (this isn't thread-safe; a Lock would be needed for concurrent use).Implement an async context manager transaction(session) that: begins a transaction on enter, commits on success, and rolls back automatically if an exception is raised. The caller should not need to call commit/rollback explicitly.
Python's contextlib.asynccontextmanager turns an async generator function into a context manager. The yield is the body of the with block. Wrap the yield in a try/except/else to handle success vs failure.
from contextlib import asynccontextmanager
from sqlalchemy.ext.asyncio import AsyncSession
@asynccontextmanager
async def transaction(session: AsyncSession):
"""
Async context manager for atomic database operations.
Commits on success; rolls back on any exception.
"""
try:
yield session # caller does work here
await session.commit() # success: commit
except Exception:
await session.rollback() # failure: undo all changes
raise # re-raise so the caller sees the error
# Usage in a FastAPI route:
async def create_transaction(data: TransactionCreate, db: AsyncSession):
async with transaction(db):
tx = Transaction(**data.dict())
db.add(tx)
# If anything raises here, rollback happens automatically
asynccontextmanager is the idiomatic approach over writing a class with __aenter__/__aexit__. SQLAlchemy's own begin() context manager does the same — knowing the pattern behind it matters more than the tool.Write a function parse_transactions_csv(content: str) that parses a CSV with columns date,description,amount,category. Return a tuple: (valid_rows: list[Transaction], errors: list[RowError]). Malformed rows should not stop processing — collect all errors and return them together with valid rows.
Use the stdlib csv module (never manual string splitting). Validate each row individually inside a try/except and collect errors with row numbers. The return type of (valid, errors) is the key design decision — it lets callers show the user exactly which rows failed.
import csv
import io
from dataclasses import dataclass
from decimal import Decimal, InvalidOperation
from datetime import date
@dataclass
class Transaction:
row: int
date: date
description: str
amount: Decimal
category: str
@dataclass
class RowError:
row: int
reason: str
def parse_transactions_csv(
content: str,
) -> tuple[list[Transaction], list[RowError]]:
valid: list[Transaction] = []
errors: list[RowError] = []
reader = csv.DictReader(io.StringIO(content))
for i, row in enumerate(reader, start=2): # row 1 is header
try:
# Validate date
parsed_date = date.fromisoformat(row["date"].strip())
# Validate amount — accept "1234.56" or "-50.00"
amount = Decimal(row["amount"].strip())
# Validate required fields
description = row["description"].strip()
if not description:
raise ValueError("description is empty")
category = row["category"].strip()
valid.append(Transaction(
row=i,
date=parsed_date,
description=description,
amount=amount,
category=category,
))
except (KeyError, ValueError, InvalidOperation) as e:
errors.append(RowError(row=i, reason=str(e)))
return valid, errors
Decimal over float for financial amounts (float precision loss is a real bug in money handling); csv.DictReader over manual parsing; io.StringIO to wrap a string as a file-like object. The (valid, errors) return type is a deliberate API design choice — discuss it.You have a Transaction table with millions of rows and need to export all of them to a CSV file. Write a function stream_transactions(session, batch_size=1000) as a generator that yields one transaction at a time without loading all rows into memory at once.
Fetch rows in batches using LIMIT + OFFSET or cursor-based pagination. Yield each row individually from within the batch loop. The caller iterates over the generator without knowing about batching — the memory footprint stays at one batch at a time.
from collections.abc import Generator
from sqlalchemy import select
from sqlalchemy.orm import Session
def stream_transactions(
session: Session,
batch_size: int = 1000,
) -> Generator[Transaction, None, None]:
"""
Yields transactions one at a time using offset pagination.
Memory usage: one batch (batch_size rows) at a time.
"""
offset = 0
while True:
batch = session.execute(
select(Transaction)
.order_by(Transaction.id) # stable ordering is required for correct pagination
.limit(batch_size)
.offset(offset)
).scalars().all()
if not batch:
return # no more rows
for transaction in batch:
yield transaction
offset += len(batch)
# Usage — caller never loads all rows at once:
with open("export.csv", "w") as f:
writer = csv.writer(f)
for tx in stream_transactions(session):
writer.writerow([tx.date, tx.description, tx.amount])
order_by is mandatory for correct offset pagination; that this pattern keeps memory usage at O(batch_size) instead of O(total_rows). For very large tables in production, cursor-based pagination (WHERE id > last_seen_id) is more efficient than offset — mention this as a follow-up improvement.Write a Container class that allows registering factories (callables) by type, and resolving them by type — including resolving their dependencies recursively. When you call container.resolve(ServiceA), the container should inspect ServiceA.__init__'s type annotations, resolve each dependency, and instantiate ServiceA with them.
Use Python's inspect.signature and get_type_hints to introspect the constructor's parameter types at runtime. Recursive resolution handles transitive dependencies. Cache resolved instances if they're singletons.
import inspect
from typing import Any, Callable, TypeVar, get_type_hints
T = TypeVar("T")
class Container:
def __init__(self):
self._factories: dict[type, Callable] = {}
self._singletons: dict[type, Any] = {}
def register(self, cls: type, factory: Callable = None, singleton: bool = False):
"""Register a type with an optional factory. Defaults to using cls itself."""
self._factories[cls] = (factory or cls, singleton)
def resolve(self, cls: type[T]) -> T:
# Return cached singleton if available
if cls in self._singletons:
return self._singletons[cls]
factory, is_singleton = self._factories.get(cls, (cls, False))
# Introspect constructor parameter types
hints = get_type_hints(factory.__init__ if inspect.isclass(factory) else factory)
hints.pop("return", None)
# Recursively resolve each dependency
kwargs = {name: self.resolve(dep_type) for name, dep_type in hints.items()}
instance = factory(**kwargs)
if is_singleton:
self._singletons[cls] = instance
return instance
# Example
class DatabaseSession:
def query(self, q): return []
class TransactionService:
def __init__(self, db: DatabaseSession):
self.db = db
container = Container()
container.register(DatabaseSession, singleton=True)
container.register(TransactionService)
service = container.resolve(TransactionService)
# service.db is the same DatabaseSession instance (singleton)
get_type_hints; recursive dependency resolution; singleton vs transient lifetime distinction. FastAPI's Depends() does this but lazily per-request and with generator support. The key insight: DI is fundamentally about inverting who creates dependencies — the container decides, not the class.Given a transactions table with columns id, user_id, category, amount, created_at, write a query that returns the top 3 categories by total spend for each user in the current calendar month. Output: user_id, category, total_spend, rank.
Use a window function (RANK() or ROW_NUMBER()) partitioned by user_id and ordered by SUM(amount) DESC. Aggregate first in a subquery or CTE, then rank. Filter for rank ≤ 3 in the outer query.
WITH monthly_spend AS (
-- Step 1: aggregate per user + category for this month
SELECT
user_id,
category,
SUM(amount) AS total_spend
FROM transactions
WHERE
created_at >= date_trunc('month', CURRENT_DATE)
AND created_at < date_trunc('month', CURRENT_DATE) + INTERVAL '1 month'
GROUP BY user_id, category
),
ranked AS (
-- Step 2: rank categories per user by spend
SELECT
user_id,
category,
total_spend,
RANK() OVER (PARTITION BY user_id ORDER BY total_spend DESC) AS rnk
FROM monthly_spend
)
-- Step 3: keep only top 3
SELECT user_id, category, total_spend, rnk
FROM ranked
WHERE rnk <= 3
ORDER BY user_id, rnk;
-- Index that makes this fast:
-- CREATE INDEX ON transactions (user_id, created_at) INCLUDE (category, amount);
date_trunc for correct month boundaries (not hand-rolled string comparisons); RANK() vs ROW_NUMBER() distinction (RANK allows ties; ROW_NUMBER doesn't — discuss which you'd choose); the index comment shows production thinking. Run EXPLAIN ANALYZE if asked — look for Index Scan vs Seq Scan on the transactions table.The following Python code produces N+1 queries. Identify the problem and rewrite it as a single efficient SQL query (or correct ORM query).
Before (N+1):
users = session.query(User).all()
for user in users: print(user.transactions) # triggers a SELECT per user
The problem: user.transactions is a lazy-loaded relationship. Each access fires a new SELECT ... WHERE user_id = ?. Fix: eager-load the relationship in the initial query, or use a JOIN.
# Option 1: SQLAlchemy eager loading (selectinload)
# Fires 2 queries total: one for users, one for all their transactions in bulk
from sqlalchemy.orm import selectinload
users = session.execute(
select(User).options(selectinload(User.transactions))
).scalars().all()
# Now user.transactions is already loaded — no additional queries
for user in users:
print(user.transactions)
# Option 2: explicit JOIN (single query, returns denormalized rows)
result = session.execute(
select(User, Transaction)
.join(Transaction, Transaction.user_id == User.id, isouter=True)
).all()
# Option 3: raw SQL with a JOIN — clearest for reporting
SELECT
u.id AS user_id,
u.email,
t.id AS transaction_id,
t.amount,
t.category
FROM users u
LEFT JOIN transactions t ON t.user_id = u.id
ORDER BY u.id, t.created_at DESC;
selectinload (2 queries, avoids Cartesian product), joinedload (1 query with JOIN, Cartesian product risk for one-to-many), and raw SQL JOIN. The key trade-off: selectinload is usually the right choice for one-to-many to avoid row multiplication.Design a PostgreSQL schema for recurring expenses in the Expense Tracker. Requirements: a recurring expense has a frequency (weekly, monthly, yearly), a start date, an optional end date, an expected amount (which may vary — e.g., a utility bill), and generates actual transaction records when it fires.
Separate the definition of the recurrence from the instances it generates. The recurring rule is one table; each fired transaction is a row in the existing transactions table with a foreign key back to the rule. This keeps query paths clean.
-- Recurring expense definition
CREATE TABLE recurring_expenses (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
user_id UUID NOT NULL REFERENCES users(id) ON DELETE CASCADE,
description TEXT NOT NULL,
category TEXT NOT NULL,
-- Recurrence rule
frequency TEXT NOT NULL CHECK (frequency IN ('weekly', 'monthly', 'yearly')),
start_date DATE NOT NULL,
end_date DATE, -- NULL = indefinite
-- Expected amount (NULL = variable; user enters amount when transaction fires)
expected_amount NUMERIC(12, 2),
created_at TIMESTAMPTZ NOT NULL DEFAULT now(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT now()
);
-- Add a column to transactions to link back to the rule
ALTER TABLE transactions
ADD COLUMN recurring_expense_id UUID REFERENCES recurring_expenses(id);
-- When the cron fires a recurring expense:
-- 1. Query recurring_expenses WHERE next_due_date <= today AND (end_date IS NULL OR end_date >= today)
-- 2. For fixed amount: INSERT INTO transactions with expected_amount
-- 3. For variable amount: create a draft transaction — notify user to confirm amount
-- Index for cron query
CREATE INDEX ON recurring_expenses (user_id, start_date)
WHERE end_date IS NULL OR end_date >= CURRENT_DATE;
NULL = variable amount design decision (discuss the trade-off vs a separate is_fixed boolean); the ON DELETE CASCADE consideration (if a user is deleted, their rules go too); the partial index on active recurring expenses. A more complex version would store next_fire_date on the rule to make the cron query O(1) instead of recalculating it each run.Given a transactions table with columns id, account_id, amount, created_at (amounts can be positive for income, negative for expenses), write a query that returns each transaction with its running balance — the cumulative sum of amounts up to and including that transaction, ordered by created_at, per account.
Use SUM() OVER (PARTITION BY account_id ORDER BY created_at ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW). The frame clause makes the window cumulative. Handle ties in created_at by adding id as a tiebreaker to the ORDER BY.
SELECT
id,
account_id,
created_at,
amount,
SUM(amount) OVER (
PARTITION BY account_id -- reset per account
ORDER BY created_at, id -- stable ordering; id breaks ties
ROWS BETWEEN UNBOUNDED PRECEDING -- all previous rows
AND CURRENT ROW -- up to and including this row
) AS running_balance
FROM transactions
ORDER BY account_id, created_at, id;
-- Example output:
-- id | account_id | created_at | amount | running_balance
-- 1 | acct_A | 2024-01-01 | 1000 | 1000
-- 2 | acct_A | 2024-01-05 | -200 | 800
-- 3 | acct_A | 2024-01-10 | -50 | 750
-- 4 | acct_B | 2024-01-02 | 500 | 500 (reset for new account)
PARTITION BY to reset per account; the ROWS BETWEEN ... AND CURRENT ROW frame clause (the default is RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW which handles ties differently); why id is added as a tiebreaker (two transactions at the same timestamp would have non-deterministic order without it). Window functions don't filter rows — the full dataset is always present, just with an added computed column.The following query is slow on a transactions table with 10 million rows. Identify the best index to add and justify why:
SELECT * FROM transactions WHERE user_id = $1 AND created_at BETWEEN $2 AND $3 ORDER BY created_at DESC LIMIT 20;
Think about which columns are in the WHERE clause (filter), which are in ORDER BY (sort), and whether a composite index can eliminate both the filter scan and the sort. Column order in a composite index matters — higher-selectivity filter columns first.
-- Best index for this query:
CREATE INDEX idx_transactions_user_date
ON transactions (user_id, created_at DESC);
-- Why this works:
-- 1. user_id is the equality filter — PostgreSQL uses it to jump directly
-- to rows for this user (high selectivity if many users)
-- 2. created_at DESC matches the ORDER BY — no separate sort step needed
-- 3. The BETWEEN range on created_at is handled by index range scan
-- within the user_id partition of the index
-- EXPLAIN ANALYZE before the index:
-- Seq Scan on transactions (cost=... rows=10M ...)
-- Filter: (user_id = $1 AND created_at BETWEEN $2 AND $3)
-- EXPLAIN ANALYZE after the index:
-- Index Scan using idx_transactions_user_date on transactions
-- Index Cond: (user_id = $1 AND created_at BETWEEN $2 AND $3)
-- Limit: 20 rows
-- Alternative: INCLUDE clause to make it a covering index
-- (avoids heap fetch if all needed columns are in the index)
CREATE INDEX idx_transactions_user_date_covering
ON transactions (user_id, created_at DESC)
INCLUDE (amount, category, description);
-- Partial index if you only query recent transactions:
CREATE INDEX idx_transactions_recent
ON transactions (user_id, created_at DESC)
WHERE created_at > '2024-01-01';
INCLUDE) concept to avoid heap lookups; partial indexes for time-bounded queries. The ability to predict what EXPLAIN ANALYZE will show before running it is the senior signal.Implement a TransactionForm component that: submits a new transaction to the API, shows a loading state while the request is in-flight, optimistically adds the transaction to a list before the server responds, and rolls back the optimistic update if the request fails.
Add the transaction to local state immediately on submit (optimistic). On success, you can either keep it (possibly updating with the server's canonical ID) or refetch. On error, remove the optimistic entry and show an error message. React Query's useMutation with onMutate/onError/onSettled is the production pattern for this.
// Using React Query (TanStack Query) — the production standard
import { useMutation, useQueryClient } from '@tanstack/react-query'
interface Transaction {
id: string
description: string
amount: number
category: string
}
function useCreateTransaction() {
const queryClient = useQueryClient()
return useMutation({
mutationFn: (data: Omit<Transaction, 'id'>) =>
fetch('/api/transactions', {
method: 'POST',
body: JSON.stringify(data),
headers: { 'Content-Type': 'application/json' },
}).then(r => r.json()),
onMutate: async (newTx) => {
// Cancel any in-flight refetches to avoid race conditions
await queryClient.cancelQueries({ queryKey: ['transactions'] })
// Save current state for rollback
const previousTxs = queryClient.getQueryData<Transaction[]>(['transactions'])
// Optimistically add the new transaction with a temp ID
queryClient.setQueryData<Transaction[]>(['transactions'], old => [
...(old ?? []),
{ ...newTx, id: `temp-${Date.now()}` },
])
return { previousTxs } // context passed to onError
},
onError: (_err, _newTx, context) => {
// Rollback to the saved state
queryClient.setQueryData(['transactions'], context?.previousTxs)
},
onSettled: () => {
// Always refetch after success or error to sync with server
queryClient.invalidateQueries({ queryKey: ['transactions'] })
},
})
}
function TransactionForm() {
const { mutate, isPending, isError } = useCreateTransaction()
const handleSubmit = (e: React.FormEvent<HTMLFormElement>) => {
e.preventDefault()
const data = new FormData(e.currentTarget)
mutate({ description: String(data.get('description')), amount: Number(data.get('amount')), category: String(data.get('category')) })
}
return (
<form onSubmit={handleSubmit}>
<input name="description" required />
<input name="amount" type="number" step="0.01" required />
<input name="category" required />
<button type="submit" disabled={isPending}>
{isPending ? 'Saving...' : 'Add Transaction'}
</button>
{isError && <p>Failed to save. Please try again.</p>}
</form>
)
}
onMutate → optimistic, onError → rollback, onSettled → sync); cancelQueries to prevent race conditions; returning context from onMutate to enable rollback. This is the canonical React Query pattern — knowing it signals real production React experience.Build a search input that fires an API request 300ms after the user stops typing. If the user types again before the previous request completes, the stale response should be ignored (requests can arrive out of order).
Debouncing delays the API call; AbortController cancels the in-flight request when a new one starts. Both are needed: debouncing reduces calls, AbortController handles the race condition when requests arrive out of order.
import { useState, useEffect, useRef } from 'react'
function useDebounce<T>(value: T, delay: number): T {
const [debouncedValue, setDebouncedValue] = useState(value)
useEffect(() => {
const timer = setTimeout(() => setDebouncedValue(value), delay)
return () => clearTimeout(timer) // cleanup cancels the timer on re-render
}, [value, delay])
return debouncedValue
}
function TransactionSearch() {
const [query, setQuery] = useState('')
const [results, setResults] = useState([])
const [isLoading, setIsLoading] = useState(false)
const debouncedQuery = useDebounce(query, 300)
useEffect(() => {
if (!debouncedQuery) {
setResults([])
return
}
// AbortController cancels the previous request when debouncedQuery changes
const controller = new AbortController()
setIsLoading(true)
fetch(`/api/transactions/search?q=${encodeURIComponent(debouncedQuery)}`, {
signal: controller.signal,
})
.then(r => r.json())
.then(data => setResults(data))
.catch(err => {
if (err.name !== 'AbortError') console.error(err) // ignore intentional cancellations
})
.finally(() => setIsLoading(false))
return () => controller.abort() // cleanup: cancel if debouncedQuery changes
}, [debouncedQuery])
return (
<div>
<input value={query} onChange={e => setQuery(e.target.value)} placeholder="Search transactions..." />
{isLoading && <span>Searching...</span>}
<ul>{results.map(r => <li key={r.id}>{r.description}</li>)}</ul>
</div>
)
}
AbortError specifically rather than swallowing all errors. In production, React Query's useQuery with enabled: !!debouncedQuery handles cancellation automatically.Implement a generic useQuery<T> hook that accepts a URL, fetches it, and returns { data: T | null, isLoading: boolean, error: Error | null }. Use TypeScript generics so callers get correct typing on the data field.
The generic type parameter flows from the hook's type parameter to the state type and the return type. Use a discriminated union for the return type to enforce that when isLoading is true, data is necessarily null — TypeScript can narrow this for callers.
import { useState, useEffect } from 'react'
// Discriminated union: TypeScript narrows these correctly in if/switch
type QueryResult<T> =
| { isLoading: true; data: null; error: null }
| { isLoading: false; data: T; error: null }
| { isLoading: false; data: null; error: Error }
function useQuery<T>(url: string): QueryResult<T> {
const [state, setState] = useState<QueryResult<T>>({
isLoading: true,
data: null,
error: null,
})
useEffect(() => {
let cancelled = false
setState({ isLoading: true, data: null, error: null })
fetch(url)
.then(async r => {
if (!r.ok) throw new Error(`HTTP ${r.status}`)
return r.json() as Promise<T>
})
.then(data => {
if (!cancelled) setState({ isLoading: false, data, error: null })
})
.catch(error => {
if (!cancelled) setState({ isLoading: false, data: null, error })
})
return () => { cancelled = true } // prevent state update after unmount
}, [url])
return state
}
// Usage — TypeScript infers the correct type:
interface Transaction { id: string; description: string; amount: number }
function TransactionList() {
const { isLoading, data, error } = useQuery<Transaction[]>('/api/transactions')
if (isLoading) return <p>Loading...</p>
if (error) return <p>Error: {error.message}</p>
// Here TypeScript knows data is Transaction[] (not null)
return <ul>{data.map(t => <li key={t.id}>{t.description}</li>)}</ul>
}
cancelled flag to prevent setState after unmount; correctly typing r.json() as Promise<T>. The interviewer may ask "why not use React Query?" — answer: for production, React Query adds caching, deduplication, background refetching. This exercise shows you understand what React Query does under the hood.Given a Client Component that fetches the transaction list with useEffect + useState, convert it to a Next.js Server Component that fetches data server-side. Identify what must stay as a Client Component and what can move to the server.
Server Components can await async calls directly — no hooks needed. But they cannot use browser APIs, event handlers, or stateful hooks (useState, useEffect). The interactive parts (forms, click handlers) stay as Client Components, composed inside the Server Component.
// BEFORE: Client Component (runs in the browser)
'use client'
import { useState, useEffect } from 'react'
export function TransactionList() {
const [transactions, setTransactions] = useState([])
useEffect(() => {
fetch('/api/transactions').then(r => r.json()).then(setTransactions)
}, [])
return <ul>{transactions.map(t => <li key={t.id}>{t.description}</li>)}</ul>
}
// ─────────────────────────────────────────────────────
// AFTER: Server Component (runs on the server, no 'use client')
// No useState, no useEffect — async/await directly
async function getTransactions() {
// Direct DB query or internal service call — no HTTP round-trip
const res = await fetch('http://localhost:3000/api/transactions', {
cache: 'no-store', // or 'force-cache' with revalidation
})
return res.json()
}
export default async function TransactionList() {
const transactions = await getTransactions()
return (
<ul>
{transactions.map(t => (
<li key={t.id}>
{t.description}
{/* Interactive child stays Client Component */}
<DeleteButton transactionId={t.id} />
</li>
))}
</ul>
)
}
// DeleteButton must be 'use client' because it has onClick
// 'use client'
// export function DeleteButton({ transactionId }) { ... }
cache option controlling how Next.js caches the fetch. Key rule: "use client" propagates downward — a Client Component's children are also client-side unless they're passed as children props from a Server Component.Implement a useLocalStorage<T>(key: string, initialValue: T) hook that persists state to localStorage. It should: read the initial value from localStorage if available, update localStorage on every state change, and handle JSON parse errors gracefully.
The hook should behave like useState from the caller's perspective — same return shape. The complexity is in the initializer (read from localStorage) and the setter (write to localStorage). Handle SSR: localStorage doesn't exist on the server in Next.js.
import { useState, useCallback } from 'react'
function useLocalStorage<T>(
key: string,
initialValue: T,
): [T, (value: T | ((prev: T) => T)) => void] {
// Initialize from localStorage (or initialValue if missing/unparseable)
const [storedValue, setStoredValue] = useState<T>(() => {
// Guard against SSR (Next.js server-side rendering)
if (typeof window === 'undefined') return initialValue
try {
const item = window.localStorage.getItem(key)
return item !== null ? (JSON.parse(item) as T) : initialValue
} catch {
// Corrupted value in localStorage — fall back to initialValue
return initialValue
}
})
const setValue = useCallback(
(value: T | ((prev: T) => T)) => {
setStoredValue(prev => {
const next = typeof value === 'function'
? (value as (prev: T) => T)(prev)
: value
try {
window.localStorage.setItem(key, JSON.stringify(next))
} catch {
// Quota exceeded or private browsing — fail silently
console.warn(`useLocalStorage: failed to write key "${key}"`)
}
return next
})
},
[key],
)
return [storedValue, setValue]
}
// Usage — fully typed:
const [theme, setTheme] = useLocalStorage<'light' | 'dark'>('theme', 'light')
const [filters, setFilters] = useLocalStorage<{ category: string; minAmount: number }>(
'tx-filters',
{ category: 'all', minAmount: 0 },
)
useState(() => ...) so localStorage is only read once; SSR guard (typeof window === 'undefined'); supporting the functional updater form of the setter (setValue(prev => ...)); useCallback to stabilize the setter reference; graceful handling of quota errors. This hook is in the Expense Tracker for confidence tracking state — the interviewers can ask you to walk through your actual implementation.