Reading & Doing
Old Tiny Games
const fakeConcatAll = () => (obs$: Observable<any>) => {
let obsArr: Observable<any>[] = [];
let currentSubscription :Subscription | undefined;
let hoCompleted = false;
return Observable.create((observer: Subscriber<any>) => {
const start = () => {
if(obsArr.length <= 0) return;
return obsArr.shift()!.subscribe(
val => observer.next(val),
err => observer.error(err),
() => {
if(obsArr.length > 0) {
currentSubscription = start();
} else {
currentSubscription = undefined;
if(hoCompleted) {
observer.complete();
}
}
}
)
}
const hoSubscrption = obs$.subscribe(
subObs => {
obsArr.push(subObs);
if(!currentSubscription) {
currentSubscription = start();
}
},
err => observer.error(err),
() => hoCompleted = true
)
return {
unsubscribe() {
hoSubscrption.unsubscribe();
currentSubscription && currentSubscription.unsubscribe();
obsArr.length = 0;
}
}
})
}
const fakeMergeAll = () => (obs$:Observable<Observable<any>>) => {
const subscriptionArr: Subscription[] = [];
return Observable.create((observer: Subscriber<any>) => {
const hoSubscription = obs$.subscribe(
subObs$ => {
const sub = subObs$.subscribe(
val => observer.next(val),
err => observer.error(err),
() => {
if(hoSubscription.closed &&
subscriptionArr
// 执行到这里的时候,该Observable已经可以 "认为" completed了,
// 但是还没有被标记为closed
.filter(item => item !== sub)
.every(item => item.closed)) {
observer.complete();
}
}
)
subscriptionArr.push(sub);
}
)
return {
unsubscribe() {
subscriptionArr.forEach(sub => sub.unsubscribe());
hoSubscription.unsubscribe();
}
}
})
}