Semaphore

Semaphore is a data structure for solving concurrency problems. It enforces constraints on how multiple threads or processes access the common resource (or known as enter the critical section).

It keeps a counter whose meaning depends on how you want to use it. And there are two operations which are performend on the counter.

  • P: decrement & wait, i.e. decrease the value of the counter, if the value has become negative, then the current thread or process must wait until it is waked
  • V: increament & signal, i.e. increase the value of the counter, and signal a waiting thread or process if there is any

If the semaphore is bounded, then the value of its counter cannot exceed its initial value - if the value exceeds its initial value, an error should exceed its value.

Semaphore also keeps a queue to manage the waiting threads or processes - it might follow the FIFO ordering or randomly pick an element from the queue.

Now we have a rough impression of how semaphore is, let’s check out how it is implemented first and then how it is applied to solve concurrency problems.

Implementation of semaphore

This is my implementation based on the definition of semaphore, but it is just a demo for demonstration since it is inefficient to handle threads in this way.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
public class Semaphore {

private int count;

private Queue<Thread> queue;

public Semaphore(int n) {
this.count = n;
queue = new LinkedList<>();
}

public void p() {
count--;
if (count < 0) {
Thread thread = Thread.currentThread();
queue.add(thread);
block(thread);
}
}

private void block(Thread thread) {
// this is not a good way of blocking a thread
try {
while (true) {
System.out.println("Being blocked");
thread.sleep(2000);
}
} catch (InterruptedException e) {
}
}

public void v() {
count++;
if (!queue.isEmpty()) {
Thread thread = queue.poll();
thread.interrupt();
}
}
}

Semaphore implemented in Java concurrent package

The class Semaphore is implemented as a wraper of a subclass of AbstractQueuedSynchronizer. It uses AbstractQueuedSynchronizer state to represent the number of permits.

I have simplified the implementation here for demonstration.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
public class Semaphore {
// comply with the FIFO ordering
private final FairSync sync;
abstract statck class FairSync extends AbstractQueuedSynchronizer {
FairSync(int permits) {
setState(permits);
}

protected int tryAcquireShared(int acquires) {
for (;;) {
if (hasQueuedPredecessors())
return -1;
int available = getState();
int remaining = available - acquires;
if (remaining < 0 || compareAndSetState(available, remaining))
return remaining;
}
}

protected final boolean tryReleaseShared(int releases) {
for (;;) {
int current = getState();
int next = current + releases;
if (next < current) // overflow
throw new Error("Maximum permit count exceeded");
if (compareAndSetState(current, next))
return true;
}
}
}

public Semaphore(int permits) {
sync = new FairSync(permits);
}

public void acquire() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}

public void release() {
sync.releaseShared(1);
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
public abstract class AbstractQueuedSynchronizer {

public final void acquireSharedInterruptibly(int arg) throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}

private void doAcquireSharedInterruptibly(int arg) throws InterruptedException {
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}

public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}

private void doReleaseShared() {
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
unparkSuccessor(h);
}
else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
if (h == head) // loop if head changed
break;
}
}
}

Semaphore implemented in Python threading module

This implementation uses a condition variable to block threads until they are notified by another thread.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
class Semaphore:
def __init__(self, value=1):
if value < 0:
raise ValueError("semaphore initial value must be >= 0")
self._cond = Condition(Lock())
self._value = value

def acquire(self, blocking=True, timeout=None):
if not blocking and timeout is not None:
raise ValueError("can't specify timeout for non-blocking acquire")
rc = False
endtime = None
with self._cond:
while self._value == 0:
if not blocking:
break
if timeout is not None:
if endtime is None:
endtime = _time() + timeout
else:
timeout = endtime - _time()
if timeout <= 0:
break
self._cond.wait(timeout)
else:
self._value -= 1
rc = True
return rc

def release(self):
with self._cond:
self._value += 1
self._cond.notify()

Application of semaphore

There are two major applications of semaphore:

  1. protecting critical section
  2. signal - waking other waiting threads or processes

Protecting critical section

Scenario I. Increase a counter by 1 if a thread is started. By introducing semaphore, only one thread can update the value of count at a time.

1
2
3
4
5
6
7
8
9
10
11
12
// Demo written in Java
Semaphore semaphore = new Semaphore(1);
final MutableInteger count = new MutableInteger(0); // self-defined class
for (int i = 0; i < 20; i++) {
new Thread(new Runnable() {
@Override public void run() {
semaphore.acquire();
count.setValue(count.getValue() + 1);
semaphore.release();
}
}).start();
}

Signal

Scenario I. Thread1 can execute eventA only if Thread2 has executed eventB. How can Thread1 know whether Thread2 executes eventB? With the help of semaphore, Thread1 is allowed to be blocked, waiting for signal from Thread2.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
// Demo written in Java
Semaphore semaphore = new Semaphore(0);
new Thread(new Runnable() {
@Override public void run() {
semaphore.acquire();
eventA();
}
}).start(); // Thread1
new Thread(new Runnable() {
@Override public void run() {
eventB();
semaphore.release();
}
}).start(); // Thread2

Scenario II. Every thread runs the following code:

1
2
eventA();
eventB();

Any thread cannot execute eventB unless all threads have executed eventA.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
// Demo written in Java
Semaphore signalSemaphore = new Semaphore(0);
Semaphore countSemaphore = new Semaphore(1);
final MutableInteger count = new MutableInteger(0);
for (int i = 0; i < 10; i++) {
new Thread(new Runnable() {
@Override public void run() {
eventA();
countSemaphore.acquire();
int value = count.getValue() + 1;
count.setValue(value);
if (count == 10) {
signalSemaphore.release();
}
countSemaphore.release();

signalSemaphore.acquire();
signalSemaphore.release();

eventB();
}
})
}

Scenario III. The main thread cannot proceed until all the subthreads finish their tasks.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
// Demo written in Java
Semaphore signalSemaphore = new Semaphore(0);
Semaphore countSemaphore = new Semaphore(1);
final MutableInteger count = new MutableInteger(0);
for (int i = 0; i < 10; i++) {
new Thread(new Runnable() {
@Override public void run() {
eventA();
countSemaphore.acquire();
count.setValue(value);
if (count == count.getValue() + 1) {
signalSemaphore.release();
}
countSemaphore.release();
}
})
}

signalSemaphore.require();
eventB();

Java concurrent package has provided a class CountDownLatch for solving such a problem. Here is a demo of the solution:

1
2
3
4
5
6
7
8
9
10
11
12
13
final CountDownLatch latch = new CountDownLatch(10);
final MutableInteger count = new MutableInteger(0);
for (int i = 0; i < 10; i++) {
new Thread(new Runnable() {
@Override public void run() {
count.setValue(count.getValue() + 1);
latch.countDown();
}
}).start();
}

latch.await();
System.out.println(count.getValue());

Similar to the class Semaphore, CountDownLatch encapsulates a class which controls synchronization by extending AbstractQueuedSynchronizer, but its implementation does not rely on semaphore.
CountDownLatch is initialized with the value of count. When the method countDown() is called, it reduces the count value by 1 (It’s a bit weird to overwrite tryReleaseShared() by using decrement, opposite to the original meaning of release) and sends out signal when the value is changed to zero. When await() is called, it checks the value of count, the current thread will be blocked if the value is not larger than zero.

References

1 The Little Book of Semaphores