RxSwift核心
1. Observable - 可被监听的序列
Observable 可以用于描述元素异步产生的序列。这样我们生活中许多事物都可以通过它来表示: Observable<Double> 温度
例如:你可以将温度看作是一个序列,然后监测这个温度值,最后对这个值做出响应。例如:当室温高于 33 度时,打开空调降温
特征序列
- Single
Single 是 Observable 的另外一个版本。不像 Observable 可以发出多个元素,它要么只能发出一个元素,要么产生一个 error 事件。
A.发出一个元素,或一个 error 事件
B.不会共享状态变化
- Completable
Completable 是 Observable 的另外一个版本。不像 Observable 可以发出多个元素,它要么只能产生一个 completed 事件,要么产生一个 error 事件。
A.发出零个元素
B.发出一个 completed 事件或者一个 error 事件
C.不会共享状态变化
- Maybe
Maybe 是 Observable 的另外一个版本。它介于 Single 和 Completable 之间,它要么只能发出一个元素,要么产生一个 completed 事件,要么产生一个 error 事件。
A.发出一个元素或者一个 completed 事件或者一个 error 事件
B.不会共享状态变化”
- Driver
Driver(司机?) 是一个精心准备的特征序列。它主要是为了简化 UI 层的代码。不过如果你遇到的序列具有以下特征,你也可以使用它:
A.不会产生 error 事件
B.一定在 MainScheduler 监听(主线程监听)
C.共享状态变化
*ControlEvent
ControlEvent 专门用于描述 UI 控件所产生的事件,它具有以下特征:
A.不会产生 error 事件
B.一定在 MainScheduler 订阅(主线程订阅)
C.一定在 MainScheduler 监听(主线程监听)
D.共享状态变化
2. Observer - 观察者
观察者 是用来监听事件,然后它需要这个事件做出响应。例如:弹出提示框就是观察者,它对点击按钮这个事件做出响应。
例如:当室温高于 33 度时,打开空调降温
打开空调降温就是观察者 Observer<Double>。
创建观察者:
创建观察者最直接的方法就是在 Observable 的 subscribe 方法后面描述,事件发生时,需要如何做出响应。而观察者就是由后面的 onNext,onError,onCompleted的这些闭包构建出来的。
特征观察者
- AnyObserver
AnyObserver 可以用来描叙任意一种观察者
*Binder
Binder 主要有以下两个特征:
A.不会处理错误事件
B.确保绑定都是在给定 Scheduler 上执行(默认 MainScheduler)
一旦产生错误事件,在调试环境下将执行 fatalError,在发布环境下将打印错误信息。
let observer: AnyObserver<Bool> = AnyObserver { [weak self] (event) in
switch event {
case .next(let isHidden):
self?.usernameValidOutlet.isHidden = isHidden
default:
break
}
}
//observer
usernameValid.bind(to: observer).disposed(by: disposeBag)
extension Reactive where Base: UIView {
public var isHidden: Binder<Bool> {
return Binder(self.base) { view, hidden in
view.isHidden = hidden
}
}
}
usernameValid.bind(to: usernameValidOutlet.rx.isHidden).disposed(by: disposeBag)
3. Observable & Observer 既是可被监听的序列也是观察者
在我们所遇到的事物中,有一部分非常特别。它们既是可被监听的序列也是观察者。
例如:textField的当前文本。它可以看成是由用户输入,而产生的一个文本序列。也可以是由外部文本序列,来控制当前显示内容的观察者:
// 作为可被监听的序列
let observable = textField.rx.text
observable.subscribe(onNext: { text in show(text: text) })
// 作为观察者
let observer = textField.rx.text
let text: Observable<String?> = ...
text.bind(to: observer)
Subject
*AsyncSubject
AsyncSubject.png AsyncSubject.pngAsyncSubject 将在源 Observable 产生完成事件后,发出最后一个元素(仅仅只有最后一个元素),如果源 Observable 没有发出任何元素,只有一个完成事件。那 AsyncSubject 也只有一个完成事件。
它会对随后的观察者发出最终元素。如果源 Observable 因为产生了一个 error 事件而中止, AsyncSubject 就不会发出任何元素,而是将这个 error 事件发送出来。
let disposeBag = DisposeBag()
let subject = AsyncSubject<String>()
subject
.subscribe { print("Subscription: 1 Event:", $0) }
.disposed(by: disposeBag)
subject.onNext("🐶")
subject.onNext("🐱")
subject.onNext("🐹")
subject.onCompleted()
输出结果:
Subscription: 1 Event: next(🐹)
Subscription: 1 Event: completed
*PublishSubject
PublishSubject.png PublishSubject.pngPublishSubject 将对观察者发送订阅后产生的元素,而在订阅前发出的元素将不会发送给观察者。如果你希望观察者接收到所有的元素,你可以通过使用Observable 的 create 方法来创建 Observable,或者使用 ReplaySubject。
如果源 Observable 因为产生了一个 error 事件而中止, PublishSubject 就不会发出任何元素,而是将这个 error 事件发送出来。
let disposeBag = DisposeBag()
let subject = PublishSubject<String>()
subject
.subscribe { print("Subscription: 1 Event:", $0) }
.disposed(by: disposeBag)
subject.onNext("🐶")
subject.onNext("🐱")
subject
.subscribe { print("Subscription: 2 Event:", $0) }
.disposed(by: disposeBag)
subject.onNext("🅰️")
subject.onNext("🅱️")
输出结果:
Subscription: 1 Event: next(🐶)
Subscription: 1 Event: next(🐱)
Subscription: 1 Event: next(🅰️)
Subscription: 2 Event: next(🅰️)
Subscription: 1 Event: next(🅱️)
Subscription: 2 Event: next(🅱️)
*ReplaySubject
ReplaySubject.pngReplaySubject 将对观察者发送全部的元素,无论观察者是何时进行订阅的。
这里存在多个版本的 ReplaySubject,有的只会将最新的 n 个元素发送给观察者,有的只会将限制时间段内最新的元素发送给观察者。
如果把 ReplaySubject 当作观察者来使用,注意不要在多个线程调用 onNext, onError 或 onCompleted。这样会导致无序调用,将造成意想不到的结果。
let disposeBag = DisposeBag()
let subject = ReplaySubject<String>.create(bufferSize: 1)
subject
.subscribe { print("Subscription: 1 Event:", $0) }
.disposed(by: disposeBag)
subject.onNext("🐶")
subject.onNext("🐱")
subject
.subscribe { print("Subscription: 2 Event:", $0) }
.disposed(by: disposeBag)
subject.onNext("🅰️")
subject.onNext("🅱️")
输出结果:
Subscription: 1 Event: next(🐶)
Subscription: 1 Event: next(🐱)
Subscription: 2 Event: next(🐱)
Subscription: 1 Event: next(🅰️)
Subscription: 2 Event: next(🅰️)
Subscription: 1 Event: next(🅱️)
Subscription: 2 Event: next(🅱️)
*BehaviorSubject
98C97A0E-D2A0-4C84-964D-90A2BF88B2EC.png 413FA601-5878-433B-BDC1-FA57A4196385.png当观察者对 BehaviorSubject 进行订阅时,它会将源 Observable 中最新的元素发送出来(如果不存在最新的元素,就发出默认元素)。然后将随后产生的元素发送出来。如果源 Observable 因为产生了一个 error 事件而中止, BehaviorSubject 就不会发出任何元素,而是将这个 error 事件发送出来
let disposeBag = DisposeBag()
let subject = BehaviorSubject(value: "🔴")
subject
.subscribe { print("Subscription: 1 Event:", $0) }
.disposed(by: disposeBag)
subject.onNext("🐶")
subject.onNext("🐱")
subject
.subscribe { print("Subscription: 2 Event:", $0) }
.disposed(by: disposeBag)
subject.onNext("🅰️")
subject.onNext("🅱️")
subject
.subscribe { print("Subscription: 3 Event:", $0) }
.disposed(by: disposeBag)
subject.onNext("🍐")
subject.onNext("🍊")
输出结果:
Subscription: 1 Event: next(🔴)
Subscription: 1 Event: next(🐶)
Subscription: 1 Event: next(🐱)
Subscription: 2 Event: next(🐱)
Subscription: 1 Event: next(🅰️)
Subscription: 2 Event: next(🅰️)
Subscription: 1 Event: next(🅱️)
Subscription: 2 Event: next(🅱️)
Subscription: 3 Event: next(🅱️)
Subscription: 1 Event: next(🍐)
Subscription: 2 Event: next(🍐)
Subscription: 3 Event: next(🍐)
Subscription: 1 Event: next(🍊)
Subscription: 2 Event: next(🍊)
Subscription: 3 Event: next(🍊)
Variable
在 Swift 中我们经常会用 var 关键字来声明变量。RxSwift 提供的 Variable 实际上是 var 的 Rx 版本,你可以将它看作是 RxVar。
我们来对比一下 var 以及 Variable 的用法:
使用 var:
// 在 ViewController 中
var model: Model? = nil {
didSet { updateUI(with: model) }
func updateUI(with model: Model?) { ... }
let model: Variable<Model?> = Variable(nil)
override func viewDidLoad() {
super.viewDidLoad()
model.asObservable()
.subscribe(onNext: { [weak self] model in
self?.updateUI(with: model)
})
.disposed(by: disposeBag)
func updateUI(with model: Model?) { ... }
func getModel() -> Model { ... }
第一种使用 var 的方式十分常见,在 ViewController 中监听 Model 的变化,然后刷新页面。
第二种使用 Variable 则是 RxSwift 独有的。Variable 几乎提供了 var 的所有功能。另外,加上一条非常重要的特性,就是可以通过调用 asObservable() 方法转换成序列。然后你可以对这个序列应用操作符,来合成其他的序列。所以,如果我们声明的变量需要提供 Rx 支持,那就选用 Variable 这个类型。
说明
Variable 封装了一个 BehaviorSubject,所以它会持有当前值,并且 Variable 会对新的观察者发送当前值。它不会产生 error 事件。Variable 在 deinit 时,会发出一个 completed 事件
Operator - 操作符
6800A280-ADA2-4F79-A87B-11891FCF0158.png操作符可以帮助大家创建新的序列,或者变化组合原有的序列,从而生成一个新的序列。
我们之前在输入验证例子中就多次运用到操作符。例如,通过 map 方法将输入的用户名,转换为用户名是否有效。然后用这个转化后来的序列来控制红色提示语是否隐藏。我们还通过 combineLatest 方法,将用户名是否有效和密码是否有效合并成两者是否同时有效。然后用这个合成后来的序列来控制按钮是否可点击。
这里 map 和 combineLatest 都是操作符,它们可以帮助我们构建所需要的序列。现在,我们再来看几个例子:
Filter:
99EE92D6-D016-4605-BB95-BA1FF5BAD869.png
Map:
D382E434-EC82-438B-B50A-A4297FFA708E.png
5. Disposable - 可被清除的资源
Disposable - 可被清除的资源
F5A2FE7F-5526-4347-A7AB-80A37D135B12.png通常来说,一个序列如果发出了 error 或者 completed 事件,那么所有内部资源都会被释放。如果你需要提前释放这些资源或取消订阅的话,那么你可以对返回的 可被清除的资源(Disposable) 调用 dispose 方法:
var disposable: Disposable?
override func viewWillAppear(_ animated: Bool) {
super.viewWillAppear(animated)
self.disposable = textField.rx.text.orEmpty
.subscribe(onNext: { text in print(text) })
}
override func viewWillDisappear(_ animated: Bool) {
super.viewWillDisappear(animated)
self.disposable?.dispose()
}
调用 dispose 方法后,订阅将被取消,并且内部资源都会被释放。通常情况下,你是不需要手动调用 dispose 方法的。我们推荐使用 清除包(DisposeBag) 或者 takeUntil 操作符 来管理订阅的生命周期。
DisposeBag - 清除包
404F9871-3ED0-4063-993F-3545B78FD9FD.png因为我们用的是 Swift ,所以我们更习惯于使用 ARC 来管理内存。那么我们能不能用 ARC 来管理订阅的生命周期了。答案是肯定了,你可以用 清除包(DisposeBag) 来实现这种订阅管理机制
var disposeBag = DisposeBag()
override func viewWillAppear(_ animated: Bool) {
super.viewWillAppear(animated)
textField.rx.text.orEmpty
.subscribe(onNext: { text in print(text) })
.disposed(by: self.disposeBag)
}
override func viewWillDisappear(_ animated: Bool) {
super.viewWillDisappear(animated)
self.disposeBag = DisposeBag()
}
当 清除包 被释放的时候,清除包 内部所有 可被清除的资源(Disposable) 都将被清除。
takeUntil
8E12A1D5-9ED6-4018-86A0-DC6D5FB1CF2F.png另外一种实现自动取消订阅的方法就是使用 takeUntil 操作符,上面那个输入验证的演示代码也可以通过使用 takeUntil 来实现:
override func viewDidLoad() {
“super.viewDidLoad()
...
_ = usernameValid
.takeUntil(self.rx.deallocated)
.bind(to: passwordOutlet.rx.isEnabled)
_ = usernameValid
.takeUntil(self.rx.deallocated)
.bind(to: usernameValidOutlet.rx.isHidden)
_ = passwordValid
.takeUntil(self.rx.deallocated)
.bind(to: passwordValidOutlet.rx.isHidden)
_ = everythingValid
.takeUntil(self.rx.deallocated)
.bind(to: doSomethingOutlet.rx.isEnabled)
}
这将使得订阅一直持续到控制器的 dealloc 事件产生为止。
6. Schedulers - 调度器
13FA831F-BA09-4DF5-9D93-2385BF472352.pngSchedulers 是 Rx 实现多线程的核心模块,它主要用于控制任务在哪个线程或队列运行。
let rxData: Observable<Data> = ...
rxData.subscribeOn(ConcurrentDispatchQueueScheduler(qos: .userInitiated))
.observeOn(MainScheduler.instance)
.subscribe(onNext: { [weak self] data in
self?.data = data
})
.disposed(by: disposeBag)
使用 subscribeOn
我们用 subscribeOn 来决定数据序列的构建函数在哪个 Scheduler 上运行。以上例子中,由于获取 Data 需要花很长的时间,所以用 subscribeOn 切换到 后台 Scheduler 来获取 Data。这样可以避免主线程被阻塞。
使用 observeOn
我们用 observeOn 来决定在哪个 Scheduler 监听这个数据序列。以上例子中,通过使用 observeOn 方法切换到主线程来监听并且处理结果。
一个比较典型的例子就是,在后台发起网络请求,然后解析数据,最后在主线程刷新页面。你就可以先用 subscribeOn 切到后台去发送请求并解析数据,最后用 observeOn 切换到主线程更新页面。
MainScheduler
MainScheduler 代表主线程。如果你需要执行一些和 UI 相关的任务,就需要切换到该 Scheduler 运行。
SerialDispatchQueueScheduler
SerialDispatchQueueScheduler 抽象了窜行 DispatchQueue。如果你需要执行一些窜行任务,可以切换到这个 Scheduler 运行。
ConcurrentDispatchQueueScheduler
ConcurrentDispatchQueueScheduler 抽象了并行 DispatchQueue。如果你需要执行一些并发任务,可以切换到这个 Scheduler 运行。
OperationQueueScheduler
OperationQueueScheduler 抽象了 NSOperationQueue。
它具备 NSOperationQueue 的一些特点,例如,你可以通过设置 maxConcurrentOperationCount,来控制同时执行并发任务的最大数量。
7. Error Handling - 错误处理
一旦序列里面产出了一个 error 事件,整个序列将被终止。RxSwift 主要有两种错误处理机制:
- retry - 重试
- catch - 恢复
retry - 重试
retry 可以让序列在发生错误后重试:
// 请求 JSON 失败时,立即重试,
// 重试 3 次后仍然失败,就将错误抛出
let rxJson: Observable<JSON> = ...
rxJson
.retry(3)
.subscribe(onNext: { json in
print("取得 JSON 成功: \(json)")
}, onError: { error in
print("取得 JSON 失败: \(error)")
})
.disposed(by: disposeBag)
以上的代码非常直接 retry(3) 就是当发生错误时,就进行重试操作,并且最多重试 3 次
retryWhen
如果我们需要在发生错误时,经过一段延时后重试,那可以这样实现:
let retryDelay: Double = 5 // 重试延时 5 秒
rxJson
.retryWhen { (rxError: Observable<Error>) -> Observable<Int> in
return Observable.timer(retryDelay, scheduler: MainScheduler.instance)
}
.subscribe(...)
.disposed(by: disposeBag)
这里我们需要用到 retryWhen 操作符,这个操作符主要描述应该在何时重试,并且通过闭包里面返回的 Observable 来控制重试的时机:
闭包里面的参数是 Observable<Error> 也就是所产生错误的序列,然后返回值是一个 Observable。当这个返回的Observable 发出一个元素时,就进行重试操作。当它发出一个 error 或者 completed 事件时,就不会重试,并且将这个事件传递给到后面的观察者。
如果需要加上一个最大重试次数的限制:
// 请求 JSON 失败时,等待 5 秒后重试,
// 重试 4 次后仍然失败,就将错误抛出
let maxRetryCount = 4 // 最多重试 4 次
let retryDelay: Double = 5 // 重试延时 5 秒
rxJson
.retryWhen { (rxError: Observable<Error>) -> Observable<Int> in
return rxError.flatMapWithIndex { (error, index) -> Observable<Int> in
guard index < maxRetryCount else {
return Observable.error(error)
}
return Observable<Int>.timer(retryDelay, scheduler: MainScheduler.instance)
}
}
.subscribe(...)
.disposed(by: disposeBag)
我们用 flatMapWithIndex 这个操作符,因为它可以给我们提供错误的索引数 index。然后用这个索引数判断是否超过最大重试数,如果超过了,就将错误抛出。如果没有超过,就等待 5 秒后重试。
catchError - 恢复
catchError 可以在错误产生时,用一个备用元素或者一组备用元素将错误替换掉:
searchBar.rx.text.orEmpty
...
.flatMapLatest { query -> Observable<[Repository]> in
...
return searchGitHub(query)
.catchErrorJustReturn([])
}
...
.bind(to: ...)
.disposed(by: disposeBag)
我们开头的 Github 搜索就用到了catchErrorJustReturn。当错误产生时,就返回一个空数组,于是就会显示一个空列表页。
你也可以使用 catchError,当错误产生时,将错误事件替换成一个备选序列:
// 先从网络获取数据,如果获取失败了,就从本地缓存获取数据
let rxData: Observable<Data> = ... // 网络请求的数据
let cahcedData: Observable<Data> = ... // 之前本地缓存的数据
rxData
.catchError { _ in cahcedData }
.subscribe(onNext: { date in
print("获取数据成功: \(date.count)")
})
.disposed(by: disposeBag)
常用操作符
-
catchError
从一个错误事件中恢复,将错误事件替换成一个备选序列
catchError
catchError 操作符将会拦截一个 error 事件,将它替换成其他的元素或者一组元素,然后传递给观察者。这样可以使得 Observable 正常结束,或者根本都不需要结束。
let disposeBag = DisposeBag()
let sequenceThatFails = PublishSubject<String>()
let recoverySequence = PublishSubject<String>()
sequenceThatFails
.catchError {
print("Error:", $0)
return recoverySequence
}
.subscribe { print($0) }
.disposed(by: disposeBag)
sequenceThatFails.onNext("😬")
sequenceThatFails.onNext("😨")
sequenceThatFails.onNext("😡")
sequenceThatFails.onNext("🔴")
sequenceThatFails.onError(TestError.test)
recoverySequence.onNext("😊")
输出结果:
next(😬)
next(😨)
next(😡)
next(🔴)
Error: test
next(😊)
- catchErrorJustReturn
catchErrorJustReturn 操作符会将error 事件替换成其他的一个元素,然后结束该序列。
let disposeBag = DisposeBag()
let sequenceThatFails = PublishSubject<String>()
sequenceThatFails
.catchErrorJustReturn("😊")
.subscribe { print($0) }
.disposed(by: disposeBag)
sequenceThatFails.onNext("😬")
sequenceThatFails.onNext("😨")
sequenceThatFails.onNext("😡")
sequenceThatFails.onNext("🔴")
sequenceThatFails.onError(TestError.test)
输出结果:
next(😬)
next(😨)
next(😡)
next(🔴)
next(😊)
completed
- combineLatest
当多个 Observables 中任何一个发出一个元素,就发出一个元素。这个元素是由这些 Observables 中最新的元素,通过一个函数组合起来的
combineLatest 操作符将多个 Observables 中最新的元素通过一个函数组合起来,然后将这个组合的结果发出来。这些源 Observables 中任何一个发出一个元素,他都会发出一个元素(前提是,这些 Observables 曾经发出过元素)。
let disposeBag = DisposeBag()
let first = PublishSubject<String>()
let second = PublishSubject<String>()
second) { $0 + $1 }
.subscribe(onNext: { print($0) })
.disposed(by: disposeBag)
first.onNext("1")
second.onNext("A")
first.onNext("2")
second.onNext("B")
second.onNext("C")
second.onNext("D")
first.onNext("3")
first.onNext("4")
输出结果:
1A
2A
2B
2C
2D
3D
4D
- concat
让两个或多个 Observables 按顺序串连起来
concat 操作符将多个 Observables 按顺序串联起来,当前一个 Observable 元素发送完毕后,后一个 Observable 才可以开始发出元素。
concat 将等待前一个 Observable 产生完成事件后,才对后一个 Observable 进行订阅。如果后一个是“热” Observable ,在它前一个 Observable 产生完成事件前,所产生的元素将不会被发送出来。
startWith 和它十分相似。但是startWith不是在后面添加元素,而是在前面插入元素。
merge 和它也是十分相似。merge并不是将多个 Observables 按顺序串联起来,而是将他们合并到一起,不需要 Observables 按先后顺序发出元素。
let disposeBag = DisposeBag()
let subject1 = BehaviorSubject(value: "🍎")
let subject2 = BehaviorSubject(value: "🐶")
let variable = Variable(subject1)
variable.asObservable()
.concat()
.subscribe { print($0)
}.disposed(by: disposeBag)
subject1.onNext("🍐")
subject1.onNext("🍊")
variable.value = subject2
subject2.onNext("I would be ignored")
subject2.onNext("🐱")
subject1.onCompleted()
subject2.onNext("🐭")
输出结果:
next(🍎)
next(🍐)
next(🍊)
next(🐱)
next(🐭)
- concatMap
将 Observable 的元素转换成其他的 Observable,然后将这些 Observables 串连起来
concatMap 操作符将源 Observable 的每一个元素应用一个转换方法,将他们转换成 Observables。然后让这些 Observables 按顺序的发出元素,当concatMap 操作符将源 Observable 的每一个元素应用一个转换方法,将他们转换成 Observables。然后让这些 Observables 按顺序的发出元素,当前一个 Observable 元素发送完毕后,后一个 Observable 才可以开始发出元素。等待前一个 Observable 产生完成事件后,才对后一个 Observable 进行订阅。
let disposeBag = DisposeBag()
let subject1 = BehaviorSubject(value: "🍎")
let subject2 = BehaviorSubject(value: "🐶")
let variable = Variable(subject1)
variable.asObservable()
.concatMap { $0 }
.subscribe { print($0) }
.disposed(by: disposeBag)
subject1.onNext("🍐")
subject1.onNext("🍊")
variable.value = subject2
subject2.onNext("I would be ignored")
subject2.onNext("🐱")
subject1.onCompleted()
subject2.onNext("🐭")
输出结果:
next(🍎)
next(🍐)
next(🍊)
next(🐱)
next(🐭)
- connect
通知可被连接的 Observable 可以开始发出元素了
可被连接的 Observable 和普通的 Observable 十分相似,不过在被订阅后不会发出元素,直到 connect 操作符被应用为止。这样一来你可以等所有观察者全部订阅完成后,才发出元素。
- create
通过一个构建函数完整的创建一个 Observable
create 操作符将创建一个 Observable,你需要提供一个构建函数,在构建函数里面描述事件(next,error,completed)的产生过程。
通常情况下一个有限的序列,只会调用一次观察者的 onCompleted 或者 onError 方法。并且在调用它们后,不会再去调用观察者的其他方法。
创建一个 [0, 1, ... 8, 9] 的序列:
let id = Observable<Int>.create { observer in
observer.onNext(0)
observer.onNext(1)
observer.onNext(2)
observer.onNext(3)
observer.onNext(4)
observer.onNext(5)
observer.onNext(6)
observer.onNext(7)
observer.onNext(8)
observer.onNext(9)
observer.onCompleted()
return Disposables.create()
}
- debug
打印所有的订阅,事件以及销毁信息
let disposeBag = DisposeBag()
let sequence = Observable<String>.create { observer in
observer.onNext("🍎")
observer.onNext("🍐")
observer.onCompleted()
return Disposables.create()
}
sequence
.debug("Fruit")
.subscribe()
.disposed(by: disposeBag)
输出结果:
2017-11-06 20:49:43.187: Fruit -> subscribed
2017-11-06 20:49:43.188: Fruit -> Event next(🍎)
2017-11-06 20:49:43.188: Fruit -> Event next(🍐)
2017-11-06 20:49:43.188: Fruit -> Event completed
2017-11-06 20:49:43.189: Fruit -> isDisposed
- delay
将 Observable 的每一个元素拖延一段时间后发出
delaydelay 操作符将修改一个 Observable,它会将 Observable 的所有元素都拖延一段设定好的时间, 然后才将它们发送出来。
- delaySubscription
进行延时订阅
delaySubscriptiondelaySubscription 操作符将在经过所设定的时间后,才对 Observable 进行订阅操作。
- distinctUntilChanged
阻止 Observable 发出相同的元素
distinctUntilChangeddistinctUntilChanged 操作符将阻止 Observable 发出相同的元素。如果后一个元素和前一个元素是相同的,那么这个元素将不会被发出来。如果后一个元素和前一个元素不相同,那么这个元素才会被发出来。
let disposeBag = DisposeBag()
Observable.of("🐱", "🐷", "🐱", "🐱", "🐱", "🐵", "🐱")
.distinctUntilChanged()
.subscribe(onNext: { print($0) })
.disposed(by: disposeBag)
输出结果:
🐱
🐷
🐱
🐵
🐱
- elementAt
只发出 Observable 中的第 n 个元素
elementAtelementAt 操作符将拉取 Observable 序列中指定索引数的元素,然后将它作为唯一的元素发出。
let disposeBag = DisposeBag()
Observable.of("🐱", "🐰", "🐶", "🐸", "🐷", "🐵")
.elementAt(3)
.subscribe(onNext: { print($0) })
.disposed(by: disposeBag)
输出结果:
🐸
- empty
创建一个空 Observable
emptyempty 操作符将创建一个 Observable,这个 Observable 只有一个完成事件。
//创建一个空 Observable:
let id = Observable<Int>.empty()
//它相当于:
let id = Observable<Int>.create { observer in
observer.onCompleted()
return Disposables.create()
}
- error
创建一个只有 error 事件的 Observable
error 操作符将创建一个 Observable,这个 Observable 只会产生一个 error 事件。
//创建一个只有 error 事件的 Observable:
let error: Error = ...
let id = Observable<Int>.error(error)
//它相当于:
let error: Error = ...
let id = Observable<Int>.create { observer in
observer.onError(error)
return Disposables.create()
}
- filter
仅仅发出 Observable 中通过判定的元素
filterfilter 操作符将通过你提供的判定方法过滤一个 Observable。
let disposeBag = DisposeBag()
Observable.of(2, 30, 22, 5, 60, 1)
.filter { $0 > 10 }
.subscribe(onNext: { print($0) })
.disposed(by: disposeBag)
输出结果:
30
22
60
- flatMap
将 Observable 的元素转换成其他的 Observable,然后将这些 Observables 合并
flatMapflatMap 操作符将源 Observable 的每一个元素应用一个转换方法,将他们转换成 Observables。 然后将这些 Observables 的元素合并之后再发送出来。
这个操作符是非常有用的,例如,当 Observable 的元素本生拥有其他的 Observable 时,你可以将所有子 Observables 的元素发送出来。
let disposeBag = DisposeBag()
let first = BehaviorSubject(value: "👦🏻")
let second = BehaviorSubject(value: "🅰️")
let variable = Variable(first)
variable.asObservable()
.flatMap { $0 }
.subscribe(onNext: { print($0) })
.disposed(by: disposeBag)
first.onNext("🐱")
variable.value = second
second.onNext("🅱️")
first.onNext("🐶")
输出结果:
👦🏻
🐱
🅰️
🅱️
🐶
- flatMapLatest
将 Observable 的元素转换成其他的 Observable,然后取这些 Observables 中最新的一个
flatMapLatestflatMapLatest 操作符将源 Observable 的每一个元素应用一个转换方法,将他们转换成 Observables。一旦转换出一个新的 Observable,就只发出它的元素,旧的 Observables 的元素将被忽略掉。
tips:与 flatMap 比较更容易理解
let disposeBag = DisposeBag()
let first = BehaviorSubject(value: "👦🏻")
let second = BehaviorSubject(value: "🅰️")
let variable = Variable(first)
variable.asObservable()
.flatMapLatest { $0 }
.subscribe(onNext: { print($0) })
.disposed(by: disposeBag)
first.onNext("🐱")
variable.value = second
second.onNext("🅱️")
first.onNext("🐶")
输出结果:
👦🏻
🐱
🅰️
🅱️
- from
将其他类型或者数据结构转换为 Observable
from当你在使用 Observable 时,如果能够直接将其他类型转换为 Observable,这将是非常省事的。from 操作符就提供了这种功能。
//将一个数组转换为 Observable:
let numbers = Observable.from([0, 1, 2])
//它相当于:
let numbers = Observable<Int>.create { observer in
observer.onNext(0)
observer.onNext(1)
observer.onNext(2)
observer.onCompleted()
return Disposables.create()
}
//将一个可选值转换为 Observable
let optional: Int? = 1
let value = Observable.from(optional: optional)
//它相当于:
let optional: Int? = 1
let value = Observable<Int>.create { observer in
if let element = optional {
observer.onNext(element)
}
observer.onCompleted()
return Disposables.create()
}
- just
创建 Observable 发出唯一的一个元素
justjust 操作符将某一个元素转换为 Observable。
//一个序列只有唯一的元素 0:
let id = Observable.just(0)
//它相当于:
let id = Observable<Int>.create { observer in
observer.onNext(0)
observer.onCompleted()
return Disposables.create()
}
- map
通过一个转换函数,将 Observable 的每个元素转换一遍
mapmap 操作符将源 Observable 的每个元素应用你提供的转换方法,然后返回含有转换结果的 Observable。
let disposeBag = DisposeBag()
Observable.of(1, 2, 3)
.map { $0 * 10 }
.subscribe(onNext: { print($0) })
.disposed(by: disposeBag)
输出结果:
10
20
30
- merge
将多个 Observables 合并成一个
merge通过使用 merge 操作符你可以将多个 Observables 合并成一个,当某一个 Observable 发出一个元素时,他就将这个元素发出。
如果,某一个 Observable 发出一个 onError 事件,那么被合并的 Observable 也会将它发出,并且立即终止序列。
let disposeBag = DisposeBag()
let subject1 = PublishSubject<String>()
let subject2 = PublishSubject<String>()
Observable.of(subject1, subject2)
.merge()
.subscribe(onNext: { print($0) })
.disposed(by: disposeBag)
subject1.onNext("A")
subject1.onNext("B")
subject2.onNext("1")
subject2.onNext("2")
subject1.onNext("AB")
subject2.onNext("3")
输出结果:
A
B
1
2
AB
3