top of page

Crash Course Kotlin Flows, Channels and Shared/State Flows



Coroutines became extremely popular in the Kotlin world, where Rx was used everyone is now talking about suspendable functions alias Coroutines.

In Rx functions are modeled as streams, basically everything can become a stream and you can perform operations on it, which would return a new stream while in Coroutine the paradigm is to write asynchronous code in a synchronous way. Let’s take an example where we fetch some user data and only are interested in the id

--- RX style ---
fetchUserData()
  .map {  
     it.id
  }.subscribe()
--- Coroutine style ---
suspend fun fetchData(): String = fetchUserData().id
launch(Dispatchers.IO) { fetchData() }

It seems that the “writing asynchronous code in a synchronous way” is a nice idea but it turns out that the Jetbrains folks are indeed missing the concept of streams. Jetbrains came out with their own implementation of streams, which are heavily inspired by Rx . This article will give a brief overview about the two main concepts and a third newcomer, which seeks to replace one of the first two concepts, namely: Flows, Channels and Shared/State-Flows.


Flows

A Flow object is what Observables / Flowables are in the Rx world, they represent a stream of specific values.


val myIntFlow: Flow<Int> = flow { emit(1) }

can be translated to

val myIntStream: Observable<Int> = Observable.create { it.emit(1) }

The Flow starts emitting data when collect() is called on the stream, so that

// Flow way myIntFlow.collect{ intValue -> /* intValue = 1 / }// Rx waymyIntStream.subscribe { intValue / intValue = 1 */ }

You can subscribe to the flow object or you can perform operations on the flow objects which then return another flow object. Because a flow starts producing data when you start collecting them we can conclude that flows are usually cold! This means, that a flow only emits data when it is collected (or consumed).

Update - 26th April 2021: — A flow can now also be a hot flow —

Before collecting a typical flow, you can now call

// This will keep the flow alive as long as the attached scope is not canceled. You can basically re-use that flow and keep emitting items. In other words, shareIn transforms that flow into a SharedFlow, yet the type reflected by the API can be a simple `Flow`myNormalFlow.shareIn(coroutineScope, SharingStarted.Lazily

and this will transform a cold flow into a hot flow (a SharedFlow), because it can now emit as long as the attached coroutine scope is alive. That’s why we can not conclude anymore that when an API returns a flow, that it really is a cold flow.

So we just talked about flow, but what the heck are those things called channels and why do we need them? Both can be used for streams, a flow emits where a channel can receive and emit data. The main difference between flows and channels is this: Flows are usually cold and channels are hot. In other words, when using a flow the data is produced within the stream while in channels the data is produced outside of the stream.

Here’s an example of a cold stream using flow

val dataFlow = flow {    // code block is only executed when flow    // is being collected    val data = dataSource.fetchData()    emit(data)}...dataFlow.collect { ... }

And here’s the same with a channel

// the channel lives and can receive and send// dataval dataChannel = Channel<Data>()suspend fun fetchData() {    val data = dataSource.fetchData()    dataChannel.send(data)}

One can call receive() on the channel to obtain the data that was sent through the channel by another suspended function. A channel is like a queue, it can receive data and it can be then consumed by another component. Additionally, a channel can contain a buffer and can block itself from receiving or sending further data. A channel buffers the data until someone calls receive() on it. A channel can continuously consume data until close gets called on it. There are many forms of Channels such as BroadcastChannel and ConflatedBroadcastChannel that support different but similar use cases and have the same concept. A regular Channel can only have one active listener / consumer at the same time, unlikeBroadcastChannel which supports multiple listeners at once. Also, channels have a more complex API and are considered as low-level primitives, if you’re in need of a hot flow then consider using a (Mutable) Shared/StateFlow. Read more here about what channels bring to the table.


MutableSharedFlow / MutableStateFlow


There are these new things called MutableSharedFlow and MutableStateFlow . Don’t be fooled by the naming, even though they containFlow in their name, they are unlike the original flows, hot streams. They were introduced to deprecate BroadcastChannel as their API is way simpler (see reference) These Flows can live without having an active consumer, in other words, the data is produced outside of the stream and then passed to the flow, which is a clear indicator that we’re having a hot stream here.

// Constructor takes no valueval mySharedFlow = MutableSharedFlow<Int>()// Constructor takes a valueval myStateFlow = MutableStateFlow<Int>(0)...mySharedFlow.emit(1)myStateFlow.emit(1)

As you see, the main difference between a SharedFlow and a StateFlow is that a StateFlow takes a default value through the constructor and emits it immediately when someone starts collecting, while a SharedFlow takes no value and emits nothing by default. This is a very important difference as this forces you to pay attention whether you model your data as a cold/hot stream and whether your data is a state (use StateFlow then) or an event (use SharedFlow). As an example, a state could be the visibility of your UI component and it always has a value (shown/hidden) while an event can only be triggered if and only if one or multiple preconditions are fulfilled.


State vs Event

To have a better understanding of when to use MutableSharedFlow and when to use MutableStateFlow let’s look at following scenario:

  1. User clicks a button

  2. Remote data is fetched

  3. User gets navigated to another screen

We could model the navigation (3.) now as a state or as an event. Let’s model the navigation as a state first. For this we’ll assume that there is a general NavigationState class, an EmptyNavigationState and ViewNavigationState and a MutableStateFlow that emits the state

sealed class NavigationState data class EmptyNavigationState: NavigationState()data class ViewNavigationState: NavigationState()// A state always has a value, in this case it is the empty stateval navigationState = MutableStateFlow<NavigationState>(EmptyNavigationState())// ... User button click happened, remote data is fetched, and now // we want to update the state. navigationState.emit(ViewNavigationState(...navigation arguments...))

When navigationState first gets collected then the consumer will receive an EmptyNavigationState object by default and later on a ViewNavigationState, it is to the consumer to handle now proper state updates (Usually within in a big when{..} statement). Because of that individual state handling MVI (ModelViewIntent) / Redux / Flux are common architecture patterns, as you have one big immutable state (Going into detail about these patterns would break the scope of the article, let me know in the comments if you would like to have an extra article about architecture patterns and Coroutines).

Now we’ll model the same scenario only with navigation as an event


// No default value, as this is an eventval navigationEvent: MutableSharedFlow()// ... User button click happened, remote data is fetched, and now // we want to emit an eventnavigationEvent.emit(NavigationTarget(..arguments))

Whoever collects the flow will only receive a value when the User clicked a button and remote data is successfully fetched. We don’t have to write that big when block anymore to handle multiple state updates, such as the default EmptyNavigationState . Since we’re still using a hot flow, MVVM (Model-View-ViewModel) is a common architecture pattern, as we’re emitting values and don’t know anything about the consumer or even if there is one.

We could also use cold flows and stick to simple MVP (Model-View-Presenter) but that’s another topic :-)


The bottom line is that a procedure can be modeled as a state or as an event. I recommend you to always pick the simpler way that suits your application the best. I’d like to quote Terry Davis, founder of TempleOS and to me one of the best developers that had walked on this planet

An idiot admires complexity. A genius admires simplicity.

Take this quote with a grain of salt though, some projects are complex by its nature or have grown uncoordinated and are difficult to operate.

Comparison to Rx

Let’s compare the concepts we just talked about with we know from Rx:

Flow = (cold) Flowable / Observable / Single

Channel = Subjects

StateFlow = BehaviorSubjects (Always emits something)

SharedFlow = PublishSubjects (Starts with no value)

primitive suspend functions = Single / Maybe / Completable

I wrote these things down as I was studying a new code base that was relying heavily on the concepts we just talked about. I would recommend you to pay a lot of attention when using these new tools such as does your stream really need to be hot when you really only want to get data when you actively ask for it (lazily). Probably, a cold stream would be the better decision then. If you build your API around flow and want to introduce it on every layer of your system, then avoid using a StateFlow / SharedFlow but rather have your layers return a flow, for example


DO:interface XRepository {  fun getX(): Flow<X>}DON'T:interface XRepository {  val xFlow: Flow<X>  suspend fun getX()   // get is also the wrong naming in this case}

Before we end I’d like to say that the flow library is still very new, even though flow and Shared + State flows were just made stable, their usage still differs a lot in various projects. I’ve seen channels in some big projects, and now they are being deprecated. Many people are porting their whole Rx API over to flows, but I would be very careful here. The concepts of flows and Rx are similar yet very different, as an example: Rx has extended testing utilities, such as TestObservers but it is very hacky to test a hot flow (Within your test you need to launch a new scope where you start collecting the flow, before you are performing the method you’re about to test).While flows support suspendable functions / coroutines out of the box, you might not even need Coroutines at all if you’re doing fine with Rx already. Take into consideration that your team will need to learn a new API.


Thank you for reading


11 views0 comments

Recent Posts

See All

Comments


bottom of page