Rxjava对日常callback形式代码的改造
# 用Rxjava对callback形式代码的改造
# 核心:
Observalbe.create() + flatmap
# js类比
其实和js里用promise改造callback是一样的操作
上面js代码封装方法后:
js里的map reduce和rx里的也是差不多操作
# 问题1:
每个Observalbe.create()内部的emiter必须调用onComplete,否则最终onComplete不会回调
List<Integer> nums = new ArrayList<>();
nums.add(1);
nums.add(9);
AtomicInteger atomicInteger = new AtomicInteger(2);
Observable.fromIterable(nums)
.subscribeOn(Schedulers.io())
.flatMap(new Function<Integer, ObservableSource<String>>() {
@Override
public ObservableSource<String> apply(@io.reactivex.annotations.NonNull Integer integer) throws Exception {
return Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(@io.reactivex.annotations.NonNull ObservableEmitter<String> emitter) throws Exception {
//Thread.sleep(1500);
emitter.onNext(integer+"+flatMap(Observable.create)");
emitter.onComplete();
//todo 每一个都要调用onComplete,而不能自己计数.下面代码是错误的
/*int i = atomicInteger.decrementAndGet();
if(i ==0){
LogUtils.i("flatMap-emitter.onComplete()");
emitter.onComplete();
}*/
}
}).observeOn(Schedulers.io());//.observeOn(Schedulers.io()
}
}).observeOn(AndroidSchedulers.mainThread())
.timeout(5, TimeUnit.SECONDS)
.subscribe(new Observer<String>() {
@Override
public void onSubscribe(@io.reactivex.annotations.NonNull Disposable d) {
}
@Override
public void onNext(@io.reactivex.annotations.NonNull String s) {
LogUtils.w("onNext",s);
}
@Override
public void onError(@io.reactivex.annotations.NonNull Throwable e) {
LogUtils.w("onError",e);
}
@Override
public void onComplete() {
LogUtils.i("oncomplete");
}
});
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
# 示例
try {
requestByType(LocationManager.NETWORK_PROVIDER, locationManager, map, countSet, finalListener1);
requestByType(LocationManager.GPS_PROVIDER, locationManager, map, countSet, finalListener1);
requestByType(LocationManager.PASSIVE_PROVIDER, locationManager, map, countSet, finalListener1);
//requestByType("fused", locationManager, map, countSet, finalListener1);
if (isGmsAvaiable(finalContext)) {
requestGmsLocation(finalContext, locationManager, map, countSet, finalListener1);
//return;
}
} catch (Throwable throwable) {
throwable.printStackTrace();
}
1
2
3
4
5
6
7
8
9
10
11
12
2
3
4
5
6
7
8
9
10
11
12
改造:
flatMap内部使用Observable.create
private void byFlatMap(Context context, MyLocationCallback listener, LocationManager locationManager, List<String> providers, long start, int size) {
AtomicInteger atomicInteger = new AtomicInteger(0);
List<Pair<String,Location>> pairList = new ArrayList<>();
Observable.fromIterable(providers)
.flatMap(new Function<String, ObservableSource<Pair<String,Location>>>() {
@Override
public ObservableSource<Pair<String,Location>> apply(@io.reactivex.annotations.NonNull String provider) throws Exception {
return Observable.create(new ObservableOnSubscribe<Pair<String,Location>>() {
@Override
@SuppressLint("MissingPermission")
public void subscribe(@io.reactivex.annotations.NonNull ObservableEmitter<Pair<String,Location>> emitter) throws Exception {
LogUtils.i("flatMap", "requestSingleUpdate-" + provider);
//locationManager.getCurrentLocation(provider,);
locationManager.requestSingleUpdate(provider, new LocationListener() {
@Override
public void onLocationChanged(@NonNull Location location) {
LogUtils.d("onLocationChanged", location, provider, "耗时(ms):", (System.currentTimeMillis() - start));
saveLocation2(location);
emitter.onNext(new Pair<>(provider,location));
locationManager.removeUpdates(this);
//todo 重要: flatMap内部的Observable.create: 因为创建了多个Observable,必须每个Observable都调用onComplete,才能触发最终observer的onComplete
emitter.onComplete();
}
@Override
public void onStatusChanged(String provider, int status, Bundle extras) {
LogUtils.d("onStatusChanged", provider, status, extras);
}
@Override
public void onProviderDisabled(@NonNull String provider) {
LogUtils.w("onProviderDisabled", provider);
locationManager.removeUpdates(this);
//用着用着突然关掉了,会触发这里
emitter.onComplete();
}
@Override
public void onProviderEnabled(@NonNull String provider) {
LogUtils.d("onProviderEnabled", provider);
}
}, handlerThread.getLooper());
}
//要这里指定subscribeOn(Schedulers.io()),才能让每个Observable在不同的线程工作
}).subscribeOn(Schedulers.io());
}
}).mergeWith(Observable.create(new ObservableOnSubscribe<Pair<String,Location>>() {
@SuppressLint("MissingPermission")
@Override
public void subscribe(@io.reactivex.annotations.NonNull ObservableEmitter<Pair<String,Location>> emitter) throws Exception {
//if-else的代码直接在Observable内部实现
if (isGmsAvaiable(context)) {
LocationServices.getFusedLocationProviderClient(context)
.requestLocationUpdates(new LocationRequest()
.setExpirationDuration(timeOut)
.setNumUpdates(1)
.setMaxWaitTime(timeOut), new LocationCallback() {
@Override
public void onLocationResult(LocationResult result) {
LogUtils.i("gms result", result);
if (result != null && result.getLocations() != null && !result.getLocations().isEmpty()) {
Location location = result.getLocations().get(0);
saveLocation2(location);
emitter.onNext(new Pair<>("gms",location));
}
emitter.onComplete();
}
}, handlerThread.getLooper());
} else {
//如果没有onNext,那就直接发onComplete,否则最后Observer无法调用onComplete
emitter.onComplete();
}
}})
.subscribeOn(Schedulers.io()))
.timeout(timeOut, TimeUnit.MILLISECONDS)
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Observer<Pair<String,Location>>() {
@Override
public void onSubscribe(@io.reactivex.annotations.NonNull Disposable d) {
}
@Override
public void onNext(@io.reactivex.annotations.NonNull Pair<String,Location> location) {
LogUtils.i("onNext", location);
pairList.add(location);
int i = atomicInteger.incrementAndGet();
if(i ==1){
LogUtils.i("onQuickest", location);
listener.onQuickestLocationCallback(location.second, location.first);
}
listener.onEachLocationChanged(location.second, location.first);
}
@Override
public void onError(@io.reactivex.annotations.NonNull Throwable e) {
LogUtils.w("onError", e);
dealOnFail(e, pairList,listener);
}
@Override
public void onComplete() {
LogUtils.i("onComplete");
if (Build.VERSION.SDK_INT >= Build.VERSION_CODES.JELLY_BEAN_MR2) {
handlerThread.quitSafely();
}
Pair<String,Location> mostAcurLocation2 = getMostAcurLocation2(pairList);
if(mostAcurLocation2 == null){
onError(new RuntimeException("getMostAcurLocation2 null"));
}else {
listener.onSuccess(mostAcurLocation2.second,"from sys api :"+ mostAcurLocation2.first);
}
}
});
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
# 问题2:如何连接多个observable?
一个方法本身返回Observable,其内部要执行一段逻辑,然后在回调里调用另外一个Observable
先使用Observable.create, 然后调用flatmap即可
private static Observable<Pair<String,Location>> checkPermission(Context context, int timeout, boolean showBeforeRequest, boolean showAfterRequest, MyLocationCallback callback) {
if (PermissionUtils.isGranted(Manifest.permission.ACCESS_COARSE_LOCATION)
&& PermissionUtils.isGranted(Manifest.permission.ACCESS_FINE_LOCATION)) {
return doRequestLocation(context, timeout, callback);
} else {
return Observable.create(new ObservableOnSubscribe<Pair<String, Location>>() {
@Override
public void subscribe(@io.reactivex.annotations.NonNull ObservableEmitter<Pair<String, Location>> emitter) throws Exception {
MyPermissions.requestByMostEffort(
showBeforeRequest,
showAfterRequest,
new PermissionUtils.FullCallback() {
@Override
public void onGranted(@NonNull List<String> granted) {
emitter.onNext(new Pair<>("doRequestLocation",null));
emitter.onComplete();
}
@Override
public void onDenied(@NonNull List<String> deniedForever, @NonNull List<String> denied) {
//callback.onFailed(1, "no permission",true);
emitter.onError(new LocationFailException("no permission").setFailBeforeReallyRequest(true));
}
}, Manifest.permission.ACCESS_FINE_LOCATION);
}
}).subscribeOn(AndroidSchedulers.mainThread())
.flatMap(new Function<Pair<String, Location>, ObservableSource<Pair<String, Location>>>() {
@Override
public ObservableSource<Pair<String, Location>> apply(@io.reactivex.annotations.NonNull Pair<String, Location> stringLocationPair) throws Exception {
return doRequestLocation(context, timeout, callback);
}
});
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
# 问题3 : 如何利用rxjava对回调流程进行延迟重试
注意retry前要加subscribeOn(Schechlers.io), 否则会在原先的线程sleep,如果是主线程则造成卡顿
编辑 (opens new window)
上次更新: 2022/08/25, 20:20:31