最近学了下Kotlin Flow,顺便在项目中进行了实践,做一下总结。
按顺序发出多个值的数据流。本质就是一个生产者消费者模型,生产者发送数据给消费者进行消费。
冷流:
当执行collect的时候(也就是有消费者的时候),生产者才开始发射数据流。生产者与消费者是一对一的关系。当生产者发送数据的时候,对应的消费者才可以收到数据。热流:
不管有没有执行collect(也就是不管有没有消费者),生产者都会发射数据流到内存中。生产者与消费者是一对多的关系。当生产者发送数据的时候,多个消费者都可以收到数据简单的列表显示场景,可以使用onStart
,onEmpty
,catch
,onCompletion
等回调操作符,监听数据流的状态,显示相应的加载状态UI。
onStart
:在数据发射之前触发,onStart所在的线程,是数据产生的线程onCompletio
n:在数据流结束时触发,onCompletion所在的线程,是数据产生的线程onEmpty
:当数据流结束了,缺没有发出任何元素的时候触发。catch
:数据流发生错误的时候触发flowOn
:指定上游数据流的CoroutineContext,下游数据流不会受到影响private fun coldFlowDemo() {
//创建一个冷流,在3秒后发射一个数据
val coldFlow = flow<Int> {
delay(3000)
emit(1)
}
lifecycleScope.launch(Dispatchers.IO) {
coldFlow.onStart {
Log.d(TAG, "coldFlow onStart, thread:${Thread.currentThread().name}")
mBinding.progressBar.isVisible = true
mBinding.tvLoadingStatus.text = "加载中"
}.onEmpty {
Log.d(TAG, "coldFlow onEmpty, thread:${Thread.currentThread().name}")
mBinding.progressBar.isVisible = false
mBinding.tvLoadingStatus.text = "数据加载为空"
}.catch {
Log.d(TAG, "coldFlow catch, thread:${Thread.currentThread().name}")
mBinding.progressBar.isVisible = false
mBinding.tvLoadingStatus.text = "数据加载错误:$it"
}.onCompletion {
Log.d(TAG, "coldFlow onCompletion, thread:${Thread.currentThread().name}")
mBinding.progressBar.isVisible = false
mBinding.tvLoadingStatus.text = "加载完成"
}
//指定上游数据流的CoroutineContext,下游数据流不会受到影响
.flowOn(Dispatchers.Main)
.collect {
Log.d(TAG, "coldFlow collect:$it, thread:${Thread.currentThread().name}")
}
}
}
比如上面的例子。使用flow构建起函数,创建一个冷流,3秒后发送一个值到数据流中。使用onStart``,onEmpty
,catch
,onCompletion
操作符,监听数据流的状态。日志输出:
coldFlow onStart, thread:main
coldFlow onCompletion, thread:main
coldFlow collect:1, thread:DefaultDispatcher-worker-1
在实际的开发场景中,经常会将一些网络数据保存到本地,下次加载数据的时候,优先使用本地数据,再使用网络数据。
但是本地数据和网络数据的加载完成时机不一样,所以可能会有下面几种场景。
本地数据比网络数据先加载完成:那先使用本地数据,再使用网络数据
网络数据比本地数据先加载完成:
网络数据加载成功,那只使用网络数据即可,不需要再使用本地数据了。
网络数据加载失败,可以继续尝试使用本地数据进行兜底。
本地数据和网络数据都加载失败:通知上层数据加载失败
将上面的逻辑进行简单封装成一个基类,CacheRepositity
。
相应的子类,只需要实现两个方法即可。
CResult:
代表加载结果,Success
或者 Error
。fetchDataFromLocal()
,实现本地数据读取的逻辑fetchDataFromNetWork()
,实现网络数据获取的逻辑abstract class CacheRepositity<T> {
private val TAG = "CacheRepositity"
fun getData() = channelFlow<CResult<T>> {
supervisorScope {
val dataFromLocalDeffer = async {
fetchDataFromLocal().also {
Log.d(TAG,"fetchDataFromLocal result:$it , thread:${Thread.currentThread().name}")
//本地数据加载成功
if (it is CResult.Success) {
send(it)
}
}
}
val dataFromNetDeffer = async {
fetchDataFromNetWork().also {
Log.d(TAG,"fetchDataFromNetWork result:$it , thread:${Thread.currentThread().name}")
//网络数据加载成功
if (it is CResult.Success) {
send(it)
//如果网络数据已加载,可以直接取消任务,就不需要处理本地数据了
dataFromLocalDeffer.cancel()
}
}
}
//本地数据和网络数据,都加载失败的情况
val localData = dataFromLocalDeffer.await()
val networkData = dataFromNetDeffer.await()
if (localData is CResult.Error && networkData is CResult.Error) {
send(CResult.Error(Throwable("load data error")))
}
}
}
protected abstract suspend fun fetchDataFromLocal(): CResult<T>
protected abstract suspend fun fetchDataFromNetWork(): CResult<T>
}
sealed class CResult<out R> {
data class Success<out T>(val data: T) : CResult<T>()
data class Error(val throwable: Throwable) : CResult<Nothing>()
}
写个TestRepositity
,实现CacheRepositity
的抽象方法。
通过delay延迟耗时来模拟各种场景,观察日志的输出顺序。
private fun cacheRepositityDemo(){
val repositity=TestRepositity()
lifecycleScope.launch {
repositity.getData().onStart {
Log.d(TAG, "TestRepositity: onStart")
}.onCompletion {
Log.d(TAG, "TestRepositity: onCompletion")
}.collect {
Log.d(TAG, "collect: $it")
}
}
}
复制代码
本地数据比网络数据加载快
class TestRepositity : CacheRepositity<String>() {
override suspend fun fetchDataFromLocal(): CResult<String> {
delay(1000)
return CResult.Success("data from fetchDataFromLocal")
}
override suspend fun fetchDataFromNetWork(): CResult<String> {
delay(2000)
return CResult.Success("data from fetchDataFromNetWork")
}
}
delay1
秒,网络加载delay2秒collec
t 执行两次,先收到本地数据,再收到网络数据。onStart
fetchDataFromLocal result:Success(data=data from fetchDataFromLocal) , thread:main
collect: Success(data=data from fetchDataFromLocal)
fetchDataFromNetWork result:Success(data=data from fetchDataFromNetWork) , thread:main
collect: Success(data=data from fetchDataFromNetWork)
onCompletion
网络数据比本地数据加载快
class TestRepositity : CacheRepositity<String>() {
override suspend fun fetchDataFromLocal(): CResult<String> {
delay(2000)
return CResult.Success("data from fetchDataFromLocal")
}
override suspend fun fetchDataFromNetWork(): CResult<String> {
delay(1000)
return CResult.Success("data from fetchDataFromNetWork")
}
}
模拟数据
:本地加载delay 2秒,网络加载delay 1秒日志输出
:collect 只执行1次,只收到网络数据。onStart
fetchDataFromNetWork result:Success(data=data from fetchDataFromNetWork) , thread:main
collect: Success(data=data from fetchDataFromNetWork)
onCompletion
网络数据加载失败,使用本地数据
class TestRepositity : CacheRepositity<String>() {
override suspend fun fetchDataFromLocal(): CResult<String> {
delay(2000)
return CResult.Success("data from fetchDataFromLocal")
}
override suspend fun fetchDataFromNetWork(): CResult<String> {
delay(1000)
return CResult.Error(Throwable("fetchDataFromNetWork Error"))
}
}
onStart
fetchDataFromNetWork result:Error(throwable=java.lang.Throwable: fetchDataFromNetWork Error) , thread:main
fetchDataFromLocal result:Success(data=data from fetchDataFromLocal) , thread:main
collect: Success(data=data from fetchDataFromLocal)
onCompletion
网络数据和本地数据都加载失败
class TestRepositity : CacheRepositity<String>() {
override suspend fun fetchDataFromLocal(): CResult<String> {
delay(2000)
return CResult.Error(Throwable("fetchDataFromLocal Error"))
}
override suspend fun fetchDataFromNetWork(): CResult<String> {
delay(1000)
return CResult.Error(Throwable("fetchDataFromNetWork Error"))
}
}
onStart
fetchDataFromNetWork result:Error(throwable=java.lang.Throwable: fetchDataFromNetWork Error) , thread:main
fetchDataFromLocal result:Error(throwable=java.lang.Throwable: fetchDataFromLocal Error) , thread:main
collect: Error(throwable=java.lang.Throwable: load data error)
onCompletion
在实际的开发场景中,经常一个页面的数据,是需要发起多个网络请求之后,组合数据之后再进行显示。比如类似这种页面,3种数据,需要由3个网络请求获取得到,然后再进行相应的显示。
实现目标:
可以合并多个不同的 Flow 数据流,生成一个新的流。只要其中某个子 Flow 数据流有产生新数据的时候,就会触发 combine 操作,进行重新计算,生成一个新的数据。
class HomeViewModel : ViewModel() {
//暴露给View层的列表数据
val list = MutableLiveData<List<String?>>()
//多个子Flow,这里简单都返回String,实际场景根据需要,返回相应的数据类型即可
private val bannerFlow = MutableStateFlow<String?>(null)
private val channelFlow = MutableStateFlow<String?>(null)
private val listFlow = MutableStateFlow<String?>(null)
init {
//使用combine操作符
viewModelScope.launch {
combine(bannerFlow, channelFlow, listFlow) { bannerData, channelData, listData ->
Log.d("HomeViewModel", "combine bannerData:$bannerData,channelData:$channelData,listData:$listData")
//只要子flow里面的数据不为空,就放到resultList里面
val resultList = mutableListOf<String?>()
if (bannerData != null) {
resultList.add(bannerData)
}
if (channelData != null) {
resultList.add(channelData)
}
if (listData != null) {
resultList.add(listData)
}
resultList
}.collect {
//收集combine之后的数据,修改liveData的值,通知UI层刷新列表
Log.d("HomeViewModel", "collect: ${it.size}")
list.postValue(it)
}
}
}
fun loadData() {
viewModelScope.launch(Dispatchers.IO) {
//模拟耗时操作
async {
delay(1000)
Log.d("HomeViewModel", "getBannerData success")
bannerFlow.emit("Banner")
}
async {
delay(2000)
Log.d("HomeViewModel", "getChannelData success")
channelFlow.emit("Channel")
}
async {
delay(3000)
Log.d("HomeViewModel", "getListData success")
listFlow.emit("List")
}
}
}
}
HomeViewModel
View层使用
private fun flowCombineDemo() {
val homeViewModel by viewModels<HomeViewModel>()
homeViewModel.list.observe(this) {
Log.d("HomeViewModel", "observe size:${it.size}")
}
homeViewModel.loadData()
}
简单的创建一个 ViewModel
,observe
列表数据对应的LiveData
。
通过输出的日志发现,触发数据加载之后,每次子 Flow 流生产数据的时候,都会触发一次 combine 操作,生成新的数据。
日志输出:
combine bannerData:null,channelData:null,listData:null
collect: 0
observe size:0
getBannerData success
combine bannerData:Banner,channelData:null,listData:null
collect: 1
observe size:1
getChannelData success
combine bannerData:Banner,channelData:Channel,listData:null
collect: 2
observe size:2
getListData success
combine bannerData:Banner,channelData:Channel,listData:List
collect: 3
observe size:3
具体场景,具体分析。刚好这几个场景,配合Flow进行使用,整体实现也相对简单了一些。
Copyright© 2013-2020
All Rights Reserved 京ICP备2023019179号-8