Executor Service를 사용하여 모든 스레드가 완료될 때까지 기다리는 방법
다음과 같은 작업을 한 번에 4개씩 수행해야 합니다.
ExecutorService taskExecutor = Executors.newFixedThreadPool(4);
while(...) {
taskExecutor.execute(new MyTask());
}
//...wait for completion somehow
모두 완료하면 어떻게 알림을 받을 수 있나요?현시점에서는 글로벌 태스크카운터를 설정하고 모든 태스크의 마지막에 이 카운터를 줄이고 무한 루프에서 이 카운터를 모니터링하여0이 되거나 Futures 목록을 가져오거나 무한 루프 모니터에서 모든 태스크카운터를 실행하는 것보다 더 나은 방법은 없습니다.무한 루프를 수반하지 않는 보다 나은 솔루션은 무엇입니까?
고마워요.
기본적으로는, 다음에 콜을 실시합니다.
ExecutorService taskExecutor = Executors.newFixedThreadPool(4);
while(...) {
taskExecutor.execute(new MyTask());
}
taskExecutor.shutdown();
try {
taskExecutor.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS);
} catch (InterruptedException e) {
...
}
Count Down Latch 사용:
CountDownLatch latch = new CountDownLatch(totalNumberOfTasks);
ExecutorService taskExecutor = Executors.newFixedThreadPool(4);
while(...) {
taskExecutor.execute(new MyTask());
}
try {
latch.await();
} catch (InterruptedException E) {
// handle
}
작업 범위 내(시도 중/최종 완료)
latch.countDown();
ExecutorService.invokeAll()
널 위해서야.
ExecutorService taskExecutor = Executors.newFixedThreadPool(4);
List<Callable<?>> tasks; // your tasks
// invokeAll() returns when all tasks are complete
List<Future<?>> futures = taskExecutor.invokeAll(tasks);
Lists of Future도 사용할 수 있습니다.
List<Future> futures = new ArrayList<Future>();
// now add to it:
futures.add(executorInstance.submit(new Callable<Void>() {
public Void call() throws IOException {
// do something
return null;
}
}));
그 후, 모든 것에 참가하고 싶은 경우, 기본적으로는 각각에 참가하고 있는 것과 같습니다(자 스레드에서 메인 스레드로 예외를 재지정하는 이점도 있습니다).
for(Future f: this.futures) { f.get(); }
기본적으로는 (모두 또는 각)에서 isDone()을 무한 루프하는 대신 각 미래에서 한 번에 하나씩 .get()을 호출하는 것이 중요합니다.따라서 마지막 스레드가 완료되는 즉시 이 블록을 "이동"할 수 있습니다.은 .콜은 중 정지했을 스레드가에 이 .get()을 할 수 ).catch ExecutionException
전화통화를 할 수 있습니다.또 다른 주의사항은 스레드 로컬 변수가 있는 경우 모든 스레드에 대한 참조가 유지되므로 이 블록을 통과할 때까지 수집되지 않습니다(문제가 발생한 경우 ArrayList에서 Future's를 삭제하면 문제를 해결할 수 있습니다).어떤 미래에서 "우선 선택"을 원하는 경우 https://stackoverflow.com/a/31885029/32453과 같은 사이트를 이용할 수 있습니다.
Java8에서는 Completetable을 사용하여 실행할 수 있습니다.미래:
ExecutorService es = Executors.newFixedThreadPool(4);
List<Runnable> tasks = getTasks();
CompletableFuture<?>[] futures = tasks.stream()
.map(task -> CompletableFuture.runAsync(task, es))
.toArray(CompletableFuture[]::new);
CompletableFuture.allOf(futures).join();
es.shutdown();
내 의견일 뿐이야.을 CountDownLatch
를 미리 수 있는 은 간단하게 .Semaphore
.
ExecutorService taskExecutor = Executors.newFixedThreadPool(4);
int numberOfTasks=0;
Semaphore s=new Semaphore(0);
while(...) {
taskExecutor.execute(new MyTask());
numberOfTasks++;
}
try {
s.aquire(numberOfTasks);
...
에서는 그냥 하세요.s.release()
latch.countDown();
게임 시작은 조금 늦었지만 완성을 위해서...
모든 일이 끝나기를 기다리는 대신, 할리우드 원칙의 관점에서 생각할 수 있다. "전화하지 마, 전화할게" - 내가 끝나면.결과물이 더 우아한 것 같아요
Guava는 이를 달성하기 위한 몇 가지 흥미로운 도구를 제공합니다.
예:
ExecuterService를 ListeningExecutorService로 랩합니다.
ListeningExecutorService service = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(10));
실행할 콜러블 컬렉션을 송신합니다.:
for (Callable<Integer> callable : callables) {
ListenableFuture<Integer> lf = service.submit(callable);
// listenableFutures is a collection
listenableFutures.add(lf)
});
중요한 부분은 다음과 같습니다.
ListenableFuture<List<Integer>> lf = Futures.successfulAsList(listenableFutures);
Listenable에 콜백 연결Future: 모든 선물 완료 시 알림을 받을 수 있습니다.
Futures.addCallback(lf, new FutureCallback<List<Integer>> () {
@Override
public void onSuccess(List<Integer> result) {
// do something with all the results
}
@Override
public void onFailure(Throwable t) {
// log failure
}
});
또한 처리가 완료되면 모든 결과를 한 곳에 수집할 수 있다는 이점도 있습니다.
자세한 내용은 이쪽
Java 5 이후의 CyclicBarrier 클래스는 이러한 용도로 설계되었습니다.
여기 두 가지 옵션이 있습니다. 어느 것이 가장 좋은지 조금 헷갈릴 뿐입니다.
옵션 1:
ExecutorService es = Executors.newFixedThreadPool(4);
List<Runnable> tasks = getTasks();
CompletableFuture<?>[] futures = tasks.stream()
.map(task -> CompletableFuture.runAsync(task, es))
.toArray(CompletableFuture[]::new);
CompletableFuture.allOf(futures).join();
es.shutdown();
옵션 2:
ExecutorService es = Executors.newFixedThreadPool(4);
List< Future<?>> futures = new ArrayList<>();
for(Runnable task : taskList) {
futures.add(es.submit(task));
}
for(Future<?> future : futures) {
try {
future.get();
}catch(Exception e){
// do logging and nothing else
}
}
es.shutdown();
여기에 future.get(); in try catch를 넣는 것은 좋은 생각이죠?
다음의 몇개의 어프로치를 따릅니다.
- 에서 반환된 모든 미래 작업을 반복합니다.
submit
ExecutorService
콜을 합니다.get()
Future
Kiran
invokeAll()
실행자 서비스- 카운트다운래치
- ForkJoinPool 또는 Executors.html#newWorkStealingPool
shutdown, awaitTermination, shutdownNow
시퀀스의 ThreadPoolExecutor 의 ThreadPoolExecutor 입니다.
관련 SE 질문:
Java 멀티스레딩에서 Count Down Latch는 어떻게 사용됩니까?
java Executor Service를 올바르게 셧다운하는 방법
작업을 다른 실행 가능 파일로 래핑하여 알림을 보낼 수 있습니다.
taskExecutor.execute(new Runnable() {
public void run() {
taskStartedNotification();
new MyTask().run();
taskFinishedNotification();
}
});
방금 당신의 문제를 해결하는 샘플 프로그램을 작성했습니다.간결한 구현이 주어지지 않았으므로 추가하겠습니다.「 」를 사용할 수 만,executor.shutdown()
★★★★★★★★★★★★★★★★★」executor.awaitTermination()
스레드마다 걸리는 시간은 예측할 수 없기 때문에 베스트 프랙티스는 아닙니다.
ExecutorService es = Executors.newCachedThreadPool();
List<Callable<Integer>> tasks = new ArrayList<>();
for (int j = 1; j <= 10; j++) {
tasks.add(new Callable<Integer>() {
@Override
public Integer call() throws Exception {
int sum = 0;
System.out.println("Starting Thread "
+ Thread.currentThread().getId());
for (int i = 0; i < 1000000; i++) {
sum += i;
}
System.out.println("Stopping Thread "
+ Thread.currentThread().getId());
return sum;
}
});
}
try {
List<Future<Integer>> futures = es.invokeAll(tasks);
int flag = 0;
for (Future<Integer> f : futures) {
Integer res = f.get();
System.out.println("Sum: " + res);
if (!f.isDone())
flag = 1;
}
if (flag == 0)
System.out.println("SUCCESS");
else
System.out.println("FAILED");
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
여기서 더 많은 대안을 제시하기 위해 래치/배리어 사용을 달리합니다.또한 모든 결과에서 Completion Service 사용이 완료될 때까지 부분 결과를 얻을 수도 있습니다.
Java Concurrency in practice: "실행자에게 제출해야 하는 계산의 배치가 있고 그 결과를 입수할 수 있게 되면 각 태스크와 관련된 Future를 유지하고 타임아웃 0으로 get을 호출하여 완료 여부를 반복적으로 폴링할 수 있습니다.이것은 가능하지만 지루하다.다행히 더 좋은 방법이 있습니다.완료 서비스입니다.
구현은 이쪽
public class TaskSubmiter {
private final ExecutorService executor;
TaskSubmiter(ExecutorService executor) { this.executor = executor; }
void doSomethingLarge(AnySourceClass source) {
final List<InterestedResult> info = doPartialAsyncProcess(source);
CompletionService<PartialResult> completionService = new ExecutorCompletionService<PartialResult>(executor);
for (final InterestedResult interestedResultItem : info)
completionService.submit(new Callable<PartialResult>() {
public PartialResult call() {
return InterestedResult.doAnOperationToGetPartialResult();
}
});
try {
for (int t = 0, n = info.size(); t < n; t++) {
Future<PartialResult> f = completionService.take();
PartialResult PartialResult = f.get();
processThisSegment(PartialResult);
}
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
catch (ExecutionException e) {
throw somethinghrowable(e.getCause());
}
}
}
이것은 Adam Skywalker의 힌트를 기반으로 한 솔루션이며, 이 솔루션은
package frss.main;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class TestHilos {
void procesar() {
ExecutorService es = Executors.newFixedThreadPool(4);
List<Runnable> tasks = getTasks();
CompletableFuture<?>[] futures = tasks.stream().map(task -> CompletableFuture.runAsync(task, es)).toArray(CompletableFuture[]::new);
CompletableFuture.allOf(futures).join();
es.shutdown();
System.out.println("FIN DEL PROCESO DE HILOS");
}
private List<Runnable> getTasks() {
List<Runnable> tasks = new ArrayList<Runnable>();
Hilo01 task1 = new Hilo01();
tasks.add(task1);
Hilo02 task2 = new Hilo02();
tasks.add(task2);
return tasks;
}
private class Hilo01 extends Thread {
@Override
public void run() {
System.out.println("HILO 1");
}
}
private class Hilo02 extends Thread {
@Override
public void run() {
try {
sleep(2000);
}
catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("HILO 2");
}
}
public static void main(String[] args) {
TestHilos test = new TestHilos();
test.procesar();
}
}
이그제큐티브 서비스를 통한 클린웨이
List<Future<Void>> results = null;
try {
List<Callable<Void>> tasks = new ArrayList<>();
ExecutorService executorService = Executors.newFixedThreadPool(4);
results = executorService.invokeAll(tasks);
} catch (InterruptedException ex) {
...
} catch (Exception ex) {
...
}
다음 코드를 사용할 수 있습니다.
public class MyTask implements Runnable {
private CountDownLatch countDownLatch;
public MyTask(CountDownLatch countDownLatch {
this.countDownLatch = countDownLatch;
}
@Override
public void run() {
try {
//Do somethings
//
this.countDownLatch.countDown();//important
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
}
}
}
CountDownLatch countDownLatch = new CountDownLatch(NUMBER_OF_TASKS);
ExecutorService taskExecutor = Executors.newFixedThreadPool(4);
for (int i = 0; i < NUMBER_OF_TASKS; i++){
taskExecutor.execute(new MyTask(countDownLatch));
}
countDownLatch.await();
System.out.println("Finish tasks");
그래서 링크된 질문에 대한 답변을 여기에 올립니다.누군가 더 간단한 방법을 원하기 때문입니다.
ExecutorService executor = Executors.newFixedThreadPool(10);
CompletableFuture[] futures = new CompletableFuture[10];
int i = 0;
while (...) {
futures[i++] = CompletableFuture.runAsync(runner, executor);
}
CompletableFuture.allOf(futures).join(); // THis will wait until all future ready.
저는 다음과 같은 작업 예를 만들었습니다.여러 스레드가 있는 태스크 풀(예를 들어 큐를 사용 중)을 처리하고(프로그래밍 방식으로 OfTasks/threshold 수에 따라 결정됨) 모든 스레드가 완료될 때까지 기다렸다가 다른 처리를 계속할 수 있습니다.
import java.util.PriorityQueue;
import java.util.Queue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/** Testing CountDownLatch and ExecutorService to manage scenario where
* multiple Threads work together to complete tasks from a single
* resource provider, so the processing can be faster. */
public class ThreadCountDown {
private CountDownLatch threadsCountdown = null;
private static Queue<Integer> tasks = new PriorityQueue<>();
public static void main(String[] args) {
// Create a queue with "Tasks"
int numberOfTasks = 2000;
while(numberOfTasks-- > 0) {
tasks.add(numberOfTasks);
}
// Initiate Processing of Tasks
ThreadCountDown main = new ThreadCountDown();
main.process(tasks);
}
/* Receiving the Tasks to process, and creating multiple Threads
* to process in parallel. */
private void process(Queue<Integer> tasks) {
int numberOfThreads = getNumberOfThreadsRequired(tasks.size());
threadsCountdown = new CountDownLatch(numberOfThreads);
ExecutorService threadExecutor = Executors.newFixedThreadPool(numberOfThreads);
//Initialize each Thread
while(numberOfThreads-- > 0) {
System.out.println("Initializing Thread: "+numberOfThreads);
threadExecutor.execute(new MyThread("Thread "+numberOfThreads));
}
try {
//Shutdown the Executor, so it cannot receive more Threads.
threadExecutor.shutdown();
threadsCountdown.await();
System.out.println("ALL THREADS COMPLETED!");
//continue With Some Other Process Here
} catch (InterruptedException ex) {
ex.printStackTrace();
}
}
/* Determine the number of Threads to create */
private int getNumberOfThreadsRequired(int size) {
int threshold = 100;
int threads = size / threshold;
if( size > (threads*threshold) ){
threads++;
}
return threads;
}
/* Task Provider. All Threads will get their task from here */
private synchronized static Integer getTask(){
return tasks.poll();
}
/* The Threads will get Tasks and process them, while still available.
* When no more tasks available, the thread will complete and reduce the threadsCountdown */
private class MyThread implements Runnable {
private String threadName;
protected MyThread(String threadName) {
super();
this.threadName = threadName;
}
@Override
public void run() {
Integer task;
try{
//Check in the Task pool if anything pending to process
while( (task = getTask()) != null ){
processTask(task);
}
}catch (Exception ex){
ex.printStackTrace();
}finally {
/*Reduce count when no more tasks to process. Eventually all
Threads will end-up here, reducing the count to 0, allowing
the flow to continue after threadsCountdown.await(); */
threadsCountdown.countDown();
}
}
private void processTask(Integer task){
try{
System.out.println(this.threadName+" is Working on Task: "+ task);
}catch (Exception ex){
ex.printStackTrace();
}
}
}
}
도움이 됐으면 좋겠다!
독자적인 Executor Completion Service 서브클래스를 사용하여 랩할 수 있습니다.taskExecutor
Blocking Queue의 자체 구현에서는 각 작업이 완료되었을 때 알림을 받고 완료된 작업 수가 원하는 목표에 도달했을 때 원하는 콜백 또는 기타 액션을 수행합니다.
도 이렇게 써야 요.executorService.shutdown()
★★★★★★★★★★★★★★★★★」executorService.awaitTermination
★★★★★★ 。
다음은 예를 제시하겠습니다.
public class ScheduledThreadPoolExample {
public static void main(String[] args) throws InterruptedException {
ScheduledExecutorService executorService = Executors.newScheduledThreadPool(5);
executorService.scheduleAtFixedRate(() -> System.out.println("process task."),
0, 1, TimeUnit.SECONDS);
TimeUnit.SECONDS.sleep(10);
executorService.shutdown();
executorService.awaitTermination(1, TimeUnit.DAYS);
}
}
더 많은 스레드 ExecutionServices를 순차적으로 사용하고 각 ExecutionSERVICE가 완료될 때까지 기다리려는 경우.가장 좋은 방법은 다음과 같습니다.
ExecutorService executer1 = Executors.newFixedThreadPool(THREAD_SIZE1);
for (<loop>) {
executer1.execute(new Runnable() {
@Override
public void run() {
...
}
});
}
executer1.shutdown();
try{
executer1.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS);
ExecutorService executer2 = Executors.newFixedThreadPool(THREAD_SIZE2);
for (true) {
executer2.execute(new Runnable() {
@Override
public void run() {
...
}
});
}
executer2.shutdown();
} catch (Exception e){
...
}
Java 8 - 스트림 API를 사용하여 스트림을 처리할 수 있습니다.아래 토막을 참조해 주세요.
final List<Runnable> tasks = ...; //or any other functional interface
tasks.stream().parallel().forEach(Runnable::run) // Uses default pool
//alternatively to specify parallelism
new ForkJoinPool(15).submit(
() -> tasks.stream().parallel().forEach(Runnable::run)
).get();
ExecutorService WORKER_THREAD_POOL
= Executors.newFixedThreadPool(10);
CountDownLatch latch = new CountDownLatch(2);
for (int i = 0; i < 2; i++) {
WORKER_THREAD_POOL.submit(() -> {
try {
// doSomething();
latch.countDown();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
}
// wait for the latch to be decremented by the two remaining threads
latch.await();
ifdoSomething()
몇 가지 다른 예외, 즉latch.countDown()
실행이 안 될 것 같은데 어떻게 해야 하나요?
" " "의 AutoCloseable
Project Loom을 사용한 실행자 서비스
프로젝트. Loom은 Java의 동시성 기능에 새로운 기능을 추가하고자 합니다.
그 기능 중 하나가 의 제조입니다.이것은 모든 것을 의미한다.ExecutorService
구현은 방법을 제공합니다.리소스 사용 구문을 사용하여 자동으로 종료할 수 있습니다.ExecutorService
★★★★★★ 。
메서드는 제출된 모든 작업이 완료될 때까지 차단합니다.사용.close
을 shutdown
&awaitTermination
.
되다AutoCloseable
Project Loom이 Java에 "구조화된 동시성"을 가져오는 데 기여합니다.
try (
ExecutorService executorService = Executors.… ;
) {
// Submit your `Runnable`/`Callable` tasks to the executor service.
…
}
// At this point, flow-of-control blocks until all submitted tasks are done/canceled/failed.
// After this point, the executor service will have been automatically shutdown, wia `close` method called by try-with-resources syntax.
Project Loom에 대한 자세한 내용은 Project Loom 팀에 대한 Ron Pressler 및 다른 사람들의 강연과 인터뷰를 검색하십시오.Project Loom이 진화함에 따라 보다 최근의 것에 초점을 맞추십시오.
Project Loom 테크놀로지의 실험적인 빌드는, 초기 액세스 Java 18에 근거하고 있습니다.
이것이 도움이 될 수 있다
Log.i(LOG_TAG, "shutting down executor...");
executor.shutdown();
while (true) {
try {
Log.i(LOG_TAG, "Waiting for executor to terminate...");
if (executor.isTerminated())
break;
if (executor.awaitTermination(5000, TimeUnit.MILLISECONDS)) {
break;
}
} catch (InterruptedException ignored) {}
}
다음 Runner 클래스에서 waitTillDone()을 호출할 수 있습니다.
Runner runner = Runner.runner(4); // create pool with 4 threads in thread pool
while(...) {
runner.run(new MyTask()); // here you submit your task
}
runner.waitTillDone(); // and this blocks until all tasks are finished (or failed)
runner.shutdown(); // once you done you can shutdown the runner
이 클래스를 재사용하여 shutdown()을 호출하기 전에 원하는 횟수만큼 waitTillDone()을 호출할 수 있으며 코드도 매우 간단합니다.또한 사전에 작업 수를 알 필요가 없습니다.
.compile 'com.github.matejtymes:javafixes:1.3.1'
프로젝트에 대한 의존도를 높일 수 있습니다.
상세한 것에 대하여는, 여기를 참조해 주세요.
https://github.com/MatejTymes/JavaFixes
.getActiveCount()
스레드 를 알 수 - 활성 스레드 수를 알 수 있습니다. - 활성 스레드 수를 알 수 있습니다.
후, 「」, 「」의 할 수 .activeCount()
는 「」입니다.0
값이 0이면 현재 실행 중인 활성 스레드가 없음을 의미하며, 이는 작업이 완료되었음을 의미합니다.
while (true) {
if (executor.getActiveCount() == 0) {
//ur own piece of code
break;
}
}
언급URL : https://stackoverflow.com/questions/1250643/how-to-wait-for-all-threads-to-finish-using-executorservice
'programing' 카테고리의 다른 글
플로트를 최소 증분(또는 그에 근접)으로 변경하는 방법은 무엇입니까? (0) | 2022.06.22 |
---|---|
C에서 연산자는 무엇을 합니까? (0) | 2022.06.22 |
vue-multicelect에 대한 쓰기 단위 테스트 (0) | 2022.06.22 |
ArrayList에 값이 있는지 확인합니다. (0) | 2022.06.22 |
긴 요소 목록에서 작은 변경 하나에 대해 렌더링 시간을 단축할 수 있는 방법이 있습니까? (0) | 2022.06.22 |