Condition Variables in C++
Prerequisites: std::thread, std::mutex, std::unique_lock, lambda predicates.
Standard: C++11 introduced std::condition_variable. It always works in conjunction with a std::mutex (via std::unique_lock).
using namespace std::chrono_literals;Watch out: always check the condition in a while loop (or use the predicate overload of wait()). Spurious wakeups can occur -- the thread may wake without notify being called. -----------------------------------------------
class MessageQueue {
std::queue<std::string> queue_;
mutable std::mutex mtx_;
std::condition_variable cv_;
bool done_ = false;
public:
// Producer: add a message and notify one waiting consumer
void push(std::string msg) {
{
std::lock_guard lock(mtx_);
queue_.push(std::move(msg));
}
// Notify AFTER releasing the lock for better performance
cv_.notify_one();
}
// Consumer: wait until a message is available
// Returns empty string when done
std::string pop() {
std::unique_lock lock(mtx_);
// Wait with a predicate to handle spurious wakeups.
// The lambda is checked on each wakeup; we only proceed
// when the queue is non-empty OR we're done.
cv_.wait(lock, [this] { return !queue_.empty() || done_; });
if (queue_.empty()) return {}; // Shutdown signal
std::string msg = std::move(queue_.front());
queue_.pop();
return msg;
}
// Signal that no more messages will be produced
void shutdown() {
{
std::lock_guard lock(mtx_);
done_ = true;
}
cv_.notify_all(); // Wake up ALL waiting consumers
}
};2. Bounded buffer (blocks producer when full)
Uses two condition variables: one for "not full", one for "not empty". AND when calling wait(), but you should NOT hold it when calling notify_one()/notify_all() (for performance).
Watch out: you must hold the mutex when modifying the shared state
template<typename T, std::size_t Capacity>
class BoundedBuffer {
T buffer_[Capacity];
std::size_t head_ = 0, tail_ = 0, count_ = 0;
std::mutex mtx_;
std::condition_variable not_full_;
std::condition_variable not_empty_;
public:
void put(T item) {
std::unique_lock lock(mtx_);
// Block until there's room
not_full_.wait(lock, [this] { return count_ < Capacity; });
buffer_[tail_] = std::move(item);
tail_ = (tail_ + 1) % Capacity;
++count_;
lock.unlock();
not_empty_.notify_one();
}
T take() {
std::unique_lock lock(mtx_);
// Block until there's data
not_empty_.wait(lock, [this] { return count_ > 0; });
T item = std::move(buffer_[head_]);
head_ = (head_ + 1) % Capacity;
--count_;
lock.unlock();
not_full_.notify_one();
return item;
}
};3. One-time event notification (like a gate/barrier)
Threads wait until a signal is given, then all proceed. notification is not "saved" -- but the predicate (open_ == true) ensures late-arriving threads see the gate is already open.
Watch out: if you notify_all() before any thread calls wait(), the
class Gate {
std::mutex mtx_;
std::condition_variable cv_;
bool open_ = false;
public:
void open() {
{
std::lock_guard lock(mtx_);
open_ = true;
}
cv_.notify_all();
}
void wait() {
std::unique_lock lock(mtx_);
cv_.wait(lock, [this] { return open_; });
}
};
int main() {
// ---- Producer-Consumer ----
std::cout << "--- Producer-Consumer ---\n";
MessageQueue mq;
// Consumer thread
std::thread consumer([&mq] {
while (true) {
std::string msg = mq.pop();
if (msg.empty()) break; // Shutdown
std::cout << std::format("Received: {}\n", msg);
}
std::cout << "Consumer done\n";
});
// Producer: send some messages
for (int i = 1; i <= 5; ++i) {
mq.push(std::format("Message #{}", i));
std::this_thread::sleep_for(50ms);
}
mq.shutdown();
consumer.join();
// ---- Bounded Buffer ----
std::cout << "\n--- Bounded Buffer (capacity=3) ---\n";
BoundedBuffer<int, 3> buffer;
std::thread producer([&buffer] {
for (int i = 1; i <= 8; ++i) {
buffer.put(i);
std::cout << std::format("Produced: {}\n", i);
}
});
std::thread consumer2([&buffer] {
for (int i = 0; i < 8; ++i) {
std::this_thread::sleep_for(30ms); // Slow consumer
int val = buffer.take();
std::cout << std::format("Consumed: {}\n", val);
}
});
producer.join();
consumer2.join();
// ---- Gate / Barrier ----
std::cout << "\n--- Gate (start signal) ---\n";
Gate gate;
auto worker = [&gate](int id) {
std::cout << std::format("Worker {} waiting for start signal...\n", id);
gate.wait();
std::cout << std::format("Worker {} started!\n", id);
};
std::thread w1(worker, 1);
std::thread w2(worker, 2);
std::thread w3(worker, 3);
std::this_thread::sleep_for(100ms);
std::cout << "Opening the gate!\n";
gate.open(); // All workers proceed
w1.join();
w2.join();
w3.join();
return 0;
}