10分钟学习掌握RxSwift的基础知识

原文:Learn & Master ⚔️ the Basics of RxSwift in 10 Minutes

每个人应该都听说过Rx。无论你是在开发者大会上听过,还是阅读过像这样的博客文章。你几乎不可能没有听说过它,但究竟什么是响应式编程(Reactive Programming)呢? 让我们先来看看响应式编程的定义:

在计算中,响应式编程是一种面向数据流和变化传递的编程范式。这意味着,我们应该可以轻松的使用编程语言来表达静态或动态数据流,并且底层执行模型将通过数据流自动传递更改。
– 维基百科

诚实的说,在阅读上面的定义后,大多数人可能依然不知道响应式编程究竟是什么,我也是这样。这就是为什么我要尝试写一个简单易懂的介绍。下面我就开始介绍RxSwift,这个现代的软件开发方法Rx的Swift版本。

可观察序列🎞(Observable Sequences)

你首先需要了解的是,RxSwift中的所有内容都是可观察的序列,或者是对可观察序列发出的事件进行操作或订阅的事物。

RxSwift中,数组,字符串或字典将被转换为的可观察序列。 您也可以为,符合Swift标准库中Sequence协议的任何对象,创建可观察序列。

我们来创建一些可观察的序列:

1
2
3
let helloSequence = Observable.just("Hello Rx")
let fibonacciSequence = Observable.from([0,1,1,2,3,5,8])
let dictSequence = Observable.from([1:"Hello",2:"World"])

你通过调用subscribe(on:(Event<T>)-> ())来订阅可观察的序列。传递的块将接收该序列发出的所有事件。

1
2
3
4
5
6
7
8
let helloSequence = Observable.of("Hello Rx")
let subscription = helloSequence.subscribe { event in
print(event)
}

OUTPUT:
next("Hello Rx")
completed

可观察序列可以在其生命期间内发出零个或多个事件。在RxSwift中,一个事件是一个枚举类型,有三种可能的状态:

  • .next(value:T) - 当一个值或一组值被添加到一个可观察的序列中时,它将向其订阅者发送下一个事件(next event)。 关联的值将包含序列中的实际值。
  • .error(error: Error) - 如果遇到错误,则会发出一个错误事件(error event)。 这也将终止序列。
  • .completed - 如果一个序列正常结束,它会向其订阅者发送完成事件(completed event)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
let helloSequence = Observable.from(["H","e","l","l","o"])
let subscription = helloSequence.subscribe { event in
switch event {
case .next(let value):
print(value)
case .error(let error):
print(error)
case .completed:
print("completed")
}
}

OUTPUT:
H e l l o
completed

如果你想取消订阅,你可以通过调用dispose来完成。您也可以将订阅添加到Disposebag中,Disposebag会在自己deinit中自动取消在它里面的所有订阅。另外,你还可以只订阅特定的事件。例如,如果只想接收序列发出的错误事件,则可以使用:subscribe(onError:(Error -> ()))

下面代码阐述了目前学到的所有内容:

1
2
3
4
5
6
7
8
9
10
11
12
13
// 创建一个DisposeBag,加入其中的订阅将被自动取消
let bag = DisposeBag()

// 创建一个发出String值的可观察序列
let observable = Observable.just("Hello Rx!")

// 为下一个事件创建一个订阅
let subscription = observable.subscribe(onNext:{
print($0)
})

// 将订阅添加到DisposeBag中
subscription.addDisposableTo(bag)

主题📫(Subjects)

主题是一个可观测序列的特殊形式,您可以订阅并动态添加元素。RxSwift目前有4种不同的主题

  • PublishSubject:如果您订阅它,您将获得订阅后发生的所有事件。
  • BehaviourSubject:行为主题将为订阅者提供最近的元素以及订阅后发生的所有事件。
  • ReplaySubject:如果您想要在初始订阅中时得到更多之前的元素,则需要使用ReplaySubject。使用ReplaySubject,您可以定义要向新订户发送多少个最近的项目。
  • Variable:变量只是一个BehaviourSubject包装器,对于一个没有接触过响应编程的程序员来说感觉更自然。它可以像普通变量一样使用。

我将向您展示PublishSubject的工作原理。如果您想了解其他主题类型的更多信息,您需要查看GitHub上的相关资料。它们的基本区别仅在于初次订阅时订阅者接收的过去事件的数量。

  • Publish: 0
  • Behaviour & Variable: 1
  • Replay: N

我们来看看PublishSubject

我们需要做的第一件事是创建一个真实的PublishSubject实例。这很容易,我们可以使用默认的初始化程序。

1
2
let bag = DisposeBag()
var publishSubject = PublishSubject<String>()

你可以使用onNext()函数将新值添加到该序列。 onCompleted()将完成序列,而onError(error)将导致发出错误事件。让我们为我们的PublishSubject添加一些值。

1
2
publishSubject.onNext("Hello")
publishSubject.onNext("World")

如果您在使用onNext()添加“Hello”和“World”之后订阅该主题,则不会通过事件接收这两个值。而BehaviourSubject不同,它将接收最近发生的事件“World”。

现在让我们创建一个订阅并为主题添加一些新值。我们还创建第二个订阅并为其添加更多值。请仔细阅读注释以了解究竟发生了什么。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
let subscription1 = publishSubject.subscribe(onNext:{
print($0)
}).addDisposableTo(bag)

// subscription1接收到2个事件, subscription2一个也没有接收到
publishSubject.onNext("Hello")
publishSubject.onNext("Again")

// subscription2不会接收到"Hello"和"Again",因为它的订阅发生在添加事件之后
let subscription2 = publishSubject.subscribe(onNext:{
print(#line,$0)
})

publishSubject.onNext("Both Subscriptions receive this message")

恭喜🎉你读到这里,你现在应该知道了RxSwift的基础知识。虽然还有很多东西要学习,但Rx的一切都基于这些简单的原则。你现在可以休息一下,并且仔细思考这些概念以充分理解它们。如果你准备好了,让我们继续下去,因为还有更多有趣的东西等待我们去发现。

弹子图🙌(Marble Diagrams)

如果你使用过RxSwiftRx,您应该了解弹子图弹子图可以可视化观察可观察序列的转换。它由顶部的输入流,底部的输出流和中间的实际变换函数组成。

对于实例,让我们看看一个操作,它将您的发射事件从可观察序列延迟150毫秒。请忽略调度程序参数,因为我将在后面的文章中介绍它:

弹子图

容易理解,对吧?

iOSAndroid都有很好的开源项目,允许您在移动设备上交互式地播放这些图表。和他们一起玩,我向你保证,你会在很短的时间内了解到很多有关Rx的知识。

Web-App: http://rxmarbles.com
iOS-App: https://itunes.apple.com/com/app/rxmarbles/id1087272442
Android: https://goo.gl/b5YD8K

转换⚙(Transformations)

有时候你想在用户收到它们之前,对可观察序列发出的元素进行变换,组合或过滤。我会向你介绍基本的转换操作符,告诉你一些关于过滤器和组合序列的可能性。最后我会告诉你如何在不同的线程上执行转换,组合等。让我们开始吧。

映射(Map)

要转换从可观察序列发出的元素,可以它们到达其订阅者之前使用映射(Map)运算符。想象一下,在发射之前将序列中每个值乘以10的转换。

弹子图

1
2
3
4
5
6
7
Observable<Int>.of(1,2,3,4).map { value in 
return value * 10
}.subscribe(onNext:{
print($0)
})

OUTPUT: 10 20 30 40

FlatMap

设想一个可观察序列,它由本身就是可观察序列的对象组成,并且您想用这些对象创建一个新序列。这就是FlatMap的作用。FlatMap合并这些可观察的结果的发射并将这些合并结果作为其自己的序列发布。

弹子图

1
2
3
4
5
6
7
8
9
10
let sequence1  = Observable<Int>.of(1,2)
let sequence2 = Observable<Int>.of(1,2)

let sequenceOfSequences = Observable.of(sequence1,sequence2)

sequenceOfSequences.flatMap{ return $0 }.subscribe(onNext:{
print($0)
})

OUTPUT: 1 2 1 2

扫描(Scan)

扫描从一个初始种子值开始,用于聚合值,就像Swift中的reduce一样。

弹子图

1
2
3
4
5
6
7
Observable.of(1,2,3,4,5).scan(0) { seed, value in
return seed + value
}.subscribe(onNext:{
print($0)
})

OUTPUT: 1 3 6 10 15

Buffer

Buffer操作符将发射元素转换为可观察序列,该可观察序列发射这些项目的缓冲集合。

弹子图

1
2
3
4
5
6
7
SequenceThatEmitsWithDifferentIntervals
.buffer(timeSpan: 150, count: 3, scheduler:s)
.subscribe(onNext:{
print($0)
})

OUTPUT: [1] [2,3] [4] [5,6] [7] []

过滤🚬(Filter)

如果您只想根据某些标准对下一个事件做出反应,则应使用过滤器运算符。

过滤

基本过滤器操作与Swift等价操作类似。您只需定义需要传递的条件,并且如果条件满足,则.next event事件将发送给其订阅者。

弹子图

1
2
3
4
5
Observable.of(2,30,22,5,60,1).filter { $0 > 10 }.subscribe(onNext: {
print($0)
})

OUTPUT: 30 22 60

DistinctUntilChanged

如果你只想在当前值跟上一个值不一样时,才会发出下一个事件时,你需要使用distinctUntilChanged

弹子图

1
2
3
4
5
Observable.of(1,2,2,1,3).distinctUntilChanged().subscribe(onNext: {
print($0)
})

OUTPUT: 1 2 1 3

其他过滤器操作符你可以尝试:

  • Debounce
  • TakeDuration
  • Skip

组合💑(Combine)

组合序列是一个常见的任务。RxSwift为此给您提供了许多操作符。这里有3个:

StartWith

如果您希望可观察序列在开始时发出特定的值,请使用startWith运算符。

弹子图

1
2
3
4
5
Observable.of(2,3).startWith(1).subscribe(onNext:{
print($0)
})

OUTPUT: 1 2 3

合并(Merge)

通过使用Merge运算符,可以合并多个可观测序列的输出,以使它们像单个可观察序列一样工作。

弹子图

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
let publish1 = PublishSubject<Int>()
let publish2 = PublishSubject<Int>()

Observable.of(publish1,publish2).merge().subscribe(onNext:{
print($0)
})

publish1.onNext(20)
publish1.onNext(40)
publish1.onNext(60)
publish2.onNext(1)
publish1.onNext(80)
publish2.onNext(2)
publish1.onNext(100)

OUTPUT: 20 40 60 1 80 2 100

Zip

如果要将不同可观察序列发出的项目合并到一个可观察序列,则使用Zip方法。Zip将以严格的顺序操作,所以Zip发出的前两个元素将是第一个序列的第一个元素和第二个序列的第一个元素的组合。另外请注意,Zip只会发出与发出最少项目的源可观察序列一样的数目。

弹子图

1
2
3
4
5
6
7
8
let a = Observable.of(1,2,3,4,5)
let b = Observable.of("a","b","c","d")

Observable.zip(a, b){ return ($0, $1) }.subscribe {
print($0)
}

OUTPUT: (1, "a")(2, "b") (3, "c") (4, "d")

其他组合过滤器你可以尝试:

  • Concat
  • CombineLatest
  • SwitchLatests

副作用👉(Side Effects)

如果您想注册在某个事件发生在可观察序列上时将执行的回调,则需要使用doOn运算符。它不会修改发射的元素,而只是传递它们。您可以使用 …

  • do(onNext:) - 如果你想在下一个事件发生时做点什么
  • do(onError:) - 如果错误发生
  • do(onCompleted:) - 如果序列成功完成。
1
2
3
4
5
Observable.of(1,2,3,4,5).do(onNext: {
$0 * 10 // This has no effect on the actual subscription
}).subscribe(onNext:{
print($0)
})

调度器⏰(Schedulers)

操作符将在同一线程上创建订阅。在RxSwift中,您可以使用调度器强制操作符在特定队列上执行其工作。您也可以强制订阅发生在特定队列上。你使用subscribeOnobserveOn来完成这些任务。如果你熟悉operation-queuesdispatch-queues的概念,这对你来说应该没什么特别的。调度器可以是串行或并发的,类似于GCDOperationQueueRxSwift有5种类型的调度器:

  • MainScheduler – “需要在MainThread上执行工作。如果从主线程调用调度方法,它将立即执行操作,而无需调度。此调度程序通常用于执行UI工作。“
  • CurrentThreadScheduler – “安排当前线程的工作单元。这是生成元素运算符的默认调度器。“
  • SerialDispatchQueueScheduler – “需要在特定dispatch_queue_t上执行工作。它将确保即使并发调度队列被传递,它也会被转换为串行调度队列。串行调度器可以为observeOn启用某些优化。主调度器是SerialDispatchQueueScheduler的一个实例“
  • ConcurrentDispatchQueueScheduler – “需要在特定dispatch_queue_t上执行工作。你也可以传递一个串行调度队列,它不应该引起任何问题。 当需要在后台执行某些工作时,该调度器非常适合。“
  • OperationQueueScheduler – “需要在特定的NSOperationQueue上执行工作。此调度器适用于需要在后台执行某些更大块工作的情况,并且你还可以使用maxConcurrentOperationCount微调并发处理。“

以下一段代码,向您展示了如何在后台队列上同时观察并在主队列上订阅。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
let publish1 = PublishSubject<Int>()
let publish2 = PublishSubject<Int>()

let concurrentScheduler = ConcurrentDispatchQueueScheduler(qos: .background)

Observable.of(publish1,publish2)
.observeOn(concurrentScheduler)
.merge()
.subscribeOn(MainScheduler())
.subscribe(onNext:{
print($0)
})

publish1.onNext(20)
publish1.onNext(40)

OUTPUT: 20 40

结束语🎁

恭喜你,你已经完成RxSwift基础知识的学习。Happy Coding🎉