반응형
https://www.youtube.com/watch?v=8fenTR3KOJo
개념
- Duality: 쌍대성. 수학적 표현
- Observer Pattern: 디자인패턴
- Reactive Streams: 자바 진영에서 정한 표준
Duality
기능은 똑같은데, 반대방향으로 표현한 것 (에릭 마이어 정의 찾아보기)
Iterable <---> Observable
Iterable | Observable | |
데이터 방향 | Pull | Push |
함수 호출 | next() | notifyObservers(i) |
return 값 | 있음 | 없음 |
여러개의 Observer가 동시에 데이터 받기 | 어려움 | 쉬움 |
멀티 스레드 동작 만들기 | 어려움 | 쉬움 |
Iterable
import java.util.Iterator;
public class IteratorExample {
public static void main(String[] args) {
Iterable<Integer> iterable = () -> new Iterator() {
int i = 0;
final static int MAX = 10;
public boolean hasNext() {
return i < MAX;
}
public Integer next() {
return ++i;
}
};
for(Integer i : iterable) {
System.out.println(i); // 1...10 출력됨
}
for(Iterator<Integer> iterator = iterable.iterator(); iterator.hasNext();) {
System.out.println(iterator.next()); // pull
}
}
}
Observable
import java.util.Observable;
import java.util.Observer;
public class ObserverExample {
// Source --Event/Data--> Observer
// 데이터 보내는 쪽
static class IntObservable extends Observable implements Runnable { // java util의 observable
@Override
public void run() {
for(int i=1; i<=10; i++) {
setChanged();
notifyObservers(i); // push
}
}
}
public static void main(String[] args) {
// 데이터 받는 쪽
Observer observer = new Observer() {
@Override
public void update(Observable o, Object arg) {
System.out.println(arg);
}
};
IntObservable observerable = new IntObservable();
observerable.addObserver(observer);
observerable.run();
}
}
1...10 출력됨
public static void main(String[] args) {
Observer observer = new Observer() {
@Override
public void update(Observable o, Object arg) {
System.out.println(Thread.currentThread().getName() + " " + arg);
}
};
IntObservable observerable = new IntObservable();
observerable.addObserver(observer);
ExecutorService executorService = Executors.newSingleThreadExecutor();
executorService.execute(observerable);
System.out.println(Thread.currentThread().getName() + " EXIT");
executorService.shutdown();
}
main EXIT
pool-1-thread-1 1...10출력됨
Reative Extension
Reative extension을 처음 만든 MS 엔지니어들은 Observer 패턴으로는 부족하다고 생각
- Complete는 언제?
- Error는 어떻게 표현?
이 두 가지를 추가해서 확장함
Reactive Streams
JVM 진영에서 만든 표준
http://www.reactive-streams.org/
https://github.com/reactive-streams/reactive-streams-jvm#specification
직접만들어보기
import java.util.Arrays;
import java.util.Iterator;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.Flow.Publisher;
import java.util.concurrent.Flow.Subscriber;
import java.util.concurrent.Flow.Subscription;
public class PubSub {
public static void main(String[] args) {
Iterable<Integer> itr = Arrays.asList(1, 2, 3, 4, 5);
ExecutorService executorService = Executors.newSingleThreadExecutor();
Publisher publisher = new Publisher() {
@Override
public void subscribe(Subscriber subscriber) {
Iterator<Integer> it = itr.iterator();
subscriber.onSubscribe(new Subscription() {
@Override
public void request(long n) { // n: Back pressure
executorService.execute(() -> {
int i = 0;
try {
while (i++ < n) {
if (it.hasNext()) {
subscriber.onNext(it.next());
} else {
subscriber.onComplete();
break;
}
}
} catch (RuntimeException e) {
subscriber.onError(e);
}
});
}
@Override
public void cancel() {
}
});
}
};
Subscriber<Integer> subscriber = new Subscriber<Integer>() {
Subscription subscription;
@Override
public void onSubscribe(Subscription subscription) {
System.out.println(Thread.currentThread().getName() + " onSubscribe");
this.subscription = subscription;
this.subscription.request(1);
// subscription.request(Long.MAX_VALUaE); // 전부 다 받고 싶을 때
}
@Override
public void onNext(Integer item) {
System.out.println(Thread.currentThread().getName() + " onNext " + item);
this.subscription.request(1);
}
@Override
public void onError(Throwable throwable) { // Observer 패턴에는 없는 부분. 에러 처리
System.out.println("onError" + throwable.getMessage());
}
@Override
public void onComplete() { // Observer 패턴에는 없는 부분. 완료 처리
System.out.println(Thread.currentThread().getName() + " onComplete");
executorService.shutdown();
}
};
publisher.subscribe(subscriber);
try {
executorService.awaitTermination(10, TimeUnit.HOURS);
} catch (InterruptedException error) {
error.printStackTrace();
}
executorService.shutdown();
}
}
// Result
main onSubscribe
pool-1-thread-1 onNext 1
pool-1-thread-1 onNext 2
pool-1-thread-1 onNext 3
pool-1-thread-1 onNext 4
pool-1-thread-1 onNext 5
pool-1-thread-1 onComplete
- Back pressure: Publisher와 Subscriber의 속도가 다를 때 사용 가능. 예를 들어, Publisher가 발행 속도가 너무 빨라서 Subscriber가 바로 처리할 수 없으면 버퍼가 증가해서 메모리 사용량이 증가할 수 있음.
반응형
'iOS > Combine' 카테고리의 다른 글
[Combine 책 정리] Chapter 5: Combining Operators (0) | 2021.01.18 |
---|---|
[Combine 책 정리] Chapter 4: Filtering Operators (0) | 2021.01.13 |
[Combine 책 정리] Chatper 3: Transforming Operators (0) | 2021.01.12 |
[Combine 책 정리] Chapter 2: Publishers & Subscribers (0) | 2021.01.06 |
[Combine 책 정리] Chapter 1: Hello, Combine! (0) | 2021.01.03 |