개발을 하다보면, 어깨너머 듣는 이야기가 '비동기 언제 사용해요?', 'Redis 의 스핀락 사용해보셨어요?', '메세지 큐를 활용한 비동기 프로그래밍' 등등의 뭔가 어려운 것들을 나한테 묻곤 하는데, 잘 사용해본 적도 없을 뿐더러, 이미 Java JDK 에서 충분히 제공되고 있다고 생각했다.
이래서 한번 언젠가는 정리해봐야지- 라고 생각만하다가 드디어 알아본다.
알아볼 Java의 비동기 프로그래밍
1. Thread
2. ExecutorService
3. CompletableFuture
4. Flow
상황은 다음과 같습니다. SomethingCounter 가 있습니다. 해당 객체는 다음과 같이 count 를 가집니다.
final class SomethingCounter {
// 횟수
Integer count = 0;
public SomethingCounter() {
}
int add(int t) {
count = count + t;
return count ;
}
public void printCount() {
System.out.println("모든 횟수:" + count);
}
}
이제 2개의 Thread 가 SomethingCounter의 count 를 경합(race condition) 없이 어떻게 count 값을 올릴 수 있는지 하나씩 알아보자.
일단 Thread 활용해서 아주 간단하게 이렇게 해볼 수 있을 것 같다.
@Test
void threadTest() throws InterruptedException {
SomethingCounter count = new SomethingCounter();
Thread thread = new Thread(() -> {
System.out.println("thread >>>");
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
count.count = count.count + 1;
});
thread.start();
Thread thread2 = new Thread(() -> {
try {
System.out.println("thread2 >>>");
Thread.sleep(1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
count.count = count.count + 5;
});
thread2.start();
thread.join();
thread2.join();
count.printCount();
}
이때 발생할 수 있는 문제점은 무엇이 있을까? 일단 겉보기에는 문제가 없어 보일 수 있다. thread.start(), thread.join() 으로 메인스레드가 각 thread 가 끝날때까지 기다려 준다.
하지만 아래와 같이 코드를 변경해보면 바로 알 수 있다.
Thread thread = new Thread(() -> {
System.out.println("thread >>>");
for (int i = 0; i < 1000; i++) {
count.count = count.count + 1;
}
});
2개의 Thread 가 1000씩 더했으면 2000 이 나와야 하지만, 결론적으로는 1674가 나왔다.
첫번째 문제는 SomethingCounter 의 count 가 MultiThread 환경에서 안전하지 않다는 것이다.
3가지 해결방식을 고민해볼 수 있는데요.
첫번째는 add 메소드에 synchronized 를 붙입니다.
synchronized void add(int t) {
count = count + t;
}
두번째는 int 가 아닌 AtomicInteger 을 활용하는 방법이다.
AtomicInteger count2 = new AtomicInteger();
void add2(int t) {
count2.addAndGet(t);
}
AtomicClass 에 대해서 왜 동시성 문제가 발생하지 않냐면 Compare And Set 알고리즘을 활용해 문제를 해결합니다.
마지막 3번째는 Lock 인터페이스를 활용하는 방법입니다.
void incrementWithLock(int value) {
lock.lock();
try {
count += value;
} finally {
lock.unlock();
}
}
Thread 를 비동기로 실행시킬 경우, 위 3가지 기법 중 하나가 필요합니다.
이렇게 코딩할 경우, 개발의 복잡도를 높히게 되므로, Java 에서는 이를 해결하기 위해 java.util.concurrent 패키지에 Executors 클래스가 1.5 에 발표되었습니다. Executors 클래스가 무엇을 해결해주냐면 Thread 생성과 start() 의 실행을 Executors 내에서 동시에 실행과 thread.join과 같은 코드를 작성하지 않아도 됩니다.
그렇다면 Executors 로 어떻게 해결해주는지 확인해봅시다.
@Test
void executorTest1() throws InterruptedException {
SomethingCounter counter = new SomethingCounter();
ExecutorService executor = Executors.newFixedThreadPool(2);
Runnable task1 = () -> {
System.out.println("thread >>>");
for (int i = 0; i < 1000; i++) {
counter.count = counter.count + 1;
}
};
Runnable task2 = () -> {
System.out.println("thread2 >>>");
for (int i = 0; i < 1000; i++) {
counter.count = counter.count + 1;
}
};
executor.submit(task1);
executor.submit(task2);
// Initiate an orderly shutdown
executor.shutdown();
// Wait until all tasks are finished
executor.awaitTermination(1, TimeUnit.MINUTES);
counter.printCount();
}
당연히 동기화 처리를 위한 앞서 말했던 3가지 중 하나를 적용해야만 경합이 발생하지 않겠지만 일단은 여러개의 Thread 를 하나씩 start(), join() 시키는 행위를 하지 않는 것을 확인할 수 있다.
또 다른 방법으로는 Future 를 활용하는 방법인데, 이는 CompletableFuture 구현체를 활용해 체이닝 방식으로 여러 Thread 를 동작시킬 수 있다. 바로 예제 코드를 살펴보자.
@Test
void xxxx() throws ExecutionException, InterruptedException {
SomethingCounter count = new SomethingCounter();
CompletableFuture<Void> future1 = CompletableFuture.runAsync(() -> {
System.out.println("thread >>>");
for (int i = 0; i < 1000; i++) {
count.add(1);
}
});
CompletableFuture<Void> future2 = CompletableFuture.runAsync(() -> {
System.out.println("thread2 >>>");
for (int i = 0; i < 1000; i++) {
count.add(1);
}
});
CompletableFuture<Void> combinedFuture = CompletableFuture.allOf(future1, future2);
combinedFuture.get();
count.printCount();
}
CompletableFuture의 경우 1.8에 등장하게 되었는데- 여기서 이런게 궁금할 수 있다. 코드 작성된 것을 보면 ExecutorService 와 CompletableFuture 가 크게 다르지 않고, 그렇다고 동기화 문제를 해결해주지 않는 것같은데, 왜 CompletableFuture 가 등장하게 된걸까?
Java 1.8 에서 가장 큰 변화는 함수형 Stream 이 등장할 적이다. 여기에 힌트가 있는데, CompletableFuture 또한 메서드 체이닝을 통해 ExecutorService 비해 간결하게 작성할 수 있고 ExecutorService 비해 개선된 일부 기능을 제공한다.
예를 들어 아래와 같이 ExecutorService 사용한다면
ExecutorService executor = Executors.newFixedThreadPool(2);
Future<Integer> future1 = executor.submit(() -> {
return doSomeWork();
});
Future<Integer> future2 = executor.submit(() -> {
return doSomeOtherWork();
});
try {
Integer result1 = future1.get();
Integer result2 = future2.get();
// Combine results
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
executor.shutdown();
CompletableFuture 를 사용한다면 다음과 같이 작성할 수 있다.
CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> doSomeWork());
CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> doSomeOtherWork());
CompletableFuture<Void> combinedFuture = CompletableFuture.allOf(future1, future2);
combinedFuture.thenRun(() -> {
try {
Integer result1 = future1.get();
Integer result2 = future2.get();
// Combine results
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
});
더하여, 에러핸들링이나 CompletableFuture의 Combine 메서드(thenApply, thenCompose, thenCombine, allOf, anyOf) 또한 간결하게 작성할 수 있도록 도와준다.
마지막으로 Flow API 에 대해서 알아보려 한다. RxJava 와 유사한 API 를 가지고 있으며 JDK 1.9 부터 정식으로 지원하는 API 이다.
Flow 를 이해하기 위해서는 일단 3가지 인터페이스를 기억해야 한다. Publisher, Subscriber, Subscription 그럼 각각의 인터페이스를 활용해 비동기 프로그래밍을 만들어보자.
@Test
void flowTest() throws InterruptedException {
SomethingCounter count = new SomethingCounter();
// Publisher 생성
IncrementPublisher publisher = new IncrementPublisher();
// Subscriber 생성
IncrementSubscriber subscriber1 = new IncrementSubscriber(count);
IncrementSubscriber subscriber2 = new IncrementSubscriber(count);
// Publisher에 Subscriber 등록
publisher.subscribe(subscriber1);
publisher.subscribe(subscriber2);
// 비동기 작업 시작
System.out.println("thread >>>");
System.out.println("thread2 >>>");
publisher.publishData(1, 1000);
// 작업이 완료될 때까지 기다림
TimeUnit.SECONDS.sleep(3);
publisher.close();
count.printCount();
}
static class IncrementPublisher extends SubmissionPublisher<Integer> {
void publishData(int incrementBy, int times) {
for (int i = 0; i < times; i++) {
submit(incrementBy);
}
}
}
// Subscriber 클래스
static class IncrementSubscriber implements Flow.Subscriber<Integer> {
private Flow.Subscription subscription;
private SomethingCounter counter;
IncrementSubscriber(SomethingCounter counter) {
this.counter = counter;
}
@Override
public void onSubscribe(Flow.Subscription subscription) {
this.subscription = subscription;
subscription.request(1); // 최초 요청
}
@Override
public void onNext(Integer item) {
counter.add(item);
subscription.request(1); // 다음 데이터 요청
}
@Override
public void onError(Throwable throwable) {
System.err.println("Error: " + throwable.getMessage());
}
@Override
public void onComplete() {
System.out.println("Processing complete");
}
}
당연히 syncronization, Lock 또는 AtomicClass 을 사용해야 한다. 그러지 않을 경우 RaceCondition 에 대해 역시나 해결할 수 없다.
지금까지 자바에서 사용하는 동시성 프로그래밍에 대한 가상스레드를 제외하고 JDK 를 알아보았는데, 추후 가상 스레드도 포스팅해보자!
'Programming Language 이해하기 > Java 이해하기' 카테고리의 다른 글
Java 에서 패키지(Package) 라는 것을 왜 사용해야만 할까? (2) | 2024.10.03 |
---|---|
#2 스터디 할래 - 2주차 (0) | 2021.11.14 |
Java LocalDateTime.toString()은 JavaScript에서 어떻게 호환될까? (0) | 2021.11.02 |
#1 스터디 할래 - 1주차 (0) | 2021.10.30 |
객체지향의 재사용성과 다이나믹 디스패치, 더블 디스패치에 관한 이야기 (0) | 2021.03.06 |
댓글