Skip to main content

System Design Questions

Design Distributed Counter — System Design Interview Guide

Design Distributed Counter is a system-design interview that asks you to build a high-throughput counter service: any client can increment a named counter, any client can read its current value, and the system handles millions of increments per second across a global fleet without losing counts. The hard part is that a single counter can't naively live on one machine at this rate — sharding plus aggregation is the only way to scale, and the tradeoff is read freshness vs write throughput.

By Alex Chen, Founder, InterviewChamp.AI · Last verified

Reported in interviews at

  • Google
  • Meta
  • Amazon
  • LinkedIn
  • Microsoft

Sourced from Glassdoor, Levels.fyi, and Blind interview reports.

Functional requirements

  • Increment a named counter by N (default 1); returns success after the increment is durably recorded
  • Read the current value of a named counter; reads are allowed to be slightly stale (eventually-consistent)
  • Atomic compare-and-set option for strict-counter use cases (e.g. inventory decrement that must not go negative)
  • Support millions of distinct counter names with arbitrary cardinality of values
  • Time-windowed counters: 'increments in the last 1 minute' or 'increments per second over the last hour'
  • Reset a counter to a specified value (admin operation; audited)

Non-functional requirements

  • Write throughput: ~1M increments/sec aggregate across the cluster; ~100K/sec on a single hot counter
  • Read latency: <50ms p99 for the current value, with up to 1-2 seconds of staleness allowed
  • Write latency: <10ms p99 producer ack for non-strict increments; higher (50-100ms) for strict CAS
  • Durability: every acked increment must survive a single-node failure; no double-counting on retry
  • Availability: 99.99%; writes must remain available even when reads are temporarily stale
  • Storage: long-term retention of historical counter values for time-windowed queries (30 days typical)

Capacity estimation

Anchor: a busy platform tracks ~10M distinct counter names (one per article/post for view counts, one per ad campaign for impression counts, one per user-action type for analytics). Aggregate write rate at peak: ~1M increments/sec.

The distribution is skewed. A 'top-10 viral post' might absorb ~100K increments/sec by itself while the long tail of millions of counters sees <1 increment/sec each. Capacity planning must accommodate both: the cluster must serve 1M aggregate increments AND a single counter must handle 100K/sec without becoming a hot-shard bottleneck.

Storage: each counter has a current value (~16 bytes for a uint64 plus metadata). 10M counters × 50 bytes = 500 MB of current state — easily in-memory.

If we keep a per-minute history for time-window queries: 10M counters × 1440 minutes/day × 16 bytes = 230 GB/day. With 30-day retention = ~7 TB. Tier the older windows to cheap storage; keep last 24 hours in fast storage.

Increment event log: every increment is also persisted to a durable log for replay and audit. 1M increments/sec × 100 bytes/event = 100 MB/sec = 8.6 TB/day of raw log data. Compressed 3-4x = ~2 TB/day. Retain in the log for ~7 days for replay.

Read QPS: typically much lower than writes for analytics-style counters because dashboards refresh on a slow cadence. ~10K reads/sec aggregate. Reads from the in-memory state cache; almost never touches the durable storage.

Counter cardinality: if counter names include user_id (per-user counters), cardinality can hit billions. Production systems shard the counter-name space across hundreds of storage nodes — each node owns ~1% of the namespace.

High-level design

Two-layer architecture: a sharded write-path that absorbs increments at scale, and a read-path that exposes current values via aggregated state.

Write path: increments hit a frontend API. Each counter is sharded across N internal sub-counters; the frontend picks a sub-counter for each increment (round-robin or random). The increment is appended to a per-shard log and applied to the shard's in-memory counter. Multiple shards per counter means the per-shard write rate is the global rate divided by N.

For a hot counter doing 100K increments/sec: shard it across 100 sub-counters, each absorbing 1K/sec — trivial per shard. The trick is that no single shard ever sees the whole counter.

Read path: when a client reads the counter, the read service queries all N sub-counter shards, sums their current values, and returns the total. With N=100 and intra-DC parallel queries, read latency is bounded by the slowest shard — typically <50ms.

For very hot counters where even N=100 isn't enough, the system can further shard. The downside is read latency grows linearly with N (more shards to sum). Practical ceiling is N~1000 shards per counter.

Durability: each per-shard increment is appended to a durable log (replicated 3x) before the shard's in-memory counter is updated. On node restart, the in-memory counter is rebuilt by replaying the log. The log also serves as the source of truth for forensic queries ('show me all increments to counter X between time T1 and T2').

Aggregation worker: a background process periodically (every 1-10 seconds) reads each shard's local counter, sums across shards for each counter name, and writes the aggregated value to a fast read-cache. Reads consult the cache for the current total instead of fanning out to all shards every time.

The cache is eventually-consistent — bounded by the aggregation interval. For most analytics use cases this is fine; for use cases requiring fresh-to-the-second values, reads can opt to fan-out instead of using the cache.

Time-windowed counters: each shard maintains per-minute buckets in addition to the running total. Bucket = (minute_timestamp, increment_count). Older buckets get tiered to cheap storage. Queries like 'increments per minute over the last hour' read the last 60 buckets from each shard and sum.

Deep dive — the hard problem

Three deep dives: shard count vs read latency, exactly-once increment under retries, and strict counters with CAS.

Shard count vs read latency — the central tradeoff.

Higher N (more shards per counter) → lower per-shard write rate → no hot-shard problem on writes. But reads must fan out to all N shards and sum, so read latency grows with N. With parallel queries, latency is bounded by the slowest shard, which under variance becomes the tail latency (e.g. P99 of N requests).

For N=10: very rare hot-shard risk for non-extreme counters; reads fan out to 10 shards, P99 latency ~10ms intra-DC. Good for moderate-traffic counters.

For N=100: handles ~100K/sec/counter without hot shards; reads fan out to 100 shards, P99 latency ~30-50ms. Good for high-traffic counters.

For N=1000: handles ~1M/sec/counter, the upper end. Reads fan out to 1000 shards, P99 latency ~100-200ms. The aggregated cache becomes mandatory because direct reads are too slow.

Production systems pick N per-counter based on observed write rate. Cold counters use N=1 (no sharding); hot counters get autoscaled up. The control plane monitors per-counter write rate and adjusts N over minutes — when it goes up, new shards are spun up; the old aggregated value is split evenly across shards as a starting point.

Exactly-once increment — what happens when a producer retries?

Naive design: producer sends increment, gets timeout, retries. The first increment may or may not have applied. Default behavior is at-least-once: both increments apply, the counter is over-counted.

For counters where over-counting is unacceptable (paid impressions, billing meters), the producer attaches a unique increment_id to each call. The shard maintains a recent-id dedup set (Bloom filter + small TTL'd cache); if the id has been seen, the increment is silently dropped. Combined with idempotent retry on the producer side, this gives exactly-once with high probability.

The dedup window is bounded — typically minutes. Beyond the window, the id is forgotten. Producers that retry after the window expires can still cause double-counts. In practice 5-minute windows cover essentially all retry scenarios.

Strict counters — CAS for cases where the counter can't drift.

Use case: inventory countdown ('this product has 100 units; decrement on purchase, must not go negative'). Sharded counters don't work because no single shard knows the total.

Strict counters are unsharded — one counter, one owner shard, with leader-follower replication for durability. Writes are serialized through the leader; CAS is implemented as 'read current value, compare to expected, write new value if match, fail if mismatch.' Throughput per strict counter is bounded by the single-leader's serialization: ~10K ops/sec.

If the use case needs both 'no over-sell' (CAS guarantee) and high throughput, the application layer is responsible for partitioning the inventory across N sub-counters and reasoning about the partition tradeoff. The counter service offers the primitives; the application composes them.

Fourth tradeoff: aggregation lag and the read-after-write contract.

A client increments a counter, then immediately reads it. Does the read see the increment? With pure aggregated-cache reads, no — the cache hasn't refreshed yet. Two solutions: (1) the client-API attaches a 'just-incremented' header so the read fans out to shards directly, bypassing the cache; (2) on increment, the API also updates the cache directly (synchronous cache update). Production systems typically support option (1) as an opt-in for tests and debugging.

Common mistakes

  • Designing a single counter on a single node — doesn't scale past ~50K/sec writes
  • Sharding without an aggregation cache — every read fans out to N shards and pays N x network roundtrip
  • Forgetting idempotency keys — at-least-once retries silently over-count without dedup
  • Treating strict counters and analytics counters the same — they need different architectures (single-leader CAS vs sharded aggregation)
  • Hard-coding the shard count — production systems autoscale N per-counter based on observed write rate

Likely follow-up questions

  • How would you implement a global rate-limit counter where the limit must never be exceeded by more than 5%?
  • What changes if reads must be strongly consistent (read-after-write within the same session)?
  • How would you support counter rollover at midnight UTC so daily counters reset cleanly?
  • How would you implement 'top N counters by current value' (a leaderboard query) without scanning all 10M counters?
  • How would your design handle a single counter that suddenly receives 10M increments/sec (a viral video)?
  • How would you implement counter decrement and ensure the decrement and the increment that paired with it are both committed atomically?

Practice Design Distributed Counter live with an AI interviewer

Free, no sign-up required. Get real-time feedback on your design.

Practice these live

Frequently asked questions

Why can't a single counter just live on one machine?
A single machine handles ~50-100K writes/sec to a single counter — beyond that, the lock or atomic-increment instruction becomes the bottleneck. Sharding across N sub-counters bypasses the per-counter contention and scales linearly with N.
Is exactly-once increment possible across all failure modes?
Approximately — bounded by the dedup window. Producers attach an idempotency key; shards dedup within a few minutes. Retries within the window are deduplicated; retries beyond the window can double-count. In practice the window is wide enough to cover essentially all real retry scenarios.
How long is the Design Distributed Counter interview?
30-45 minutes. Often used as a focused problem to drill the sharding-vs-read-latency tradeoff. Expect at least one of (exactly-once retry, strict CAS, time-windowed queries) as a follow-up.
What is the most important concept for Design Distributed Counter?
The sharding-and-aggregation pattern: N sub-counters per counter, an aggregation cache for fast reads, and the explicit tradeoff between write throughput (favors high N) and read latency (favors low N). Without this pattern the design collapses to a single-machine bottleneck.