본문 바로가기
Programming Language 이해하기/Java 이해하기

Java의 비동기(Async) 프로그래밍을 제공하는 JDK를 다시는 무시하지 말자!

by simplify-len 2024. 7. 28.

 

 개발을 하다보면, 어깨너머 듣는 이야기가 '비동기 언제 사용해요?', 'Redis 의 스핀락 사용해보셨어요?', '메세지 큐를 활용한 비동기 프로그래밍' 등등의 뭔가 어려운 것들을 나한테 묻곤 하는데, 잘 사용해본 적도 없을 뿐더러, 이미 Java JDK 에서 충분히 제공되고 있다고 생각했다.

 이래서 한번 언젠가는 정리해봐야지- 라고 생각만하다가 드디어 알아본다.

알아볼 Java의 비동기 프로그래밍
1. Thread
2. ExecutorService
3. CompletableFuture
4.  Flow

상황은 다음과 같습니다. SomethingCounter 가 있습니다. 해당 객체는 다음과 같이 count 를 가집니다.

final class SomethingCounter {

    // 횟수
    Integer count = 0;

    public SomethingCounter() {
    }

    int add(int t) {
    	count = count + t;
        return count ;
    }

    public void printCount() {
        System.out.println("모든 횟수:" + count);
    }
}

이제 2개의 Thread 가 SomethingCounter의 count 를 경합(race condition) 없이 어떻게 count 값을 올릴 수 있는지 하나씩 알아보자.

일단 Thread 활용해서 아주 간단하게 이렇게 해볼 수 있을 것 같다.

@Test
void threadTest() throws InterruptedException {
    SomethingCounter count = new SomethingCounter();

    Thread thread = new Thread(() -> {
        System.out.println("thread >>>");
        try {
            Thread.sleep(5000);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
        count.count = count.count + 1;
    });
    thread.start();

    Thread thread2 = new Thread(() -> {
        try {
            System.out.println("thread2 >>>");
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
        count.count = count.count + 5;
    });
    thread2.start();

    thread.join();
    thread2.join();

    count.printCount();
}

이때 발생할 수 있는 문제점은 무엇이 있을까? 일단 겉보기에는 문제가 없어 보일 수 있다. thread.start(), thread.join() 으로 메인스레드가 각 thread 가 끝날때까지 기다려 준다.

하지만 아래와 같이 코드를 변경해보면 바로 알 수 있다.

Thread thread = new Thread(() -> {
    System.out.println("thread >>>");
    for (int i = 0; i < 1000; i++) {
        count.count = count.count + 1;
    }
});

2개의 Thread 가 1000씩 더했으면 2000 이 나와야 하지만, 결론적으로는 1674가 나왔다.

첫번째 문제는 SomethingCounter 의 count 가 MultiThread 환경에서 안전하지 않다는 것이다. 

3가지 해결방식을 고민해볼 수 있는데요.

첫번째는 add 메소드에 synchronized 를 붙입니다.

synchronized void add(int t) {
    count = count + t;
}

두번째는 int 가 아닌 AtomicInteger 을 활용하는 방법이다.

AtomicInteger count2 = new AtomicInteger();

void add2(int t) {
    count2.addAndGet(t);
}

AtomicClass 에 대해서 왜 동시성 문제가 발생하지 않냐면 Compare And Set 알고리즘을 활용해 문제를 해결합니다.

마지막 3번째는 Lock 인터페이스를 활용하는 방법입니다.

void incrementWithLock(int value) {
    lock.lock();
    try {
        count += value;
    } finally {
        lock.unlock();
    }
}

 

Thread 를 비동기로 실행시킬 경우, 위 3가지 기법 중 하나가 필요합니다. 

이렇게 코딩할 경우, 개발의 복잡도를 높히게 되므로, Java 에서는 이를 해결하기 위해 java.util.concurrent 패키지에 Executors 클래스가 1.5 에 발표되었습니다. Executors 클래스가 무엇을 해결해주냐면 Thread 생성과 start() 의 실행을 Executors 내에서 동시에 실행과 thread.join과 같은 코드를 작성하지 않아도 됩니다.

그렇다면 Executors 로 어떻게 해결해주는지 확인해봅시다.

@Test
void executorTest1() throws InterruptedException {
    SomethingCounter counter = new SomethingCounter();
    ExecutorService executor = Executors.newFixedThreadPool(2);

    Runnable task1 = () -> {
        System.out.println("thread >>>");
        for (int i = 0; i < 1000; i++) {
            counter.count = counter.count + 1;
        }
    };

    Runnable task2 = () -> {
        System.out.println("thread2 >>>");
        for (int i = 0; i < 1000; i++) {
            counter.count = counter.count + 1;
        }
    };

    executor.submit(task1);
    executor.submit(task2);

    // Initiate an orderly shutdown
    executor.shutdown();
    // Wait until all tasks are finished
    executor.awaitTermination(1, TimeUnit.MINUTES);
    counter.printCount();
}

당연히 동기화 처리를 위한 앞서 말했던 3가지 중 하나를 적용해야만 경합이 발생하지 않겠지만 일단은 여러개의 Thread 를 하나씩 start(), join() 시키는 행위를 하지 않는 것을 확인할 수 있다.

또 다른 방법으로는 Future 를 활용하는 방법인데, 이는 CompletableFuture 구현체를 활용해 체이닝 방식으로 여러 Thread 를 동작시킬 수 있다. 바로 예제 코드를 살펴보자.

@Test
void xxxx() throws ExecutionException, InterruptedException {
    SomethingCounter count = new SomethingCounter();

    CompletableFuture<Void> future1 = CompletableFuture.runAsync(() -> {
        System.out.println("thread >>>");
        for (int i = 0; i < 1000; i++) {
            count.add(1);
        }
    });

    CompletableFuture<Void> future2 = CompletableFuture.runAsync(() -> {
        System.out.println("thread2 >>>");
        for (int i = 0; i < 1000; i++) {
            count.add(1);
        }
    });

    CompletableFuture<Void> combinedFuture = CompletableFuture.allOf(future1, future2);
    combinedFuture.get();

    count.printCount();
}

CompletableFuture의 경우 1.8에 등장하게 되었는데- 여기서 이런게 궁금할 수 있다. 코드 작성된 것을 보면 ExecutorService 와 CompletableFuture 가 크게 다르지 않고, 그렇다고 동기화 문제를 해결해주지 않는 것같은데, 왜 CompletableFuture 가 등장하게 된걸까?

Java 1.8 에서 가장 큰 변화는 함수형 Stream 이 등장할 적이다. 여기에 힌트가 있는데, CompletableFuture 또한 메서드 체이닝을 통해 ExecutorService 비해 간결하게 작성할 수 있고 ExecutorService 비해 개선된 일부 기능을 제공한다.

예를 들어 아래와 같이 ExecutorService 사용한다면

ExecutorService executor = Executors.newFixedThreadPool(2);

Future<Integer> future1 = executor.submit(() -> {
    return doSomeWork();
});

Future<Integer> future2 = executor.submit(() -> {
    return doSomeOtherWork();
});

try {
    Integer result1 = future1.get();
    Integer result2 = future2.get();
    // Combine results
} catch (InterruptedException | ExecutionException e) {
    e.printStackTrace();
}
executor.shutdown();

CompletableFuture 를 사용한다면 다음과 같이 작성할 수 있다.

CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> doSomeWork());
CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> doSomeOtherWork());

CompletableFuture<Void> combinedFuture = CompletableFuture.allOf(future1, future2);

combinedFuture.thenRun(() -> {
    try {
        Integer result1 = future1.get();
        Integer result2 = future2.get();
        // Combine results
    } catch (InterruptedException | ExecutionException e) {
        e.printStackTrace();
    }
});

더하여, 에러핸들링이나 CompletableFuture의 Combine 메서드(thenApply, thenCompose, thenCombine, allOf, anyOf) 또한 간결하게 작성할 수 있도록 도와준다.

마지막으로 Flow API 에 대해서 알아보려 한다. RxJava 와 유사한 API 를 가지고 있으며 JDK 1.9 부터 정식으로 지원하는 API 이다. 
Flow 를 이해하기 위해서는 일단 3가지 인터페이스를 기억해야 한다. Publisher, Subscriber, Subscription 그럼 각각의 인터페이스를 활용해 비동기 프로그래밍을 만들어보자.

@Test
void flowTest() throws InterruptedException {
    SomethingCounter count = new SomethingCounter();

    // Publisher 생성
    IncrementPublisher publisher = new IncrementPublisher();

    // Subscriber 생성
    IncrementSubscriber subscriber1 = new IncrementSubscriber(count);
    IncrementSubscriber subscriber2 = new IncrementSubscriber(count);

    // Publisher에 Subscriber 등록
    publisher.subscribe(subscriber1);
    publisher.subscribe(subscriber2);

    // 비동기 작업 시작
    System.out.println("thread >>>");
    System.out.println("thread2 >>>");
    publisher.publishData(1, 1000);
    // 작업이 완료될 때까지 기다림
    TimeUnit.SECONDS.sleep(3);
    publisher.close();
    count.printCount();
}
static class IncrementPublisher extends SubmissionPublisher<Integer> {
    void publishData(int incrementBy, int times) {
        for (int i = 0; i < times; i++) {
            submit(incrementBy);
        }
    }
}

// Subscriber 클래스
static class IncrementSubscriber implements Flow.Subscriber<Integer> {
    private Flow.Subscription subscription;
    private SomethingCounter counter;

    IncrementSubscriber(SomethingCounter counter) {
        this.counter = counter;
    }

    @Override
    public void onSubscribe(Flow.Subscription subscription) {
        this.subscription = subscription;
        subscription.request(1); // 최초 요청
    }

    @Override
    public void onNext(Integer item) {
        counter.add(item);
        subscription.request(1); // 다음 데이터 요청
    }

    @Override
    public void onError(Throwable throwable) {
        System.err.println("Error: " + throwable.getMessage());
    }

    @Override
    public void onComplete() {
        System.out.println("Processing complete");
    }
}

당연히 syncronization, Lock 또는 AtomicClass 을 사용해야 한다. 그러지 않을 경우 RaceCondition 에 대해 역시나 해결할 수 없다.

지금까지 자바에서 사용하는 동시성 프로그래밍에 대한 가상스레드를 제외하고 JDK 를 알아보았는데, 추후 가상 스레드도 포스팅해보자!

댓글