programing

Executor Service를 사용하여 모든 스레드가 완료될 때까지 기다리는 방법

prostudy 2022. 6. 22. 21:43
반응형

Executor Service를 사용하여 모든 스레드가 완료될 때까지 기다리는 방법

다음과 같은 작업을 한 번에 4개씩 수행해야 합니다.

ExecutorService taskExecutor = Executors.newFixedThreadPool(4);
while(...) {
    taskExecutor.execute(new MyTask());
}
//...wait for completion somehow

모두 완료하면 어떻게 알림을 받을 수 있나요?현시점에서는 글로벌 태스크카운터를 설정하고 모든 태스크의 마지막에 이 카운터를 줄이고 무한 루프에서 이 카운터를 모니터링하여0이 되거나 Futures 목록을 가져오거나 무한 루프 모니터에서 모든 태스크카운터를 실행하는 것보다 더 나은 방법은 없습니다.무한 루프를 수반하지 않는 보다 나은 솔루션은 무엇입니까?

고마워요.

기본적으로는, 다음에 콜을 실시합니다.

ExecutorService taskExecutor = Executors.newFixedThreadPool(4);
while(...) {
  taskExecutor.execute(new MyTask());
}
taskExecutor.shutdown();
try {
  taskExecutor.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS);
} catch (InterruptedException e) {
  ...
}

Count Down Latch 사용:

CountDownLatch latch = new CountDownLatch(totalNumberOfTasks);
ExecutorService taskExecutor = Executors.newFixedThreadPool(4);
while(...) {
  taskExecutor.execute(new MyTask());
}

try {
  latch.await();
} catch (InterruptedException E) {
   // handle
}

작업 범위 내(시도 중/최종 완료)

latch.countDown();

ExecutorService.invokeAll() 널 위해서야.

ExecutorService taskExecutor = Executors.newFixedThreadPool(4);
List<Callable<?>> tasks; // your tasks
// invokeAll() returns when all tasks are complete
List<Future<?>> futures = taskExecutor.invokeAll(tasks);

Lists of Future도 사용할 수 있습니다.

List<Future> futures = new ArrayList<Future>();
// now add to it:
futures.add(executorInstance.submit(new Callable<Void>() {
  public Void call() throws IOException {
     // do something
    return null;
  }
}));

그 후, 모든 것에 참가하고 싶은 경우, 기본적으로는 각각에 참가하고 있는 것과 같습니다(자 스레드에서 메인 스레드로 예외를 재지정하는 이점도 있습니다).

for(Future f: this.futures) { f.get(); }

기본적으로는 (모두 또는 각)에서 isDone()을 무한 루프하는 대신 각 미래에서 한 번에 하나씩 .get()을 호출하는 것이 중요합니다.따라서 마지막 스레드가 완료되는 즉시 이 블록을 "이동"할 수 있습니다.은 .콜은 중 정지했을 스레드가에 이 .get()을 할 수 ).catch ExecutionException전화통화를 할 수 있습니다.또 다른 주의사항은 스레드 로컬 변수가 있는 경우 모든 스레드에 대한 참조가 유지되므로 이 블록을 통과할 때까지 수집되지 않습니다(문제가 발생한 경우 ArrayList에서 Future's를 삭제하면 문제를 해결할 수 있습니다).어떤 미래에서 "우선 선택"을 원하는 경우 https://stackoverflow.com/a/31885029/32453과 같은 사이트를 이용할 수 있습니다.

Java8에서는 Completetable을 사용하여 실행할 수 있습니다.미래:

ExecutorService es = Executors.newFixedThreadPool(4);
List<Runnable> tasks = getTasks();
CompletableFuture<?>[] futures = tasks.stream()
                               .map(task -> CompletableFuture.runAsync(task, es))
                               .toArray(CompletableFuture[]::new);
CompletableFuture.allOf(futures).join();    
es.shutdown();

내 의견일 뿐이야.CountDownLatch를 미리 수 있는 은 간단하게 .Semaphore.

ExecutorService taskExecutor = Executors.newFixedThreadPool(4);
int numberOfTasks=0;
Semaphore s=new Semaphore(0);
while(...) {
    taskExecutor.execute(new MyTask());
    numberOfTasks++;
}

try {
    s.aquire(numberOfTasks);
...

에서는 그냥 하세요.s.release()latch.countDown();

게임 시작은 조금 늦었지만 완성을 위해서...

모든 일이 끝나기를 기다리는 대신, 할리우드 원칙의 관점에서 생각할 수 있다. "전화하지 마, 전화할게" - 내가 끝나면.결과물이 더 우아한 것 같아요

Guava는 이를 달성하기 위한 몇 가지 흥미로운 도구를 제공합니다.

예:

ExecuterService를 ListeningExecutorService로 랩합니다.

ListeningExecutorService service = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(10));

실행할 콜러블 컬렉션을 송신합니다.:

for (Callable<Integer> callable : callables) {
  ListenableFuture<Integer> lf = service.submit(callable);
  // listenableFutures is a collection
  listenableFutures.add(lf)
});

중요한 부분은 다음과 같습니다.

ListenableFuture<List<Integer>> lf = Futures.successfulAsList(listenableFutures);

Listenable에 콜백 연결Future: 모든 선물 완료 시 알림을 받을 수 있습니다.

Futures.addCallback(lf, new FutureCallback<List<Integer>> () {
    @Override
    public void onSuccess(List<Integer> result) {
        // do something with all the results
    }

    @Override
    public void onFailure(Throwable t) {
        // log failure
    }
});

또한 처리가 완료되면 모든 결과를 한 곳에 수집할 수 있다는 이점도 있습니다.

자세한 내용은 이쪽

Java 5 이후의 CyclicBarrier 클래스는 이러한 용도로 설계되었습니다.

여기 두 가지 옵션이 있습니다. 어느 것이 가장 좋은지 조금 헷갈릴 뿐입니다.

옵션 1:

ExecutorService es = Executors.newFixedThreadPool(4);
List<Runnable> tasks = getTasks();
CompletableFuture<?>[] futures = tasks.stream()
                               .map(task -> CompletableFuture.runAsync(task, es))
                               .toArray(CompletableFuture[]::new);
CompletableFuture.allOf(futures).join();    
es.shutdown();

옵션 2:

ExecutorService es = Executors.newFixedThreadPool(4);
List< Future<?>> futures = new ArrayList<>();
for(Runnable task : taskList) {
    futures.add(es.submit(task));
}

for(Future<?> future : futures) {
    try {
        future.get();
    }catch(Exception e){
        // do logging and nothing else
    }
}
es.shutdown();

여기에 future.get(); in try catch를 넣는 것은 좋은 생각이죠?

다음의 몇개의 어프로치를 따릅니다.

  1. 에서 반환된 모든 미래 작업을 반복합니다.submitExecutorService 콜을 합니다.get()FutureKiran
  2. invokeAll()실행자 서비스
  3. 카운트다운래치
  4. ForkJoinPool 또는 Executors.html#newWorkStealingPool
  5. shutdown, awaitTermination, shutdownNow 시퀀스의 ThreadPoolExecutor 의 ThreadPoolExecutor 입니다.

관련 SE 질문:

Java 멀티스레딩에서 Count Down Latch는 어떻게 사용됩니까?

java Executor Service를 올바르게 셧다운하는 방법

작업을 다른 실행 가능 파일로 래핑하여 알림을 보낼 수 있습니다.

taskExecutor.execute(new Runnable() {
  public void run() {
    taskStartedNotification();
    new MyTask().run();
    taskFinishedNotification();
  }
});

방금 당신의 문제를 해결하는 샘플 프로그램을 작성했습니다.간결한 구현이 주어지지 않았으므로 추가하겠습니다.「 」를 사용할 수 만,executor.shutdown() ★★★★★★★★★★★★★★★★★」executor.awaitTermination()스레드마다 걸리는 시간은 예측할 수 없기 때문에 베스트 프랙티스는 아닙니다.

ExecutorService es = Executors.newCachedThreadPool();
    List<Callable<Integer>> tasks = new ArrayList<>();

    for (int j = 1; j <= 10; j++) {
        tasks.add(new Callable<Integer>() {

            @Override
            public Integer call() throws Exception {
                int sum = 0;
                System.out.println("Starting Thread "
                        + Thread.currentThread().getId());

                for (int i = 0; i < 1000000; i++) {
                    sum += i;
                }

                System.out.println("Stopping Thread "
                        + Thread.currentThread().getId());
                return sum;
            }

        });
    }

    try {
        List<Future<Integer>> futures = es.invokeAll(tasks);
        int flag = 0;

        for (Future<Integer> f : futures) {
            Integer res = f.get();
            System.out.println("Sum: " + res);
            if (!f.isDone()) 
                flag = 1;
        }

        if (flag == 0)
            System.out.println("SUCCESS");
        else
            System.out.println("FAILED");

    } catch (InterruptedException | ExecutionException e) {
        e.printStackTrace();
    }

여기서 더 많은 대안을 제시하기 위해 래치/배리어 사용을 달리합니다.또한 모든 결과에서 Completion Service 사용이 완료될 때까지 부분 결과를 얻을 수도 있습니다.

Java Concurrency in practice: "실행자에게 제출해야 하는 계산의 배치가 있고 그 결과를 입수할 수 있게 되면 각 태스크와 관련된 Future를 유지하고 타임아웃 0으로 get을 호출하여 완료 여부를 반복적으로 폴링할 수 있습니다.이것은 가능하지만 지루하다.다행히 더 좋은 방법이 있습니다.완료 서비스입니다.

구현은 이쪽

public class TaskSubmiter {
    private final ExecutorService executor;
    TaskSubmiter(ExecutorService executor) { this.executor = executor; }
    void doSomethingLarge(AnySourceClass source) {
        final List<InterestedResult> info = doPartialAsyncProcess(source);
        CompletionService<PartialResult> completionService = new ExecutorCompletionService<PartialResult>(executor);
        for (final InterestedResult interestedResultItem : info)
            completionService.submit(new Callable<PartialResult>() {
                public PartialResult call() {
                    return InterestedResult.doAnOperationToGetPartialResult();
                }
        });

    try {
        for (int t = 0, n = info.size(); t < n; t++) {
            Future<PartialResult> f = completionService.take();
            PartialResult PartialResult = f.get();
            processThisSegment(PartialResult);
            }
        } 
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        } 
        catch (ExecutionException e) {
            throw somethinghrowable(e.getCause());
        }
    }
}

이것은 Adam Skywalker의 힌트를 기반으로 한 솔루션이며, 이 솔루션은

package frss.main;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class TestHilos {

    void procesar() {
        ExecutorService es = Executors.newFixedThreadPool(4);
        List<Runnable> tasks = getTasks();
        CompletableFuture<?>[] futures = tasks.stream().map(task -> CompletableFuture.runAsync(task, es)).toArray(CompletableFuture[]::new);
        CompletableFuture.allOf(futures).join();
        es.shutdown();

        System.out.println("FIN DEL PROCESO DE HILOS");
    }

    private List<Runnable> getTasks() {
        List<Runnable> tasks = new ArrayList<Runnable>();

        Hilo01 task1 = new Hilo01();
        tasks.add(task1);

        Hilo02 task2 = new Hilo02();
        tasks.add(task2);
        return tasks;
    }

    private class Hilo01 extends Thread {

        @Override
        public void run() {
            System.out.println("HILO 1");
        }

    }

    private class Hilo02 extends Thread {

        @Override
        public void run() {
            try {
                sleep(2000);
            }
            catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("HILO 2");
        }

    }


    public static void main(String[] args) {
        TestHilos test = new TestHilos();
        test.procesar();
    }
}

이그제큐티브 서비스를 통한 클린웨이

 List<Future<Void>> results = null;
 try {
     List<Callable<Void>> tasks = new ArrayList<>();
     ExecutorService executorService = Executors.newFixedThreadPool(4);
     results = executorService.invokeAll(tasks);
 } catch (InterruptedException ex) {
     ...
 } catch (Exception ex) {
     ...
 }

다음 코드를 사용할 수 있습니다.

public class MyTask implements Runnable {

    private CountDownLatch countDownLatch;

    public MyTask(CountDownLatch countDownLatch {
         this.countDownLatch = countDownLatch;
    }

    @Override
    public void run() {
         try {
             //Do somethings
             //
             this.countDownLatch.countDown();//important
         } catch (InterruptedException ex) {
              Thread.currentThread().interrupt();
         }
     }
}

CountDownLatch countDownLatch = new CountDownLatch(NUMBER_OF_TASKS);
ExecutorService taskExecutor = Executors.newFixedThreadPool(4);
for (int i = 0; i < NUMBER_OF_TASKS; i++){
     taskExecutor.execute(new MyTask(countDownLatch));
}
countDownLatch.await();
System.out.println("Finish tasks");

그래서 링크된 질문에 대한 답변을 여기에 올립니다.누군가 더 간단한 방법을 원하기 때문입니다.

ExecutorService executor = Executors.newFixedThreadPool(10);
CompletableFuture[] futures = new CompletableFuture[10];
int i = 0;
while (...) {
    futures[i++] =  CompletableFuture.runAsync(runner, executor);
}

CompletableFuture.allOf(futures).join(); // THis will wait until all future ready.

저는 다음과 같은 작업 예를 만들었습니다.여러 스레드가 있는 태스크 풀(예를 들어 큐를 사용 중)을 처리하고(프로그래밍 방식으로 OfTasks/threshold 수에 따라 결정됨) 모든 스레드가 완료될 때까지 기다렸다가 다른 처리를 계속할 수 있습니다.

import java.util.PriorityQueue;
import java.util.Queue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/** Testing CountDownLatch and ExecutorService to manage scenario where
 * multiple Threads work together to complete tasks from a single
 * resource provider, so the processing can be faster. */
public class ThreadCountDown {

private CountDownLatch threadsCountdown = null;
private static Queue<Integer> tasks = new PriorityQueue<>();

public static void main(String[] args) {
    // Create a queue with "Tasks"
    int numberOfTasks = 2000;
    while(numberOfTasks-- > 0) {
        tasks.add(numberOfTasks);
    }

    // Initiate Processing of Tasks
    ThreadCountDown main = new ThreadCountDown();
    main.process(tasks);
}

/* Receiving the Tasks to process, and creating multiple Threads
* to process in parallel. */
private void process(Queue<Integer> tasks) {
    int numberOfThreads = getNumberOfThreadsRequired(tasks.size());
    threadsCountdown = new CountDownLatch(numberOfThreads);
    ExecutorService threadExecutor = Executors.newFixedThreadPool(numberOfThreads);

    //Initialize each Thread
    while(numberOfThreads-- > 0) {
        System.out.println("Initializing Thread: "+numberOfThreads);
        threadExecutor.execute(new MyThread("Thread "+numberOfThreads));
    }

    try {
        //Shutdown the Executor, so it cannot receive more Threads.
        threadExecutor.shutdown();
        threadsCountdown.await();
        System.out.println("ALL THREADS COMPLETED!");
        //continue With Some Other Process Here
    } catch (InterruptedException ex) {
        ex.printStackTrace();
    }
}

/* Determine the number of Threads to create */
private int getNumberOfThreadsRequired(int size) {
    int threshold = 100;
    int threads = size / threshold;
    if( size > (threads*threshold) ){
        threads++;
    }
    return threads;
}

/* Task Provider. All Threads will get their task from here */
private synchronized static Integer getTask(){
    return tasks.poll();
}

/* The Threads will get Tasks and process them, while still available.
* When no more tasks available, the thread will complete and reduce the threadsCountdown */
private class MyThread implements Runnable {

    private String threadName;

    protected MyThread(String threadName) {
        super();
        this.threadName = threadName;
    }

    @Override
    public void run() {
        Integer task;
        try{
            //Check in the Task pool if anything pending to process
            while( (task = getTask()) != null ){
                processTask(task);
            }
        }catch (Exception ex){
            ex.printStackTrace();
        }finally {
            /*Reduce count when no more tasks to process. Eventually all
            Threads will end-up here, reducing the count to 0, allowing
            the flow to continue after threadsCountdown.await(); */
            threadsCountdown.countDown();
        }
    }

    private void processTask(Integer task){
        try{
            System.out.println(this.threadName+" is Working on Task: "+ task);
        }catch (Exception ex){
            ex.printStackTrace();
        }
    }
}
}

도움이 됐으면 좋겠다!

독자적인 Executor Completion Service 서브클래스를 사용하여 랩할 수 있습니다.taskExecutorBlocking Queue의 자체 구현에서는 각 작업이 완료되었을 때 알림을 받고 완료된 작업 수가 원하는 목표에 도달했을 때 원하는 콜백 또는 기타 액션을 수행합니다.

도 이렇게 써야 요.executorService.shutdown() ★★★★★★★★★★★★★★★★★」executorService.awaitTermination★★★★★★ 。

다음은 예를 제시하겠습니다.

public class ScheduledThreadPoolExample {

    public static void main(String[] args) throws InterruptedException {
        ScheduledExecutorService executorService = Executors.newScheduledThreadPool(5);
        executorService.scheduleAtFixedRate(() -> System.out.println("process task."),
                0, 1, TimeUnit.SECONDS);

        TimeUnit.SECONDS.sleep(10);
        executorService.shutdown();
        executorService.awaitTermination(1, TimeUnit.DAYS);
    }

}

더 많은 스레드 ExecutionServices를 순차적으로 사용하고 각 ExecutionSERVICE가 완료될 때까지 기다리려는 경우.가장 좋은 방법은 다음과 같습니다.

ExecutorService executer1 = Executors.newFixedThreadPool(THREAD_SIZE1);
for (<loop>) {
   executer1.execute(new Runnable() {
            @Override
            public void run() {
                ...
            }
        });
} 
executer1.shutdown();

try{
   executer1.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS);

   ExecutorService executer2 = Executors.newFixedThreadPool(THREAD_SIZE2);
   for (true) {
      executer2.execute(new Runnable() {
            @Override
            public void run() {
                 ...
            }
        });
   } 
   executer2.shutdown();
} catch (Exception e){
 ...
}

Java 8 - 스트림 API를 사용하여 스트림을 처리할 수 있습니다.아래 토막을 참조해 주세요.

final List<Runnable> tasks = ...; //or any other functional interface
tasks.stream().parallel().forEach(Runnable::run) // Uses default pool

//alternatively to specify parallelism 
new ForkJoinPool(15).submit(
          () -> tasks.stream().parallel().forEach(Runnable::run) 
    ).get();

ExecutorService WORKER_THREAD_POOL 
  = Executors.newFixedThreadPool(10);
CountDownLatch latch = new CountDownLatch(2);
for (int i = 0; i < 2; i++) {
    WORKER_THREAD_POOL.submit(() -> {
        try {
            // doSomething();
            latch.countDown();
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    });
}

// wait for the latch to be decremented by the two remaining threads
latch.await();

ifdoSomething()몇 가지 다른 예외, 즉latch.countDown()실행이 안 될 것 같은데 어떻게 해야 하나요?

" " "의 AutoCloseableProject Loom을 사용한 실행자 서비스

프로젝트. Loom은 Java의 동시성 기능에 새로운 기능을 추가하고자 합니다.

그 기능 중 하나가 의 제조입니다.이것은 모든 것을 의미한다.ExecutorService구현은 방법을 제공합니다.리소스 사용 구문을 사용하여 자동으로 종료할 수 있습니다.ExecutorService★★★★★★ 。

메서드는 제출된 모든 작업이 완료될 때까지 차단합니다.사용.closeshutdown&awaitTermination.

되다AutoCloseableProject Loom이 Java에 "구조화된 동시성"을 가져오는 데 기여합니다.

try (
    ExecutorService executorService = Executors.… ;
) {
    // Submit your `Runnable`/`Callable` tasks to the executor service.
    …
}
// At this point, flow-of-control blocks until all submitted tasks are done/canceled/failed.
// After this point, the executor service will have been automatically shutdown, wia `close` method called by try-with-resources syntax.

Project Loom에 대한 자세한 내용은 Project Loom 팀에 대한 Ron Pressler 및 다른 사람들의 강연과 인터뷰를 검색하십시오.Project Loom이 진화함에 따라 보다 최근의 것에 초점을 맞추십시오.

Project Loom 테크놀로지의 실험적인 빌드는, 초기 액세스 Java 18에 근거하고 있습니다.

이것이 도움이 될 수 있다

Log.i(LOG_TAG, "shutting down executor...");
executor.shutdown();
while (true) {
                try {
                    Log.i(LOG_TAG, "Waiting for executor to terminate...");
                    if (executor.isTerminated())
                        break;
                    if (executor.awaitTermination(5000, TimeUnit.MILLISECONDS)) {
                        break;
                    }
                } catch (InterruptedException ignored) {}
            }

다음 Runner 클래스에서 waitTillDone()을 호출할 수 있습니다.

Runner runner = Runner.runner(4); // create pool with 4 threads in thread pool

while(...) {
    runner.run(new MyTask()); // here you submit your task
}


runner.waitTillDone(); // and this blocks until all tasks are finished (or failed)


runner.shutdown(); // once you done you can shutdown the runner

이 클래스를 재사용하여 shutdown()을 호출하기 전에 원하는 횟수만큼 waitTillDone()을 호출할 수 있으며 코드도 매우 간단합니다.또한 사전에 작업 수를 알 필요가 없습니다.

.compile 'com.github.matejtymes:javafixes:1.3.1'프로젝트에 대한 의존도를 높일 수 있습니다.

상세한 것에 대하여는, 여기를 참조해 주세요.

https://github.com/MatejTymes/JavaFixes

.getActiveCount() 스레드 를 알 수 - 활성 스레드 수를 알 수 있습니다. - 활성 스레드 수를 알 수 있습니다.

후, 「」, 「」의 할 수 .activeCount()는 「」입니다.0값이 0이면 현재 실행 중인 활성 스레드가 없음을 의미하며, 이는 작업이 완료되었음을 의미합니다.

while (true) {
    if (executor.getActiveCount() == 0) {
    //ur own piece of code
    break;
    }
}

언급URL : https://stackoverflow.com/questions/1250643/how-to-wait-for-all-threads-to-finish-using-executorservice

반응형