Java 실행자: 작업이 완료되었을 때 차단 없이 알림을 받으려면 어떻게 해야 합니까?
실행자 서비스에 제출해야 하는 태스크가 큐에 가득 찼다고 가정합니다.한 번에 하나씩 처리해 주세요.생각할 수 있는 가장 간단한 방법은 다음과 같은 방법이 있습니다.
- 큐에서 작업 가져오기
- 실행자에게 제출합니다.
- .get을 호출하여 결과를 얻을 수 있을 때까지 차단합니다.
- 대기열에서 다른 작업 수행...
그러나 저는 완전히 차단되는 것을 피하려고 합니다.작업을 한 번에 하나씩 처리해야 하는 10,000개의 큐가 있는 경우 대부분의 큐가 차단된 스레드에 고정되어 있기 때문에 스택 공간이 부족합니다.
제가 원하는 것은 작업을 제출하고 작업이 완료되면 호출되는 콜백을 제공하는 것입니다.그 콜백 알림을 플래그로 사용하여 다음 작업을 전송합니다.(functionaljava와 jetlang은 그런 논블로킹 알고리즘을 사용하는 것 같은데 코드를 이해할 수 없습니다.)
어떻게 하면 JDK의 java.util.concurrent를 사용하여 실행할 수 있습니까?
(이러한 태스크의 큐 자체는 차단할 수 있지만, 이것은 나중에 대처해야 할 문제입니다.)
완료 알림으로 전달할 파라미터를 수신하는 콜백인터페이스를 정의합니다.그런 다음 태스크가 끝날 때 호출합니다.
한 태스크의 하여 '실행 가능 태스크'에 .ExecutorServiceJava 8 에 짜넣어진 메카니즘에 대해서는, 이하를 참조해 주세요.
class CallbackTask implements Runnable {
private final Runnable task;
private final Callback callback;
CallbackTask(Runnable task, Callback callback) {
this.task = task;
this.callback = callback;
}
public void run() {
task.run();
callback.complete();
}
}
그럼 Java 8은 프로세스를 비동기적으로 조건부로 완료할 수 있는 파이프라인을 구성하는 보다 정교한 수단을 포함했습니다.다음은 계획적이지만 완전한 알림의 예입니다.
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
public class GetTaskNotificationWithoutBlocking {
public static void main(String... argv) throws Exception {
ExampleService svc = new ExampleService();
GetTaskNotificationWithoutBlocking listener = new GetTaskNotificationWithoutBlocking();
CompletableFuture<String> f = CompletableFuture.supplyAsync(svc::work);
f.thenAccept(listener::notify);
System.out.println("Exiting main()");
}
void notify(String msg) {
System.out.println("Received message: " + msg);
}
}
class ExampleService {
String work() {
sleep(7000, TimeUnit.MILLISECONDS); /* Pretend to be busy... */
char[] str = new char[5];
ThreadLocalRandom current = ThreadLocalRandom.current();
for (int idx = 0; idx < str.length; ++idx)
str[idx] = (char) ('A' + current.nextInt(26));
String msg = new String(str);
System.out.println("Generated message: " + msg);
return msg;
}
public static void sleep(long average, TimeUnit unit) {
String name = Thread.currentThread().getName();
long timeout = Math.min(exponential(average), Math.multiplyExact(10, average));
System.out.printf("%s sleeping %d %s...%n", name, timeout, unit);
try {
unit.sleep(timeout);
System.out.println(name + " awoke.");
} catch (InterruptedException abort) {
Thread.currentThread().interrupt();
System.out.println(name + " interrupted.");
}
}
public static long exponential(long avg) {
return (long) (avg * -Math.log(1 - ThreadLocalRandom.current().nextDouble()));
}
}
Java 8에서는 Complete table을 사용할 수 있습니다.Future. 다음은 사용자 서비스에서 사용자를 가져와 뷰 객체에 매핑한 다음 보기를 업데이트하거나 오류 대화 상자를 표시하기 위해 사용하는 코드의 예입니다(GUI 응용 프로그램).
CompletableFuture.supplyAsync(
userService::listUsers
).thenApply(
this::mapUsersToUserViews
).thenAccept(
this::updateView
).exceptionally(
throwable -> { showErrorDialogFor(throwable); return null; }
);
비동기적으로 실행됩니다.두 가지 방법을 .mapUsersToUserViews ★★★★★★★★★★★★★★★★★」updateView.
Guava의 청취 가능한 미래 API를 사용하여 콜백을 추가합니다.웹사이트에서 참조:
ListeningExecutorService service = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(10));
ListenableFuture<Explosion> explosion = service.submit(new Callable<Explosion>() {
public Explosion call() {
return pushBigRedButton();
}
});
Futures.addCallback(explosion, new FutureCallback<Explosion>() {
// we want this handler to run immediately after we push the big red button!
public void onSuccess(Explosion explosion) {
walkAwayFrom(explosion);
}
public void onFailure(Throwable thrown) {
battleArchNemesis(); // escaped the explosion!
}
});
'연장하다'라고 하면 됩니다.FutureTask.done()method를한 후 method를 합니다.FutureTaskExecutorService '이러다'는done()메서드는 다음 경우에 호출됩니다.FutureTask즉시 완료됩니다.
ThreadPoolExecutor또한 가지고 있다beforeExecute ★★★★★★★★★★★★★★★★★」afterExecute덮어쓰고 사용할 수 있는 후크 방식입니다.하겠습니다.ThreadPoolExecutor의 자바독
후크 방식
이 클래스는 각 작업의 실행 전후에 호출되는 보호된 재정의 가능 및 메서드를 제공합니다.예를 들어 재초기화와 같은 실행 환경을 조작하는 데 사용할 수 있습니다.
ThreadLocals통계정보를 수집하거나 로그엔트리를 추가합니다.또한 메서드를 덮어쓰면 특정 처리를 실행할 수 있습니다.Executor가 완전히 종료되었습니다.후크 방식 또는 콜백 방식이 예외를 슬로우하면 내부 워커 스레드에 장애가 발생하여 갑자기 종료될 수 있습니다.
를 사용합니다.
에서 왔습니다.java.util.concurrent여러 스레드의 실행이 완료될 때까지 기다렸다가 계속하는 것이 바로 이 방법입니다.
당신이 생각하는 콜백 효과를 얻기 위해서는 약간의 추가 작업이 필요합니다. 이 때, 른, 른, 른, 른, 른, 른, 른, which, which, which, which, which, which, which, which, which, which, which, which, which, which,CountDownLatch그리고 그것을 기다리고 나서, 통지할 필요가 있는 것을 통지하는 것에 대해서 계속합니다.콜백에 대한 네이티브 지원이나 이와 유사한 효과는 없습니다.
편집: 이제 당신의 질문을 더 이해하게 되었습니다. 당신이 너무 멀리, 불필요하게 도달했다고 생각합니다.regular를 사용할 경우 모든 작업을 수행합니다. 그러면 큐잉이 네이티브하게 수행됩니다.
동시에 실행되는 태스크가 없는지 확인하려면 SingleThreaded를 사용합니다.실행자작업은 전송된 순서대로 처리됩니다.작업을 보류할 필요도 없고, 경영진에게 제출하기만 하면 됩니다.
Callback를 사용한 ExecutorService
import java.util.concurrent.*;
import java.util.*;
public class CallBackDemo{
public CallBackDemo(){
System.out.println("creating service");
ExecutorService service = Executors.newFixedThreadPool(5);
try{
for ( int i=0; i<5; i++){
Callback callback = new Callback(i+1);
MyCallable myCallable = new MyCallable((long)i+1,callback);
Future<Long> future = service.submit(myCallable);
//System.out.println("future status:"+future.get()+":"+future.isDone());
}
}catch(Exception err){
err.printStackTrace();
}
service.shutdown();
}
public static void main(String args[]){
CallBackDemo demo = new CallBackDemo();
}
}
class MyCallable implements Callable<Long>{
Long id = 0L;
Callback callback;
public MyCallable(Long val,Callback obj){
this.id = val;
this.callback = obj;
}
public Long call(){
//Add your business logic
System.out.println("Callable:"+id+":"+Thread.currentThread().getName());
callback.callbackMethod();
return id;
}
}
class Callback {
private int i;
public Callback(int i){
this.i = i;
}
public void callbackMethod(){
System.out.println("Call back:"+i);
// Add your business logic
}
}
출력:
creating service
Callable:1:pool-1-thread-1
Call back:1
Callable:3:pool-1-thread-3
Callable:2:pool-1-thread-2
Call back:2
Callable:5:pool-1-thread-5
Call back:5
Call back:3
Callable:4:pool-1-thread-4
Call back:4
주요 사항:
- 순서로 처리 FIFO를 .
newFixedThreadPool(5)newFixedThreadPool(1) 분석 후
callback경우, 아래 됩니다.//System.out.println("future status:"+future.get()+":"+future.isDone());할 수 요.
newFixedThreadPool()한Executors.newCachedThreadPool() Executors.newWorkStealingPool() ThreadPoolExecutor사용 사례에 따라 다릅니다.
콜백 메서드를 비동기적으로 처리하는 경우
a. 공유 전달
ExecutorService or ThreadPoolExecutor호출 가능한 태스크로b. 사용자 변환
Callable하는 방법Callable/Runnable작업c. 콜백 태스크 푸시 대상
ExecutorService or ThreadPoolExecutor
이것은 Pache가 Guava를 사용한 대답의 연장선입니다.ListenableFuture.
특히,Futures.transform()돌아온다ListenableFuture비동기 콜 체인에 사용할 수 있습니다. Futures.addCallback()돌아온다void는 체인에 사용할 수 없지만 비동기 완료 시 성공/실패 처리에는 적합합니다.
// ListenableFuture1: Open Database
ListenableFuture<Database> database = service.submit(() -> openDatabase());
// ListenableFuture2: Query Database for Cursor rows
ListenableFuture<Cursor> cursor =
Futures.transform(database, database -> database.query(table, ...));
// ListenableFuture3: Convert Cursor rows to List<Foo>
ListenableFuture<List<Foo>> fooList =
Futures.transform(cursor, cursor -> cursorToFooList(cursor));
// Final Callback: Handle the success/errors when final future completes
Futures.addCallback(fooList, new FutureCallback<List<Foo>>() {
public void onSuccess(List<Foo> foos) {
doSomethingWith(foos);
}
public void onFailure(Throwable thrown) {
log.error(thrown);
}
});
메모: 비동기 작업 체인과 더불어Futures.transform()에서는 각 작업을 개별 실행기로 스케줄 할 수도 있습니다(이 예에서는 표시되지 않습니다).
Matt의 답변에 덧붙이자면, 여기 콜백의 사용법을 보여주는 좀 더 구체적인 예가 있습니다.
private static Primes primes = new Primes();
public static void main(String[] args) throws InterruptedException {
getPrimeAsync((p) ->
System.out.println("onPrimeListener; p=" + p));
System.out.println("Adios mi amigito");
}
public interface OnPrimeListener {
void onPrime(int prime);
}
public static void getPrimeAsync(OnPrimeListener listener) {
CompletableFuture.supplyAsync(primes::getNextPrime)
.thenApply((prime) -> {
System.out.println("getPrimeAsync(); prime=" + prime);
if (listener != null) {
listener.onPrime(prime);
}
return prime;
});
}
출력은 다음과 같습니다.
getPrimeAsync(); prime=241
onPrimeListener; p=241
Adios mi amigito
다음과 같이 Callable 구현을 사용할 수 있습니다.
public class MyAsyncCallable<V> implements Callable<V> {
CallbackInterface ci;
public MyAsyncCallable(CallbackInterface ci) {
this.ci = ci;
}
public V call() throws Exception {
System.out.println("Call of MyCallable invoked");
System.out.println("Result = " + this.ci.doSomething(10, 20));
return (V) "Good job";
}
}
여기서 콜백인터페이스는 다음과 같은 매우 기본적인 것입니다.
public interface CallbackInterface {
public int doSomething(int a, int b);
}
그리고 이제 메인 클래스는 이렇게 될 것이다.
ExecutorService ex = Executors.newFixedThreadPool(2);
MyAsyncCallable<String> mac = new MyAsyncCallable<String>((a, b) -> a + b);
ex.submit(mac);
언급URL : https://stackoverflow.com/questions/826212/java-executors-how-to-be-notified-without-blocking-when-a-task-completes
'programing' 카테고리의 다른 글
| 비트 시프트와 덧셈만을 사용하여 어떻게 곱하고 나눌 수 있습니까? (0) | 2022.07.16 |
|---|---|
| Vuex에서 여러 돌연변이를 호출하는 적절한 방법 (0) | 2022.07.16 |
| 불완전한 유형에 대한 참조 포인터 (0) | 2022.07.16 |
| Array List 또는 String Array에서 모든 null 요소를 효율적으로 삭제하는 방법 (0) | 2022.07.16 |
| vue js dev server 시작 시 .plugins [ 0 ]는 2 태플 또는 3 태플만 제공할 수 있습니다. (0) | 2022.07.16 |