관찰 가능한 목록 결합 후 모두 완료될 때까지 기다립니다.
TL;DR 변환 방법Task.whenAll(List<Task>)
RxJava
기존 코드는 볼트를 사용하여 비동기 작업 목록을 작성하고 이러한 작업이 모두 완료될 때까지 기다린 후 다른 단계를 수행합니다. 「중요한」, 「중요한」이됩니다.List<Task>
싱글을 반환한다.Task
볼트 사이트의 예에 따라 목록의 모든 작업이 완료되면 완료된 것으로 표시됩니다.
대신할 Bolts
RxJava
작업 수 ) 목록을 Observable
가능하긴 한데 어떻게 해야 할지 모르겠어요.
merge
,zip
,concat
기타... 을 사용하다List<Observable>
해도 될 것 Observables
내가 그 문서를 제대로 이해한다면 말이야.
는 배우려고 하고 있다RxJava
아직 처음 접하는 것이기 때문에, 이것이 명백한 질문인지, 어딘가에서 설명되고 있는 것인지 양해해 주십시오.검색해 보았습니다.어떤 도움이라도 주시면 감사하겠습니다.
하시면 됩니다.flatMap
★★★★★★★★★★★★★★★★★★★★★★★★★★★★★★★★★★★다음과 같이 합니다.
public Observable<Boolean> whenAll(List<Observable<Boolean>> tasks) {
return Observable.from(tasks)
//execute in parallel
.flatMap(task -> task.observeOn(Schedulers.computation()))
//wait, until all task are executed
//be aware, all your observable should emit onComplete event
//otherwise you will wait forever
.toList()
//could implement more intelligent logic. eg. check that everything is successful
.map(results -> true);
}
주의: 에러 처리의 요건은 잘 모릅니다.예를 들어 하나의 태스크만 실패할 경우 수행할 작업입니다.당신은 이 시나리오를 검증해야 한다고 생각합니다.
집 교환원을 찾으시는 것 같네요.
몇 가지 다른 사용법이 있으니 예를 들어 보겠습니다.다양한 유형의 간단한 관측 가능 변수가 몇 개 있다고 가정해 보겠습니다.
Observable<Integer> obs1 = Observable.just(1);
Observable<String> obs2 = Observable.just("Blah");
Observable<Boolean> obs3 = Observable.just(true);
이 모든 것을 기다리는 가장 간단한 방법은 다음과 같습니다.
Observable.zip(obs1, obs2, obs3, (Integer i, String s, Boolean b) -> i + " " + s + " " + b)
.subscribe(str -> System.out.println(str));
zip 함수에서는 매개변수가 zip되는 관측치의 유형에 해당하는 구체적인 유형을 가집니다.
관측 가능 목록의 지퍼링도 직접 수행할 수 있습니다.
List<Observable<?>> obsList = Arrays.asList(obs1, obs2, obs3);
Observable.zip(obsList, (i) -> i[0] + " " + i[1] + " " + i[2])
.subscribe(str -> System.out.println(str));
을...로...또는 목록을 정리하여Observable<Observable<?>>
:
Observable<Observable<?>> obsObs = Observable.from(obsList);
Observable.zip(obsObs, (i) -> i[0] + " " + i[1] + " " + i[2])
.subscribe(str -> System.out.println(str));
두의 zip만 할 수 .Object[]
목록에 있는 관측 가능성의 유형 및 개수를 미리 알 수 없기 때문에 매개 변수입니다., .
은 모두 으로 인쇄될 것입니다.1 Blah true
편집: Zip을 사용할 때,Observables
모두 같은 수의 아이템을 방출합니다.위의 예에서 세 개의 관측 가능한 모든 항목이 단일 항목을 방출했습니다.다음과 같은 방법으로 변경할 수 있습니다.
Observable<Integer> obs1 = Observable.from(new Integer[]{1,2,3}); //Emits three items
Observable<String> obs2 = Observable.from(new String[]{"Blah","Hello"}); //Emits two items
Observable<Boolean> obs3 = Observable.from(new Boolean[]{true,true}); //Emits two items
★★★★★★★★★★★★★★★.1, Blah, True
★★★★★★★★★★★★★★★★★」2, Hello, True
지퍼 ★★★★★★3
다른 관측치가 완성됐기 때문에 지퍼가 잠기지 않습니다.
제안된 제안 중 zip()은 실제로 관측 가능한 결과를 서로 결합합니다.이것은 원하는 것일 수도 있고 그렇지 않을 수도 있지만 질문에서는 질문되지 않았습니다.질문에서 원하는 것은 각 작업을 하나씩 또는 병렬로 실행하는 것이었습니다(지정되지 않았지만 Linked Bolts의 예는 병렬 실행에 관한 것이었습니다).또한 zip()은 관찰 가능한 항목이 하나라도 완료되면 즉시 완료되므로 요구 사항을 위반합니다.
Observatables 병렬 실행의 경우 다른 응답에 제시된 platMap()은 괜찮지만 merge()가 더 간단합니다.Observatables 오류 시 Marge가 종료됩니다.모든 Observatable이 완료될 때까지 종료를 연기하면 mergeDelayError()가 표시됩니다.
1개씩은 Observable.concat() static 메서드를 사용해야 한다고 생각합니다.javadoc 상태는 다음과 같습니다.
concat(198.199)Itherable > sequence) 반복 가능한 관측 가능을 연속해서 하나의 관측 가능으로 평탄화한다.
병렬처형을 원하지 않는다면 당신이 원하는 것처럼 들리겠죠
또한 반환 값이 아닌 작업 완료에만 관심이 있는 경우 관찰 가능 대신 완료 테이블을 살펴봐야 합니다.
TLDR: 태스크와 완료 시 온컴플리트이벤트를 1대 1로 실행할 경우 Completetable.concat()이 가장 적합하다고 생각합니다.병렬 실행의 경우 Completable.merge() 또는 Completable.mergeDelayError()는 솔루션과 같이 발음됩니다.전자는 모든 complete 테이블의 오류에 대해 즉시 중지되고, 후자는 둘 중 하나에 오류가 있더라도 모두 실행된 후 오류를 보고합니다.
코틀린과 함께
Observable.zip(obs1, obs2, BiFunction { t1 : Boolean, t2:Boolean ->
})
함수의 인수에 대한 유형을 설정하는 것이 중요합니다. 그렇지 않으면 컴파일 오류가 발생합니다.
마지막 인수 유형은 인수 수: BiFunction for 2 Function 3 for 3 Function 4 for 4 ...에 따라 변경됩니다.
여러분들도 아마 보셨을 거예요zip
2개의 관측치를 사용하는 연산자.
스태틱 방식도 있습니다.Observable.zip
도움이 될 만한 폼이 하나 있습니다.
zip(java.lang.Iterable<? extends Observable<?>> ws, FuncN<? extends R> zipFunction)
자세한 내용은 javadoc을 참조하십시오.
JavaRx Observatibles와 RxKotlin으로 코틀린에서 몇 가지 계산 코드를 작성하고 있습니다.완성해야 할 관찰 대상 목록을 보고 그 동안 진척상황과 최신 결과를 보고 싶습니다.마지막에 최적의 계산 결과를 반환합니다.또한 모든 CPU 코어를 사용하기 위해 Observatibles를 병렬로 실행해야 했습니다.저는 결국 다음과 같은 해결책을 찾았습니다.
@Volatile var results: MutableList<CalculationResult> = mutableListOf()
fun doALotOfCalculations(listOfCalculations: List<Calculation>): Observable<Pair<String, CalculationResult>> {
return Observable.create { subscriber ->
Observable.concatEager(listOfCalculations.map { calculation: Calculation ->
doCalculation(calculation).subscribeOn(Schedulers.computation()) // function doCalculation returns an Observable with only one result
}).subscribeBy(
onNext = {
results.add(it)
subscriber.onNext(Pair("A calculation is ready", it))
},
onComplete = {
subscriber.onNext(Pair("Finished: ${results.size}", findBestCalculation(results))
subscriber.onComplete()
},
onError = {
subscriber.onError(it)
}
)
}
}
비슷한 문제가 있었습니다.Resent Search Provider에서 저장된 제안을 통합하는 동시에 rest call에서 검색 항목을 가져와야 했습니다.AUTHORITY를 조합하여 하나의 통합 목록을 만듭니다.@MyDogTom 솔루션을 사용하려고 했는데 아쉽게도 Observable은 없습니다.RxJava에 있습니다.몇 가지 조사를 한 후에 나는 나에게 맞는 해결책을 찾았다.
fun getSearchedResultsSuggestions(context : Context, query : String) : Single<ArrayList<ArrayList<SearchItem>>>
{
val fetchedItems = ArrayList<Observable<ArrayList<SearchItem>>>(0)
fetchedItems.add(fetchSearchSuggestions(context,query).toObservable())
fetchedItems.add(getSearchResults(query).toObservable())
return Observable.fromArray(fetchedItems)
.flatMapIterable { data->data }
.flatMap {task -> task.observeOn(Schedulers.io())}
.toList()
.map { ArrayList(it) }
}
쿼리에 따라 인터넷에서 얻은 제안 및 결과 목록이 포함된 관찰 가능한 항목을 만들었습니다.그런 다음 platMapItable을 사용하여 작업을 검토하고 플랫맵을 사용하여 결과를 배열에 배치하면 나중에 재활용 보기로 가져올 수 있습니다.
Project Reactor를 사용하는 경우Mono.when
.
Mono.when(publisher1, publisher2)
.map(i-> {
System.out.println("everything is done!");
return i;
}).block()
언급URL : https://stackoverflow.com/questions/35357919/combine-a-list-of-observables-and-wait-until-all-completed
'programing' 카테고리의 다른 글
const char*연결 (0) | 2022.09.19 |
---|---|
JavaScript에서 정의되지 않은 변수를 확인하는 방법 (0) | 2022.09.19 |
#1273 – 불명확한 대조: 'utf8mb4_unicode_520_ci' (0) | 2022.09.19 |
SQL LIKE와 IN을 함께 사용 (0) | 2022.09.19 |
Debian 10 MariaDB '소켓 '/var/run/mysqld/mysqld를 통해 로컬 MySQL 서버에 연결할 수 없습니다.양말' (2)' (0) | 2022.09.19 |