Semaphore is a data structure for solving concurrency problems. It enforces constraints on how multiple threads or processes access the common resource (or known as enter the critical section).
It keeps a counter whose meaning depends on how you want to use it. And there are two operations which are performend on the counter.
- P: decrement & wait, i.e. decrease the value of the counter, if the value has become negative, then the current thread or process must wait until it is waked
- V: increament & signal, i.e. increase the value of the counter, and signal a waiting thread or process if there is any
If the semaphore is bounded, then the value of its counter cannot exceed its initial value - if the value exceeds its initial value, an error should exceed its value.
Semaphore also keeps a queue to manage the waiting threads or processes - it might follow the FIFO ordering or randomly pick an element from the queue.
Now we have a rough impression of how semaphore is, let’s check out how it is implemented first and then how it is applied to solve concurrency problems.
Implementation of semaphore
This is my implementation based on the definition of semaphore, but it is just a demo for demonstration since it is inefficient to handle threads in this way.1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39public class Semaphore {
private int count;
private Queue<Thread> queue;
public Semaphore(int n) {
this.count = n;
queue = new LinkedList<>();
}
public void p() {
count--;
if (count < 0) {
Thread thread = Thread.currentThread();
queue.add(thread);
block(thread);
}
}
private void block(Thread thread) {
// this is not a good way of blocking a thread
try {
while (true) {
System.out.println("Being blocked");
thread.sleep(2000);
}
} catch (InterruptedException e) {
}
}
public void v() {
count++;
if (!queue.isEmpty()) {
Thread thread = queue.poll();
thread.interrupt();
}
}
}
Semaphore implemented in Java concurrent package
The class Semaphore is implemented as a wraper of a subclass of AbstractQueuedSynchronizer. It uses AbstractQueuedSynchronizer state to represent the number of permits.
I have simplified the implementation here for demonstration.
1 | public class Semaphore { |
1 | public abstract class AbstractQueuedSynchronizer { |
Semaphore implemented in Python threading module
This implementation uses a condition variable to block threads until they are notified by another thread.
1 | class Semaphore: |
Application of semaphore
There are two major applications of semaphore:
- protecting critical section
- signal - waking other waiting threads or processes
Protecting critical section
Scenario I. Increase a counter by 1 if a thread is started. By introducing semaphore, only one thread can update the value of count at a time.1
2
3
4
5
6
7
8
9
10
11
12// Demo written in Java
Semaphore semaphore = new Semaphore(1);
final MutableInteger count = new MutableInteger(0); // self-defined class
for (int i = 0; i < 20; i++) {
new Thread(new Runnable() {
public void run() {
semaphore.acquire();
count.setValue(count.getValue() + 1);
semaphore.release();
}
}).start();
}
Signal
Scenario I. Thread1 can execute eventA only if Thread2 has executed eventB. How can Thread1 know whether Thread2 executes eventB? With the help of semaphore, Thread1 is allowed to be blocked, waiting for signal from Thread2.
1 | // Demo written in Java |
Scenario II. Every thread runs the following code:1
2eventA();
eventB();
Any thread cannot execute eventB unless all threads have executed eventA.
1 | // Demo written in Java |
Scenario III. The main thread cannot proceed until all the subthreads finish their tasks.1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20// Demo written in Java
Semaphore signalSemaphore = new Semaphore(0);
Semaphore countSemaphore = new Semaphore(1);
final MutableInteger count = new MutableInteger(0);
for (int i = 0; i < 10; i++) {
new Thread(new Runnable() {
public void run() {
eventA();
countSemaphore.acquire();
count.setValue(value);
if (count == count.getValue() + 1) {
signalSemaphore.release();
}
countSemaphore.release();
}
})
}
signalSemaphore.require();
eventB();
Java concurrent package has provided a class CountDownLatch for solving such a problem. Here is a demo of the solution:1
2
3
4
5
6
7
8
9
10
11
12
13final CountDownLatch latch = new CountDownLatch(10);
final MutableInteger count = new MutableInteger(0);
for (int i = 0; i < 10; i++) {
new Thread(new Runnable() {
public void run() {
count.setValue(count.getValue() + 1);
latch.countDown();
}
}).start();
}
latch.await();
System.out.println(count.getValue());
Similar to the class Semaphore, CountDownLatch encapsulates a class which controls synchronization by extending AbstractQueuedSynchronizer, but its implementation does not rely on semaphore.
CountDownLatch is initialized with the value of count. When the method countDown() is called, it reduces the count value by 1 (It’s a bit weird to overwrite tryReleaseShared() by using decrement, opposite to the original meaning of release) and sends out signal when the value is changed to zero. When await() is called, it checks the value of count, the current thread will be blocked if the value is not larger than zero.