1 Writer, Many Readers

Hello,

Programming-related question: lets imagine we have an "atomic" variable and "atomic" functions (increment, decrement, compare). "Atomic" means that they (functions) can guarantee you consistency of the variable (we assume multitask environment).

What I want is to create a sync object, sort of mutex or semaphore which can be acquired multiple times by readers, but allow only one writer:

syncobject_lock(&obj, LOCK_FOR_READING);
syncobject_lock(&obj, LOCK_FOR_WRITING);
syncobject_unlock(&obj, ...);

So different tasks can acquire READING lock at the same time without blocking. WRITING lock is more restrictive: if object is locked for writing, than any readers or writers will be blocked if they try to acquire lock.

The question:

How to implement that using basic atomic operations (++, --, OR, XOR and AND) :slight_smile:

Thank you!

It's not clear what that means :frowning:
A single dedicated writer or one selectable out of many?

What makes you believe that those operations are atomic?

It's not clear what you mean by "lock" for either reading or writing. Neither operation requires locking for any appreciable amount of time ... just long enough to atomically copy the subject data to a local version or vice versa.

A multi-tasking environment presumes an operating system of some sort and context switching.

On a single-core processor, you can simply disable interrupts for just long enough to prevent a context switch while the code atomically copies the subject data to a local version or vice versa.

In a multi-core, multi-tasking configuration, the OS will most certainly have a feature to allow variable consistency across all cores and all tasks. For example, ESP32 with FreeRTOS has portENTER_CRITICAL.

Finally, consider the facilties provided by std::atomic.

most multi-tasking operating systems have a mechanism to share variables between tasks.

One exclusive writer.

A bit more explanation:

I have lists. Single-linked lists, which are processed (read, traversing) by multiple tasks. Reader tasks. Rarely another task (Writer), goes through lists and apply some changes to them (i.e. add or remove entries).

So what I want is a mutex, which can be aquired multiple times for reading, or once exclusively, for writing.

If I just use a simple mutex or critical section, then one reader enters critical section and second reader will be blocked. Same for mutexes.

doesn’t FreeRTOS have a mutex?

on esp32 check into the xQueue..
here's an example 2 producers 1 consumer..

have fun.. ~q

Maybe something like this. The Critical Sections only block long enough to determine if read or write access will be granted.

Read access is granted as long as no task has write access.

Write access is only granted if no tasks have read or write access.

#include "Arduino.h"
#include <vector>
#include <algorithm>

TaskHandle_t writingTask { 0 };
std::vector<TaskHandle_t> readingTasks;
portMUX_TYPE accessControl = portMUX_INITIALIZER_UNLOCKED;

bool getResourceForReading();
void releaseResourceForReading();
bool getResourceForWriting();
void releaseResourceForWriting();

void setup() {
	// Spool up the tasks that will want read and/or write access to resource here

}

bool getResourceForReading() {
	bool accessGranted { false };

	taskENTER_CRITICAL(&accessControl);
	if (writingTask == 0) {
		accessGranted = true;
		TaskHandle_t currentTask { xTaskGetCurrentTaskHandle() };
		if (std::find(readingTasks.begin(), readingTasks.end(), currentTask) == readingTasks.end()) {
			readingTasks.emplace_back(currentTask);
		}
	}
	taskEXIT_CRITICAL(&accessControl);
	return accessGranted;
}

void releaseResourceForReading() {
	taskENTER_CRITICAL(&accessControl);
	TaskHandle_t currentTask { xTaskGetCurrentTaskHandle() };
	auto it = std::find(readingTasks.begin(), readingTasks.end(), currentTask);
	if (it != readingTasks.end()) {
		readingTasks.erase(it);
	}
	taskEXIT_CRITICAL(&accessControl);
}

bool getResourceForWriting() {
	bool accessGranted { false };
	taskENTER_CRITICAL(&accessControl);
	if ((readingTasks.size() == 0) && (writingTask == 0)) {
		accessGranted = true;
		writingTask = xTaskGetCurrentTaskHandle();
	}
	taskEXIT_CRITICAL(&accessControl);
	return accessGranted;
}

void releaseResourceForWriting() {
	taskENTER_CRITICAL(&accessControl);
	if (writingTask == xTaskGetCurrentTaskHandle()) {
		writingTask = 0;
	}
	taskEXIT_CRITICAL(&accessControl);
}

void loop() {

}

Now I have some pseudo-code which should work but it itself relies on binary semaphore and on a mutex ;-/. Should be doable in a more simple way

typedef struct {
  mutex_t    mcnt; // simple mutex to protect /cnt/
  int         cnt; // -1=write_lock, 0=unlocked, >0 reader_lock
  semaphore_t sem; // binary semaphore, acts like blocking object
} rwlock_t;

void writer_lock(rwlock_t *rw) {
  // Wait for READER and WRITER unlock, yielding 
  do {
    mutex_lock(rw->mcnt);
    if (rw->cnt == 0)
      break;
    mutex_unlock(rw->mcnt);
    os_scheduler_yield(); // Let WRITER/READER task to do its job
  } while( true );

  rw->cnt--; //  rw->cnt = -1
  semaphore_lock(rw->sem);
  mutex_unlock(rw->mcnt);
}

void writer_unlock(rwlock_t *rw) {

  mutex_lock(rw->mcnt);

  if (rw->cnt >= 0)
    abort();

  rw->cnt++; //rw->cnt = 0;

  semaphore_unlock(rw->sem);

  mutex_unlock();
}

void reader_lock(rwlock_t *rw) {

  // Wait for WRITER unlock, yielding 
  do {

    mutex_lock(rw->mcnt);

    if (rw->cnt >= 0)
      break;

    mutex_unlock(rw->mcnt);
    os_scheduler_yield(); // Let WRITER task to do its job
  } while( true );

  if (!rw->cnt)
    semaphore_lock(rw->sem);

  rw->cnt++;
  mutex_unlock(rw->mcnt);
}

void reader_unlock(rwlock_t *rw) {

  mutex_lock(rw->mcnt);

  if (rw->cnt <= 0)
    abort();

  if (--rw->cnt == 0)
    semaphore_unlock(rw->sem);

  mutex_unlock();
}

Here's the code from Post #10 modified to use a mutex instead of a critical section. The later would only be needed if access to the structures was needed by ISR code.

#include "Arduino.h"
#include <vector>
#include <algorithm>

TaskHandle_t writingTask { 0 };
std::vector<TaskHandle_t> readingTasks;
SemaphoreHandle_t accessMutex;
bool getResourceForReading();
void releaseResourceForReading();
bool getResourceForWriting();
void releaseResourceForWriting();

void setup() {
	accessMutex = xSemaphoreCreateMutex();
	assert((accessMutex != NULL) && "Couldn't create mutex");
	xSemaphoreGive(accessMutex);

	// Spool up the tasks that will want read and/or write access to resource here

}

bool getResourceForReading() {
	bool accessGranted { false };
	xSemaphoreTake(accessMutex, portMAX_DELAY);
	if (writingTask == 0) {
		accessGranted = true;
		TaskHandle_t currentTask { xTaskGetCurrentTaskHandle() };
		if (std::find(readingTasks.begin(), readingTasks.end(), currentTask) == readingTasks.end()) {
			readingTasks.emplace_back(currentTask);
		}
	}
	xSemaphoreGive(accessMutex);
	return accessGranted;
}

void releaseResourceForReading() {
	xSemaphoreTake(accessMutex, portMAX_DELAY);
	TaskHandle_t currentTask { xTaskGetCurrentTaskHandle() };
	auto it = std::find(readingTasks.begin(), readingTasks.end(), currentTask);
	if (it != readingTasks.end()) {
		readingTasks.erase(it);
	}
	xSemaphoreGive(accessMutex);
}

bool getResourceForWriting() {
	bool accessGranted { false };
	xSemaphoreTake(accessMutex, portMAX_DELAY);
	if ((readingTasks.size() == 0) && (writingTask == 0)) {
		accessGranted = true;
		writingTask = xTaskGetCurrentTaskHandle();
	}
	xSemaphoreGive(accessMutex);
	return accessGranted;
}

void releaseResourceForWriting() {
	xSemaphoreTake(accessMutex, portMAX_DELAY);
	if (writingTask == xTaskGetCurrentTaskHandle()) {
		writingTask = 0;
	}
	xSemaphoreGive(accessMutex);
}

void loop() {

}

The same applies to write access. Every reader/writer will block all other attempts to read or write the protected data.

You may succeed if you add atomic INC/DEC instructions. A requestor increments an counter and if it is 1 he has acquired exclusive access.

Nope. All requests for read access will succeed as long as no task has write access, meaning multiple tasks may have read access simultaneously. And, write access is only granted if no tasks have read or write access.

Another question is : lets imagine that we have 3 readers which obtain/release reader lock in a such way so there is no window in between. That would mean that writer task will never obtain writer lock. So it should be something: "block further reader lock requests if we have writer task blocking as well"

Complicated.

Currently I have implemented this rwlock_t object via one critical section, one counter and one biniary semaphore (FreeRTOS).

But it looks overcomplicated for such a simple thing as rwlock

True there would need to be some cooperation between the tasks. Especially, no task should hold on to its read (or write) access longer than it needs too.

I think your whole architecture is overcomplicated. Instead of giving all tasks potential access to the databases themselves, I'd have a single task that owns them. Read / write requests to the database from other tasks could be through a single queue to be serviced by the owning task. Results of reads could be pushed into individual queues (one per reading task) by the owner for the requesting task to pick up.

Why complicated?

The writer task sets a flag that denies all reader tasks to obtain further locks (suspend). On the last reader unlock the writer can do its job and reset the flag and resume the suspended readers.

Again check for or implement something like xQueues.

As I mentioned in Post #12, Critical Sections are only required if ISR code needs to take part in acquiring/sharing the resource. Otherwise, a regular mutex will suffice, and likely be lighter weight.

If you are on C++17 you can use std::shared_mutex, otherwise you can usepthread_rwlock_t.

This is good idea. Going to implement it :). It was a surprise that FreeRTOS does not have RWlocks.