Pure Kotlin Library which adds one sealed class
named Container and a couple of helper methods and classes for
Kotlin Flow and for converting async data loaders into reactive Flows.
Container is a simple class that represents the status of async operation:
Container.Pending- data is loadingContainer.Completed- data loading is finished:Container.Success- data has been loaded successfullyContainer.Error- loading has been failed with error
Both Container.Success and Container.Error extends Container.Completed.
Just add the following line to your build.gradle file:
implementation "com.elveum:container:2.0.0-beta10"
Click here.
The library contains:
Container<T>class - represents the status of async operation (Pending,Error,Success)- A couple of helper extension methods for easier working (mapping, filtering, etc.) with
Kotlin Flows containing 
Container<T>instances - Two subjects:
FlowSubject- represents a simple finite flow which emissions can be controlled outsideLazyFlowSubject- provides a convenient way of transforming suspend functions into Kotlin Flow with the possibility to update values, replace loaders, load multiple values, cache latest loaded value and with loading values only on demand
 LazyCachewhich can hold multiple instances ofLazyFlowSubjectStateFlowextensions:stateMapfunction which can convertStateFlow<T>intoStateFlow<R>(originmapfunction convertsStateFlow<T>into more genericFlow<R>)combineStatesfunction which can merge 2 or moreStateFlow<*>instances into oneStateFlow<R>instance (origincombinefunction merges flows into more genericFlow<R>, but not intoStateFlow<R>)
ContainerReducerthat can observe values from input flows, convert them into a state, and additionally it has an ability to update state manually at any time
Container is a simple class which represents the status of async operation:
Container.Pending- data is loadingContainer.Success- data has been loaded successfullyContainer.Error- loading has been failed with error
You can instantiate all these subtypes by using pendingContainer(), successContainer(),
and errorContainer() functions.
There are a couple of methods which can simplify code working with such statuses:
fold,foldDefault,foldNullabletransformmap,mapExceptioncatch,catchAllgetOrNull,exceptionOrNull,unwrap
Flow<Container<T>>.containerMap()converts the flow of typeContainer<T>into a flow of typeContainer<R>StateFlow<Container<T>>.containerStateMap()converts StateFlow of typeContainer<T>into a flow of typeContainer<R>Flow<Container<T>>.containerFilter()filters allContainer.Success<T>values by a given predicate
Other extension functions:
Flow<Container<T>>.containerFilterNotFlow<Container<T>>.containerMapLatestFlow<Container<T>>.containerFoldFlow<Container<T>>.containerFoldDefaultFlow<Container<T>>.containerFoldNullableFlow<Container<T>>.containerTransformFlow<Container<T>>.containerCatchFlow<Container<T>>.containerCatchAll
Also, there are the following type aliases:
ListContainer<T>=Container<List<T>>ContainerFlow<T>=Flow<Container<T>>ListContainerFlow<T>=Flow<Container<List<T>>>
Combine container flows:
val flow1: Flow<Container<String>> = ...
val flow2: Flow<Container<Int>> = ...
val combinedFlow: Flow<Container<String>> = 
    combineContainerFlows(flow1, flow2) { string, number ->
       "$string,$number"
    }Combine a container flow with other flows:
val flow1: Flow<Container<String>> = ...
val flow2: Flow<Int> = ...
val combinedFlow: Flow<Container<String>> = flow1
    .containerCombineWith(flow2) { string, number ->
        "$string,$number"
    }Container Reducers allow you to transform input flows into a StateFlow with ability to update state manually.
Simple example (1 flow, without converting into another state class).
interface UserRepository {
    fun getUser(): Flow<Container<User>>
}
// create a reducer
val reducer = userRepository.getUser().containerToReducer(
    scope = viewModelScope,
    started = SharingStarted.Lazily,
)
// observe users via StateFlow:
val stateFlow: StateFlow<Container<User>> = reducer.stateFlow
// update manually if needed:
reducer.updateValue { copy(username = "new-username") }Example #2: convert input data into a state class.
data class State(
    val user: User,
    val otherData: String = "",
)
val reducer = userRepository.getUser().containerToReducer(
    initialState = ::State,
    nextState = State::copy,
    scope = viewModelScope,
    started = SharingStarted.Lazily,
)
// observe final state:
val stateFlow: StateFlow<Container<State>> = reducer.stateFlow
// update state manually if needed:
reducer.updateState { copy(otherData = "otherData") }Example #3: convert 2 or more input flows into a state class.
interface Repository {
    fun getUser(): Flow<Container<User>>
    fun getSettings(): Flow<Container<Settings>>
}
data class State(
    val user: User,
    val settings: Settings,
    val otherData: String = "",
)
val reducer = combineContainersToReducer(
    repository.getUser(),
    repository.getSettings(),
    initialState = ::State,
    nextState = State::copy,
    scope = viewModelScope,
    started = SharingStarted.WhileSubscribed(stopTimeoutMillis = 5000)
)
// observe a final state via StateFlow:
val stateFlow: StateFlow<Container<State>> = reducer.stateFlow
// update the state manually if needed:
reducer.updateState { copy(otherData = "updated-data") }Subjects are classes controlling flow emissions (name is taken from Reactive Streams)
FlowSubject represents a finite Flow which emission is controlled outside
(like StateFlow and SharedFlow). The FlowSubject holds the latest value but
there are differences from StateFlow:
FlowSubjectis a finite flow and it can be completed by usingonCompleteandonErrormethodsFlowSubjectdoesn't need a starting default valueFlowSubjectdoesn't hold the latest value if it has been completed with error
Usage example:
val flowSubject = FlowSubject.create<String>()
flowSubject.onNext("first")
flowSubject.onNext("second")
flowSubject.onComplete()
flowSubject.listen().collect {
  // ...
}LazyFlowSubject<T> provides a mechanism of converting a load
function into a Flow<Container<T>>.
Features:
- The load function is usually executed only once (except #6 and #8)
 - The load function is executed only when at least one subscriber starts collecting the flow
 - The load function can emit more than one value
 - The load function is cancelled when the last subscriber stops collecting the flow after timeout (default timeout = 1sec)
 - The latest result is cached, so any new subscriber can receive the most actual loaded value without triggering the load function again and again
 - There is a timeout (default value is 1sec) after the last subscriber stops collecting the flow. When timeout expires, the cached value is cleared so any further subscribers will execute the load function again
 - The flow can be collected by using 
listen()method - You can replace the load function at any time by using the following methods:
newLoad- assign a new load function which can emit more than one value. This method also returns a separate flow which differs from the flow returned bylisten(): it emits only values emitted by a new load function and completes as soon as a new load function completesnewAsyncLoad- the same asnewLoadbut it returns Unit immediatelynewSimpleLoad- assign a new load function which can emit only one value. This is a suspend function and it waits until a new load function completes and returns its result (or throws an exception)newSimpleAsyncLoad- the same asnewSimpleLoadbut it doesn't wait for load results and returns immediately
 - Also you can use 
updateWithmethod in order to cancel any active loader and place your own value immediately to the subject. The previous loader function will be used again if you callreload()or if cache has been expired. 
Usage example:
class ProductRepository(
  private val productsLocalDataSource: ProductsLocalDataSource,
  private val productsRemoteDataSource: ProductsRemoteDataSource,
) {
    private val productsSubject = LazyFlowSubject.create {
        val localProducts = productsLocalDataSource.getProducts()
        if (localProducts != null) emit(localProducts)
        val remoteProducts = productsRemoteDataSource.getProducts()
        productsLocalDataSource.saveProducts(remoteProducts)
        emit(remoteProducts)
    }
    // ListContainerFlow<T> is an alias to Flow<Container<List<T>>>
    fun listenProducts(): ListContainerFlow<Product> {
        return productsSubject.listen()
    }
    fun reload() {
        productsSubject.reloadAsync()
    }
}You can access an additional field named loadTrigger within the
LazyFlowSubject.create { ... } block. Depending on its value, you can change the load
logic. For example, you can skip loading data from the local cache if the loading process has
been initiated by reload call:
private val productsSubject = LazyFlowSubject.create {
    if (loadTrigger != LoadTrigger.Reload) {
        val localProducts = productsLocalDataSource.getProducts()
        if (localProducts != null) emit(localProducts)
    }
    val remoteProducts = productsRemoteDataSource.getProducts()
    productsLocalDataSource.saveProducts(localProducts)
    emit(remoteProducts)
}Optionally you can assign a SourceType to any Container.Success value
just to let them know about an actual source where data arrived from.
// in loader function:
val subject = LazyFlowSubject.create {
    val remoteProducts = productsRemoteDataSource.getProducts()
    emit(remoteProducts, RemoteSourceType)
}
// in `Container.Success()` directly:
subject.updateWith(Container.Success("hello", FakeSourceType))
// in `Container.updateIfSuccess`:
subject.updateIfSuccess(ImmediateSourceType) { oldValue ->
    oldValue.copy(isFavorite = true)
}Source types can be accessed via Container.Success instance:
subject.listen()
    .filterIsInstance<Container.Success<String>>()
    .collectLatest { successContainer ->
        val value = successContainer.value
        val sourceType = successContainer.source
        println("$value, isRemote = ${sourceType == RemoteSourceType}")
    }All success and error containers have an additional property named reloadFunction.
By default, it is empty, but optionally you can configure the LazyFlowSubject.listen()
call with additional ContainerConfiguration(emitReloadFunction = true) argument. In this
case, calling reloadFunction causes a full reload of the corresponding LazeFlowSubject
instance (similar to call of LazyFlowSubject.reloadAsync()).
You can re-assign the reloading function by:
- 
creating a new container:
successContainer(value, reloadFunction = customReloadFunction) - 
mapping an existing container using
updatefunction:Container<T>.update { reloadFunction = { println("Reloading...") reloadFunction(it) // call origin reload function if needed } }
 - 
mapping an existing container in a Kotlin Flow:
val flow: Flow<Container<String>> = getFlow() return flow .containerUpdate { println("Reloading...") reloadFunction(it) // call origin reload function if needed }
 
LazyCache is a store of multiple LazyFlowSubject instances. It allows you
defining listeners and loader functions with additional arguments.
val lazyCache = LazyCache.create<Long, User> { id ->
    val localUser = localUsersDataSource.getUserById(id)
    if (localUser != null) {
        emit(localUser)
    }
    val remoteUser = remoteUsersDataSource.getUserById(id)
    localUsersDataSource.save(remoteUser)
    emit(remoteUser)
}
fun getUserById(id: Long): Flow<Container<User>> {
    return lazyCache.listen(id)
}