์ƒ์‚ฐ์ž-์†Œ๋น„์ž ๋ฌธ์ œ๋Š” ์ƒ์‚ฐ์ž๊ฐ€ ๋ฐ์ดํ„ฐ๋ฅผ ์ƒ์‚ฐํ•˜๋ฉด ์†Œ๋น„์ž๋Š” ๊ทธ ๋ฐ์ดํ„ฐ๋ฅผ ์†Œ๋น„ํ•˜๋Š” ํ˜•ํƒœ์˜ ๋ฌธ์ œ์ด๋‹ค. ์ปดํ“จํ„ฐ ํ™˜๊ฒฝ์—์„œ ์˜ˆ๋ฅผ ๋ณด๋ฉด, ์ปดํŒŒ์ผ๋Ÿฌ โ†’ ์–ด์…ˆ๋ธ”๋Ÿฌ, ์›น ์„œ๋ฒ„ โ†’ ์›น ํด๋ผ์ด์–ธํŠธ ๋“ฑ์ด ์žˆ๋‹ค. ์ปดํŒŒ์ผ๋Ÿฌ์—์„œ ์ƒ์„ฑํ•œ ์–ด์…ˆ๋ธ”๋ฆฌ์–ด๋Š” ์–ด์…ˆ๋ธ”๋Ÿฌ์—์„œ ์ด๋ฅผ ์†Œ๋น„ํ•˜์—ฌ ๊ธฐ๊ณ„์–ด๋ฅผ ๋งŒ๋“ ๋‹ค.

์ƒ์‚ฐ์ž-์†Œ๋น„์ž ๊ด€๊ณ„๋ฅผ ๊ฐ„๋‹จํžˆ ๊ทธ๋ฆผ์œผ๋กœ ๋‚˜ํƒ€๋‚ด๋ฉด ์œ„์™€ ๊ฐ™๋‹ค. ์ด ๊ด€๊ณ„์˜ ๋Œ€๋ถ€๋ถ„์€ ์ƒ์‚ฐ์ž์—์„œ ์ƒ์‚ฐํ•œ ๋ฐ์ดํ„ฐ ์–‘์„ ์†Œ๋น„์ž๊ฐ€ ํ•œ ๋ฒˆ์— ์†Œ๋น„ํ•˜๋Š” ๊ฒฝ์šฐ๋Š” ๋“œ๋ฌผ๋‹ค. ์ƒ์‚ฐํ•œ ๋ฐ์ดํ„ฐ๋Š” ์ค‘๊ฐ„์˜ buffer ๋ผ๋Š” ์ €์žฅ ๊ณต๊ฐ„(๋ฉ”๋ชจ๋ฆฌ ๊ณต๊ฐ„)์— ์ €์žฅํ•ด๋‘๊ณ  ์†Œ๋น„์ž๋Š” ์—ฌ๊ธฐ์„œ ํ•„์š”ํ•œ ๋งŒํผ ๊ฐ€์ ธ๊ฐ„๋‹ค. ์ฐฝ๊ณ ์™€ ๊ฐ™๋‹ค.

๋ฒ„ํผ์˜ ํฌ๊ธฐ๋Š” ํ˜„์‹ค์ ์œผ๋กœ ์œ ํ•œํ•˜๋‹ค. ๊ทธ๋Ÿฌ๋ฏ€๋กœ ์ƒ์‚ฐ์ž๋Š” ๋ฒ„ํผ ๊ณต๊ฐ„์ด ๊ฐ€๋“ ์ฐจ๋ฉด ๋” ์ด์ƒ ์ €์žฅํ•  ์ˆ˜ ์—†๋‹ค. ์†Œ๋น„์ž๋Š” ๋ฒ„ํผ๊ฐ€ ๋น„์–ด ์žˆ์œผ๋ฉด ๊ฐ€์ ธ์˜ฌ ์ˆ˜ ์—†๋‹ค. ์ด๋Ÿฌํ•œ ์œ ํ•œํ•œ ๋ฒ„ํผ ํฌ๊ธฐ๋ฅผ bounded buffer ๋ผ๊ณ  ํ•œ๋‹ค.

์ •๋ฆฌ

  • ์ƒ์‚ฐ์ž ์†Œ๋น„์ž ๋ฌธ์ œ
    • ์ƒ์‚ฐ์ž : ๋ฐ์ดํ„ฐ ์ƒ์‚ฐ, ์†Œ๋น„์ž : ๋ฐ์ดํ„ฐ ์†Œ๋น„
    • ์ปดํŒŒ์ผ๋Ÿฌ โ†’ ์–ด์…ˆ๋ธ”๋Ÿฌ, ํŒŒ์ผ ์„œ๋ฒ„ โ†’ ํด๋ผ์ด์–ธํŠธ, ์›น์„œ๋ฒ„ โ†’ ์›น ํด๋ผ์ด์–ธํŠธ
  • Bounded Buffer
    • Buffer๋ž€ ์ƒ์‚ฐ์ž์™€ ์†Œ๋น„์ž ์‚ฌ์ด์— ์กด์žฌํ•˜๋Š” ์ฐฝ๊ณ ์™€ ๊ฐ™์Œ
    • ์ƒ์‚ฐ์ž์™€ ์†Œ๋น„์ž ์‚ฌ์ด์˜ ์†๋„ ์ฐจ์ด๋ฅผ ๋ณด์™„ํ•˜๊ธฐ ์œ„ํ•ด ํ•„์š”ํ•จ
    • ๋ฒ„ํผ ํฌ๊ธฐ๋Š” ์œ ํ•œํ•˜๋‹ค.
      • ์ƒ์‚ฐ์ž๋Š” ๊ฐ€๋“ ์ฐจ๋ฉด ๋„ฃ์„ ์ˆ˜ ์—†๋‹ค.
      • ์†Œ๋น„์ž๋Š” ๋ฒ„ํผ๊ฐ€ ๋น„๋ฉด ๋บ„ ์ˆ˜ ์—†๋‹ค.

1.1 Code

Main

class Test {
	public static void main(String[] arg) {
		Buffer b = new Buffer(100);
		Producer p = new Producer(b, 10000);
		Consumer c = new Consumer(b, 10000);
		p.start();
		c.start();
		try {
			p.join();
			c.join();
		} catch (InterruptedException e) {}
		System.out.println("Number of items in the buf is " + b.count);
	}
}

Buffer Class

์ด ๋ถ€๋ถ„์„ ์ฃผ๋ชฉํ•ด์„œ ๋ด์•ผ ํ•œ๋‹ค.

class Buffer {
	int[] buf; // buf: Bounded buffer
	int size;	// size: ๋ฒ„ํผ ํฌ๊ธฐ
	int count; // count: ๋ฒ„ํผ์— ์ €์žฅ๋œ ๋ฐ์ดํ„ฐ ๊ฐœ์ˆ˜
	int in; // in: ์ƒ์‚ฐํ•œ ๋ฐ์ดํ„ฐ๋ฅผ ๋‹ด์„ ๋ฒ„ํผ ์ธ๋ฑ์Šค
	int out; // out: ์†Œ๋น„ํ•  ๋ฐ์ดํ„ฐ๋ฅผ ๊ฐ€๋ฆฌํ‚ค๋Š” ๋ฒ„ํผ ์ธ๋ฑ์Šค
	Buffer(int size) {
		buf = new int[size];
		this.size = size;
		count = in = out = 0;
	}
	void insert(int item) {
		/* check if buf is full */
		while (count == size)
			;
		/* buf is not full */
		buf[in] = item;
		in = (in+1)%size; //  Circular Queue
		count++;
	}
	int remove() {
		/* check if buf is empty */
		while (count == 0)
			;
		/* buf is not empty */
		int item = buf[out];
		out = (out+1)%size; //  Circular Queue
		count--;
		return item;
	}
}

Buffer ํด๋ž˜์Šค์˜ ๋ฉค๋ฒ„ ๋ณ€์ˆ˜๋ฅผ ๋ณด๋ฉด

  • buf: Bounded buffer
  • size: ๋ฒ„ํผ ํฌ๊ธฐ
  • count: ๋ฒ„ํผ์— ์ €์žฅ๋œ ๋ฐ์ดํ„ฐ ๊ฐœ์ˆ˜
  • in: ์ƒ์‚ฐํ•œ ๋ฐ์ดํ„ฐ๋ฅผ ๋‹ด์„ ๋ฒ„ํผ ์ธ๋ฑ์Šค
  • out: ์†Œ๋น„ํ•  ๋ฐ์ดํ„ฐ๋ฅผ ๊ฐ€๋ฆฌํ‚ค๋Š” ๋ฒ„ํผ ์ธ๋ฑ์Šค

Producer

/****** ์ƒ์‚ฐ์ž ******/
class Producer extends Thread {
	Buffer b;
	int N;
	Producer(Buffer b, int N) {
		this.b = b; this.N = N;
	}
	public void run() {
		for (int i=0; i<N; i++)
			b.insert(i);
	}
}

Consumer

/****** ์†Œ๋น„์ž ******/
class Consumer extends Thread {
	Buffer b;
	int N;
	Consumer(Buffer b, int N) {
		this.b = b; this.N = N;
	}
	public void run() {
		int item;
		for (int i=0; i<N; i++)
			item = b.remove();
	}
}

๋งŒ์•ฝ ์ƒ์„ฑ์ž๊ฐ€ ๋ฐ์ดํ„ฐ๋ฅผ ๊ณ„์† ์ƒ์„ฑํ•˜์—ฌ ๋ฒ„ํผ์˜ ๋งˆ์ง€๋ง‰ ์ธ๋ฑ์Šค๋กœ ๊ฐ€๋ฉด ๊ทธ ๋‹ค์Œ์€ ์ฒ˜์Œ์œผ๋กœ ๋˜๋Œ์•„๊ฐ„๋‹ค. (circular buffer) ์†Œ๋น„ํ•˜๋Š” ๊ฒƒ๋„ ๋งˆ์ฐฌ๊ฐ€์ง€์ด๋‹ค.

main์„ ๋ณด๋ฉด ํฌ๊ธฐ๊ฐ€ 100์ธ ๋ฒ„ํผ๋ฅผ ์ƒ์„ฑํ•˜๊ณ  2๊ฐœ์˜ ์“ฐ๋ ˆ๋“œ๊ฐ€ ๊ฐ๊ฐ ์ƒ์‚ฐ์ž์™€ ์†Œ๋น„์ž ์—ญํ• ์„ ํ•˜์—ฌ ๊ฐ๊ฐ 10000๋ฒˆ์”ฉ ์ƒ์‚ฐํ•˜๊ณ  ์†Œ๋น„ํ•œ๋‹ค. ์ •์ƒ์ ์ธ ๊ฒฐ๊ณผ๋Š” count๊ฐ’์ด 0์ด ์ถœ๋ ฅ๋˜์•ผ ํ•œ๋‹ค.

ํ•˜์ง€๋งŒ ์‹ค์ œ ์ฝ”๋“œ๋ฅผ ์ˆ˜ํ–‰ํ•˜๋ฉด ๋ฌดํ•œ ๋ฃจํ”„์— ๋น ์ง€๊ฑฐ๋‚˜, count๊ฐ’์— ์ „ํ˜€ ์˜ˆ์ƒํ•˜์ง€ ์•Š์€ ๊ฐ’์ด ์ถœ๋ ฅ๋œ๋‹ค.

์ด ๋ฌธ์ œ ๋‹น์—ฐํ•˜๊ฒŒ๋„ ๋™๊ธฐํ™” ๋ฌธ์ œ์ด๋‹ค. ์ƒ์‚ฐ์ž์™€ ์†Œ๋น„์ž๊ฐ€ ๋™์‹œ์— ์ ‘๊ทผํ•˜๋Š” ๊ณตํ†ต ๋ณ€์ˆ˜์ธ buf, count ๋ฅผ ๋‘ ํ”„๋กœ์„ธ์Šค๊ฐ€ ๋™์‹œ์— ์—…๋ฐ์ดํŠธํ•˜๊ธฐ ๋•Œ๋ฌธ์ด๋‹ค. ๋‹ค์‹œ ๋งํ•˜๋ฉด ์ž„๊ณ„๊ตฌ์—ญ์— ๋™์‹œ์— ์ ‘๊ทผํ•œ ๊ฒƒ์ด๋‹ค.

1.2 ๋™๊ธฐํ™” ํ•ด๊ฒฐ

ํ•ด๊ฒฐ๋ฐฉ๋ฒ•์€ ์•ž์„œ ๋ฐฐ์› ๋˜ ์„ธ๋งˆํฌ๋ฅผ ์‚ฌ์šฉํ•˜์—ฌ mutual exclusion์„ ๋ณด์žฅํ•˜๋Š” ๊ฒƒ์ด๋‹ค. ์ž„๊ณ„๊ตฌ์—ญ์„ ๋™์‹œ์— ์ ‘๊ทผํ•˜๋Š” ๊ฒƒ์„ ๋ฐฉ์ง€ํ•˜๊ณ  ํ•˜๋‚˜์˜ ํ”„๋กœ์„ธ์Šค๋งŒ ํ—ˆ์šฉํ•ด์•ผํ•œ๋‹ค. sem์ด๋ผ๊ณ  ์„ ์–ธํ•˜์ง€ ์•Š๊ณ  mutex๋ผ ์„ ์–ธํ•œ๋‹ค. ์ง€๊ธˆ ๋งŒ๋“œ๋Š” semaphore๋Š” **mut**ual **ex**clusion์„ ๋ง‰๋Š” ์—ญํ• ์„ ํ•˜๊ธฐ ๋•Œ๋ฌธ์ด๋‹ค.

Buffer Class

class Buffer {
	int[] buf;
	int size;
	int count;
	int in;
	int out;
	Semaphore mutex;   // ์„ธ๋งˆํฌ ์„ ์–ธ
 
	Buffer(int size) {
		buf = new int[size];
		this.size = size;
		count = in = out = 0;
		mutex = new Semaphore(1);
	}
 
	void insert(int item) {
		/* check if buf is full */
		while (count == size)
			;
		/* buf is not full */
		try {
            mutex.acquire();
            buf[in] = item;
            in = (in+1)%size;
            count++;
            mutex.release();
		} catch(InterruptedException e) {}
	}
 
	int remove() {
		/* check if buf is empty */
		while (count == 0)
			;
		/* buf is not empty */
		try {
			mutex.acquire();
			int item = buf[out];
			out = (out+1)%size;
			count--;
			mutex.release();
			return item;
		} catch(InterruptedException e) {}
 
		return -1;
	}
}

์œ„๋Š” ์ž„๊ณ„๊ตฌ์—ญ์— ์„ธ๋งˆํฌ๋ฅผ ์ถ”๊ฐ€ํ•œ ์ฝ”๋“œ์ด๋‹ค. ์ž„๊ณ„๊ตฌ์—ญ์€ ์œ„์—์„œ ๋งํ–ˆ๋“ฏ์ด buf, count ์— ์ ‘๊ทผํ•˜๋Š” ์˜์—ญ์ด๋ฏ€๋กœ insert(), remove() ํ•จ์ˆ˜ ๋‚ด๋ถ€์— ์„ ์–ธํ•œ ๊ฒƒ์„ ๋ณผ ์ˆ˜ ์žˆ๋‹ค.

1.3 Busy waiting

์—ฌ๊ธฐ์„œ ํ•œ ๊ฐ€์ง€ ๋” ๋ฌธ์ œ์ ์ด ์žˆ๋‹ค. ๋ฐ”๋กœ busy waiting ์ด๋‹ค. ์œ„์—์„œ busy waiting์€ ์ƒ์‚ฐ๊ณผ ์†Œ๋น„ํ•˜๊ธฐ ์ „์— ๋ฒ„ํผ๊ฐ€ ๊ฐ€๋“ ์ฐผ๋Š”์ง€ ๋น„์–ด ์žˆ๋Š”์ง€ ํ™•์ธํ•˜๋Š” ๋ฌดํ•œ ๋ฐ˜๋ณต๋ฌธ์„ ๋งํ•œ๋‹ค. ์ด๋Š” ์•„๋ฌด ์ผ๋„ ํ•˜์ง€ ์•Š์œผ๋ฉด์„œ ๋ฌดํ•œ์œผ๋กœ ๋ฐ˜๋ณตํ•˜์—ฌ CPU๋ฅผ ์ ์œ ํ•˜๊ณ  ์žˆ์œผ๋ฏ€๋กœ ๋งค์šฐ ๋น„ํšจ์œจ์ ์ด๋‹ค. ์ด๋ฅผ ํ•ด๊ฒฐํ•  ์ˆ˜ ์žˆ๋Š” ๊ฒƒ๋„ ์„ธ๋งˆํฌ์ด๋‹ค.

class Buffer {
	int[] buf;
	int size;
	int count;
	int in;
	int out;
	Semaphore mutex, full, empty;
 
	Buffer(int size) {
		buf = new int[size];
		this.size = size;
		count = in = out = 0;
		mutex = new Semaphore(1);
		full = new Semaphore(0);
		empty = new Semaphore(size);
	}
 
	void insert(int item) {
		try {
            empty.acquire();    // ๋ฒ„ํผ์˜ ๋น„์–ด์žˆ๋Š” ๊ณต๊ฐ„์„ 1 ๊ฐ์†Œ์‹œํ‚จ๋‹ค.(๋น„์–ด์žˆ๋Š” ๊ณต๊ฐ„์ด ์—†์œผ๋ฉด block)
            mutex.acquire();
            buf[in] = item;
            in = (in+1)%size;
            count++;
            mutex.release();
            full.release();    // ๋ฒ„ํผ์˜ ์ฐฌ ๊ณต๊ฐ„์„ 1 ์ฆ๊ฐ€์‹œํ‚จ๋‹ค.
          } catch(InterruptedException e) {}
	}
 
	int remove() {
		try {
            full.acquire();    // ๋ฒ„ํผ์˜ ์ฐฌ ๊ณต๊ฐ„์„ 1 ๊ฐ์†Œ์‹œํ‚จ๋‹ค.(๋ฒ„ํผ๊ฐ€ ๋ชจ๋‘ ๋น„์–ด์žˆ์œผ๋ฉด block)
            mutex.acquire();
            int item = buf[out];
            out = (out+1)%size;
            count--;
            mutex.release();
            empty.release();   // ๋ฒ„ํผ์˜ ๋น„์–ด์žˆ๋Š” ๊ณต๊ฐ„์„ 1 ์ฆ๊ฐ€์‹œํ‚จ๋‹ค.
            return item;
          } catch(InterruptedException e) {}
		return -1;
	}
}

busy waiting์„ ์—†์• ๊ธฐ ์œ„ํ•ด ๋‘ ๊ฐœ์˜ ์„ธ๋งˆํฌ๋ฅผ ๋” ์ถ”๊ฐ€ํ•˜์˜€๋‹ค.

  • empty: ๋ฒ„ํผ์—์„œ ๋น„์–ด์žˆ๋Š” ๊ณต๊ฐ„์˜ ๊ฐœ์ˆ˜(์ดˆ๊ธฐ๊ฐ’ size)
  • full: ๋ฒ„ํผ์—์„œ ์ฐจ์žˆ๋Š” ๊ณต๊ฐ„์˜ ๊ฐœ์ˆ˜(์ดˆ๊ธฐ๊ฐ’ 0)

์ถ”๊ฐ€ํ•œ ์„ธ๋งˆํฌ ๋ณ€์ˆ˜๋Š” ์œ„์™€ ๊ฐ™๊ณ , empty๋Š” ์ดˆ๊ธฐํ™”ํ•  ๋•Œ ๋ฒ„ํผ๋Š” ๋ชจ๋‘ ๋น„์–ด์žˆ์œผ๋ฏ€๋กœ ๋ฒ„ํผ์˜ ํฌ๊ธฐ๋กœ ์ดˆ๊ธฐํ™”ํ•˜๊ณ  full์€ ์ดˆ๊ธฐ ๋ฒ„ํผ์—๋Š” ์•„๋ฌด๋Ÿฐ ๋ฐ์ดํ„ฐ๊ฐ€ ์—†์œผ๋ฏ€๋กœ 0์œผ๋กœ ์ดˆ๊ธฐํ™”ํ•œ๋‹ค.

๋ฐ์ดํ„ฐ๋ฅผ ์ƒ์„ฑํ•˜๊ธฐ ์ „์— ๋น„์–ด์žˆ๋Š” ๊ณต๊ฐ„์ด ์žˆ๋Š”์ง€ ํ™•์ธํ•œ๋‹ค. ์—†๋‹ค๋ฉด empty์„ธ๋งˆํฌ์˜ value๊ฐ’์ด -1์ด ๋˜๋ฏ€๋กœ block์ด ๋˜๊ณ , ์žˆ๋‹ค๋ฉด ์ž„๊ณ„๊ตฌ์—ญ ๋‚ด๋ถ€๋กœ ์ง„์ž…ํ•˜์—ฌ ๋ฐ์ดํ„ฐ๋ฅผ ์ƒ์„ฑํ•œ๋‹ค. ์ƒ์„ฑ์ด ์™„๋ฃŒ๋˜๋ฉด full์„ธ๋งˆํฌ์˜ value๊ฐ’์„ 1 ์ฆ๊ฐ€์‹œํ‚จ๋‹ค.(์†Œ๋น„์ž๋Š” ๋ฐ˜๋Œ€๋กœ ๋™์ž‘ํ•œ๋‹ค๊ณ  ๋ณผ ์ˆ˜ ์žˆ๋‹ค. ์ฝ”๋“œ์ฐธ๊ณ ) ์ด ์ฝ”๋“œ๋ฅผ ์‹คํ–‰์‹œ์ผœ๋ณด๋ฉด ์ •์ƒ์ ์œผ๋กœ ๊ฒฐ๊ณผ๊ฐ’์ด 0์ด ์ถœ๋ ฅ๋˜๋Š” ๊ฒƒ์„ ํ™•์ธํ•  ์ˆ˜ ์žˆ๋‹ค.

Reference