IT박스

관찰 가능 및 흐름 가능 rxJava2

itboxs 2020. 7. 27. 07:41
반응형

관찰 가능 및 흐름 가능 rxJava2


나는 새로운 rx java 2를보고 있었고 backpressure더 이상 아이디어를 이해하지 못한다 .

나는 우리 Observablebackpressure지원 하지 않으며 그것을 가지고 있다는 것을 알고 Flowable있습니다.

예를 기반으로 내가 가지고 있다고 가정 해 flowable봅시다 interval.

        Flowable.interval(1, TimeUnit.MILLISECONDS, Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(new Consumer<Long>() {
                @Override
                public void accept(Long aLong) throws Exception {
                    // do smth
                }
            });

이것은 약 128 값 이후에 충돌 할 것이므로 항목을 얻는 것보다 느리게 소비하고 있음이 분명합니다.

그러나 우리는 Observable

     Observable.interval(1, TimeUnit.MILLISECONDS, Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(new Consumer<Long>() {
                @Override
                public void accept(Long aLong) throws Exception {
                    // do smth
                }
            });

소비에 약간의 지연이 있어도 여전히 작동하지 않습니다. 일을하기 위해 연산자를 Flowable넣으라고하면 onBackpressureDrop충돌이 사라지지만 모든 값이 방출되는 것은 아닙니다.

따라서 현재 내 머리 속에 답을 찾을 수없는 기본 질문은 backpressure일반을 사용할 수있을 때 왜 관리해야 Observable하지 않고 모든 값을 받 buffer습니까? 아니면 다른 쪽에서 backpressure소비를 관리하고 처리하는 데 유리한 점은 무엇 입니까?


실제로 배압이 나타내는 것은 경계 버퍼이며 Flowable.observeOn, dowstream이 취할 수있는 한 빨리 배수되는 128 요소의 버퍼가 있습니다. 버스트 소스를 처리하기 위해이 버퍼 크기를 개별적으로 늘릴 수 있으며 모든 배압 관리 방법은 여전히 ​​1.x부터 적용됩니다. Observable.observeOn요소를 계속 수집하는 무제한 버퍼가 있으며 앱의 메모리가 부족할 수 있습니다.

Observable예를 들어 다음을 사용할 수 있습니다 .

  • GUI 이벤트 처리
  • 짧은 시퀀스 작업 (총 1000 개 미만 요소)

Flowable예를 들어 다음을 사용할 수 있습니다 .

  • 차갑고 시간이없는 소스
  • 소스와 같은 발전기
  • 네트워크 및 데이터베이스 접근 자

역 압력은 관찰 가능 (게시자)이 가입자가 처리 할 수있는 것보다 많은 이벤트를 생성하는 경우입니다. 따라서 가입자에게 누락 된 이벤트가 발생하거나 메모리 부족으로 이어지는 방대한 이벤트 대기열을 얻을 수 있습니다. Flowable배압을 고려합니다. Observable하지 않습니다. 그게 다야.

너무 많은 액체가 넘칠 때 깔때기를 상기시킵니다. Flowable은 그러한 일이 일어나지 않도록 도울 수 있습니다.

엄청난 역압으로 :

여기에 이미지 설명을 입력하십시오

그러나 유동성을 사용하면 배압이 훨씬 적습니다.

여기에 이미지 설명을 입력하십시오

Rxjava2 has a few backpressure strategies you can use depending on your usecase. by strategy i mean Rxjava2 supplies a way to handle the objects that cannot be processed because of the overflow (backpressure).

here are the strategies. I wont go through them all, but for example if you want to not worry about the items that are overflowed you can use a drop strategy like this:

observable.toFlowable(BackpressureStrategy.DROP)

As far as i know there should be a 128 item limit on the queue, after that there can be a overflow (backpressure). Even if its not 128 its close to that number. Hope this helps someone.

if you need to change the buffer size from 128 it looks like it can be done like this (but watch any memory constraints:

myObservable.toFlowable(BackpressureStrategy.MISSING).buffer(256); //but using MISSING might be slower.  

in software developement usually back pressure strategy means your telling the emitter to slow down a bit as the consumer cannot handle the velocity your emitting events.


The fact that your Flowable crashed after emitting 128 values without backpressure handling doesn't mean it will always crash after exactly 128 values: sometimes it will crash after 10, and sometimes it will not crash at all. I believe this is what happened when you tried the example with Observable - there happened to be no backpressure, so your code worked normally, next time it may not. The difference in RxJava 2 is that there is no concept of backpressure in Observables anymore, and no way to handle it. If you're designing a reactive sequence that will probably require explicit backpressure handling - then Flowable is your best choice.

참고 URL : https://stackoverflow.com/questions/40323307/observable-vs-flowable-rxjava2

반응형