[Combine] Reducing Elements Operator - Operator 공부 3
안녕하세요 Pingu입니다.🐧
지난 글에서는 Combine의 Operator 중 Filtering Elements 역할을 하는 Operator들을 알아봤었습니다. 이름대로 Upstream에서 받은 값들을 어떤 조건에 의해 필터링한 뒤 Downstream으로 내려보내는 역할을 했었습니다.
이번 글에서는 이어서 Reducing Element로 분류된 Operator에 대해 알아보도록 하겠습니다.
Reducing Elements
일단 Reducing Elements로 분류된 Publisher에는 어떤 것들이 있는지부터 알아볼게요.
- Collect
- CollectByCount
- CollectByTime
- IgnoreOutput
- Reduce
- TryReduce
공식문서에는 위와 같이 6개의 Publisher가 Reducing Elements로 분류되어있습니다.
그리고 이를 활용해서 만든 Operator는 아래와 같습니다.
- collect()
- collect(_:)
- collect(_:options:)
- TimeGroupingStrategy
- ignoreOutput()
- reduce(_:_:)
- tryReduce(_:_:)
공식문서에는 위와 같이 6개의 Operator가 Reducing Elements로 분류되어있습니다.
그럼 하나씩 어떤 일을 하는 Publisher, Operator인지 알아볼게요!
Collect
첫 번째로 알아볼 것은 Collect입니다.
정의를 보면 그냥 간단하게 아이템들을 버퍼에 가지고 있는 Publisher라고만 되어있네요.
버퍼에 가지고 있어서 어쩌겠다는 건지는 이를 활용해서 만들어진 Operator를 사용해보면 알 수 있습니다.
collect()
정의를 보면 Upstream에서 받은 모든 값을 가지고 있다가 Upstream Publisher가 finish 되면 하나의 배열에 모두 담아서 Downstream으로 보낸다고 되어있네요.
바로 사용해보겠습니다.
let intPublisher = [1, 2, 3, 4, 5].publisher
intPublisher
.collect()
.sink(receiveValue: { print($0) })
위 코드를 실행해보면 결과는 다음과 같습니다.
// Collect 예제 코드
[1, 2, 3, 4, 5]
결과를 보면 정의대로 Upstream에서 받은 값을 모두 모아서 하나의 배열을 만들어 Downstream으로 보내주는 역할을 합니다.
Collect는 Upstream에서 받는 값들을 무제한으로 모아서 Downstream으로 내려보내게 되는데, 만약 Upstream에서 받는 값이 너무 많다면 메모리에 무리가 갈 수 있습니다. 이를 보완하기 위한 CollectByCount를 알아보겠습니다.
CollectByCount
CollectByCount는 방금 알아본 Collect과 비슷한데, 차이점은 Upstream으로부터 값을 받을 때 정해진 개수를 받을 때마다 Downstream으로 내려보낸다는 점입니다.
어떤 건지 이를 활용해서 만들어진 Operator를 바로 알아보죠.
collect(_:)
정의를 보면 Upstream에서 받은 값을 정해진 숫자만큼 모아서 Downstream으로 하나의 배열로 보내는 Operator입니다.
간단하게 사용해보면 어떻게 동작하는지 쉽게 알 수 있어요.
let intPublisher = [1, 2, 3, 4, 5].publisher
intPublisher
.collect(2)
.sink(receiveValue: { print($0) })
위 코드의 결과는 다음과 같습니다.
// CollectByCount 예제 코드
[1, 2]
[3, 4]
[5]
정의대로 매개변수로 받은 count 개수만큼 값을 모은 뒤 해당 개수만큼 값이 모이면 하나의 배열로 만들어서 Downstream으로 내려보냅니다.
그리고 예제에서 볼 수 있듯이 마지막 값의 경우에는 count 값보다 적은 값이 모여있더라도 그냥 Downstream으로 내려보내게 됩니다.
CollectByTime
다음으로 알아볼 것은 CollectByTime입니다. CollectByTime의 정의를 보면 Upstream에서 받은 값을 가지고 있다가 주기적으로 Downstream으로 내려보내는 Publisher라고 하네요. 그리고 정의에 Scheduler가 있는 걸 보면 스케줄러에 의해 주기가 정 해지는 거 같습니다.
collect(_:options:)
CollectByTime을 활용해서 만든 Operator는 collect(_:options:)입니다.
정의를 보면 strategy, options라는 매개변수가 있는데요, 사용하려면 어떤 게 필요한지 알아야 하니 어떤 타입들인지 알아볼게요.
strategy에 필요한 타입인 TimeGroupingStratgy을 찾아가 보면 아래와 같네요.
/// A strategy for collecting received elements.
public enum TimeGroupingStrategy<Context> where Context : Scheduler {
/// A grouping that collects and periodically publishes items.
case byTime(Context, Context.SchedulerTimeType.Stride)
/// A grouping that collects and publishes items periodically or when a buffer reaches a maximum size.
case byTimeOrCount(Context, Context.SchedulerTimeType.Stride, Int)
}
여기에 사용되는 Context라는 제네릭 타입은 Scheduler 타입인걸 볼 수 있습니다. 그리고 Stride도 사용하는 걸 볼 수 있습니다.
여기서 사용할 수 있는 Scheduler에는 ImmediateScheduler가 있네요.
두 개의 케이스가 있는데, byTime은 주어진 시간 동안 받은 값을 내보내는 단순한 녀석이고, byTimeOrCount는 주어진 시간 동안 받은 값을 내보내기도 하지만, 주어진 시간동안 count 개수만큼의 값을 받으면 시간이 지나지 않더라도 내보내는 녀석입니다.
간단하게 사용해보면 아래와 같이 사용할 수 있습니다.
var subscription = Set<AnyCancellable>()
let timerPublisher = Timer.publish(every: 0.5, on: .main, in: .default)
timerPublisher
.autoconnect()
.collect(.byTime(RunLoop.main, .seconds(1)))
.sink(receiveValue: { print($0) })
.store(in: &subscription)
위 코드를 실행하면 아래와 같은 결과를 보게 됩니다.
[2022-04-14 16:07:59 +0000]
[2022-04-14 16:07:59 +0000, 2022-04-14 16:08:00 +0000]
[2022-04-14 16:08:00 +0000, 2022-04-14 16:08:01 +0000]
...
결과는 따로 멈추지 않는 이상 1초마다 계속 추가될 거예요.
아까 CollectByCount는 개수가 채워지면 Downstream으로 모아둔 값을 보냈지만, CollectByTime은 정해진 시간 동안 모인 값을 Downstream으로 내려보내고 있습니다.
코드를 보면, 0.5초마다 값을 내보내는 timerPublisher를 만들었고 collect 메서드에서는 1초마다 모인 값들을 Downstream으로 내려보내게 만들어졌습니다. 실제로 결과를 보면 한 개의 배열에 2개의 값이 있는 것을 볼 수 있습니다. (물론 처음 거에는 1개만 있는 게 마음에 안 듭니다.)
var subscription = Set<AnyCancellable>()
let timerPublisher = Timer.publish(every: 0.5, on: .main, in: .default)
timerPublisher
.autoconnect()
.collect(.byTime(DispatchQueue.main, .seconds(1)))
.sink(receiveValue: { print($0) })
.store(in: &subscription)
이번에는 DispatchQueue를 Scheduler로 사용해봤더니 결과가 아래와 같이 아주 깔끔합니다.
[2022-04-14 16:48:48 +0000, 2022-04-14 16:48:49 +0000]
[2022-04-14 16:48:49 +0000, 2022-04-14 16:48:50 +0000]
[2022-04-14 16:48:50 +0000, 2022-04-14 16:48:51 +0000]
...
이번에는 byTimeOrCount도 사용해볼게요.
var subscription = Set<AnyCancellable>()
let timerPublisher = Timer.publish(every: 0.5, on: .main, in: .default)
timerPublisher
.autoconnect()
.collect(.byTimeOrCount(DispatchQueue.main, .seconds(4), 2))
.sink(receiveValue: { print($0) })
.store(in: &subscription)
timer가 0.5초마다 값을 내보내니까 4초 동안 모아서 내보내면 8개를 모아야 하겠지만, count로 2를 줬기 때문에 2개가 모이면 바로 Downstream으로 내려보내게 됩니다. 결과를 보면 예상대로 아래와 같습니다.
[2022-04-14 16:55:19 +0000, 2022-04-14 16:55:19 +0000]
[2022-04-14 16:55:20 +0000, 2022-04-14 16:55:20 +0000]
[2022-04-14 16:55:21 +0000, 2022-04-14 16:55:21 +0000]
...
다시 정의를 보면 이제 strategy는 알겠고, options를 살펴봅시다. 간단하게 Scheduler의 SchedulerOptions입니다.
이건 스케줄러에 따라 모두 다를 거니까, DispatchQueue의 SchedulerOptions를 살펴볼게요.
public struct SchedulerOptions {
/// The dispatch queue quality of service.
public var qos: DispatchQoS
/// The dispatch queue work item flags.
public var flags: DispatchWorkItemFlags
/// The dispatch group, if any, that should be used for performing actions.
public var group: DispatchGroup?
public init(qos: DispatchQoS = .unspecified, flags: DispatchWorkItemFlags = [], group: DispatchGroup? = nil)
}
qos도 보이고 group도 설정할 수 있네요. 이렇게 스케줄러에 맞는 옵션을 설정할 수 있습니다.
IgnoreOutput
다음으로 IgnoreOutput을 알아보겠습니다. 정의를 읽어보니 Upstream의 모든 값을 무시하고 Publisher의 completion만 무시하지 않는다고 되어있네요.
ignoreOutput()
IgnoreOutput Publisher를 활용해서 만들어진 Operator는 ignoreOutput()입니다.
정의는 IgnoreOutput Publisher의 설명과 거의 비슷하니 바로 사용해볼게요.
let intPublisher = [1, 2, 3, 4, 5].publisher
intPublisher
.ignoreOutput()
.sink(receiveCompletion: { print($0) },
receiveValue: { print($0) })
실행해보면 결과는 아래와 같습니다.
// IgnoreOutput 예제 코드
// 결과
finished
정말로 모든 값을 무시하고 completion만 출력한 것을 볼 수 있네요.
Reduce
그럼 다음으로 이번에는 Reduce를 알아봅시다.
Upstream에서 받은 값들을 클로저로 처리한 뒤 가지고 있다가 Upstream이 completion 되면 Downstream으로 마지막으로 처리된 값을 내려보내는 Publisher입니다.
reduce(_:_:)
Reduce로 만든 Operator는 아래와 같습니다.
정의를 보면 초기값이 있는 걸 볼 수 있습니다. 역할은 Upstream에서 받은 값을 클로저로 처리해서 Upstream이 finish 되면 그동안 처리해둔 값을 Downstream으로 보낸다고 되어있습니다.
바로 사용해보면 아래와 같습니다.
let intPublisher = [1, 2, 3, 4, 5].publisher
intPublisher
.reduce(0, { $0 + $1 })
.sink(receiveCompletion: { print($0) },
receiveValue: { print($0) })
위 코드를 보면 초기값이 0이고 클로저에서는 누적 값으로 현재 누적 값과 새로운 값을 더한 것을 사용하고 있습니다.
즉 시작 값이 0이고 1, 2, 3, 4, 5가 계속해서 더해지며 누적되는 거죠.
결과는 그럼 당연히 아래와 같습니다.
// Reduce 예제 코드
15
finished
간단하게 String에도 사용해보면 이해에 도움이 될 거 같아서 아래 코드로도 해봤습니다.
let stringPublisher = ["P", "I", "N", "G", "U"].publisher
stringPublisher
.reduce("", { $0 + $1 })
.sink(receiveCompletion: { print($0) },
receiveValue: { print($0) })
위 코드의 결과는 아래와 같습니다.
// Reduce-String 예제 코드
PINGU
finished
TryReduce
이제 벌써 이번 글의 마지막 Publisher네요.
사실 이제 Try가 붙어있으면 기존 거와 동일하면서 에러가 발생하면 Downstream으로 내려보낼 수 있는 차이밖에 없을 거라는 걸 느낍니다.😄
뭐 어쨌든 정의를 보면 아래와 같습니다.
역시나 예상대로입니다.
tryReduce(_:_:)
TryReduce를 사용해서 만든 Operator는 tryReduce(_:_:)입니다.
정의를 보면 예상대로 reduce(_:_:)에서 에러도 던질 수 있다는 차이만 존재합니다.
바로 사용해볼게요.
enum PinguError: Error {
case nagativeNumber
}
func checkNagativeNumber(_ number: Int) throws -> Int {
guard number < 0 else {
throw PinguError.nagativeNumber
}
return number
}
let intPublisher = [1, 2, 3, -10, 4].publisher
intPublisher
.tryReduce(0) { reduceValue, newValue in
try checkNagativeNumber(reduceValue + newValue)
return reduceValue + newValue
}
.sink(receiveCompletion: { print($0) },
receiveValue: { print($0) })
위 코드는 초기값이 0인 상태에서 Upstream에서 받은 정수 값을 계속해서 누적 값에 더하는데 만약 누적 값이 음수가 된다면 에러를 발생시키도록 만든 코드입니다. 1, 2, 3까지는 더하다가 -10을 더하면 음수가 되니까 거기서 에러가 발생하겠네요.
실제로 실행해보면 아래와 같은 결과를 볼 수 있습니다.
// TryReduce 예제 코드
failure(__lldb_expr_73.(unknown context at $10c030734).(unknown context at $10c03073c).(unknown context at $10c030744).PinguError.nagativeNumber)
이렇게 Combine의 Operator 중 Reducing Operator로 분류된 것들에 대해 알아봤습니다.
다음 글에서는 Applying Mathematical Operations on Elements라고 분류된 것들에 대해 알아보겠습니다.
전체 코드는 여기에서 볼 수 있습니다.
감사합니다~!