Multi-Thread Programming

Process and Thread

  • Process: OS로부터 메모리를 할당받아 실행중인 프로그램
  • Thread: 프로세스 동작의 최소 단위로, 하나의 프로세스는 여러 스레드로 이루어질 수 있다.

멀티스레드 프로그래밍의 장단점

  • 장점
    • 여러 동작을 병렬적으로 처리하여 CPU의 사용률 향상
    • 시간이 걸리는 동작을 분리하여 프로그램의 응답성 향상
  • 단점
    • Context Switching 오버헤드 발생
    • 스레드 제어의 어려움

스레드 구현

  • 스레드 생성 방법

    1. 익명함수

      1
      2
      3
      4
      5
      Thread threadOne = new Thread(new Runnable() {
      public void run() {
      System.out.println("Hello Thread!");
      }
      });
    2. 람다식

      1
      2
      3
      Thread threadTwo = new Thread(() -> {
      System.out.println("Hello Again, Thread!");
      });
    3. Thread Class 상속

      1
      2
      3
      4
      5
      6
      class MyThread extends Thread {
      public void run() {
      System.out.println("Hello Again Again, Thread!");
      }
      }
      Thread threadThree = new MyThread();
  • 스레드 실행

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    Thread threadOne = new Thread(() -> {
    for (int i = 0; i < 10; i++) {
    System.out.print("1");
    }
    });

    Thread threadTwo = new Thread(() -> {
    for (int i = 0; i < 10; i++) {
    System.out.print("2");
    }
    });

    threadOne.start();
    threadTwo.start();
    System.out.println("Done!");

    Thread 객체는 일회성이기 때문에 여러 번 start()메소드를 호출 할 수 없다


스레드의 상태 및 제어

  • 스레드의 상태

    • getState() 메소드로 스레드의 상태를 확인할 수 있다.

      열거형 상수 설명
      NEW start() 메소드가 아직 호출되지 않음
      RUNNABLE JVM에 의해 실행 가능한 상태
      BLOCKED 객체가 블락된 상태
      WAITING sleep(), wait(), join() 등에 의해 무한히 대기 중인 상태
      TIMED_WAITING sleep(), wait(), join() 등에 의해 정해진 시간 동안 대기 중인 상태
      TERMINATE run() 메소드가 종료된 상태
  • 스레드의 우선순위 제어

    1
    2
    3
    public final static int MIN_PRIORITY = 1;
    public final static int NORM_PRIORITY = 5;
    public final static int MAX_PRIORITY = 10;
    메소드 설명
    void setPriority(int newPriority) 새로운 우선순위로 설정한다.
    int getPriority() 우선순위를 반환한다.
  • sleep()을 이용한 제어

    1
    2
    Thread.sleep(1000); // ms
    Thread.sleep(100, 200); // ms + ns
  • join()을 이용한 스레드 조인

    • 스레드 동작을 동기화하기 위해 사용

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      Thread t1 = new Thread(() -> System.out.println("A"));
      Thread t2 = new Thread(() -> System.out.println("B"));

      t1.start(); // t1 thread start

      t1.join(); // t1 thread 종료시 까지 다른 thread 대기

      t2.start(); // t2 thread start

      t2.join(100); // 100ms 기다린 후에도 t2가 종료되지 않으면 다른 쓰레드 시작

      System.out.println("C");
  • interrupt()를 이용한 대기 중지

    • 기존 동작을 방해하고 반응을 강제하는 메소드
      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      Thread tSleep = new Thread(() -> {
      try {
      Thread.sleep(1000);
      } catch (InterruptedException e) {
      System.out.println("Interrupted");
      }
      });

      tSleep.start();

      try {
      Thread.sleep(500);
      } catch (InterruptedException e) {
      e.printStackTrace();
      }

      tSleep.interrupt();
  • yield()를 이용한 상태 제어

    • sleep()과 달리 곧바로 RUNNABLE 상태로 변경
    • 다른 쓰레드에게 양보하고 바로 실행 대기
    • 테스트용으로 쓰임 -> 쓰레드의 동작이 명확하지 않음(바로 양보하지 않는다)
      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      new Thread(() -> {
      for (int i = 0; i < 20; i++) {
      if (i % 2 == 0) {
      System.out.print("1");
      } else {
      Thread.yield();
      }
      }
      }).start();

      new Thread(() -> {
      for (int i = 0; i < 20; i++) {
      if (i % 2 == 0) {
      System.out.print("2");
      } else {
      Thread.yield();
      }
      }
      }).start();
  • 스레드의 종료

    • run() 메소드의 종료
    • stop() 메소드 호출 (deprecated)

데몬 스레드

  • 정의 : 다른 스레드가 종료될 경우 함께 종료되는 보조 스레드

  • 활용법

    • 보통 대기하며 동작하는 무한 루프로 구현
    • 일정 시간 마다 동작
    • interrupt에 의해서 동작
  • setDaemon() 메소드로 데몬 스레드로 설정

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    class DaemonThread extends Thread {
    public DaemonThread() {
    this.setDaemon(true);
    }

    @Override
    public void run() {
    while (true) {
    try {
    Thread.sleep(1000);
    } catch (InterruptedException e) {
    e.printStackTrace();
    }
    System.out.println("Daemon Thread Run");
    }
    }
    }

데이터 공유와 동기화

  • 스레드간 데이터 공유 시 신뢰성에 문제가 발생할 수 있음

    결과는 1000000이 나오지 않는다 -> count값이 동기화되지 않았기 때문

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    class Counter {
    int count = 0;
    }
    public class Main {
    public static void main(String args[]){
    Counter counter = new Counter();

    for (int i = 0; i < 1000; i++) {
    new Thread(() -> {
    for (int j = 0; j < 1000; j++) {
    counter.count = counter.count + 1;
    }
    }).start();
    }

    try {
    Thread.sleep(1000);
    } catch (InterruptedException e) {
    e.printStackTrace();
    }

    System.out.println(counter.count);
    }
    }
  • synchronized 키워드 사용

    1
    2
    3
    synchronized void method() {
    // 공유 데이터 사용
    }
    1
    2
    3
    4
    5
    void method() {
    synchronized(sharedObj) {
    // 공유 데이터 사용
    }
    }
    • Example

      • this 객체의 Intrinsic Lock을 이용한 구현

        1
        2
        3
        4
        5
        6
        7
        8
        9
        10
        11
        12
        class Counter {
        private int count = 0;
        public int increaseCount() {
        synchronized (this) {
        return ++count; // 읽고, 수정하고, 쓰는 작업
        }
        }

        public int getCount() {
        return count;
        }
        }
      • 메소드에 synchronized 키워드 사용

        • synchronized 키워드가 사용된 메소드를 호출하기 위해서는 해당 객체를 소유해야만 호출이 가능. 소유하지 못하면 Blocking
          1
          2
          3
          4
          5
          6
          7
          8
          9
          10
          class Counter {
          private int count = 0;
          public synchronized int increaseCount() {
          return ++count; // 읽고, 수정하고, 쓰는 작업
          }

          public int getCount() {
          return count;
          }
          }
  • wait(), notify(), notifyAll()

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    class WorkObject {
    public synchronized void methodA() {
    System.out.println("ThreadA의 methodA() 작업 실행");
    notify(); // 일시정지 상태에 있는 ThreadB를 실행 대기상태로 만듬
    try {
    wait(); // ThreadA를 일시 정지 상태로 만듬
    } catch (InterruptedException e) {
    e.printStackTrace();
    }
    }

    public synchronized void methodB() {
    System.out.println("ThreadB의 methodB() 작업 실행");
    notify(); // 일시정지 상태에 있는 ThreadA를 실행 대기상태로 만듬
    try {
    wait(); // ThreadB를 일시 정지 상태로 만듬
    } catch (InterruptedException e) {
    e.printStackTrace();
    }
    }
    }


    class ThreadA extends Thread {
    private WorkObject workObject;

    ThreadA(WorkObject workObject) {
    this.workObject = workObject;
    }

    public void run() {
    for(int i=0; i<10; i++) {
    workObject.methodA(); // 공유객체의 methodA를 반복적으로 호출
    }
    }
    }

    class ThreadB extends Thread{
    private WorkObject workObject;

    ThreadB(WorkObject workObject) {
    this.workObject = workObject;
    }

    public void run() {
    for(int i=0; i<10; i++) {
    workObject.methodB(); // 공유객체의 methodA를 반복적으로 호출
    }
    }
    }

    class Main {
    public static void main(String[] args) {
    WorkObject sharedObject = new WorkObject();

    ThreadA threadA = new ThreadA(sharedObject);
    ThreadB threadB = new ThreadB(sharedObject);

    threadA.start();
    threadB.start();
    }
    }

세마포어

  • 사전적 의미 : 횟대(깃발)

  • n개의 깃발을 놓고, 여러 스레드가 경쟁하도록 하는 sync 기법

  • n = 1이면, BinarySemaphore라고 하며, Lock과 유사하게 동작

  • 메소드

    • release(int permits) : permits만큼의 자원 개수 반환
    • acquire(int permits) : permits만큼의 자원 사용 (Blocking O)
    • tryAcquire(long timeout, TimeUnit unit) : unit단위의 timeout이후 자원을 얻지 못하면 false, 얻으면 true (Blocking X)
    • acquireUninterruptibly() : 자원을 얻지못해도 interrupt 발생 X
  • Example

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    public class Main {
    public static void main(String[] args) {
    Semaphore sem = new Semaphore(1);

    sem.release(10);
    sem.tryAcquire();
    System.out.println(sem.availablePermits());

    try { // Blocking으로 동작
    sem.acquire(12);
    } catch (InterruptedException e) {
    e.printStackTrace();
    }

    sem.acquireUninterruptibly(); // interrupt에 반응하지 않음
    try {
    System.out.println(sem.tryAcquire(100, TimeUnit.MILLISECONDS)); // Blocking 하지 않고, 실패하면 false
    } catch (InterruptedException e) {
    e.printStackTrace();
    }
    sem.release();

    System.out.println(sem.availablePermits());
    }
    }
  • Dining Philosopher (식사하는 철학자)

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    class Philosopher extends Thread {
    private final int id;
    private final Fork left;
    private final Fork right;

    public Philosopher(int id, Fork left, Fork right) {
    this.id = id;
    this.left = left;
    this.right = right;
    }

    @Override
    public void run() {
    while (true) {
    try {
    left.acquire();
    System.out.println(id + ": left taken");
    } catch (InterruptedException e) {
    e.printStackTrace();
    }

    try {
    if(!right.tryAcquire(1000, TimeUnit.MILLISECONDS)){
    left.release();
    Thread.yield();
    continue;
    }
    System.out.println(id + ": right taken");
    } catch (InterruptedException e) {
    e.printStackTrace();
    }

    try {
    Thread.sleep(2000);
    System.out.println(id + "is eating");
    } catch (InterruptedException e) {
    e.printStackTrace();
    }

    left.release();
    right.release();
    Thread.yield();
    }
    }
    }

    class Fork extends Semaphore {

    public Fork() {
    super(1);
    }
    }
    public class Main {
    public static void main(String[] args) {
    Philosopher[] phils = new Philosopher[5];
    Fork[] forks = new Fork[5];

    for (int i = 0; i < 5; i++) {
    forks[i] = new Fork();
    }

    for (int i = 0; i < 5 - 1; i++) {
    phils[i] = new Philosopher(i, forks[i], forks[(i+1) % 5]);
    }

    phils[4] = new Philosopher(4, forks[0], forks[4]);
    for (int i = 0; i < 5; i++){
    phils[i].start();
    }
    }
    }

JCF와 Thread

  • Vector VS List

    • List
      • 메소드들이 Synchronized 되지 않음
      • 속도는 빠르다.
    • Vector
      • 메소드들이 Synchronized 되어 있음
      • 속도는 느리다.
  • List를 Synchronized 되게 하는 방법

    • Collections의 synchronizedList() 사용
      1
      2
      List<Integer> list1 = new ArrayList<>();
      List<Integer> list2 = Collections.synchronizedList(list1);

      Thread Pool

  • Thread Pool이란?

    • 미리 생성해 둔 Thread의 집합을 스레드 풀이라고 한다.
    • 미리 Thread를 생성해 두고, 작업만 Thread에 할당하여 동작
    • 사용 이유
      • Thread를 직접 만들어 사용할 경우, Multi-Thread 작업을 계속 할 때, Thread 생성/삭제 오버해드가 크다.
      • 배치작업 (모아두고 한 번에 처리하는 작업)에 유용하여 많이 사용된다.
  • Thread Pool 생성

    • Single Thread Pool

      1
      ExecutorService pool = Executors.newSingleThreadExecutor();
      • Thread가 1개인 ThreadPool 생성(SingleThread)
      • 실패 시, 새로운 Thread를 생성하지 않음
    • Cached Thread Pool

      1
      ExecutorService pool = Executors.newCachedThreadPool();
      • 초기 Thread 0개
      • 코어 Thread 0개 (일하지 않아도 종료시키지 않는 Thread)
      • 요청 작업보다 Thead가 부족하면 새 스레드 생성
      • 60초동안 일하지 않는 Thread 제거
    • Fixed Thread Pool

      1
      ExecutorService pool = Executors.newFixedThreadPool(int nThreads);
      • 최대 Thread nThreads개
      • 코어 Thread nThreads개
      • 작업하지 않는 Thread도 제거하지 않고 동작
    • Scheduler Thread Pool

      1
      ScheduledExecutorService pool = Executors.newScheduledThreadPool(int corePoolSize);
      • 지정된 delay후에 실행하도록함(주기적으로 실행하는 명령을 예약)
  • 작업 생성과 처리 요청

    • 작업 생성

      • Runnable 구현 클래스

        1
        2
        3
        4
        5
        6
        Runnable task = new Runnable(){
        @Override
        public void run(){
        // 작업 내용
        }
        }
      • Callable 구현 클래스

        1
        2
        3
        4
        5
        6
        7
        Callable<T> task = new Callable<T>(){
        @Override
        public T call() throws Exception{
        // 작업 내용
        return T;
        }

        Runnable run()은 Return 값이 없고, Callable call()은 Return 값이 있음

    • 처리 요청

      • 매소드

        리턴 타입 메소드 설명
        void execute(Runnable command) Runnable을 작업 큐에 저장, 작업 처리 결과를 받지 못함
        Future<?> submit(Runnable task) Runnable 작업 큐에 저장, 리턴된 Future를 통해 작업 처리 결과를 얻을 수 있음
        Future submit(Runnable task, V result) Runnable 작업 큐에 저장, 리턴된 Future를 통해 작업 처리 결과를 얻을 수 있음
        Future submit(Callable task) Callable 작업 큐에 저장, 리턴된 Future를 통해 작업 처리 결과를 얻을 수 있음
        • execute()

          • 작업 처리 결과를 반환하지 않는다.

          • 작업 처리 도중 예외가 발생하면 스레드가 종료되고 해당 스레드는 스레드 풀에서 제거된다.

          • 다른 작업을 처리하기 위해 새로운 스레드를 생성한다.

        • submit()

          • 작업 처리 결과를 반환한다.

          • 작업 처리 도중 예외가 발생하더라도 스레드는 종료되지 않고 다음 작업을 위해 재사용

          • 스레드의 생성 오버헤드를 방지하기 위해서라도 submit() 을 가급적 사용한다.

  • Thread Pool 종료

    Thread Pool에 속한 Thread는 데몬 스레드가 아니다

    따라서 주 스레드 종료시 강제종료 되지 않기 때문에 main 스레드가 종료되어도 실행상태가 유지된다.

    어플리케이션을 종료하기 위해서는 스레드 풀을 강제종료시켜야 한다.

    • excutorService.shutdown()

      작업큐에 남아있는 작업까지 모두 마무리 후 종료 (오버헤드를 줄이기 위해 일반적으로 많이 사용.)

    • excutorService.shoutdownNow()

      작업큐 작업 잔량 상관없이 강제 종료

    • excutorService.awaitTermination(long timeout, TimeUnit unit)

      모든 작업 처리를 timeout 시간안에 처리하면 true 리턴 ,처리하지 못하면 작업스레드들을 interrupt()시키고 false리턴