반응형

CompletableFuture 하나만 있으면 비동기 처리 및 여러 병렬처리를 원하는 대로 할 수 가 있다.

 

여기서 코드 구현하려는 내용은

list 데이터가 있고, 각 list 의 item 이 http 커넥션을 통해 데이터를 가져오는 등 순차적으로 수행하기에는 시간소요가 오래걸랄때 쓰레드를 이용해 시간을 줄여보고자 한다.

 

List<Item> list = itemRepository.findAll();


ExecutorService executorService = Executors.newFixedThreadPool(10);
log.info(list.size() + "개");

List<CompletableFuture<Boolean>> resultList =  list.stream().map(
    item -> CompletableFuture.supplyAsync(() -> {
    	try {
    		return this.getDataByConnection(item.getParam());
    	} catch (IOException e) {
    		e.printStackTrace();
    	}
    	return false;
    }, executorService)
).collect(Collectors.toList());

resultList.stream().map(CompletableFuture::join).collect(Collectors.toList()); 
// join 을 해야 CompletableFuture 가 stream api 내부에서 수행됨. 



private boolean getDataByConnection(String param) throws IOException {
        // 해당 내용은 okhttp 라이브러리로 다른 http 커넥션 사용해도 됨.
        Request request = new Request.Builder()
                .url("https://~~~~~~~")
                .method("GET", null)
                .build();
        Response response = client.newCall(request).execute();
        String jsonString = response.body().string();
        
        ~~~~~~ 성공 및 실패에 대한 응답
        
        return true;

    }

1. ExecutorService 로 쓰레드 생성

ExecutorService 로 생성된 쓰레드 개수만큼만 병렬쓰레드로 실행 할 수 있다.

list 의 개수에 따라 더 늘려줄 수 도 있고, list 개수가 너무 많으면 성능 부하가 생길 수 있으므로 적정수의 쓰레드를 생성하자.

 

2. list 를 stream api 를 사용하여 stream 내부에서 CompletableFuture 를 실행하자.

stream api 내부에서 map api 로  supplyAsync 든 runAsync 를 하자.

둘다 CompletableFuture 의 비동기 메소드 이지만, supplyAsync 는 응답값을 받고, runAsync 는 응답을 받지 않는 메소드이다.

 

3. 만든 executorService 쓰레드를 CompletableFuture 파라미터에 넣어주자. 

쓰레드 개수를 10개ㄹ로 해주면 아래와 같이 10개까지만 쓰레드가 생성되서 실행이 된다.

 

쓰레드풀을 넣지 않아도 된다. 

넣지 않으면 적정수의 쓰레드가 생성.

7개의 쓰레드까지만 생성이 되었다.

 

4. 마지막으로 join 을 실행시키자

CompletableFuture 를 받는 변수를 생성 하고, join 을 해줘야 해당 쓰레드풀이 실행이 되고 결과값이 반환이 된다.

CompletableFuture 의 <> 안에 들어가 있는 객체를 통해 새로운 list 로도 만들수 있다.

 

EX) 아래와 같이 list 의 item 마다 병렬처리를 통해 새로운 list 를 구성할 수 있다.

List<CompletableFuture<Item>> resultList =  list.stream().map(
    item -> CompletableFuture.supplyAsync(() -> {
    	try {
    		return this.getDataByConnection(item.getParam());
    	} catch (IOException e) {
    		e.printStackTrace();
    	}
    	return false;
    }, executorService)
).collect(Collectors.toList());

List<Item> finalList = resultList.stream().map(CompletableFuture::join).collect(Collectors.toList());

 

 

 

반응형

+ Recent posts