FreeRTOS semaphores are actually queues. At least binary semaphor e is.
I thought FreeRTOS internally uses critical sections to implement mutex operation: they somehow should protect access to internal mutex structures within the kernel. Especially on multicore systems, like ESP32.
Isn't critical section faster than mutex?
First iteration. Readers can block writer for ever, but we are working on it
.
// //////////////////// Misc utility
#undef likely
#undef unlikely
#define unlikely(_X) __builtin_expect(!!(_X), 0)
#define likely(_X) __builtin_expect(!!(_X), 1)
static __attribute__((noreturn)) void
must_not_happen(const char *message, const char *file, int line) {
printf("Must not happen: %s, in\r\n%s:%d\r\n", message, file, line);
abort();
}
#define MUST_NOT_HAPPEN( ... ) do { \
if ( __VA_ARGS__ ) \
must_not_happen(#__VA_ARGS__, __FILE__, __LINE__ ); \
} while ( 0 )
// /////////////////// OS glue begin: ///////////////////
#define q_yield() vPortYield()
#define barrier_t portMUX_TYPE // Critical Section
#define sem_t SemaphoreHandle_t // Binary Semaphore
#define barrier_lock(_Name) portENTER_CRITICAL(&_Name)
#define barrier_unlock(_Name) portEXIT_CRITICAL(&_Name)
#define BARRIER_INIT portMUX_INITIALIZER_UNLOCKED // initializer for barrier_t
#define SEM_INIT NULL // initializer for sem_t
// Release binary semaphore
#define sem_unlock(_Name) \
do { \
if (likely(_Name != NULL)) \
xSemaphoreGive(_Name); \
} while( 0 )
// Create and/or lock
//
#define sem_lock(_Name) \
do { \
if (unlikely(_Name == NULL)) { \
_Name = xSemaphoreCreateBinary(); \
break; /* created locked */\
} \
if (likely(_Name != NULL)) \
while (xSemaphoreTake(_Name, portMAX_DELAY) == pdFALSE) { \
/* 1200 hours have passed. Repeat. */ \
} \
} while( 0 )
// Just for completeness.
#define sem_destroy(_Name) \
do { \
if (likely(_Name != NULL)) { \
vSemaphoreDelete(_Name); \
_Name = NULL; \
} \
} while ( 0 )
// /////////////////// OS glue end //////////////////////
// RW locks: 1 writer, many readers, writer does not preempt readers (writer can starve)
typedef struct {
barrier_t csec; // critical section to protect /cnt/
int cnt; // -1=write_lock, 0=unlocked, >0 reader_lock
sem_t sem; // binary semaphore, "shared" blocking object for writers
} rwlock_t;
#define RWLOCK_INIT { BARRIER_INIT, 0, SEM_INIT}
// Obtain exclusive (A "Writer") access.
// If there were readers or writers, this function will block on /rw->sem/ binary semaphore
// If there are 0 readers/writers, then we grab a binary semaphore /rw->sem/ and change /cnt/ to negative
// value meaning "Write" lock has been acquired
//
void rw_lockw(rwlock_t *rw) {
sem_lock(rw->sem);
barrier_lock(rw->csec);
MUST_NOT_HAPPEN(rw->cnt != 0);
rw->cnt--; // rw->cnt = -1
barrier_unlock(rw->csec);
}
// Writer unlock
void rw_unlockw(rwlock_t *rw) {
barrier_lock(rw->csec);
MUST_NOT_HAPPEN(rw->cnt >= 0);
rw->cnt++; //rw->cnt = 0;
barrier_unlock(rw->csec);
sem_unlock(rw->sem);
}
// Reader lock
void rw_lockr(rwlock_t *rw) {
// Wait for WRITER unlock, yielding
int cnt;
do {
barrier_lock(rw->csec);
if ((cnt = rw->cnt) >= 0)
break;
barrier_unlock(rw->csec);
q_yield(); // Let WRITER task to do its job
} while( true );
// Increment READERS count
rw->cnt++;
barrier_unlock(rw->csec);
// First of readers acquires rw->sem, so subsequent rw_lockw() will block immediately
if (!cnt)
sem_lock(rw->sem);
}
// Reader unlock
void rw_unlockr(rwlock_t *rw) {
int cnt;
barrier_lock(rw->csec);
cnt = --rw->cnt;
MUST_NOT_HAPPEN(cnt < 0);
barrier_unlock(rw->csec);
if (!cnt)
sem_unlock(rw->sem);
}
I believe it's more heavy weight. Additionally, it's a spin lock. So, if (non-ISR) code in one core is in a critical section and (non-ISR) code in the other core tries to enter a critical section with the same portMUX_TYPE, it will spin (totally lock up) the second core. With a mutex, the second task will enter the Blocked state so other tasks on that core can still run.
So, again, unless ISR code is involved with the sharing, stick with a mutex.
Next code iteration, hopefully crossplatform.
Uses 1 binary semaphore and C11's Atomics (supported by ESP32). Has "write-request" flag (to stop readers from queueing)
// RW locks.
// Many readers, 1 writer. Queued writers prevent new readers to obtain the lock
//
typedef struct {
_Atomic int wreq; // 1=writer blocking, do not accept new readers.
_Atomic int cnt; // Number of readers holding this lock or -1 (active writer holding lock)
sem_t sem; // Binary semaphore, used to block writers
} rwlock_t;
#define RWLOCK_INIT { 0, 0, NULL }
// Obtain exclusive ("Writer") access.
//
// If there are active readers or writers, this function will block on /rw->sem/,
// with /wr->wreq/ set to 1: wreq flags prevents new readers to obtain reader lock.
//
// Once /rw->sem/ is successfully acquired we change /rw->cnt/ to negative value
// meaning that write lock has been acquired.
//
void rw_lockw(rwlock_t *rw) {
rw->wreq++; // stop new readers from obtaining the lock
try_again:
sem_lock(rw->sem); // grab main sync object. If it is held by readers we simply block here
if (rw->cnt != 0) { // reader somehow sneaked in. redo.
sem_unlock(rw->sem);
q_yield();
goto try_again;
}
rw->cnt--; // i.e. rw->cnt = -1;
rw->wreq--;
}
// WRITE unlock
// /cnt/ is expected to be -1 (Write Lock). If it isn't then we have bugs in out RW code
//
void rw_unlockw(rwlock_t *rw) {
rw->cnt++; //i.e. rw->cnt = 0;
sem_unlock(rw->sem);
}
// Obtain reader lock
//
void rw_lockr(rwlock_t *rw) {
int cnt;
// Wait for active writers to finish their jobs.
// Let pending write requests to happen (writer-preferring)
while ((cnt = rw->cnt) < 0 || rw->wreq)
q_yield();
// New reader
rw->cnt++;
// First reader acquires /rw->sem/, last of readers releases /rw->sem/.
if (!cnt)
sem_lock(rw->sem);
}
// Reader unlock
//
void rw_unlockr(rwlock_t *rw) {
MUST_NOT_HAPPEN(rw->cnt < 1);
// Last reader releases semaphore
if (0 == --rw->cnt)
sem_unlock(rw->sem);
}
OS glue (FreeRTOS)
#include <Arduino.h>
#include <freertos/FreeRTOS.h>
#include <freertos/task.h>
#include <stdbool.h>
#include <stdint.h>
#include <stdatomic.h>
// /////////////////// OS glue begin: ///////////////////
#define MUST_NOT_HAPPEN( ... ) { if ( __VA_ARGS__ ) abort(); }
#define q_yield() vPortYield() // Force a context switch
#define sem_t SemaphoreHandle_t // Binary Semaphore
// Release binary semaphore
#define sem_unlock(_Name) { if (_Name != NULL) xSemaphoreGive(_Name); }
// Create and/or lock
#define sem_lock(_Name) \
do { \
if (_Name == NULL) { \
_Name = xSemaphoreCreateBinary(); \
break; /* created locked */\
} \
while (xSemaphoreTake(_Name, portMAX_DELAY) == pdFALSE) { /* 1200 hours have passed. Repeat. */ } \
} while( 0 )
Per @frameworklabs suggestion, I'd go with std::shared_mutex.
This is what I eventually did, thanks for the pointing out to std::atomic. Not C++, but plain C (C11). It works well on ESP32 under FreeRTOS.
Below is the final code.
// ****************************** OS glue begin *******************************
// May be I missed some includes. This code was cut from a live project, and I was not able to check if
// it compiles being extracted to a separate file
#include <Arduino.h>
#include <freertos/FreeRTOS.h>
#include <freertos/task.h>
#define q_yield() vPortYield() // Force a context switch
#define sem_t SemaphoreHandle_t // Binary semaphore type
// Unlock semaphore _Name
// Does nothing if semaphore is not initialized (created)
#define sem_unlock(_Name) { if (_Name != NULL) xSemaphoreGive(_Name); }
// Lock semaphore _Name.
// If semaphore was not created yet - it is created and then locked
#define sem_lock(_Name) \
do { \
if (_Name == NULL) { \
_Name = xSemaphoreCreateBinary(); \
break; /* created locked */\
} \
while (xSemaphoreTake(_Name, portMAX_DELAY) == pdFALSE) { /* 1200 hours have passed. Repeat. */ } \
} while( 0 )
// ****************************** OS glue end *******************************
// RW locks.
// Many readers, 1 writer. Write-preferring lock (queued writers prevent new readers to obtain the lock
// until all write requests have been processed
//
#include <stdatomic.h>
typedef struct {
_Atomic uint32_t wreq; // Number of pending write requests
_Atomic int cnt; // <0 : write_lock, 0: unlocked, >0: reader_lock
sem_t sem; // binary semaphore, acts like blocking object
} rwlock_t;
// Initializer: e.g. "rwlock_t a = RWLOCK_INIT;"
#define RWLOCK_INIT { 0, 0, 0}
// Obtain exclusive ("Writer") access.
//
// If there are active readers or writers, this function will block on /rw->sem/, and set "Write Request"
// flag which prevents new readers to obtain "reader" lock.
//
// If there are no readers/writers, then we grab a binary semaphore /rw->sem/ and change /cnt/ to negative
// value meaning "Write" lock has been acquired
//
void rw_lockw(rwlock_t *rw) {
// Set "Write Lock Request" flag before acquiring/rw->sem: this will stop new readers to queue
// while writer is blocking on /sem/
rw->wreq++; // TODO: check for overflow
try_again:
sem_lock(rw->sem); // grab main sync object. If it is held by readers we simply block here
// Normally, rw->cnt should be 0 at this point (no readers, no writers holding the lock)
// however under high load we can end up with non-zero value here: reader somehow sneaked in, spin & yield
if (rw->cnt != 0) {
sem_unlock(rw->sem);
q_yield();
goto try_again;
}
rw->cnt--; // i.e. rw->cnt = -1
rw->wreq--; // Clear "Write Request" flag
}
// "Write" unlock
// /cnt/ is expected to be -1 (Write Lock). If it isn't then we have bugs in out RW code
//
void rw_unlockw(rwlock_t *rw) {
rw->cnt++; //same as rw->cnt = 0;
sem_unlock(rw->sem);
}
// Obtain shared reader lock.
// Yield & spin if theres writer lock obtained already
//
// Dominant type of lock
//
// Probably a race condition,results in erroneous sem_lock(), but it is not critical,
// just one reader will be blocked as if he is writer
void rw_lockr(rwlock_t *rw) {
int cnt;
// Wait until there are no readers and no writers and no writelock request is queued
// Let "writer" task to do its job
//
while (atomic_load_explicit(&rw->cnt, memory_order_acquire) < 0 ||
atomic_load_explicit(&rw->wreq, memory_order_acquire) > 0)
q_yield();
// Atomically increment READERS count
cnt = atomic_fetch_add_explicit(&rw->cnt, 1, memory_order_acq_rel);
// First of readers acquires rw->sem, so subsequent rw_lockw() will block immediately
// If a concurrent rw_lockw() obtains /sem/ just after rw->cnt++ then we simply block here,
// for a tiny amount of time while rw_lockw() goes through its "try_again:"
//
if (!cnt)
sem_lock(rw->sem);
}
// Reader unlock
//
void rw_unlockr(rwlock_t *rw) {
if (atomic_fetch_sub(&rw->cnt, 1) == 1)
sem_unlock(rw->sem);
}
Here's an implementation using std::shared_mutex and RAII concepts. The output shows how pending writers are prioritized over reader requests that arrive while the resource is locked for writing by another task. It also shows two reader tasks having access at the same time.
#include <atomic>
#include <shared_mutex>
struct ControlBlock {
friend class WriteLocker;
friend class ReadLocker;
private:
std::shared_mutex accessMutex;
std::atomic_ulong queuedWriters {0};
};
class WriteLocker {
public:
WriteLocker(ControlBlock &controlBlock) : locker(controlBlock.accessMutex, std::defer_lock) {
controlBlock.queuedWriters++;
locker.lock();
controlBlock.queuedWriters--;
}
private:
std::unique_lock<std::shared_mutex> locker;
};
class ReadLocker {
public:
ReadLocker(ControlBlock &controlBlock) : locker(controlBlock.accessMutex, std::defer_lock) {
while (1) {
locker.lock();
if (controlBlock.queuedWriters == 0) {
break;
}
locker.unlock();
taskYIELD();
}
}
private:
std::shared_lock<std::shared_mutex> locker;
};
ControlBlock cb1;
void setup() {
Serial.begin(115200);
delay(3000);
Serial.println();
xTaskCreatePinnedToCore(readerTask, "Task 0", 1500, reinterpret_cast<void *>(0), 3, NULL, CONFIG_ARDUINO_RUNNING_CORE);
xTaskCreatePinnedToCore(writerTask, "Task 1", 1500, reinterpret_cast<void *>(1), 3, NULL, CONFIG_ARDUINO_RUNNING_CORE);
xTaskCreatePinnedToCore(readerTask, "Task 2", 1500, reinterpret_cast<void *>(2), 3, NULL, CONFIG_ARDUINO_RUNNING_CORE);
xTaskCreatePinnedToCore(readerTask, "Task 3", 1500, reinterpret_cast<void *>(3), 3, NULL, CONFIG_ARDUINO_RUNNING_CORE);
xTaskCreatePinnedToCore(writerTask, "Task 4", 1500, reinterpret_cast<void *>(4), 3, NULL, CONFIG_ARDUINO_RUNNING_CORE);
xTaskCreatePinnedToCore(writerTask, "Task 5", 1500, reinterpret_cast<void *>(5), 3, NULL, CONFIG_ARDUINO_RUNNING_CORE);
}
void loop() {
}
void readerTask(void *params) {
auto taskNumber {reinterpret_cast<uint32_t>(params)};
{
Serial.printf("%10lu: Task %lu waiting for Read Access\n", millis(), taskNumber);
ReadLocker lock(cb1);
Serial.printf("%10lu: Task %lu granted Read Access\n", millis(), taskNumber);
vTaskDelay(5000);
Serial.printf("%10lu: Task %lu relinquishing Read Access\n", millis(), taskNumber);
}
vTaskDelete(NULL);
}
void writerTask(void *params) {
auto taskNumber {reinterpret_cast<uint32_t>(params)};
{
Serial.printf("%10lu: Task %lu waiting for Write Access\n", millis(), taskNumber);
WriteLocker lock(cb1);
Serial.printf("%10lu: Task %lu granted Write Access\n", millis(), taskNumber);
vTaskDelay(5000);
Serial.printf("%10lu: Task %lu relinquishing Write Access\n", millis(), taskNumber);
}
vTaskDelete(NULL);
}
Serial Port Output:
3036: Task 0 waiting for Read Access
3036: Task 0 granted Read Access
3036: Task 1 waiting for Write Access
3036: Task 2 waiting for Read Access
3036: Task 3 waiting for Read Access
3036: Task 4 waiting for Write Access
3036: Task 5 waiting for Write Access
8036: Task 0 relinquishing Read Access
8036: Task 1 granted Write Access
13036: Task 1 relinquishing Write Access
13036: Task 4 granted Write Access
18036: Task 4 relinquishing Write Access
18036: Task 5 granted Write Access
23036: Task 5 relinquishing Write Access
23036: Task 2 granted Read Access
23036: Task 3 granted Read Access
28036: Task 2 relinquishing Read Access
28036: Task 3 relinquishing Read Access
The only thing that scares me that the locker gets unlocked as soon as it goes out of the scope. Results in a bit confusing code :).
auto taskNumber {reinterpret_cast<uint32_t>(params)};
{
Serial.printf("%10lu: Task %lu waiting for Read Access\n", millis(), taskNumber);
ReadLocker lock(cb1);
Serial.printf("%10lu: Task %lu granted Read Access\n", millis(), taskNumber);
vTaskDelay(5000);
Serial.printf("%10lu: Task %lu relinquishing Read Access\n", millis(), taskNumber);
}
That's intentional, and the core concept of the RAII paradigm. Read up on it and see how the locker is used in the code I posted.
It is about abusing constructors and destructors? I program mainly in C. But yeah, interesting idea. Makes your code shorter. But it may be hard to read and follow ![]()
No, it's about the paradigm that an object owns a resource during its entire lifetime and ensuring the resource is automatically released at the end of that lifetime. This prevents resource leaks.
It's a widely-used construct in C++ programming. Another example is the use of smart pointers (eg std::unique_ptr).
My code above failed badly under high load.
Fixed version which now has no raceconds is below. Can be used as is (save this code as rwlock.c in your sketch directory) in Arduino projects
It is a generic version that compiles and works on Xtensa (ESP32S3) and should work on other architectures as long as FreeRTOS is present. For other RTOSes one can replace FreeRTOS calls (there are few of them: SemaphoreCreate, Take, Give and Yield)
Pure lockless version showed worse perfomance (overall) than binary semaphore version.
(i.e. spinloop with yield() is worse than taking a locked semaphore)
// -- Readers/Writer lock --
//
// Implements the classic "many readers, one writer" scheme - write-preferring RW locks for FreeRTOS.
// Synchronization is lockless, but we also use a binary semaphore to make writers wait:
// instead of spinning in a vPortYield() loop, we take the semaphore, effectively removing
// our taskfrom the scheduler's "active" list.
//
// RWLock rules:
//
// 1. Queued write requests (/rw->wreq/) prevent new readers from acquiring the lock
// (readers will block on /rw->sem/).
// 2. Queued writers block on the semaphore if there are active readers or other writers.
//
// Can be easily adjusted for other multitasking environments
// by replacing calls to sem_lock(), sem_unlock() and vPortYield()
//
//
// Initialize RWLock:
// rwlock_t rw = RWLOCK_INITIALIZER_UNLOCKED;
//
// Reader tasks:
// ....
// rw_lockr(&rw);
// do_something();
// rw_unlockr(&rw);
//
// Writer tasks:
// ....
// rw_lockw(&rw);
// do_something();
// rw_unlockw(&rw);
#include <stdatomic.h>
#include <stdint.h>
#include <freertos/FreeRTOS.h>
#include <freertos/semphr.h>
#include <freertos/task.h>
// RWLock type:
// Declare and initialize: "static rwlock_t my_lock = RWLOCK_INITIALIZER_UNLOCKED;"
//
typedef struct {
_Atomic uint32_t wreq; // Number of pending write requests (soft-stop for new readers)
_Atomic int cnt; // <0 : write_lock, 0: unlocked, >0: reader_lock
SemaphoreHandle_t sem; // binary semaphore, acts like blocking object
} rwlock_t;
#define RWLOCK_INITIALIZER_UNLOCKED { 0, 0, NULL }
#undef likely
#undef unlikely
#define unlikely(_X) __builtin_expect(!!(_X), 0)
#define likely(_X) __builtin_expect(!!(_X), 1)
// Acquire the semaphore. Creates locked semaphore if not yet created
//
#define sem_lock(_Name) { \
if (unlikely(_Name == NULL)) \
_Name = xSemaphoreCreateBinary(); \
else \
while (xSemaphoreTake(_Name, portMAX_DELAY) == pdFALSE) { } \
}
// Release the semaphore
//
#define sem_unlock(_Name) { \
if (likely(_Name != NULL)) \
xSemaphoreGive(_Name); \
}
// void rw_lockw(rwlock_t *rw);
//
// Obtain exclusive (i.e. "writer") access.
//
// If there are active readers or writers, this function blocks on /rw->sem/ and signals
// that further read requests must be postponed (via /rw->wreq++/).
// If there are no readers or writers, we grab the binary semaphore /rw->sem/ and set the /cnt/
// to a negative value, meaning that a write lock has been acquired.
//
void rw_lockw(rwlock_t *rw) {
atomic_fetch_add_explicit(&rw->wreq, 1, memory_order_release); // Signal write intent.
sem_lock(rw->sem); // Acquire the semaphore (or block).
atomic_store_explicit(&rw->cnt, -1, memory_order_release); // Mark an exclusive writer
atomic_fetch_sub_explicit(&rw->wreq, 1, memory_order_release); // Clear our write intent.
}
// void rw_unlockw(rwlock_t *rw)
//
// Release the exclusive ("writer") lock previously acquired with rw_lockw(rwlock_t *).
//
// TODO: in DEBUG, verify that /cnt/ == -1 (itβs expected to be -1 at this point)
//
void rw_unlockw(rwlock_t *rw) {
atomic_store_explicit(&rw->cnt, 0, memory_order_release); // /cnt/ == "no readers, no writers"
sem_unlock(rw->sem); // Unblock pending reader and/or writer
}
// void rw_lockr(rwlock_t *rw)
//
// Obtain a shared, non-exclusive ("reader") lock.
// The first reader also grabs the main sync object to block additional writers (and readers).
//
// This is the most frequently used lock type.
//
void rw_lockr(rwlock_t *rw) {
int cnt;
while ( true ) {
// Spinloop here if we have active writer or we see writing intent
//
if (0 < atomic_load_explicit(&rw->wreq, memory_order_acquire) ||
0 > (cnt = atomic_load_explicit(&rw->cnt, memory_order_acquire))) {
// let other tasks do their job and retry
vPortYield();
continue;
}
// Try to increment readers count
if (atomic_compare_exchange_weak_explicit(&rw->cnt, &cnt, cnt + 1, memory_order_acq_rel, memory_order_relaxed)) {
// First reader grabs semaphore
if (cnt == 0)
sem_lock(rw->sem);
break;
}
}
}
// void rw_unlockr(rwlock_t *rw);
//
// Reader unlock. Last reader releases the semaphore
//
void rw_unlockr(rwlock_t *rw) {
// If we were the last reader -> release the semaphore
if (atomic_fetch_sub_explicit(&rw->cnt, 1, memory_order_acq_rel) == 1)
sem_unlock(rw->sem);
}
Seems kind of convoluted and considerably more complex than the code in Post #28.
You use c++ library which hides all low-level stuff. Both are ok, but I bet my code is faster and smaller :). Plus to that I am not a C++ programmer.