Java-Advanced10-Multi Thread
Multi-Thread Programming
Process and Thread
- Process: OS로부터 메모리를 할당받아 실행중인 프로그램
- Thread: 프로세스 동작의 최소 단위로, 하나의 프로세스는 여러 스레드로 이루어질 수 있다.
멀티스레드 프로그래밍의 장단점
- 장점
- 여러 동작을 병렬적으로 처리하여 CPU의 사용률 향상
- 시간이 걸리는 동작을 분리하여 프로그램의 응답성 향상
- 단점
- Context Switching 오버헤드 발생
- 스레드 제어의 어려움
스레드 구현
스레드 생성 방법
익명함수
1
2
3
4
5Thread threadOne = new Thread(new Runnable() {
public void run() {
System.out.println("Hello Thread!");
}
});람다식
1
2
3Thread threadTwo = new Thread(() -> {
System.out.println("Hello Again, Thread!");
});Thread Class 상속
1
2
3
4
5
6class MyThread extends Thread {
public void run() {
System.out.println("Hello Again Again, Thread!");
}
}
Thread threadThree = new MyThread();
스레드 실행
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15Thread threadOne = new Thread(() -> {
for (int i = 0; i < 10; i++) {
System.out.print("1");
}
});
Thread threadTwo = new Thread(() -> {
for (int i = 0; i < 10; i++) {
System.out.print("2");
}
});
threadOne.start();
threadTwo.start();
System.out.println("Done!");Thread 객체는 일회성이기 때문에 여러 번 start()메소드를 호출 할 수 없다
스레드의 상태 및 제어
스레드의 상태
getState() 메소드로 스레드의 상태를 확인할 수 있다.
열거형 상수 설명 NEW
start()
메소드가 아직 호출되지 않음RUNNABLE
JVM에 의해 실행 가능한 상태 BLOCKED
객체가 블락된 상태 WAITING
sleep()
,wait()
,join()
등에 의해 무한히 대기 중인 상태TIMED_WAITING
sleep()
,wait()
,join()
등에 의해 정해진 시간 동안 대기 중인 상태TERMINATE
run()
메소드가 종료된 상태
스레드의 우선순위 제어
1
2
3public final static int MIN_PRIORITY = 1;
public final static int NORM_PRIORITY = 5;
public final static int MAX_PRIORITY = 10;메소드 설명 void setPriority(int newPriority)
새로운 우선순위로 설정한다. int getPriority()
우선순위를 반환한다. sleep()
을 이용한 제어1
2Thread.sleep(1000); // ms
Thread.sleep(100, 200); // ms + nsjoin()
을 이용한 스레드 조인스레드 동작을 동기화하기 위해 사용
1
2
3
4
5
6
7
8
9
10
11
12Thread t1 = new Thread(() -> System.out.println("A"));
Thread t2 = new Thread(() -> System.out.println("B"));
t1.start(); // t1 thread start
t1.join(); // t1 thread 종료시 까지 다른 thread 대기
t2.start(); // t2 thread start
t2.join(100); // 100ms 기다린 후에도 t2가 종료되지 않으면 다른 쓰레드 시작
System.out.println("C");
interrupt()
를 이용한 대기 중지- 기존 동작을 방해하고 반응을 강제하는 메소드
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17Thread tSleep = new Thread(() -> {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
System.out.println("Interrupted");
}
});
tSleep.start();
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
tSleep.interrupt();
- 기존 동작을 방해하고 반응을 강제하는 메소드
yield()
를 이용한 상태 제어sleep()
과 달리 곧바로RUNNABLE
상태로 변경- 다른 쓰레드에게 양보하고 바로 실행 대기
- 테스트용으로 쓰임 -> 쓰레드의 동작이 명확하지 않음(바로 양보하지 않는다)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19new Thread(() -> {
for (int i = 0; i < 20; i++) {
if (i % 2 == 0) {
System.out.print("1");
} else {
Thread.yield();
}
}
}).start();
new Thread(() -> {
for (int i = 0; i < 20; i++) {
if (i % 2 == 0) {
System.out.print("2");
} else {
Thread.yield();
}
}
}).start();
스레드의 종료
- run() 메소드의 종료
stop()
메소드 호출 (deprecated)
데몬 스레드
정의 : 다른 스레드가 종료될 경우 함께 종료되는 보조 스레드
활용법
- 보통 대기하며 동작하는 무한 루프로 구현
- 일정 시간 마다 동작
- interrupt에 의해서 동작
setDaemon()
메소드로 데몬 스레드로 설정1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17class DaemonThread extends Thread {
public DaemonThread() {
this.setDaemon(true);
}
public void run() {
while (true) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("Daemon Thread Run");
}
}
}
데이터 공유와 동기화
스레드간 데이터 공유 시 신뢰성에 문제가 발생할 수 있음
결과는 1000000이 나오지 않는다 -> count값이 동기화되지 않았기 때문
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24class Counter {
int count = 0;
}
public class Main {
public static void main(String args[]){
Counter counter = new Counter();
for (int i = 0; i < 1000; i++) {
new Thread(() -> {
for (int j = 0; j < 1000; j++) {
counter.count = counter.count + 1;
}
}).start();
}
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(counter.count);
}
}synchronized
키워드 사용1
2
3synchronized void method() {
// 공유 데이터 사용
}1
2
3
4
5void method() {
synchronized(sharedObj) {
// 공유 데이터 사용
}
}Example
this 객체의 Intrinsic Lock을 이용한 구현
1
2
3
4
5
6
7
8
9
10
11
12class Counter {
private int count = 0;
public int increaseCount() {
synchronized (this) {
return ++count; // 읽고, 수정하고, 쓰는 작업
}
}
public int getCount() {
return count;
}
}메소드에 synchronized 키워드 사용
- synchronized 키워드가 사용된 메소드를 호출하기 위해서는 해당 객체를 소유해야만 호출이 가능. 소유하지 못하면 Blocking
1
2
3
4
5
6
7
8
9
10class Counter {
private int count = 0;
public synchronized int increaseCount() {
return ++count; // 읽고, 수정하고, 쓰는 작업
}
public int getCount() {
return count;
}
}
- synchronized 키워드가 사용된 메소드를 호출하기 위해서는 해당 객체를 소유해야만 호출이 가능. 소유하지 못하면 Blocking
wait()
,notify()
,notifyAll()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62class WorkObject {
public synchronized void methodA() {
System.out.println("ThreadA의 methodA() 작업 실행");
notify(); // 일시정지 상태에 있는 ThreadB를 실행 대기상태로 만듬
try {
wait(); // ThreadA를 일시 정지 상태로 만듬
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public synchronized void methodB() {
System.out.println("ThreadB의 methodB() 작업 실행");
notify(); // 일시정지 상태에 있는 ThreadA를 실행 대기상태로 만듬
try {
wait(); // ThreadB를 일시 정지 상태로 만듬
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
class ThreadA extends Thread {
private WorkObject workObject;
ThreadA(WorkObject workObject) {
this.workObject = workObject;
}
public void run() {
for(int i=0; i<10; i++) {
workObject.methodA(); // 공유객체의 methodA를 반복적으로 호출
}
}
}
class ThreadB extends Thread{
private WorkObject workObject;
ThreadB(WorkObject workObject) {
this.workObject = workObject;
}
public void run() {
for(int i=0; i<10; i++) {
workObject.methodB(); // 공유객체의 methodA를 반복적으로 호출
}
}
}
class Main {
public static void main(String[] args) {
WorkObject sharedObject = new WorkObject();
ThreadA threadA = new ThreadA(sharedObject);
ThreadB threadB = new ThreadB(sharedObject);
threadA.start();
threadB.start();
}
}
세마포어
사전적 의미 : 횟대(깃발)
n개의 깃발을 놓고, 여러 스레드가 경쟁하도록 하는 sync 기법
n = 1이면, BinarySemaphore라고 하며, Lock과 유사하게 동작
메소드
- release(int permits) : permits만큼의 자원 개수 반환
- acquire(int permits) : permits만큼의 자원 사용 (Blocking O)
- tryAcquire(long timeout, TimeUnit unit) : unit단위의 timeout이후 자원을 얻지 못하면 false, 얻으면 true (Blocking X)
- acquireUninterruptibly() : 자원을 얻지못해도 interrupt 발생 X
Example
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25public class Main {
public static void main(String[] args) {
Semaphore sem = new Semaphore(1);
sem.release(10);
sem.tryAcquire();
System.out.println(sem.availablePermits());
try { // Blocking으로 동작
sem.acquire(12);
} catch (InterruptedException e) {
e.printStackTrace();
}
sem.acquireUninterruptibly(); // interrupt에 반응하지 않음
try {
System.out.println(sem.tryAcquire(100, TimeUnit.MILLISECONDS)); // Blocking 하지 않고, 실패하면 false
} catch (InterruptedException e) {
e.printStackTrace();
}
sem.release();
System.out.println(sem.availablePermits());
}
}Dining Philosopher (식사하는 철학자)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71class Philosopher extends Thread {
private final int id;
private final Fork left;
private final Fork right;
public Philosopher(int id, Fork left, Fork right) {
this.id = id;
this.left = left;
this.right = right;
}
public void run() {
while (true) {
try {
left.acquire();
System.out.println(id + ": left taken");
} catch (InterruptedException e) {
e.printStackTrace();
}
try {
if(!right.tryAcquire(1000, TimeUnit.MILLISECONDS)){
left.release();
Thread.yield();
continue;
}
System.out.println(id + ": right taken");
} catch (InterruptedException e) {
e.printStackTrace();
}
try {
Thread.sleep(2000);
System.out.println(id + "is eating");
} catch (InterruptedException e) {
e.printStackTrace();
}
left.release();
right.release();
Thread.yield();
}
}
}
class Fork extends Semaphore {
public Fork() {
super(1);
}
}
public class Main {
public static void main(String[] args) {
Philosopher[] phils = new Philosopher[5];
Fork[] forks = new Fork[5];
for (int i = 0; i < 5; i++) {
forks[i] = new Fork();
}
for (int i = 0; i < 5 - 1; i++) {
phils[i] = new Philosopher(i, forks[i], forks[(i+1) % 5]);
}
phils[4] = new Philosopher(4, forks[0], forks[4]);
for (int i = 0; i < 5; i++){
phils[i].start();
}
}
}
JCF와 Thread
Vector VS List
- List
- 메소드들이 Synchronized 되지 않음
- 속도는 빠르다.
- Vector
- 메소드들이 Synchronized 되어 있음
- 속도는 느리다.
- List
List를 Synchronized 되게 하는 방법
Thread Pool이란?
- 미리 생성해 둔 Thread의 집합을 스레드 풀이라고 한다.
- 미리 Thread를 생성해 두고, 작업만 Thread에 할당하여 동작
- 사용 이유
- Thread를 직접 만들어 사용할 경우, Multi-Thread 작업을 계속 할 때, Thread 생성/삭제 오버해드가 크다.
- 배치작업 (모아두고 한 번에 처리하는 작업)에 유용하여 많이 사용된다.
Thread Pool 생성
Single Thread Pool
1
ExecutorService pool = Executors.newSingleThreadExecutor();
- Thread가 1개인 ThreadPool 생성(SingleThread)
- 실패 시, 새로운 Thread를 생성하지 않음
Cached Thread Pool
1
ExecutorService pool = Executors.newCachedThreadPool();
- 초기 Thread 0개
- 코어 Thread 0개 (일하지 않아도 종료시키지 않는 Thread)
- 요청 작업보다 Thead가 부족하면 새 스레드 생성
- 60초동안 일하지 않는 Thread 제거
Fixed Thread Pool
1
ExecutorService pool = Executors.newFixedThreadPool(int nThreads);
- 최대 Thread nThreads개
- 코어 Thread nThreads개
- 작업하지 않는 Thread도 제거하지 않고 동작
Scheduler Thread Pool
1
ScheduledExecutorService pool = Executors.newScheduledThreadPool(int corePoolSize);
- 지정된 delay후에 실행하도록함(주기적으로 실행하는 명령을 예약)
작업 생성과 처리 요청
작업 생성
Runnable 구현 클래스
1
2
3
4
5
6Runnable task = new Runnable(){
public void run(){
// 작업 내용
}
}Callable 구현 클래스
1
2
3
4
5
6
7Callable<T> task = new Callable<T>(){
public T call() throws Exception{
// 작업 내용
return T;
}Runnable run()은 Return 값이 없고, Callable call()은 Return 값이 있음
처리 요청
매소드
리턴 타입 메소드 설명 void execute(Runnable command) Runnable을 작업 큐에 저장, 작업 처리 결과를 받지 못함 Future<?> submit(Runnable task) Runnable 작업 큐에 저장, 리턴된 Future를 통해 작업 처리 결과를 얻을 수 있음 Future submit(Runnable task, V result) Runnable 작업 큐에 저장, 리턴된 Future를 통해 작업 처리 결과를 얻을 수 있음 Future submit(Callable task) Callable 작업 큐에 저장, 리턴된 Future를 통해 작업 처리 결과를 얻을 수 있음 execute()
작업 처리 결과를 반환하지 않는다.
작업 처리 도중 예외가 발생하면 스레드가 종료되고 해당 스레드는 스레드 풀에서 제거된다.
다른 작업을 처리하기 위해 새로운 스레드를 생성한다.
submit()
작업 처리 결과를 반환한다.
작업 처리 도중 예외가 발생하더라도 스레드는 종료되지 않고 다음 작업을 위해 재사용
스레드의 생성 오버헤드를 방지하기 위해서라도 submit() 을 가급적 사용한다.
Thread Pool 종료
Thread Pool에 속한 Thread는 데몬 스레드가 아니다
따라서 주 스레드 종료시 강제종료 되지 않기 때문에 main 스레드가 종료되어도 실행상태가 유지된다.
어플리케이션을 종료하기 위해서는 스레드 풀을 강제종료시켜야 한다.
excutorService.shutdown()
작업큐에 남아있는 작업까지 모두 마무리 후 종료 (오버헤드를 줄이기 위해 일반적으로 많이 사용.)
excutorService.shoutdownNow()
작업큐 작업 잔량 상관없이 강제 종료
excutorService.awaitTermination(long timeout, TimeUnit unit)
모든 작업 처리를 timeout 시간안에 처리하면 true 리턴 ,처리하지 못하면 작업스레드들을 interrupt()시키고 false리턴