programing

Java 8 병렬 스트림의 사용자 지정 스레드 풀

prostudy 2022. 5. 24. 21:55
반응형

Java 8 병렬 스트림의 사용자 지정 스레드 풀

Java 8 병렬 스트림에 대한 사용자 지정 스레드 풀을 지정할 수 있는가?어디에서도 찾을 수 없다.

서버 응용 프로그램이 있고 병렬 스트림을 사용하고 싶다고 상상해 보십시오.하지만 어플리케이션은 크고 다중 스레드라서 분리하고 싶어.나는 다른 모듈의 응용 프로그램 블록 작업 중 한 모듈에서 느리게 실행되는 작업을 원하지 않는다.

모듈마다 다른 스레드 풀을 사용할 수 없다면 대부분의 실제 상황에서 병렬 스트림을 안전하게 사용할 수 없다는 뜻이다.

다음 예를 들어 보십시오.별도의 스레드에서 실행되는 일부 CPU 집약적인 작업이 있다.태스크는 병렬 스트림을 활용한다.첫 번째 작업이 중단되어 각 단계가 1초(실드 슬립으로 시뮬레이션)가 소요된다.문제는 다른 나사산이 끼여 부서진 작업이 끝나기를 기다리는 것이다.이것은 조작된 예지만, 서블릿 앱과 누군가가 공유 포크 조인 풀에 장기 실행 작업을 제출한다고 상상해 보십시오.

public class ParallelTest {
    public static void main(String[] args) throws InterruptedException {
        ExecutorService es = Executors.newCachedThreadPool();

        es.execute(() -> runTask(1000)); //incorrect task
        es.execute(() -> runTask(0));
        es.execute(() -> runTask(0));
        es.execute(() -> runTask(0));
        es.execute(() -> runTask(0));
        es.execute(() -> runTask(0));


        es.shutdown();
        es.awaitTermination(60, TimeUnit.SECONDS);
    }

    private static void runTask(int delay) {
        range(1, 1_000_000).parallel().filter(ParallelTest::isPrime).peek(i -> Utils.sleep(delay)).max()
                .ifPresent(max -> System.out.println(Thread.currentThread() + " " + max));
    }

    public static boolean isPrime(long n) {
        return n > 1 && rangeClosed(2, (long) sqrt(n)).noneMatch(divisor -> n % divisor == 0);
    }
}

실제로 특정 포크 조인 풀에서 병렬 연산을 실행하는 방법이 있다.포크 조인 풀에서 태스크로 실행하면 포크 조인 풀에 그대로 남아 일반적인 것을 사용하지 않는다.

final int parallelism = 4;
ForkJoinPool forkJoinPool = null;
try {
    forkJoinPool = new ForkJoinPool(parallelism);
    final List<Integer> primes = forkJoinPool.submit(() ->
        // Parallel task here, for example
        IntStream.range(1, 1_000_000).parallel()
                .filter(PrimesPrint::isPrime)
                .boxed().collect(Collectors.toList())
    ).get();
    System.out.println(primes);
} catch (InterruptedException | ExecutionException e) {
    throw new RuntimeException(e);
} finally {
    if (forkJoinPool != null) {
        forkJoinPool.shutdown();
    }
}

트릭은 다음과 같이 지정한다: "해당되는 경우 현재 작업이 실행 중인 풀에서 이 작업을 비동기적으로 실행하거나ForkJoinPool.commonPool()아니라면inForkJoinPool()"

병렬 스트림이 기본값을 사용함ForkJoinPool.commonPool기본적으로 반환되는 프로세서가 있을 때 스레드가 하나적음Runtime.getRuntime().availableProcessors()(이는 병렬 스트림이 호출 스레드에 대해 하나의 프로세서를 남기는 것을 의미한다.)

별도의 또는 사용자 정의 풀이 필요한 애플리케이션의 경우, ForkJoinPool은 지정된 대상 병렬 처리 수준으로 구성될 수 있으며, 기본적으로 사용 가능한 프로세서 수와 동일하다.

이것은 또한 동시에 중첩된 병렬 스트림이나 다중 병렬 스트림이 시작되면 이들 스트림이 모두 동일한 풀을 공유한다는 것을 의미한다.장점: 기본값(사용 가능한 프로세서 수) 이상을 사용하지 마십시오.단점: 시작하는 각 병렬 스트림에 "모든 프로세서"가 할당되지 않을 수 있음(두 개 이상 있는 경우).(ManagedBlocker를 사용하여 이를 우회할 수 있음)

병렬 스트림 실행 방법을 변경하려면 다음 작업을 수행하십시오.

  • yourFJP.submit(() -> stream.parallel().forEach(soSomething)).get();또는
  • 시스템 속성을 사용하여 공통 풀의 크기를 변경할 수 있음:System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "20")20개의 스레드의 목표 병렬화

프로세서가 8개인 내 컴퓨터의 후자의 예.다음 프로그램을 실행하는 경우:

long start = System.currentTimeMillis();
IntStream s = IntStream.range(0, 20);
//System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "20");
s.parallel().forEach(i -> {
    try { Thread.sleep(100); } catch (Exception ignore) {}
    System.out.print((System.currentTimeMillis() - start) + " ");
});

출력:

215 216 216 216 216 216 216 216 315 316 316 316 316 316 316 316 415 416 416 416

따라서 병렬 스트림이 한 번에 8개의 항목을 처리한다는 것을 알 수 있다. 즉, 8개의 스레드를 사용한다.그러나 주석 줄의 압축을 풀면 다음과 같은 출력이 나온다.

215 215 215 215 215 216 216 216 216 216 216 216 216 216 216 216 216 216 216 216

이번에는 평행 스트림이 20개의 스레드를 사용하였고, 스트림의 20개의 모든 요소를 동시에 처리하였다.

자신의 포크JoinPool 내에서 병렬 컴퓨팅을 트리거하는 방법 대신 해당 풀을 Complete 테이블로 전달하십시오.Future.supplyAsync 방법은 다음과 같다.

ForkJoinPool forkJoinPool = new ForkJoinPool(2);
CompletableFuture<List<Integer>> primes = CompletableFuture.supplyAsync(() ->
    //parallel task here, for example
    range(1, 1_000_000).parallel().filter(PrimesPrint::isPrime).collect(toList()), 
    forkJoinPool
);

원래의 솔루션(ForkJoinPool 공통 병렬 속성 설정)은 더 이상 작동하지 않는다.원래 답변에 있는 링크를 보면, 이를 어기는 업데이트가 Java 8로 다시 포팅되었다.링크된 스레드에서 언급했듯이, 이 솔루션은 영원히 작동하도록 보장되지 않았다.이를 바탕으로 한 해법은 포크조인풀이다.수락한 답안에 대해 토의하다.나는 백포트가 이 해결책의 비신뢰성을 고친다고 생각한다.

ForkJoinPool fjpool = new ForkJoinPool(10);
System.out.println("stream.parallel");
IntStream range = IntStream.range(0, 20);
fjpool.submit(() -> range.parallel()
        .forEach((int theInt) ->
        {
            try { Thread.sleep(100); } catch (Exception ignore) {}
            System.out.println(Thread.currentThread().getName() + " -- " + theInt);
        })).get();
System.out.println("list.parallelStream");
int [] array = IntStream.range(0, 20).toArray();
List<Integer> list = new ArrayList<>();
for (int theInt: array)
{
    list.add(theInt);
}
fjpool.submit(() -> list.parallelStream()
        .forEach((theInt) ->
        {
            try { Thread.sleep(100); } catch (Exception ignore) {}
            System.out.println(Thread.currentThread().getName() + " -- " + theInt);
        })).get();

기본 병렬은 다음 속성을 사용하여 변경할 수 있다.

-Djava.util.concurrent.ForkJoinPool.common.parallelism=16

좀 더 병렬적인 방법을 사용할 수 있도록 설정될 수 있다.

을 확인할 수 Thread.activeCount():

    Runnable r = () -> IntStream
            .range(-42, +42)
            .parallel()
            .map(i -> Thread.activeCount())
            .max()
            .ifPresent(System.out::println);

    ForkJoinPool.commonPool().submit(r).join();
    new ForkJoinPool(42).submit(r).join();

이를 통해 4코어 CPU에서 다음과 같은 출력을 낼 수 있다.

5 // common pool
23 // custom pool

것은.parallel()다음과 같은 이점을 제공한다.

3 // common pool
4 // custom pool

지금까지 나는 이 질문의 답변에 설명된 해결책을 사용했다.이제, 나는 병렬 스트림 지원이라고 불리는 작은 도서관을 생각해냈다.

ForkJoinPool pool = new ForkJoinPool(NR_OF_THREADS);
ParallelIntStreamSupport.range(1, 1_000_000, pool)
    .filter(PrimesPrint::isPrime)
    .collect(toList())

그러나 @PabloMatiasGomez가 논평에서 지적했듯이, 공통 풀의 크기에 크게 의존하는 병렬 스트림의 분할 메커니즘에 관한 단점이 있다.자세한 내용은 HashSet의 병렬 스트림이 병렬로 실행되지 않음을 참조하십시오.

나는 이 솔루션을 사용하는 이유는 여러 종류의 작업에 대해 별도의 풀을 갖기 위해서일 뿐인데 사용하지 않아도 공용 풀의 크기를 1로 설정할 수 없다.

참고: JDK 10에는 사용자 지정 스레드 풀이 예상 스레드 수를 사용하도록 하는 수정 사항이 구현되어 있는 것으로 표시됨.

사용자 정의 ForkJoinPool 내의 병렬 스트림 실행은 병렬 처리 https://bugs.openjdk.java.net/browse/JDK-8190974을 준수해야 함

구현 해킹에 의존하지 않으려면 조합할 사용자 지정 수집기를 구현하여 동일한 결과를 얻을 수 있는 방법이 항상 있음map그리고collect의미론...ForkJoinPool에만 국한되지 않을 경우:

list.stream()
  .collect(parallel(i -> process(i), executor, 4))
  .join()

운 좋게도, 이 일은 이미 여기서 끝났고 메이븐 센트럴에서 이용할 수 있다: http://github.com/pivovarit/parallel-collectors

고지 사항:나는 그것을 쓰고 그것에 대한 책임을 진다.

풀 크기를 조정하기 위해 다음과 같이 사용자 정의 ForkJoinPool을 사용해 보았다.

private static Set<String> ThreadNameSet = new HashSet<>();
private static Callable<Long> getSum() {
    List<Long> aList = LongStream.rangeClosed(0, 10_000_000).boxed().collect(Collectors.toList());
    return () -> aList.parallelStream()
            .peek((i) -> {
                String threadName = Thread.currentThread().getName();
                ThreadNameSet.add(threadName);
            })
            .reduce(0L, Long::sum);
}

private static void testForkJoinPool() {
    final int parallelism = 10;

    ForkJoinPool forkJoinPool = null;
    Long result = 0L;
    try {
        forkJoinPool = new ForkJoinPool(parallelism);
        result = forkJoinPool.submit(getSum()).get(); //this makes it an overall blocking call

    } catch (InterruptedException | ExecutionException e) {
        e.printStackTrace();
    } finally {
        if (forkJoinPool != null) {
            forkJoinPool.shutdown(); //always remember to shutdown the pool
        }
    }
    out.println(result);
    out.println(ThreadNameSet);
}

다음은 풀에서 기본 4개보다 많은 스레드를 사용하고 있다는 출력이다.

50000005000000
[ForkJoinPool-1-worker-8, ForkJoinPool-1-worker-9, ForkJoinPool-1-worker-6, ForkJoinPool-1-worker-11, ForkJoinPool-1-worker-10, ForkJoinPool-1-worker-1, ForkJoinPool-1-worker-15, ForkJoinPool-1-worker-13, ForkJoinPool-1-worker-4, ForkJoinPool-1-worker-2]

하지만 실제로 이상한 점이 있는데, 내가 같은 결과를 얻으려고 했을 때ThreadPoolExecutor아래와 같이

BlockingDeque blockingDeque = new LinkedBlockingDeque(1000);
ThreadPoolExecutor fixedSizePool = new ThreadPoolExecutor(10, 20, 60, TimeUnit.SECONDS, blockingDeque, new MyThreadFactory("my-thread"));

하지만 난 실패했어

새 실에 평행스트림만 시작하고 그 다음엔 다른 모든 것이 그대로라는 것을 다시 한번 증명해 보일 것이다.parallelStreamForkJoinPool을 사용하여 하위 스레드를 시작하십시오.

AbacusUtil을 구하러 가십시오.병렬 스트림에 대해 스레드 번호를 지정할 수 있다.다음은 샘플 코드:

LongStream.range(4, 1_000_000).parallel(threadNum)...

공개: 나는 AbacusUtil의 개발자다.

위에 언급한 최대 스레드 수 플래그를 프로그래밍 방식으로 설정하고 매개 변수가 준수되는지 확인하는 코드 캡처 방법

System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "2");
Set<String> threadNames = Stream.iterate(0, n -> n + 1)
  .parallel()
  .limit(100000)
  .map(i -> Thread.currentThread().getName())
  .collect(Collectors.toSet());
System.out.println(threadNames);

// Output -> [ForkJoinPool.commonPool-worker-1, Test worker, ForkJoinPool.commonPool-worker-3]

타사 라이브러리를 사용하는 것이 괜찮다면 사이클롭스 리액트(cyclops-react)를 사용하여 동일한 파이프라인 내에서 순차 및 병렬 스트림을 혼합하고 사용자 정의 ForkJoinPools를 제공할 수 있다.예를 들면

 ReactiveSeq.range(1, 1_000_000)
            .foldParallel(new ForkJoinPool(10),
                          s->s.filter(i->true)
                              .peek(i->System.out.println("Thread " + Thread.currentThread().getId()))
                              .max(Comparator.naturalOrder()));

또는 순차 스트림 내에서 계속 처리하려는 경우

 ReactiveSeq.range(1, 1_000_000)
            .parallel(new ForkJoinPool(10),
                      s->s.filter(i->true)
                          .peek(i->System.out.println("Thread " + Thread.currentThread().getId())))
            .map(this::processSequentially)
            .forEach(System.out::println);

[공개 나는 사이클롭스 리액션의 선도 개발자다]

사용자 정의 ThreadPool이 필요하지 않지만 동시 태스크 수를 제한하려는 경우 다음을 사용하십시오.

List<Path> paths = List.of("/path/file1.csv", "/path/file2.csv", "/path/file3.csv").stream().map(e -> Paths.get(e)).collect(toList());
List<List<Path>> partitions = Lists.partition(paths, 4); // Guava method

partitions.forEach(group -> group.parallelStream().forEach(csvFilePath -> {
       // do your processing   
}));

(이걸 묻는 중복질문이 잠겨있으니, 여기서 참으시오)

참조URL: https://stackoverflow.com/questions/21163108/custom-thread-pool-in-java-8-parallel-stream

반응형