스레드 상태 주기
스레드의 상태 주기는 다음과 같다. 빨간색 표시는 Deprecated 된 메소드를 의미한다.
Thread 클래스에서 getState()가 제공하는 Thread.State 열거 상수는 스레드의 상태를 나타낸다.
상태 | 열거 상수 | 설명 |
객체 생성 | NEW | 스레드 객체가 생성, 아직 start() 메소드가 호출되지 않은 상태 |
실행 대기 | RUNNABLE | 실행 상태로 언제든지 갈 수 있는 상태 |
일지 정지 | WAITING | 다른 스레드가 통지할 때까지 기다리는 상태 |
TIMED_WAITING | 주어진 시간 동안 기다리는 상태 | |
BLOCKED | 사용하고자 하는 객체의 락이 풀릴 때까지 기다리는 상태 | |
종료 | TERMINATED | 실행을 마친 상태 |
간단하게 스레드의 State를 얻어서 출력해보자. 컴퓨터 사양이 좋아서 target 스레드가 너무 빨리 연산을 수행하면 RUNNABLE 시점이 보이지 않을 수도 있다.
아래 로깅에서는 볼 수 없지만 사실 작업 요청으로 생성된 스레드는 최초에 NEW에서 RUNNABLE 상태가 된다. 이후 실행과 일시정지, 실행 대기 상태를 오가면서 작업을 완료하고 종료한다.
public class SingleThreadTest {
public static void main(String[] args) {
Thread target = new Thread(() -> {
for(int i = 0; i < Integer.MAX_VALUE; i++) {}
try {
Thread.sleep(1500);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
for(int i = 0; i < Integer.MAX_VALUE; i++) {}
});
while(true) {
Thread.State state = target.getState();
System.out.println("타겟 스레드 상태: " + state);
if(state == Thread.State.NEW) {
target.start();
}
if(state == Thread.State.TERMINATED) {
break;
}
try {
Thread.sleep(200);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}
}
>>> log
타겟 스레드 상태: NEW
타겟 스레드 상태: TIMED_WAITING
타겟 스레드 상태: TIMED_WAITING
타겟 스레드 상태: TIMED_WAITING
타겟 스레드 상태: TIMED_WAITING
타겟 스레드 상태: TIMED_WAITING
타겟 스레드 상태: TIMED_WAITING
타겟 스레드 상태: TIMED_WAITING
타겟 스레드 상태: TERMINATED
Process finished with exit code 0
스레드의 상태를 조작하는 함수 사용에 대한 예제는 간단하기 때문에 넘어가도록 하겠다. 다만 스레드를 강제로 종료해야하는 경우 어떤식으로 접근할 수 있는지 알아보자.
interrupt() 사용
스레드의 인스턴스 메소드인 interrupt()를 호출하면 해당 스레드가 정지 상태에 있을 때 InterruptedException 예외가 발생한다. 이것을 이용하면 run() 메소드를 정상 종료시킬 수 있다.
public class SingleThreadInterruptTest {
public static void main(String[] args) {
Thread thread = new Thread(() -> {
while(true) {
System.out.println(Thread.currentThread().getName() + ": running");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
System.out.println("InterruptedException: 스레드 종료");
break;
}
}
});
thread.start();
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
thread.interrupt();
}
}
그런데 thread 인스턴스가 작업 도중에 일시정지 상태가 될 수 없는 경우에는 어떻게해야 할까? 이 경우에는 스레드의 정적 메소드인 interrupted() 또는 인스턴스 메소드인 isInterrupted() 메소드를 이용하면 현재 스레드가 interrupted 되었는지 확인할 수 있다.
public class SingleThreadInterruptTest {
public static void main(String[] args) {
Thread thread = new Thread(() -> {
while(true) {
System.out.println(Thread.currentThread().getName() + ": running");
if(Thread.interrupted()) {
System.out.println("Interrupted: 스레드 종료");
break;
}
}
});
thread.start();
try {
Thread.sleep(8);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
thread.interrupt();
}
}
데몬 스레드
데몬 스레드는 주 스레드의 작업을 돕는 보조적인 역할을 수행하는 스레드이다. 데몬 스레드의 가장 큰 특징은 주 스레드가 종료되면 데몬 스레드도 강제 종료된다는 것이다. 데몬 스레드를 생성하는 방법은 주 스레드가 데몬이 될 스레드의 setDaemon(true) 를 호출해주면 된다. 주의할 점은 데몬 스레드를 시작하기 전에 setDaemon(true)를 먼저 호출해주어야 한다.
public class DaemonThreadTest {
public static void main(String[] args) {
Thread daemon = new Thread(() -> {
while(true) {
System.out.println("데몬 작업 수행 중");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
});
daemon.setDaemon(true);
daemon.start();
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println("메인 스레드 종료");
}
}
>>> log
데몬 작업 수행 중
데몬 작업 수행 중
데몬 작업 수행 중
메인 스레드 종료
Process finished with exit code 0
스레드 그룹
스레드 그룹은 관련된 스레드를 묶어서 관리할 목적으로 이용한다. JVM이 실행되면 system 스레드 그룹을 만들고, JVM 운영에 필요한 스레드들을 생성해서 system 스레드 그룹에 포함시킨다. 그리고 system의 하위 스레드 그룹으로 main을 만들고 메인 스레드를 main 스레드 그룹에 포함시킨다. 스레드는 반드시 하나의 그룹에 포함되는데, 명시적으로 그룹을 설정하지 않으면 기본적으로 자신을 생성한 스레드와 같은 스레드 그룹에 속하게 된다.
아래는 스레드 그룹을 확인해 볼 수 있는 예제이다. Thread.getAllStackTraces().keySet()을 하면 프로세스 내에 전체 스레드 목록을 얻을 수 있다.
public class ThreadGroupTest {
public static void main(String[] args) {
Thread daemon = new Thread(() -> {
while(true) {}
});
daemon.setDaemon(true);
daemon.start();
Set<Thread> threadSet = Thread.getAllStackTraces().keySet();
for(var T : threadSet) {
System.out.printf("Name: %s(%s)\n", T.getName(), (T.isDaemon() ? "데몬" : "주"));
System.out.printf("\t소속 그룹: %s\n\n", T.getThreadGroup().getName());
}
}
}
>>> log
Name: Thread-0(데몬)
소속 그룹: main
Name: Reference Handler(데몬)
소속 그룹: system
Name: Signal Dispatcher(데몬)
소속 그룹: system
Name: main(주)
소속 그룹: main
Name: Common-Cleaner(데몬)
소속 그룹: InnocuousThreadGroup
Name: Finalizer(데몬)
소속 그룹: system
Name: Monitor Ctrl-Break(데몬)
소속 그룹: main
Name: Attach Listener(데몬)
소속 그룹: system
Process finished with exit code 0
스레드 그룹 생성하기
스레드 그룹을 명시적으로 생성하고 싶다면 다음 생성자 중 하나를 이용하면 된다. 스레드 그룹을 생성할 때 부모 그룹을 설정할 수 있는데 만약 설정하지 않으면 현재 스레드가 속한 그룹의 하위 그룹으로 설정된다.
ThreadGroup tg = new ThreadGroup(String name);
ThreadGroup tg = new ThreadGroup(ThreadGroup parent, String name);
스레드 그룹을 생성하면 스레드를 생성할 때 생성자에 스레드 그룹을 매개값으로 넘겨주어 함께 설정할 수도 있다. stackSize는 JVM이 스레드에 할당할 stack 크기를 말한다.
Thread t = new Thread(ThreadGroup group, Runnable target, String name, long stackSize);
스레드 그룹을 생성하면 뭐가 좋을까? 스레드 그룹은 한 번에 interrupt()가 가능하다. 이것이 가능한 이유는 스레드 그룹의 interrupt() 메소드는 포함된 모든 스레드의 interrupt() 메소드를 내부적으로 호출해주기 때문이다. 다만 개별 스레드에서 발생하는 InterruptedException에 대한 처리는 해주지 않는다. 따라서 안전한 종료를 위해 개별 스레드가 직접 처리해야 한다. interrupt() 메소드 이외에도 다른 종료 메소드들이 있는데, 안정성의 이유로 모두 Deprecated되었다.
또한 스레드 그룹은 그룹 자체를 데몬 그룹으로도 설정이 가능하다.
스레드풀
만약 병렬 작업 처리가 많아지면 스레드 개수가 증가되고 스케줄링을 위해서 CPU가 바빠지고 메모리 사용량이 늘어난다. 따라서 애플리케이션 성능이 저하된다. 이때 스레드풀은 제한된 스레드를 이용해서 작업을 처리하기 때문에 작업 처리 요청이 폭증되어도 스레드의 전체 개수가 늘어나지 않는다.
자바는 스레드풀을 생성하고 사용할 수 있도록 java.util.concurrent 패키지에서 ExecutorService 인터페이스와 Executors 클래스를 제공하고 있다. Executors의 다양한 정적 메소드를 이용해서 ExecutorService 구현 객체를 만들 수 있고 이것이 스레드풀이다.
다음 두 가지 메소드 중 하나를 이용해서 스레드풀을 생성할 수 있다.
메소드명(매개 변수) | 초기 스레드 수 | 코어 스레드 수 | 최대 스레드 수 |
newCachedThreadPool() | 0 | 0 | Integer.MAX_VALUE |
newFixedThreadPool(int nThreads) | 0 | nThreads | nThreads |
생성자
newCachedThreadPool()은 초기 스레드 수가 0이고 작업 개수가 스레드 수 보다 많으면 스레드가 추가된다. 추가된 스레드는 60초 동안 아무 작업을 하지 않으면 스레드를 종료하고 풀에서 제거한다. 또한 이론 상 Integer.MAX_VALUE 까지 스레드가 추가되지만, 운영체제의 성능과 상황에 따라서 달라진다. 생성자는 아래와 같다. SynchronousQueue에 대해서는 다음에 알아본다.
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
newFixedThreadPool은 초기 스레드 수가 0이고 작업 개수가 스레드 수 보다 많으면 스레드가 추가되는 것은 동일하다. 하지만 최대 생성될 수 있는 스레드 수는 nThreads로 직접 설정할 수 있고 스레드가 작업을 처리하지 않고 있더라도 제거되지 않는다. 여기서는 LinkedBlockingQueue가 쓰였는데 SynchronousQueue를 포스팅하면서 차이점을 알아보도록 한다.
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
그런데 위 메소드를 통하지 않고도 스레드풀의 생성이 가능하다. 바로 직접 ThreadPoolExecutor 객체를 생성하는 것이다. 사실 위 두 가지 메소드도 내부에서 ThreadPoolExecutor 객체를 생성해서 반환하지만 세부적인 설정은 어렵다. 반면에ThreadPoolExecutor 생성자를 통해서 직접 스레드풀을 생성하면 더 세부적인 설정이 가능하다.
아래는 ThreadPoolExecutor 생성자를 통해 직접 스레드 풀을 생성하는 예제이다. 코어 스레드 3개, 최대 스레드 6개, 유휴 시간 최대 120초, 작업 큐를 매개값으로 전달했다. 코어 스레드 수는 스레드 수가 증가된 후 최소한으로 유지해야할 스레드의 수를 말한다. 최대 스레드 수는 이름 그대로 생성 가능한 스레드의 최대 스레드 개수이며 유휴시간은 스레드가 유휴 상태로 있을 수 있는 시간을 의미한다. 시간을 초과하면 삭제된다. 여기서도 SynchronousQueue가 사용됐는데, SynchronousQueue에 대해서는 따로 포스팅을 할 예정이다.
ExecutorService threadPool = new ThreadPoolExecutor(
3, // 코어 스레드 개수
6, // 최대 스레드 개수
120L, // 놀고 있는 시간
TimeUnit.SECONDS, // 시간 단위
new SynchronousQueue<Runnable>() // 작업 큐
);
작업 요청
스레드 풀을 생성하는 방법을 알게 되었으니 작업을 요청하는 방법을 알아야한다. 작업을 생성하는 건 Runnable 구현 클래스나 Callable 구현 클래스를 생성하는 방법이 있는데 Callable은 리턴값이 있다는 차이점이 있다. 작업은 어떻게 요청할 수 있을까? 작업 요청은 execute()나 submit()을 사용하면 되는데 submit()의 경우 결과를 Future로 리턴해주기 때문에 작업 처리 결과를 얻을 수 있다.
더 큰 차이점은 execute()의 경우 작업 처리 도중 예외가 발생하면 스레드가 종료되고 해당 스레드는 스레드풀에서 제거되는데, submit()의 경우 작업 처리 도중 예외가 발생하더라도 종료되지 않고 다음 작업을 위해 재사용된다. 따라서 가급적이면 오버헤드를 줄이기 위해 submit()을 사용하는 것이 좋다.
의문점
궁금한 점이 생긴다. newFixedThreadPool의 경우 설정한 코어 스레드 수까지는 스레드를 유지한다고 했는데 execute()로 작업을 처리할 때 예외가 발생한다면 코어 스레드라도 제거하는가? 라는 궁금증이 생겼다. 아래는 이를 확인하는 예제이다.
public class ThreadPoolTest {
public static void main(String[] args) throws InterruptedException {
ExecutorService threadPool = Executors.newFixedThreadPool(5);
ThreadPoolExecutor threadPoolExecutor = (ThreadPoolExecutor) threadPool;
Supplier<Runnable> runnableSupplier = () -> (Runnable) () -> {
int poolSize = threadPoolExecutor.getPoolSize();
String threadName = Thread.currentThread().getName();
System.out.printf("[총 스레드 개수: %d] 작업 스레드 이름: %s\n", poolSize, threadName);
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
} finally {
throw new RuntimeException();
}
};
Stream.generate(runnableSupplier)
.limit(5)
.collect(Collectors.toList())
.parallelStream()
.forEach(T -> threadPool.execute(T));
Thread.sleep(5000);
int poolSize = threadPoolExecutor.getPoolSize();
System.out.printf("[총 스레드 개수: %d]\n", poolSize);
threadPool.submit(() -> {
System.out.printf("[총 스레드 개수: %d] 작업 스레드 이름: %s\n", threadPoolExecutor.getPoolSize(), Thread.currentThread().getName());
});
Thread.sleep(500);
threadPool.shutdownNow();
}
}
>>> log
[총 스레드 개수: 1] 작업 스레드 이름: pool-1-thread-1
[총 스레드 개수: 5] 작업 스레드 이름: pool-1-thread-4
[총 스레드 개수: 5] 작업 스레드 이름: pool-1-thread-5
[총 스레드 개수: 5] 작업 스레드 이름: pool-1-thread-2
[총 스레드 개수: 5] 작업 스레드 이름: pool-1-thread-3
Exception in thread "pool-1-thread-2" java.lang.RuntimeException
at MutiThread.ThreadPoolTest.lambda$main$0(ThreadPoolTest.java:44)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:834)
Exception in thread "pool-1-thread-4" java.lang.RuntimeException
at MutiThread.ThreadPoolTest.lambda$main$0(ThreadPoolTest.java:44)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:834)
Exception in thread "pool-1-thread-5" java.lang.RuntimeException
at MutiThread.ThreadPoolTest.lambda$main$0(ThreadPoolTest.java:44)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:834)
Exception in thread "pool-1-thread-3" Exception in thread "pool-1-thread-1" java.lang.RuntimeException
at MutiThread.ThreadPoolTest.lambda$main$0(ThreadPoolTest.java:44)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:834)
java.lang.RuntimeException
at MutiThread.ThreadPoolTest.lambda$main$0(ThreadPoolTest.java:44)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:834)
[총 스레드 개수: 5]
[총 스레드 개수: 5] 작업 스레드 이름: pool-1-thread-6
Process finished with exit code 0
결론적으로 execute()를 통해 작업을 요청했을 때 예외가 발생하면 일단 스레드를 제거하고, 생성되었던 스레드의 개수가 코어 스레드 개수 이상이었다면 설정된 코어 스레드 수에 스레드 수를 맞추기 위하여 새로운 스레드를 생성한다. 이는 마지막에 새롭게 요청한 작업에서 사용된 스레드 번호를 통해서 알 수 있는 사실이다.
submit()의 경우 스레드가 제거되지 않고 재사용된다는 사실을 알기 때문에 예제는 스킵하도록 하자.
작업 완료 통보
블로킹 방식
ExecutorService의 submit() 메소드는 매개값으로 준 Runnable 또는 Callable 작업을 스레드 풀의 작업 큐에 저장하고 즉시 Future 객체를 리턴한다. Future 객체의 get() 메소드를 호출해서 작업 결과를 얻을 수 있는데, get() 메소드를 호출한 스레드는 작업 결과가 반환될 때 까지 블로킹되기 때문에 주의해야 한다. 따라서 작업 결과를 얻기 위해 다른 스레드를 통해서 get() 메소드를 호출하곤 한다. 아래는 예제이다.
public class ThreadFutureTest {
public static void main(String[] args) {
ExecutorService threadPool = Executors.newFixedThreadPool(2);
System.out.println("main 스레드 시작");
Future<Boolean> res = threadPool.submit(() -> {
System.out.println("작업 스레드 시작");
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
return true;
});
try {
System.out.println(res.get());
} catch (InterruptedException e) {
throw new RuntimeException(e); // interrupt 발생 시
} catch (ExecutionException e) {
throw new RuntimeException(e); // 예외 발생
}
}
}
Future의 부가기능
res.cancel(true); // 작업이 진행 중일 경우에 interrupt 발생
res.cancel(false); // 작업이 진행 중일 경우에 interrupt 불가능
res.isCancelled(); // 작업이 완료되기 전에 작업이 취소되었을 경우에 true 리턴
res.isDone(); // 작업이 종료되었는지 확인
콜백 방식 (비동기 방식)
지금까지 살펴봤던 ExecutorService는 아쉽게도 콜백 기능을 제공하지 않는다. 하지만 Runnable 구현 클래스를 작성할 때 콜백 기능을 구현할 수 있다. 먼저 콜백 메소드를 가진 클래스가 있어야 하는데, 직접 정의해도 괜찮고 java.nio.channels.CompletionHandler를 이용해도 좋다. CompletionHandler 인터페이스는 비동기 통신에서 콜백 객체를 만들 때 사용된다.
참고로 비동기 방식은 작업을 요청하는 스레드가 작업 완료 여부를 신경쓰지 않는다. 다만 작업을 요청할 때 콜백 메소드를 함께 전달하고 작업을 처리하는 스레드는 작업을 완료하면 콜백 메소드를 실행한다.
아래는 CompletionHandler의 구현 객체의 completed()와 failed()를 콜백 메소드로 이용하는 비동기 작업 요청 방식의 예제이다. 작업을 비동기 방식으로 요청해두고 작업이 모두 완료되면 스레드 풀을 종료하도록 shutdown() 메소드를 호출하였다. shutdownNow()는 남은 작업이 있더라도 스레드 풀을 종료한다.
public class ThreadAsyncTest {
private ExecutorService threadPool;
public ThreadAsyncTest() {
threadPool = Executors.newFixedThreadPool(2);
}
private CompletionHandler<Integer, Void> callback = new CompletionHandler<>() {
@Override
public void completed(Integer result, Void attachment) {
System.out.printf("계산 결과: %d\n", result);
}
@Override
public void failed(Throwable exc, Void attachment) {
System.out.println("계산 실패");
}
};
private class Task implements Runnable {
private int num;
public Task(int num) {
this.num = num;
}
@Override
public void run() {
try {
if(num > 100) throw new RuntimeException();
int sum = 0;
for(int i = 1; i <= num; i++) {
sum += i;
}
callback.completed(sum, null);
}
catch (Exception e) {
callback.failed(e, null);
}
}
}
public void doWork(int num) {
threadPool.submit(new Task(num));
}
public void finish() {
threadPool.shutdown();
}
public static void main(String[] args) {
System.out.println("main 스레드 시작");
ThreadAsyncTest test = new ThreadAsyncTest();
test.doWork(99);
test.doWork(101);
test.finish();
}
}
결론
멀티 스레드를 다루는 것에 대하여 조금 알아보았는데, 능숙하게 사용하려면 아직 더 노력이 필요할 것 같다.
'JAVA' 카테고리의 다른 글
Java.nio Selector의 필요성 (0) | 2022.09.20 |
---|---|
[기본 시리즈] java.nio 버퍼 (1) | 2022.09.19 |
[기본 시리즈] java.io 자바 기본 네트워킹 TCP/IP (0) | 2022.09.06 |
[기본 시리즈] java.io 입력 스트림과 출력 스트림 (with 보조 스트림) (0) | 2022.09.02 |
[기본 시리즈] JVM의 메모리 사용 구조 : Thread 영역 (0) | 2022.08.23 |