반응형

비동기 작업에서 Executor를 사용하는 방법은 CompletableFuture의 비동기 작업을 특정 Executor를 통해 실행하도록 설정하는 것입니다. 기본적으로 CompletableFuture.supplyAsyncCompletableFuture.runAsync는 공용 ForkJoinPool의 공용 스레드 풀을 사용하지만, 특정 Executor를 지정하여 사용자 정의 스레드 풀을 사용할 수도 있습니다.

 

Executor를 사용하는 방법

 

1. Executor 생성

 

먼저, 사용할 Executor를 생성해야 합니다. 예를 들어, ExecutorService는 자주 사용되는 Executor 구현 중 하나입니다.

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

ExecutorService executor = Executors.newFixedThreadPool(10);

2. CompletableFuture에서 Executor 사용

 

CompletableFuture의 비동기 작업을 시작할 때, Executor를 두 번째 인수로 전달합니다.

 

supplyAsync 사용 예

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class CompletableFutureWithExecutorExample {
    public static void main(String[] args) {
        ExecutorService executor = Executors.newFixedThreadPool(10);

        CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
            // 비동기 작업 수행
            return "Hello, World!";
        }, executor);

        future.thenAccept(result -> System.out.println("Result: " + result));

        // Executor 서비스 종료
        executor.shutdown();
    }
}

3. 여러 비동기 작업을 결합하여 Executor 사용

thenApplyAsync, thenAcceptAsync, thenRunAsync 등의 메서드도 Executor를 인수로 받아서 지정할 수 있습니다.

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class CompletableFutureWithMultipleAsyncExample {
    public static void main(String[] args) {
        ExecutorService executor = Executors.newFixedThreadPool(10);

        CompletableFuture.supplyAsync(() -> {
            // 첫 번째 비동기 작업
            return "Hello";
        }, executor).thenApplyAsync(result -> {
            // 두 번째 비동기 작업
            return result + ", World!";
        }, executor).thenAcceptAsync(result -> {
            // 세 번째 비동기 작업
            System.out.println("Result: " + result);
        }, executor);

        // Executor 서비스 종료
        executor.shutdown();
    }
}

 

요약

 

1. Executor 생성: ExecutorService와 같은 Executor 구현체를 생성합니다.

ExecutorService executor = Executors.newFixedThreadPool(10);

2. 비동기 작업에서 Executor 사용: supplyAsync, runAsync, thenApplyAsync, thenAcceptAsync, thenRunAsync 등의 메서드에서 Executor를 두 번째 인수로 전달합니다.

CompletableFuture.supplyAsync(() -> "Hello, World!", executor);

3. Executor 서비스 종료: 모든 비동기 작업이 완료되면 executor.shutdown()을 호출하여 Executor를 종료합니다.

executor.shutdown();

 

CompletableFuture Executor

CompletableFutureExecutor를 같이 사용하는 것은 여러 가지 이점을 제공합니다.

이 조합을 사용하면 비동기 작업의 효율성과 유연성을 극대화할 수 있습니다.

주요 이점

 

1. 병렬 처리의 효율성 증가

 

설명: 여러 비동기 작업을 병렬로 처리함으로써 전체 작업의 수행 시간을 단축할 수 있습니다. Executor를 사용하면 특정 스레드 풀을 통해 작업을 분산시켜 병렬 처리의 효율성을 극대화할 수 있습니다.

 

2. 작업 스케줄링 제어

 

설명: Executor를 사용하면 비동기 작업의 실행 정책을 세밀하게 제어할 수 있습니다. 예를 들어, 고정된 수의 스레드 풀, 캐시된 스레드 풀, 단일 스레드 풀 등을 사용하여 작업을 스케줄링할 수 있습니다.

 

3. 리소스 관리

 

설명: Executor를 사용하면 스레드 생성 및 관리를 중앙집중식으로 제어할 수 있어 시스템 리소스를 효율적으로 관리할 수 있습니다. 이를 통해 과도한 스레드 생성으로 인한 성능 저하를 방지할 수 있습니다.

 

4. 작업의 독립성 보장

 

설명: 각 비동기 작업을 별도의 스레드에서 실행하므로 작업 간의 간섭을 최소화할 수 있습니다. 이를 통해 독립적인 작업이 서로 영향을 주지 않고 안전하게 실행될 수 있습니다.

CompletableFuture<Void> future1 = CompletableFuture.runAsync(() -> {
    // 첫 번째 작업
}, executor);
CompletableFuture<Void> future2 = CompletableFuture.runAsync(() -> {
    // 두 번째 작업
}, executor);

5. 복잡한 비동기 작업 처리

 

설명: 여러 비동기 작업을 조합하여 복잡한 비동기 워크플로우를 쉽게 구현할 수 있습니다. CompletableFuture의 다양한 메서드를 사용하여 작업 간의 의존성을 설정하고, Executor를 통해 이러한 작업을 효율적으로 처리할 수 있습니다.

 

6. 예외 처리 및 복구

 

설명: 비동기 작업에서 발생하는 예외를 처리하고, 필요시 복구 작업을 수행할 수 있습니다. CompletableFutureexceptionally, handle 메서드와 함께 사용하여 예외 처리를 더 유연하게 할 수 있습니다.

 

종합 예시 코드

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class CompletableFutureWithExecutorExample {
    public static void main(String[] args) {
        ExecutorService executor = Executors.newFixedThreadPool(10);

        CompletableFuture.supplyAsync(() -> {
            // 비동기 작업 1
            return "Task 1 result";
        }, executor).thenApplyAsync(result -> {
            // 비동기 작업 2
            return result + " + Task 2 result";
        }, executor).thenAcceptAsync(result -> {
            // 비동기 작업 3
            System.out.println("Final Result: " + result);
        }, executor).exceptionally(ex -> {
            System.err.println("Exception: " + ex);
            return null;
        });

        executor.shutdown();
    }
}

 

요약

기본 스레드 풀: CompletableFutureExecutor와 함께 사용하지 않으면, 기본적으로 ForkJoinPool의 공용 스레드 풀을 사용합니다.

공용 스레드 풀의 크기는 시스템의 가용 프로세서 수에 따라 자동으로 결정됩니다.

반응형
반응형

CompletableFuture는 자바 8에서 도입된 java.util.concurrent 패키지의 클래스입니다. 비동기 프로그래밍을 쉽게 구현할 수 있도록 다양한 메서드와 기능을 제공합니다. CompletableFuture는 비동기 작업을 수행하고, 그 결과를 비동기적으로 처리할 수 있게 해줍니다.

 

1. CompletableFuture의 기본 개념

 

비동기 프로그래밍: 메인 스레드와는 별도로 작업을 수행하여 응답성을 높입니다.

비동기 작업의 관리: 작업의 완료 여부를 확인하고, 작업이 완료되면 후속 작업을 수행합니다.

콜백 등록: 작업이 완료되면 실행할 콜백 함수를 등록할 수 있습니다.

 

2. CompletableFuture의 생성

 

CompletableFuture 객체는 여러 가지 방법으로 생성할 수 있습니다.

 

직접 생성:

CompletableFuture<String> future = new CompletableFuture<>();

 

비동기 작업을 시작하면서 생성:

CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
    return "Hello, World!";
});

 

비동기 작업의 결과를 미리 정의하면서 생성:

CompletableFuture<String> future = CompletableFuture.completedFuture("Hello, World!");

 

3. 주요 메서드

 

비동기 작업 실행

 

runAsync: 결과가 없는 작업을 비동기로 실행합니다.

CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
    System.out.println("Running asynchronously");
});

 

supplyAsync: 결과가 있는 작업을 비동기로 실행합니다.

CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
    return "Hello, World!";
});

 

결과 처리

 

thenApply: 이전 작업의 결과를 받아서 변환합니다.

CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> "Hello")
                                                    .thenApply(result -> result + ", World!");

thenAccept: 이전 작업의 결과를 받아서 소비합니다.

CompletableFuture<Void> future = CompletableFuture.supplyAsync(() -> "Hello")
                                                  .thenAccept(result -> System.out.println(result));

thenRun: 이전 작업의 결과를 사용하지 않고 실행할 작업을 정의합니다.

CompletableFuture<Void> future = CompletableFuture.supplyAsync(() -> "Hello")
                                                  .thenRun(() -> System.out.println("Task completed"));

예외 처리

 

exceptionally: 예외가 발생했을 때 대체 값을 제공합니다.

CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
    if (true) throw new RuntimeException("Error occurred");
    return "Hello";
}).exceptionally(ex -> "Recovered from error");

 

handle: 정상적인 결과와 예외를 모두 처리합니다.

CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
    if (true) throw new RuntimeException("Error occurred");
    return "Hello";
}).handle((result, ex) -> {
    if (ex != null) return "Recovered from error";
    return result;
});

 

여러 작업의 조합

 

thenCombine: 두 비동기 작업의 결과를 조합합니다.

CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> "Hello");
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> "World");
CompletableFuture<String> combinedFuture = future1.thenCombine(future2, (result1, result2) -> result1 + " " + result2);

allOf: 여러 비동기 작업을 모두 완료할 때까지 기다립니다.

CompletableFuture<Void> combinedFuture = CompletableFuture.allOf(future1, future2);

anyOf: 여러 비동기 작업 중 하나라도 완료되면 결과를 반환합니다.

CompletableFuture<Object> anyOfFuture = CompletableFuture.anyOf(future1, future2);

4. CompletableFuture 사용 예제

 

예제 1: 기본 사용

import java.util.concurrent.CompletableFuture;

public class CompletableFutureExample {
    public static void main(String[] args) {
        CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> "Hello")
                                                            .thenApply(result -> result + ", World!")
                                                            .thenAccept(System.out::println);
    }
}

예제 2: 예외 처리

import java.util.concurrent.CompletableFuture;

public class CompletableFutureExceptionExample {
    public static void main(String[] args) {
        CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
            if (true) throw new RuntimeException("Error occurred");
            return "Hello";
        }).exceptionally(ex -> "Recovered from error")
          .thenAccept(System.out::println);
    }
}

예제 3: 여러 작업의 조합

import java.util.concurrent.CompletableFuture;

public class CompletableFutureCombineExample {
    public static void main(String[] args) {
        CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> "Hello");
        CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> "World");

        CompletableFuture<String> combinedFuture = future1.thenCombine(future2, (result1, result2) -> result1 + " " + result2);
        combinedFuture.thenAccept(System.out::println);
    }
}

 

요약

 

비동기 프로그래밍: CompletableFuture는 비동기 작업을 쉽게 관리하고 처리할 수 있는 다양한 기능을 제공합니다.

비동기 작업 실행: runAsyncsupplyAsync를 사용하여 비동기 작업을 시작합니다.

결과 처리: thenApply, thenAccept, thenRun 등의 메서드를 사용하여 비동기 작업의 결과를 처리합니다.

예외 처리: exceptionally, handle 메서드를 사용하여 예외를 처리합니다.

여러 작업의 조합: thenCombine, allOf, anyOf를 사용하여 여러 비동기 작업을 조합합니다.

반응형
반응형

CompletableFuture 하나만 있으면 비동기 처리 및 여러 병렬처리를 원하는 대로 할 수 가 있다.

 

여기서 코드 구현하려는 내용은

list 데이터가 있고, 각 list 의 item 이 http 커넥션을 통해 데이터를 가져오는 등 순차적으로 수행하기에는 시간소요가 오래걸랄때 쓰레드를 이용해 시간을 줄여보고자 한다.

 

List<Item> list = itemRepository.findAll();


ExecutorService executorService = Executors.newFixedThreadPool(10);
log.info(list.size() + "개");

List<CompletableFuture<Boolean>> resultList =  list.stream().map(
    item -> CompletableFuture.supplyAsync(() -> {
    	try {
    		return this.getDataByConnection(item.getParam());
    	} catch (IOException e) {
    		e.printStackTrace();
    	}
    	return false;
    }, executorService)
).collect(Collectors.toList());

resultList.stream().map(CompletableFuture::join).collect(Collectors.toList()); 
// join 을 해야 CompletableFuture 가 stream api 내부에서 수행됨. 



private boolean getDataByConnection(String param) throws IOException {
        // 해당 내용은 okhttp 라이브러리로 다른 http 커넥션 사용해도 됨.
        Request request = new Request.Builder()
                .url("https://~~~~~~~")
                .method("GET", null)
                .build();
        Response response = client.newCall(request).execute();
        String jsonString = response.body().string();
        
        ~~~~~~ 성공 및 실패에 대한 응답
        
        return true;

    }

1. ExecutorService 로 쓰레드 생성

ExecutorService 로 생성된 쓰레드 개수만큼만 병렬쓰레드로 실행 할 수 있다.

list 의 개수에 따라 더 늘려줄 수 도 있고, list 개수가 너무 많으면 성능 부하가 생길 수 있으므로 적정수의 쓰레드를 생성하자.

 

2. list 를 stream api 를 사용하여 stream 내부에서 CompletableFuture 를 실행하자.

stream api 내부에서 map api 로  supplyAsync 든 runAsync 를 하자.

둘다 CompletableFuture 의 비동기 메소드 이지만, supplyAsync 는 응답값을 받고, runAsync 는 응답을 받지 않는 메소드이다.

 

3. 만든 executorService 쓰레드를 CompletableFuture 파라미터에 넣어주자. 

쓰레드 개수를 10개ㄹ로 해주면 아래와 같이 10개까지만 쓰레드가 생성되서 실행이 된다.

 

쓰레드풀을 넣지 않아도 된다. 

넣지 않으면 적정수의 쓰레드가 생성.

7개의 쓰레드까지만 생성이 되었다.

 

4. 마지막으로 join 을 실행시키자

CompletableFuture 를 받는 변수를 생성 하고, join 을 해줘야 해당 쓰레드풀이 실행이 되고 결과값이 반환이 된다.

CompletableFuture 의 <> 안에 들어가 있는 객체를 통해 새로운 list 로도 만들수 있다.

 

EX) 아래와 같이 list 의 item 마다 병렬처리를 통해 새로운 list 를 구성할 수 있다.

List<CompletableFuture<Item>> resultList =  list.stream().map(
    item -> CompletableFuture.supplyAsync(() -> {
    	try {
    		return this.getDataByConnection(item.getParam());
    	} catch (IOException e) {
    		e.printStackTrace();
    	}
    	return false;
    }, executorService)
).collect(Collectors.toList());

List<Item> finalList = resultList.stream().map(CompletableFuture::join).collect(Collectors.toList());

 

 

 

반응형

+ Recent posts