技术经验谈 技术经验谈
首页
  • 最佳实践

    • 抓包
    • 数据库操作
  • ui

    • 《JavaScript教程》
    • 《JavaScript高级程序设计》
    • 《ES6 教程》
    • 《Vue》
    • 《React》
    • 《TypeScript 从零实现 axios》
    • 《Git》
    • TypeScript
    • JS设计模式总结
  • 总纲
  • 整体开发框架
  • 技术文档
  • GitHub技巧
  • Nodejs
  • 博客搭建
  • 学习
  • 面试
  • 心情杂货
  • 实用技巧
  • 友情链接
关于
收藏
  • 分类
  • 标签
  • 归档
GitHub (opens new window)

hss01248

一号线程序员
首页
  • 最佳实践

    • 抓包
    • 数据库操作
  • ui

    • 《JavaScript教程》
    • 《JavaScript高级程序设计》
    • 《ES6 教程》
    • 《Vue》
    • 《React》
    • 《TypeScript 从零实现 axios》
    • 《Git》
    • TypeScript
    • JS设计模式总结
  • 总纲
  • 整体开发框架
  • 技术文档
  • GitHub技巧
  • Nodejs
  • 博客搭建
  • 学习
  • 面试
  • 心情杂货
  • 实用技巧
  • 友情链接
关于
收藏
  • 分类
  • 标签
  • 归档
GitHub (opens new window)
  • 最佳实践

  • ui

  • 优化

  • aop

  • apm

  • 架构

  • webview

  • rxjava

    • Rxjava
    • Rxjava对日常callback形式代码的改造
      • js类比
      • 示例
  • activity-fragment-view的回调和日志
  • Android加密相关
  • Android命令行操作
  • app后台任务
  • kotlin
  • kotlin漫谈
  • kotlin语言导论
  • sentry上传mapping.txt文件
  • so放于远程动态加载方案
  • states
  • Xposed模块开发
  • 一个关于manifest合并的猥琐操作
  • 玩坏android存储
  • 获取本app的安装来源信息
  • Android
  • rxjava
hss01248
2022-06-20
目录

Rxjava对日常callback形式代码的改造

# 用Rxjava对callback形式代码的改造

# 核心:

Observalbe.create() + flatmap

# js类比

其实和js里用promise改造callback是一样的操作

image-20220603110525793

image-20220603110501984

上面js代码封装方法后:

image-20220603111047495

js里的map reduce和rx里的也是差不多操作

image-20220603111226943

# 问题1:

每个Observalbe.create()内部的emiter必须调用onComplete,否则最终onComplete不会回调

      List<Integer> nums = new ArrayList<>();
        nums.add(1);
        nums.add(9);
        AtomicInteger atomicInteger = new AtomicInteger(2);
        Observable.fromIterable(nums)
                .subscribeOn(Schedulers.io())
                .flatMap(new Function<Integer, ObservableSource<String>>() {
                    @Override
                    public ObservableSource<String> apply(@io.reactivex.annotations.NonNull Integer integer) throws Exception {
                        return Observable.create(new ObservableOnSubscribe<String>() {
                            @Override
                            public void subscribe(@io.reactivex.annotations.NonNull ObservableEmitter<String> emitter) throws Exception {
                                //Thread.sleep(1500);
                                emitter.onNext(integer+"+flatMap(Observable.create)");
                                emitter.onComplete();
                                //todo 每一个都要调用onComplete,而不能自己计数.下面代码是错误的
                                /*int i = atomicInteger.decrementAndGet();
                                if(i ==0){
                                    LogUtils.i("flatMap-emitter.onComplete()");
                                    emitter.onComplete();
                                }*/
                            }
                        }).observeOn(Schedulers.io());//.observeOn(Schedulers.io()
                    }
                }).observeOn(AndroidSchedulers.mainThread())
                .timeout(5, TimeUnit.SECONDS)
                .subscribe(new Observer<String>() {
                    @Override
                    public void onSubscribe(@io.reactivex.annotations.NonNull Disposable d) {

                    }

                    @Override
                    public void onNext(@io.reactivex.annotations.NonNull String s) {
                        LogUtils.w("onNext",s);
                    }

                    @Override
                    public void onError(@io.reactivex.annotations.NonNull Throwable e) {
                        LogUtils.w("onError",e);
                    }

                    @Override
                    public void onComplete() {
                        LogUtils.i("oncomplete");

                    }
                });
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48

# 示例

try {
                    requestByType(LocationManager.NETWORK_PROVIDER, locationManager, map, countSet, finalListener1);
                    requestByType(LocationManager.GPS_PROVIDER, locationManager, map, countSet, finalListener1);
                    requestByType(LocationManager.PASSIVE_PROVIDER, locationManager, map, countSet, finalListener1);
                    //requestByType("fused", locationManager, map, countSet, finalListener1);
                    if (isGmsAvaiable(finalContext)) {
                        requestGmsLocation(finalContext, locationManager, map, countSet, finalListener1);
                        //return;
                    }
                } catch (Throwable throwable) {
                    throwable.printStackTrace();
                }
1
2
3
4
5
6
7
8
9
10
11
12

改造:

flatMap内部使用Observable.create

private void byFlatMap(Context context,  MyLocationCallback listener, LocationManager locationManager, List<String> providers, long start, int size) {
        AtomicInteger atomicInteger = new AtomicInteger(0);
        List<Pair<String,Location>> pairList = new ArrayList<>();

        Observable.fromIterable(providers)
                .flatMap(new Function<String, ObservableSource<Pair<String,Location>>>() {
                    @Override
                    public ObservableSource<Pair<String,Location>> apply(@io.reactivex.annotations.NonNull String provider) throws Exception {
                        return Observable.create(new ObservableOnSubscribe<Pair<String,Location>>() {
                            @Override
                            @SuppressLint("MissingPermission")
                            public void subscribe(@io.reactivex.annotations.NonNull ObservableEmitter<Pair<String,Location>> emitter) throws Exception {

                                LogUtils.i("flatMap", "requestSingleUpdate-" + provider);
                                //locationManager.getCurrentLocation(provider,);
                                locationManager.requestSingleUpdate(provider, new LocationListener() {
                                    @Override
                                    public void onLocationChanged(@NonNull Location location) {
                                        LogUtils.d("onLocationChanged", location, provider, "耗时(ms):", (System.currentTimeMillis() - start));
                                        saveLocation2(location);
                                        emitter.onNext(new Pair<>(provider,location));
                                        locationManager.removeUpdates(this);
                                        //todo 重要: flatMap内部的Observable.create: 因为创建了多个Observable,必须每个Observable都调用onComplete,才能触发最终observer的onComplete
                                        emitter.onComplete();
                                    }

                                    @Override
                                    public void onStatusChanged(String provider, int status, Bundle extras) {
                                        LogUtils.d("onStatusChanged", provider, status, extras);
                                    }

                                    @Override
                                    public void onProviderDisabled(@NonNull String provider) {
                                        LogUtils.w("onProviderDisabled", provider);
                                        locationManager.removeUpdates(this);
                                        //用着用着突然关掉了,会触发这里
                                        emitter.onComplete();
                                    }

                                    @Override
                                    public void onProviderEnabled(@NonNull String provider) {
                                        LogUtils.d("onProviderEnabled", provider);
                                    }
                                }, handlerThread.getLooper());
                            }
                            //要这里指定subscribeOn(Schedulers.io()),才能让每个Observable在不同的线程工作
                        }).subscribeOn(Schedulers.io());
                    }
                }).mergeWith(Observable.create(new ObservableOnSubscribe<Pair<String,Location>>() {
                        @SuppressLint("MissingPermission")
                        @Override
                        public void subscribe(@io.reactivex.annotations.NonNull ObservableEmitter<Pair<String,Location>> emitter) throws Exception {
                            //if-else的代码直接在Observable内部实现
                            if (isGmsAvaiable(context)) {
                                LocationServices.getFusedLocationProviderClient(context)
                                        .requestLocationUpdates(new LocationRequest()
                                                .setExpirationDuration(timeOut)
                                                .setNumUpdates(1)
                                                .setMaxWaitTime(timeOut), new LocationCallback() {
                                            @Override
                                            public void onLocationResult(LocationResult result) {
                                                LogUtils.i("gms result", result);
                                                if (result != null && result.getLocations() != null && !result.getLocations().isEmpty()) {
                                                    Location location = result.getLocations().get(0);
                                                    saveLocation2(location);
                                                    emitter.onNext(new Pair<>("gms",location));
                                                }
                                                emitter.onComplete();
                                            }

                                        }, handlerThread.getLooper());
                            } else {
                                //如果没有onNext,那就直接发onComplete,否则最后Observer无法调用onComplete
                                emitter.onComplete();
                            }

                        }})
                .subscribeOn(Schedulers.io()))
                .timeout(timeOut, TimeUnit.MILLISECONDS)
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Observer<Pair<String,Location>>() {
                    @Override
                    public void onSubscribe(@io.reactivex.annotations.NonNull Disposable d) {

                    }

                    @Override
                    public void onNext(@io.reactivex.annotations.NonNull Pair<String,Location> location) {
                        LogUtils.i("onNext", location);
                        pairList.add(location);
                        int i = atomicInteger.incrementAndGet();
                        if(i ==1){
                            LogUtils.i("onQuickest", location);
                            listener.onQuickestLocationCallback(location.second, location.first);
                        }
                        listener.onEachLocationChanged(location.second, location.first);
                    }

                    @Override
                    public void onError(@io.reactivex.annotations.NonNull Throwable e) {
                        LogUtils.w("onError", e);

                        dealOnFail(e, pairList,listener);
                    }

                    @Override
                    public void onComplete() {
                        LogUtils.i("onComplete");
                        if (Build.VERSION.SDK_INT >= Build.VERSION_CODES.JELLY_BEAN_MR2) {
                            handlerThread.quitSafely();
                        }
                        Pair<String,Location> mostAcurLocation2 = getMostAcurLocation2(pairList);
                        if(mostAcurLocation2 == null){
                            onError(new RuntimeException("getMostAcurLocation2 null"));
                        }else {
                            listener.onSuccess(mostAcurLocation2.second,"from sys api :"+ mostAcurLocation2.first);
                        }

                    }
                });
    }
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121

# 问题2:如何连接多个observable?

一个方法本身返回Observable,其内部要执行一段逻辑,然后在回调里调用另外一个Observable

先使用Observable.create, 然后调用flatmap即可

    private static Observable<Pair<String,Location>> checkPermission(Context context, int timeout, boolean showBeforeRequest, boolean showAfterRequest, MyLocationCallback callback) {
        if (PermissionUtils.isGranted(Manifest.permission.ACCESS_COARSE_LOCATION)
                && PermissionUtils.isGranted(Manifest.permission.ACCESS_FINE_LOCATION)) {
          return   doRequestLocation(context, timeout, callback);
        } else {
            return Observable.create(new ObservableOnSubscribe<Pair<String, Location>>() {
                @Override
                public void subscribe(@io.reactivex.annotations.NonNull ObservableEmitter<Pair<String, Location>> emitter) throws Exception {
                    MyPermissions.requestByMostEffort(
                            showBeforeRequest,
                            showAfterRequest,
                            new PermissionUtils.FullCallback() {
                                @Override
                                public void onGranted(@NonNull List<String> granted) {
                                    emitter.onNext(new Pair<>("doRequestLocation",null));
                                    emitter.onComplete();
                                }

                                @Override
                                public void onDenied(@NonNull List<String> deniedForever, @NonNull List<String> denied) {
                                    //callback.onFailed(1, "no permission",true);
                                    emitter.onError(new LocationFailException("no permission").setFailBeforeReallyRequest(true));
                                }
                            }, Manifest.permission.ACCESS_FINE_LOCATION);
                }
            }).subscribeOn(AndroidSchedulers.mainThread())
                    .flatMap(new Function<Pair<String, Location>, ObservableSource<Pair<String, Location>>>() {
                        @Override
                        public ObservableSource<Pair<String, Location>> apply(@io.reactivex.annotations.NonNull Pair<String, Location> stringLocationPair) throws Exception {

                            return doRequestLocation(context, timeout, callback);
                        }
                    });

        }
    }
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36

# 问题3 : 如何利用rxjava对回调流程进行延迟重试

企业微信截图_4a0e0572-2dc8-4556-bc31-bd833c4f5a33

注意retry前要加subscribeOn(Schechlers.io), 否则会在原先的线程sleep,如果是主线程则造成卡顿

编辑 (opens new window)
上次更新: 2022/08/25, 20:20:31
Rxjava
activity-fragment-view的回调和日志

← Rxjava activity-fragment-view的回调和日志→

最近更新
01
截图后的自动压缩工具
12-27
02
图片视频文件根据exif批量重命名
12-27
03
chatgpt图片识别描述功能
02-20
更多文章>
Theme by Vdoing | Copyright © 2020-2025 | 粤ICP备20041795号
  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式