Skip to main content

Concurrency Patterns

Kotlin coroutines and Flow replace most classic concurrency primitives with simpler, structured alternatives. This chapter covers the patterns you'll actually use on Android — Producer-Consumer, Fan-out/in, Pipeline, Actor, Circuit Breaker, Retry, Debounce, Throttle — with real code.

1. Producer-Consumer — queue of work

Intent: One coroutine produces work items; one or more consumers process them.

class UploadQueue @Inject constructor(
@IoDispatcher private val io: CoroutineDispatcher,
private val uploader: Uploader
) {
private val scope = CoroutineScope(SupervisorJob() + io)
private val channel = Channel<UploadRequest>(capacity = Channel.BUFFERED)

init {
// Consumer — single worker
scope.launch {
for (request in channel) {
runCatching { uploader.upload(request) }
.onFailure { Log.e("Upload", "failed for ${request.id}", it) }
}
}
}

suspend fun enqueue(request: UploadRequest) = channel.send(request)

fun shutdown() {
channel.close()
scope.cancel()
}
}

Multiple consumers — worker pool

class WorkerPool<T>(
scope: CoroutineScope,
private val workers: Int = 4,
private val process: suspend (T) -> Unit
) {
private val channel = Channel<T>(capacity = Channel.BUFFERED)

init {
repeat(workers) {
scope.launch {
for (item in channel) {
runCatching { process(item) }
}
}
}
}

suspend fun submit(item: T) = channel.send(item)

fun close() = channel.close()
}

// Usage
val thumbnailWorkers = WorkerPool<Image>(scope, workers = 4) { image ->
thumbnailer.generate(image)
}

4 workers drain the same channel. Each item goes to exactly one worker.


2. Fan-out — one input, many parallel paths

suspend fun fetchAll(ids: List<String>): List<Product> = coroutineScope {
ids.map { id ->
async { api.fetch(id) } // launch all in parallel
}.awaitAll() // gather results
}

coroutineScope + async + awaitAll is the idiomatic fan-out. All children cancel together if the parent scope cancels.

Bounded parallelism

Don't fan out 10,000 items to 10,000 network calls — throttle with a semaphore:

suspend fun fetchAllBounded(ids: List<String>, parallelism: Int = 10): List<Product> = coroutineScope {
val semaphore = Semaphore(parallelism)
ids.map { id ->
async {
semaphore.withPermit { api.fetch(id) }
}
}.awaitAll()
}

Only parallelism coroutines execute the API call at once; others suspend waiting for a permit.

Flow-based fan-out

suspend fun processFeed(posts: Flow<Post>) = coroutineScope {
posts
.flatMapMerge(concurrency = 8) { post ->
flow {
val enriched = enrichPost(post) // parallel up to 8 at once
emit(enriched)
}
}
.collect { /* handle enriched post */ }
}

flatMapMerge(concurrency = N) fans out Flow items to N parallel inner flows.


3. Fan-in — many inputs, one output

suspend fun combineFeeds(
news: Flow<Post>,
social: Flow<Post>,
sponsored: Flow<Post>
): Flow<Post> = merge(news, social, sponsored) // fan-in
.buffer(64)

// Or with combine when you need latest-from-each
val dashboard: Flow<Dashboard> = combine(
userRepo.currentUser,
notificationsRepo.unreadCount,
ordersRepo.activeOrder
) { user, unread, order -> Dashboard(user, unread, order) }

merge interleaves emissions. combine pairs the latest value from each source.


4. Pipeline — staged processing

fun imageProcessingPipeline(uris: Flow<Uri>): Flow<ProcessedImage> = uris
.map { uri -> decode(uri) } // stage 1
.map { bitmap -> resize(bitmap, 1024, 1024) } // stage 2
.map { bitmap -> watermark(bitmap) } // stage 3
.map { bitmap -> encode(bitmap, quality = 85) } // stage 4
.flowOn(Dispatchers.Default) // CPU-bound stages off main

Each stage runs independently; Flow's backpressure model means slow stages pause upstream naturally.

Parallel pipeline stages

fun parallelImagePipeline(uris: Flow<Uri>): Flow<ProcessedImage> = uris
.flatMapMerge(concurrency = 4) { uri ->
flow { emit(decode(uri)) }
}
.flatMapMerge(concurrency = 8) { bitmap ->
flow { emit(resize(bitmap, 1024, 1024)) }
}
.flatMapMerge(concurrency = 4) { bitmap ->
flow {
val watermarked = watermark(bitmap)
emit(encode(watermarked, quality = 85))
}
}

Different stages can have different parallelism.


5. Actor — serialized state access

Intent: Encapsulate mutable state behind a channel so all mutations are serialized to one coroutine.

class CounterActor(scope: CoroutineScope) {
private sealed interface Message {
data object Increment : Message
data class Get(val response: CompletableDeferred<Int>) : Message
}

private val mailbox = Channel<Message>(Channel.UNLIMITED)

init {
scope.launch {
var count = 0
for (message in mailbox) {
when (message) {
Message.Increment -> count++
is Message.Get -> message.response.complete(count)
}
}
}
}

suspend fun increment() { mailbox.send(Message.Increment) }

suspend fun get(): Int {
val response = CompletableDeferred<Int>()
mailbox.send(Message.Get(response))
return response.await()
}
}

The actor pattern guarantees serial access to count without locks. Useful when shared state is touched from many coroutines and a Mutex would contend.

StateFlow + update { } — Kotlin's usual alternative

Most "serialize state" needs are met with StateFlow:

class Counter {
private val _value = MutableStateFlow(0)
val value: StateFlow<Int> = _value.asStateFlow()

fun increment() { _value.update { it + 1 } }
}

MutableStateFlow.update { } is atomic — no actor channel plumbing. Use the Actor pattern only when you need richer behavior (e.g., complex state machines with queued work).


6. Circuit Breaker — fail fast when downstream is sick

Intent: After N consecutive failures, stop calling a failing dependency for a cool-down period.

class CircuitBreaker(
private val failureThreshold: Int = 5,
private val openDuration: Duration = 30.seconds,
private val clock: Clock = Clock.System
) {
private sealed interface State {
data object Closed : State // normal
data class Open(val openedAt: Instant) : State // fail fast
data object HalfOpen : State // probe
}

private var state: State = State.Closed
private var consecutiveFailures = 0
private val mutex = Mutex()

suspend fun <T> call(block: suspend () -> T): T = mutex.withLock { state }.let { s ->
when (s) {
is State.Open -> {
if (clock.now() - s.openedAt >= openDuration) {
mutex.withLock { state = State.HalfOpen }
runProbe(block)
} else throw CircuitOpenException()
}
State.HalfOpen -> runProbe(block)
State.Closed -> runProtected(block)
}
}

private suspend fun <T> runProtected(block: suspend () -> T): T = try {
val result = block()
mutex.withLock { consecutiveFailures = 0 }
result
} catch (e: CancellationException) { throw e
} catch (t: Throwable) {
mutex.withLock {
consecutiveFailures++
if (consecutiveFailures >= failureThreshold) {
state = State.Open(clock.now())
}
}
throw t
}

private suspend fun <T> runProbe(block: suspend () -> T): T = try {
val result = block()
mutex.withLock {
state = State.Closed
consecutiveFailures = 0
}
result
} catch (e: CancellationException) { throw e
} catch (t: Throwable) {
mutex.withLock { state = State.Open(clock.now()) }
throw t
}
}

class CircuitOpenException : Exception("Circuit open; failing fast")

Wiring with a decorator

class CircuitBreakingPaymentGateway(
private val delegate: PaymentGateway,
private val breaker: CircuitBreaker
) : PaymentGateway {
override suspend fun charge(amount: Money, method: PaymentMethod): PaymentReceipt =
breaker.call { delegate.charge(amount, method) }
}

Circuit breakers protect downstream services from thundering-herd retries and give your app a clear "service down" signal to show users.


7. Retry with exponential backoff + jitter

suspend fun <T> withRetry(
maxAttempts: Int = 3,
initialDelay: Duration = 1.seconds,
maxDelay: Duration = 30.seconds,
factor: Double = 2.0,
jitter: Double = 0.3,
shouldRetry: (Throwable) -> Boolean = { it is IOException },
block: suspend (attempt: Int) -> T
): T {
var delay = initialDelay
repeat(maxAttempts - 1) { attempt ->
try {
return block(attempt + 1)
} catch (c: CancellationException) { throw c
} catch (t: Throwable) {
if (!shouldRetry(t)) throw t
val jitterMs = (delay.inWholeMilliseconds * jitter * (Math.random() - 0.5)).toLong()
delay((delay.inWholeMilliseconds + jitterMs).coerceAtLeast(0))
delay = (delay * factor).coerceAtMost(maxDelay)
}
}
return block(maxAttempts) // last attempt — let it throw
}

// Usage
val user = withRetry(maxAttempts = 5) { attempt ->
println("attempt $attempt")
api.getUser(id)
}

Jitter is crucial: without it, N clients failing simultaneously retry at exactly the same moment, recreating the thundering herd. Spreading retry times across a ±30% window prevents that.

Flow-based retry

fun usersFlow(ids: List<String>): Flow<User> = flow {
for (id in ids) emit(api.getUser(id))
}.retryWhen { cause, attempt ->
if (cause is IOException && attempt < 3) {
val delayMs = (1L shl attempt.toInt()) * 1000L
delay(delayMs + Random.nextLong(0, delayMs / 2))
true
} else false
}

8. Debounce — wait for quiet period

Intent: Emit only the last value from a burst after some inactivity.

@Composable
fun SearchBar(viewModel: SearchViewModel = hiltViewModel()) {
val query by viewModel.query.collectAsStateWithLifecycle()
TextField(value = query, onValueChange = viewModel::onQueryChanged)
}

@HiltViewModel
class SearchViewModel @Inject constructor(
private val searchRepo: SearchRepository
) : ViewModel() {
private val _query = MutableStateFlow("")
val query: StateFlow<String> = _query.asStateFlow()

@OptIn(FlowPreview::class)
val results: StateFlow<List<SearchHit>> = _query
.debounce(300) // wait for typing to pause
.distinctUntilChanged() // ignore identical queries
.flatMapLatest { q -> // cancel previous on new query
if (q.isBlank()) flowOf(emptyList())
else flow { emit(searchRepo.search(q)) }
}
.stateIn(viewModelScope, SharingStarted.WhileSubscribed(5_000), emptyList())

fun onQueryChanged(new: String) { _query.value = new }
}

flatMapLatest — the cancellation superpower

Every time the outer flow emits, flatMapLatest cancels the in-flight inner flow and starts a new one. For typing: if the user types "p", "pi", "pix" in 300 ms, only the "pix" search executes; the "p" and "pi" flows are cancelled before they hit the network.


9. Throttle — rate-limit emissions

Intent: Emit at most one value per time window.

throttleFirst (leading-edge throttle)

Kotlin Flow doesn't ship throttleFirst in stdlib; roll your own:

fun <T> Flow<T>.throttleFirst(windowDuration: Duration): Flow<T> = flow {
var lastEmission = 0L
collect { value ->
val now = System.currentTimeMillis()
if (now - lastEmission >= windowDuration.inWholeMilliseconds) {
lastEmission = now
emit(value)
}
}
}

// Usage — button protection against rapid taps
val clicks = remember { MutableSharedFlow<Unit>() }
LaunchedEffect(clicks) {
clicks
.throttleFirst(500.milliseconds)
.collect { viewModel.submit() }
}
Button(onClick = { scope.launch { clicks.emit(Unit) } }) { Text("Submit") }

sample — throttle to fixed intervals

// Emit the most recent value every 500 ms, regardless of input rate
slideChanges.sample(500)

10. Mutex — exclusive access

Use when you need to protect a non-thread-safe resource:

class TokenStore(
private val dataStore: DataStore<Preferences>
) {
private val refreshMutex = Mutex()

suspend fun ensureValidToken(): String = refreshMutex.withLock {
val current = dataStore.data.first()[TOKEN_KEY]
if (current != null && !isExpired(current)) return@withLock current

val refreshed = api.refreshToken()
dataStore.edit { it[TOKEN_KEY] = refreshed }
refreshed
}
}

Without the mutex, simultaneous callers both detect the expired token, both refresh, and one overwrites the other. With it, only one refresh runs — others wait and see the fresh token.

Prefer StateFlow.update { } when possible

Most "protect mutable state" needs are better served by StateFlow + update { }, which is atomic without holding a lock:

private val _state = MutableStateFlow(State.Initial)
fun transition(event: Event) {
_state.update { current -> reducer(current, event) } // atomic, lock-free
}

Use Mutex when you need to serialize side effects, not just state updates.


11. Semaphore — bounded concurrency

class BoundedApi(private val api: Api, maxInFlight: Int = 8) {
private val semaphore = Semaphore(maxInFlight)

suspend fun call(request: Request): Response = semaphore.withPermit {
api.call(request)
}
}

Caps the number of in-flight API calls. Prevents OOM from unbounded fan-out.


12. Deadline / Timeout

suspend fun fetchWithinBudget(): Result<User> = withTimeoutOrNull(5.seconds) {
api.fetchUser(id)
}?.let { Result.success(it) } ?: Result.failure(TimeoutException())

withTimeout throws; withTimeoutOrNull returns null. Always budget network calls — an unbounded await() is a bug waiting to happen.

Per-operation vs end-to-end deadlines

// Per-step — each call has 5s
user = withTimeout(5.seconds) { api.fetchUser(id) }
orders = withTimeout(5.seconds) { api.fetchOrders(user.id) }

// End-to-end — the entire composite must finish in 5s total
withTimeout(5.seconds) {
user = api.fetchUser(id)
orders = api.fetchOrders(user.id)
}

For user-facing flows, end-to-end deadlines match user expectations better.


13. Backpressure handling in Flow

// Default — upstream suspends when downstream slow
sensorStream.collect { processSlowly(it) }

// Buffer — let upstream run ahead; drop old if full
sensorStream
.buffer(capacity = 64, onBufferOverflow = BufferOverflow.DROP_OLDEST)
.collect { processSlowly(it) }

// Conflate — keep only the latest value
sensorStream
.conflate()
.collect { processSlowly(it) }

// Sample — emit at a fixed cadence
sensorStream
.sample(100)
.collect { processSlowly(it) }

Choose based on the data semantics: sensor readings use conflate, events use buffer, high-rate telemetry uses sample.


14. Structured supervision

Normally, if any child throws, the parent and siblings cancel:

coroutineScope {
launch { loadProfile() } // fails
launch { loadOrders() } // cancelled
launch { loadWishlist() } // cancelled
}

Use supervisorScope when one child's failure shouldn't kill siblings:

supervisorScope {
launch { runCatching { loadProfile() }.onFailure { log(it) } }
launch { runCatching { loadOrders() }.onFailure { log(it) } }
launch { runCatching { loadWishlist() }.onFailure { log(it) } }
}

Each side-panel load is independent; one failure doesn't break the page.


15. Cancellation-safe cleanup with NonCancellable

When you MUST complete an operation even after cancellation:

suspend fun updateAndCleanup(data: Data) {
try {
save(data)
} finally {
withContext(NonCancellable) {
oldData.delete() // must run even if parent cancelled
analytics.track("data_update", data.id)
}
}
}

Combining patterns — a real-world example

A robust API call for a payment:

class RobustPaymentGateway @Inject constructor(
private val api: PaymentApi,
private val breaker: CircuitBreaker,
@IoDispatcher private val io: CoroutineDispatcher
) : PaymentGateway {

override suspend fun charge(amount: Money, method: PaymentMethod): PaymentReceipt =
withContext(io) {
withTimeout(15.seconds) { // deadline
breaker.call { // circuit breaker
withRetry( // retry with backoff
maxAttempts = 3,
shouldRetry = { it is IOException }
) { attempt ->
api.charge(amount, method, idempotencyKey = stableKey(amount, method, attempt))
}
}
}
}
}
  • withContext(io) — off the main thread
  • withTimeout(15.seconds) — deadline
  • breaker.call { } — fail fast if downstream is down
  • withRetry(...) — transient failures retry with exponential backoff
  • idempotencyKey — safe to retry; server dedupes

This is the production pattern. Miss any layer and you get hangs, thundering herd, duplicate charges, or cascading failures.


Common anti-patterns

Anti-patterns

What breaks in production

  • GlobalScope.launch — leaks, ignores lifecycle
  • try { } catch (Throwable) — swallows CancellationException
  • Unbounded fan-out (1000 parallel API calls)
  • No retry budget — retries retry forever
  • No deadline — one call hangs the whole flow
  • No jitter — thundering herd on retry
Best practices

Production patterns

  • Launch from structured scopes (viewModelScope, etc)
  • Re-throw CancellationException explicitly
  • Semaphore or flatMapMerge(concurrency=N) for parallelism
  • maxAttempts + exponential backoff + jitter
  • withTimeout on every external call
  • Random jitter in backoff (±30%)

Key takeaways

Practice exercises

  1. 01

    Bounded fan-out

    Fetch 100 product details in parallel, capped at 10 concurrent calls using a Semaphore + awaitAll.

  2. 02

    Circuit breaker

    Wrap your payments API in a CircuitBreaker with threshold=3 and open=60s. Verify it fails fast after 3 consecutive IOExceptions.

  3. 03

    Debounced search

    Build a search ViewModel using debounce(300) + distinctUntilChanged + flatMapLatest. Confirm only the final query hits the network.

  4. 04

    Retry with jitter

    Implement withRetry with exponential backoff and ±30% jitter. Write a test that fails 2 times then succeeds, verifying delays are spread.

  5. 05

    Combine them all

    Build a RobustApi wrapper combining withContext(io) + withTimeout + CircuitBreaker + Retry. Use it for one critical endpoint.

Continue reading