RxHttp + Flow 三步搞定任意请求

1、前言


RxHttp 在之前的版本中,已提供了RxHttp + Await协程RxHttp + RxJava两种请求方式,这一次,RxHttp 无缝适配了 Flow , RxHttp + Flow协程配合使用,使得请求更加简单,至此,RxHttp已集齐3架马车(Flow、Await、RxJava),且每架马车皆遵循请求三部曲,掌握请求三部曲,就掌握了RxHttp的精髓。


gradle依赖


1、必选


jitpack添加到项目的build.gradle文件中,如下:


allprojects {
repositories {
maven { url "https://jitpack.io" }
}
}

//使用kapt依赖rxhttp-compiler时必须
apply plugin: 'kotlin-kapt'

android {
//必须,java 8或更高
compileOptions {
sourceCompatibility JavaVersion.VERSION_1_8
targetCompatibility JavaVersion.VERSION_1_8
}
}

dependencies {
implementation 'com.squareup.okhttp3:okhttp:4.9.1'
implementation 'com.github.liujingxing.rxhttp:rxhttp:2.7.0'
kapt 'com.github.liujingxing.rxhttp:rxhttp-compiler:2.7.0' //生成RxHttp类,纯Java项目,请使用annotationProcessor代替kapt
}

2、可选


//非必须,根据自己需求选择 RxHttp默认内置了GsonConverter
implementation 'com.github.liujingxing.rxhttp:converter-fastjson:2.7.0'
implementation 'com.github.liujingxing.rxhttp:converter-jackson:2.7.0'
implementation 'com.github.liujingxing.rxhttp:converter-moshi:2.7.0'
implementation 'com.github.liujingxing.rxhttp:converter-protobuf:2.7.0'
implementation 'com.github.liujingxing.rxhttp:converter-simplexml:2.7.0'

2、RxHttp + Flow 使用


2.1、请求三部曲


用过RxHttp的同学知道,RxHttp发送任意请求皆遵循请求三部曲,如下:
rxhttp_flow_chart.jpg
代码表示


 RxHttp.get("/service/...")  //第一步,确定请求方式,可以选择postForm、postJson等方法
.add("key", "value")
.toFlow<Student>() //第二步,调用toFlow方法并输入泛型类型,拿到Flow对象
.catch {
//异常回调
val throwable = it
}.collect { //第三步,调用collect方法发起请求
//成功回调
val student = it
}

协程请求三部曲详解




  • 第一步,选择get、postForm、postJson、postBody等方法来确定请求方式,随后便可通过add、addFile、addHeader等方法来添加参数、文件、请求头等信息




  • 第二步,调用toFlow/toFlowXxx系列方法,并传入泛型类型,以获取到Flow对象,toFlow有一系列重载方法,可以实现上传/下载及进度的监听,本文后续会详细介绍,在这一步后,可以调用catchonStartonCompletion等方法去监听异常、开始及结束回调,跟平时使用Flow对象没有任何区别




  • 第三步,调用collect方法就会开始发送请求,如果一些正常的话,就会收到成功回调




以上就是RxHttp在协程中最常规的操作,掌握请求三部曲,就掌握了RxHttp的精髓


2.2、BaseUrl处理


RxHttp通过@DefaultDomain、@Domain注解来配置默认域名及非默认域名,如下:


public class Url {

@DefaultDomain //通过该注解设置默认域名
public static String BASE_URL = "https://www.wanandroid.com";

// name 参数在这会生成 setDomainToGoogleIfAbsent方法,可随意指定名称
// className 参数在这会生成RxGoogleHttp类,可随意指定名称
@Domain(name = "Google", className = "Google")
public static String GOOGLE = "https://www.google.com";
}

以上配置http://www.wanandroid.com为默认域名,http://www.google.com为非默认域名


多BaseUrl处理


//1、使用默认域名,传入相对路径即可
//此时 url 为 https://www.wanandroid.com/service/...
RxHttp.get("/service/...")
...

//2、使用google域名方式一:传入绝对路径
RxHttp.get("https://wwww.google.com/service/...")
...

//3、使用google域名方式二:调用setDomainToGoogleIfAbsent方法
//该方法是通过 @Domain 注解的 name 字段生成的,命名规则为 setDomainTo{name}IfAbsent
RxHttp.get("/service/...")
.setDomainToGoogleIfAbsent()
...

//4、使用google域名方式三:直接使用RxGoogleHttp类发送请求,
//该类是通过 @Domain 注解的 className 字段生成的,命名规则为 Rx{className}http
RxGoogleHttp.get("/service/...")
...

注:以上4种配置域名的方式,优先级别为:2 > 3 > 4 > 1


动态域名处理


//直接对url重新赋值即可,改完立即生效
Url.BASE_URL = "https://www.baidu.com";
RxHttp.get("/service/...")
...
//此时 url 为 https://www.baidu.com/service/...

2.3、业务code统一判断


我想大部分人的接口返回格式都是这样的


class BaseResponse<T> {
var code = 0
var msg : String? = null
var data : T
}

拿到该对象的第一步就是对code做判断,如果code != 200(假设200代表数据正确),就会拿到msg字段给用户一些错误提示,如果等于200,就拿到data字段去更新UI,常规的操作是这样的


RxHttp.get("/service/...")
.toFlow<BaseResponse<Student>>()
.collect {
if (response.code == 200) {
//拿到data字段(Student)刷新UI
} else {
val msg = it.msg //拿到msg字段给出错误提示
}
}

试想一下,一个项目少说也有30+个这样的接口,如果每个接口读取这么判断,就显得不够优雅,也可以说是灾难,相信也没有人会这么干。而且对于UI来说,只需要data字段即可,错误提示啥的我管不着。


那有没有什么办法,能直接拿到data字段,并且对code做出统一判断呢?有的,直接上代码


RxHttp.get("/service/...")
.toFlowResponse<Student>() //调用此方法,直接拿到data字段,也就是Student对象
.catch {
// code非200时,走异常回调,在这可拿到msg及code字段
val msg = it.msg
val code = it.code
}.collect {
//直接拿到data字段,在这就是Student对象
val student = it
}

可以看到,以上调用toFlowResponse()方法,成功回调就可直接拿到data字段,也就是Student对象。


此时,相信很多人会有疑问,




  • 业务code哪里判断的?




  • 异常回调里的it是什么对象,为啥可以拿到msg、code字段?




先来回答第一个问题,业务code哪里判断的?


其实toFlowResponse()方法并不是RxHttp内部提供的,而是通过自定义解析器,并用@Parser注解标注,最后由注解处理器rxhttp-compiler自动生成的,听不懂?没关系,直接看代码


@Parser(name = "Response")
open class ResponseParser<T> : TypeParser<T> {

//以下两个构造方法是必须的
protected constructor() : super()
constructor(type: Type) : super(type)

@Throws(IOException::class)
override fun onParse(response: okhttp3.Response): T {
val data: BaseResponse<T> = response.convertTo(BaseResponse::class, *types)
val t = data.data //获取data字段
if (data.code != 200 || t == null) { //code不等于200,说明数据不正确,抛出异常
throw ParseException(data.code.toString(), data.msg, response)
}
return t //最后返回data字段
}
}

上面代码只需要关注两点即可,


第一点,我们在类开头使用了@Parser注解,并为解析器取名为Response,此时rxhttp-compiler就会生成toFlowResponse<T>()方法,命名规则为toFlow{name}


第二点,我们在if语句里,code != 200data == null时,就抛出ParseException异常,并带上了msg、code字段,所以我们在异常回调通过强转,就可以拿到这两个字段


接着回答第二个问题,异常回调里的it是什么对象,为啥可以拿到msg、code字段?


其实it就是Throwable对象,而msg、codeThrowable的扩展字段,这需要我们自己为其扩展,代码如下:


val Throwable.code: Int
get() =
when (this) {
is HttpStatusCodeException -> this.statusCode //Http状态码异常
is ParseException -> this.errorCode.toIntOrNull() ?: -1 //业务code异常
else -> -1
}

val Throwable.msg: String
get() {
return if (this is UnknownHostException) { //网络异常
"当前无网络,请检查你的网络设置"
} else if (
this is SocketTimeoutException //okhttp全局设置超时
|| this is TimeoutException //rxjava中的timeout方法超时
|| this is TimeoutCancellationException //协程超时
) {
"连接超时,请稍后再试"
} else if (this is ConnectException) {
"网络不给力,请稍候重试!"
} else if (this is HttpStatusCodeException) { //请求失败异常
"Http状态码异常"
} else if (this is JsonSyntaxException) { //请求成功,但Json语法异常,导致解析失败
"数据解析失败,请检查数据是否正确"
} else if (this is ParseException) { // ParseException异常表明请求成功,但是数据不正确
this.message ?: errorCode //msg为空,显示code
} else {
"请求失败,请稍后再试"
}
}

到这,业务code统一判断就介绍完毕,上面的代码,大部分人只需要简单修改后,就可用到自己的项目上,如ResponseParser解析器,只需要改下if语句的判断条件即可


3、上传/下载


RxHttp对文件的优雅操作是与生俱来的,配合Flow,可以说是如虎添翼,不多说,直接上代码


3.1、文件上传


RxHttp.postForm("/service/...")  
.addFile("file", File("xxx/1.png")) //添加单个文件
.addFiles("fileList", ArrayList<File>()) //添加多个文件
.toFlow<String>()
.catch { //异常回调 }
.collect { //成功回调 }

只需要通过addFile系列方法添加File对象即可,就是这么简单粗暴,想监听上传进度,toFlow方法传入进度回调即可,如下:


RxHttp.postForm("/service/...")      
.addFile("file", File("xxx/1.png"))
.addFiles("fileList", ArrayList<File>())
.toFlow<String> { //这里还可以选择自定义解析器对应的toFlowXxx方法
val process = it.progress //已上传进度 0-100
val currentSize = it.currentSize //已上传size,单位:byte
val totalSize = it.totalSize //要上传的总size 单位:byte
}.catch { //异常回调 }
.collect { //成功回调 }

3.2、文件下载


接着再来看看下载,直接贴代码


val localPath = "sdcard//android/data/..../1.apk"  
RxHttp.get("/service/...")
.toFlow(localPath) {
//it为Progress对象
val process = it.progress //已下载进度 0-100
val currentSize = it.currentSize //已下载size,单位:byte
val totalSize = it.totalSize //要下载的总size 单位:byte
}
.catch { //异常回调 }
.collect { //成功回调,这里可以拿到本地存储路径,也就是localPath }

你没看错,下载也是调用 toFlow方法,传入本地路径及进度回调即可,当然,如果不需要监听进度,进度回调也可不传,来看看用来下载的toFlow方法签名


/**
* @param destPath 本地存储路径
* @param append 是否追加下载,即是否断点下载
* @param capacity 队列size,仅监听进度回调时生效
* @param progress 进度回调
*/
fun CallFactory.toFlow(
destPath: String,
append: Boolean = false,
capacity: Int = 1,
progress: (suspend (Progress) -> Unit)? = null
): Flow<String>

以上4个参数,只有destPath是必须的,其它3个参数,根据实际需要传递,想要断点下载,append传入true,想要监听进度就传入进度回调,


至于capacity参数,这个需要额外说明一下,它是指定队列的缓存大小,什么队列?进度回调的队列,目的就是丢弃来不及消费的事件,在现实场景中,可能会存在下游消费速度 小于 上游生产速度的情况,这就会导致事件的堆积,翻译过来就是下载很快,但你处理进度回调的地方很慢,就有可能出现你还在处理进度为10的事件,但实际下载进度可能到了50甚至更高,capacity设置为1的话,10-50之间的事件就会被丢弃,接下来下游收到的可能就是进度为50的事件,这就保证了下游收到的始终的最新的事件,也就是最及时的下载进度,当然,如果你想收到全部的进度回调事件,将capacity设置为100即可。


3.3、暂停/恢复下载


很多会有暂停/恢复下载的需求,但对于下载来说,并没有真正意义的暂停及恢复,所谓的暂停,不过就是停止下载,也就是中断请求,而恢复,就是再次发起请求从上次中断的位置继续下载,也就是断点下载,所有,只需要知道如何取消请求及断点下载即可


取消请求


Flow的取消,就是外部协程的关闭


val job = lifecycleScope.launch {
val localPath = "sdcard//android/data/..../1.apk"
RxHttp.get("/service/...")
.toFlow(localPath) {
//it为Progress对象
val process = it.progress //已下载进度 0-100
val currentSize = it.currentSize //已下载size,单位:byte
val totalSize = it.totalSize //要下载的总size 单位:byte
}
.catch { //异常回调 }
.collect { //成功回调,这里可以拿到本地存储路径,也就是localPath }
}
//在需要的时候,调用job.cancel()就是取消请求
job.cancel()

断点下载


上面介绍过,想要断点下载,只需要额外将toFlow方法的第二个参数append设置为true即可,如下:


val localPath = "sdcard//android/data/..../1.apk"  
RxHttp.get("/service/...")
.toFlow(localPath, true) {
//it为Progress对象
val process = it.progress //已下载进度 0-100
val currentSize = it.currentSize //已下载size,单位:byte
val totalSize = it.totalSize //要下载的总size 单位:byte
}
.catch { //异常回调 }
.collect { //成功回调,这里可以拿到本地存储路径,也就是localPath }

注:断点下载需要服务器接口支持


对于Android 10文件上传/下载,请点击RxHttp 完美适配Android 10/11 上传/下载/进度监听


4、转LiveData 


Flow依赖于协程环境,如果不想使用协程,又想要使用Flow,那LiveData就是一个很好的选择,在官方androidx.lifecycle:lifecycle-livedata-ktx:x.x.x库中提供了asLiveData方法,可方便的将FlowLiveData对象,有了LiveData对象,就不再需要协程环境


4.1、普通请求转LiveData


//当前在FragmentActivity环境中
RxHttp.get("/service/...")
.toFlow<Student>()
.catch { }
.asLiveData()
.observe(this) {
val student = it;
//更新UI
}

由于调用了asLiveData,所以,以上代码,不需要协程环境也可执行;


4.2、带进度上传转LiveData


RxHttp.postForm("/service/...")      
.addFile("file", File("xxx/1.png"))
.addFiles("fileList", ArrayList<File>())
.toFlow<Student> { //这里还可以选择自定义解析器对应的toFlowXxx方法
val process = it.progress //已上传进度 0-100
val currentSize = it.currentSize //已上传size,单位:byte
val totalSize = it.totalSize //要上传的总size 单位:byte
}
.catch { //异常回调 }
.asLiveData()
.observe(this) {
val student = it;
//更新UI
}

上面代码中,转LiveData后,下游observe只能收到上传完成的回调,如果你想收到包括进度回调在内的所有事件,则需要使用toFlowProgress替代toFlow方法(toFlow内部是通过toFlowProgress方法实现的,有兴趣的自己查看源码),如下:


RxHttp.postForm("/service/...")      
.addFile("file", File("xxx/1.png"))
.addFiles("fileList", ArrayList<File>())
.toFlowProgress<Student>() //该方法没有进度回调参数
.catch { //异常回调 }
.asLiveData()
.observe(this) {
//此时这里将收到所有事件,这里的it为ProgressT<Student>对象
val process = it.progress //已上传进度 0-100
val currentSize = it.currentSize //已上传size,单位:byte
val totalSize = it.totalSize //要上传的总size 单位:byte
val student = it.result //接口返回的对象
if (student != null) {
//不为null,代表上传完成,接口请求结束
}
}

4.3、带进度下载转LiveData


下载也一样,RxHttp提供了一个下载对应的toFlowProgress方法,如下:


fun CallFactory.toFlowProgress(
destPath: String,
append: Boolean = false,
capacity: Int = 1
): Flow<ProgressT<String>>

跟上面介绍下载时对应的toFlow方法相比,少了一个进度回调的参数,这里悄悄告诉你,下载的toFlow方法,内部就是通过toFlowProgress方法实现的,想了解的自己去查看源码,这里不做介绍


结合asLiveData方法,使用如下:


val localPath = "sdcard//android/data/..../1.apk"  
RxHttp.get("/service/...")
.toFlowProgress(localPath)
.catch { //异常回调 }
.asLiveData()
.observe(this) {
//此时这里将收到所有事件,这里的it为ProgressT<String>对象
val process = it.progress //已下载进度 0-100
val currentSize = it.currentSize //已下载size,单位:byte
val totalSize = it.totalSize //要下载的总size 单位:byte
val path = it.result //本地存储路径
if (path != null) {
//不为null,代表下载完成,接口请求结束
}
}

5、小结


看完本文,相信你已经领悟到了RxHttp的优雅,不管上传/下载,还是进度的监听,通通三步搞懂,掌握请求三部曲,就掌握了RxHttp的精髓。


其实,RxHttp远不止这些,本文只介绍了RxHttp + Flow的配合使用,更多功能,如:公共参数/请求头的添加、请求加解密、缓存等等,请查看


作者:不怕天黑
链接:https://juejin.cn/post/7017604875764629540
来源:稀土掘金
著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处。

0 个评论

要回复文章请先登录注册