Runtime and thread_pool mostly done

This commit is contained in:
2025-06-20 23:50:54 +01:00
parent 0be61851a4
commit 2ee6960223
7 changed files with 154 additions and 27 deletions

View File

@ -1,10 +1,12 @@
#pragma once #pragma once
typedef void (*system_tick_t)(int type); typedef void (*system_tick_t)();
void runtime_init(); void runtime_init();
void runtime_loop(); void runtime_loop();
void runtime_destroy(); void runtime_destroy();
int runtime_callback_single(system_tick_t callback, long delay); void runtime_request_stop();
int runtime_callback_many(system_tick_t callback, long period, long phase);
void runtime_callback_single(system_tick_t callback, long delay);
void runtime_callback_many(system_tick_t callback, long period, long phase);

4
include/systems/status.h Normal file
View File

@ -0,0 +1,4 @@
#pragma once
extern long status_last_tick_usec;
extern long status_avg_tick_usec;

View File

@ -1,5 +1,7 @@
#pragma once #pragma once
#include <stddef.h>
typedef struct thread_pool_t thread_pool_t; typedef struct thread_pool_t thread_pool_t;
thread_pool_t *thread_pool_create(size_t thread_count); thread_pool_t *thread_pool_create(size_t thread_count);

View File

@ -1,10 +1,10 @@
CC=gcc CC=gcc
C_FLAGS=-Wall -Wextra -pedantic -Wno-unknown-pragmas -g -DDEBUG C_FLAGS=-Wall -Wextra -pedantic -Wno-unknown-pragmas -g -DDEBUG -DSUS_TARGET_VERSION=10000
DIR_SRC=src DIR_SRC=src
DIR_INC=include DIR_INC=include
DIR_BUILD=build DIR_BUILD=build
DEPS=dl pthread m atomic pigpio rt sus DEPS=dl pthread m atomic rt sus
OUTBIN=$(DIR_BUILD)/bin/main OUTBIN=$(DIR_BUILD)/bin/main

View File

@ -1,10 +1,22 @@
#include "runtime.h" #include "runtime.h"
#include <sys/time.h>
#include <time.h>
#include <sus/hashtable.h> #include <sus/hashtable.h>
#include <sus/hashes.h> #include <sus/hashes.h>
#include <sus/ivector.h> #include <sus/ivector.h>
#include "thread_pool.h" #include "thread_pool.h"
#include "systems/status.h"
#define THREAD_POOL_SIZE 16
#define TICKS_P_SEC 100
#define TICK_USEC 10000
#define USEC_TO_SEC 1000000
typedef struct recurring_callback_t { typedef struct recurring_callback_t {
long period; long period;
@ -19,22 +31,33 @@ static long _tick;
static hashtable_t *_future_callbacks; static hashtable_t *_future_callbacks;
//ivector_t<recurring_callback_t> //ivector_t<recurring_callback_t>
static ivector_t *_recurring_callbacks; static ivector_t *_recurring_callbacks;
static thread_pool_t *callback_pool; static thread_pool_t *_callback_pool;
static int _should_run = 0;
static long _sum_tick_usec;
static void *callback_entry(void *func)
{
system_tick_t callback = (system_tick_t)func;
callback();
return NULL;
}
static void register_callback(long delay, system_tick_t callback) static void register_callback(long delay, system_tick_t callback)
{ {
long tick = delay + _tick; long tick = delay + _tick;
ivector_t *vector = hashtable_get(_future_callbacks, tick); ivector_t *vector = hashtable_get(_future_callbacks, (void*)tick);
if (!vector) if (!vector)
{ {
//ivector<system_tick_t>
vector = ivector_create(sizeof(sizeof(system_tick_t))); vector = ivector_create(sizeof(sizeof(system_tick_t)));
hashtable_add(_future_callbacks, tick, vector); hashtable_add(_future_callbacks, (void*)tick, vector);
} }
ivector_append(vector, callback); ivector_append(vector, (void*)callback);
} }
static void handle_callbacks(ivector_t *callbacks) static void handle_callbacks(ivector_t *callbacks)
@ -42,8 +65,8 @@ static void handle_callbacks(ivector_t *callbacks)
size_t count = ivector_get_count(callbacks); size_t count = ivector_get_count(callbacks);
for (size_t i = 0; i < count; ++i) for (size_t i = 0; i < count; ++i)
{ {
system_tick_t *tick = ivector_get(callbacks, i); system_tick_t callback = ivector_get(callbacks, i);
//TODO: Dispatch work thread_pool_run(_callback_pool, callback_entry, (void*)callback);
} }
} }
@ -62,39 +85,97 @@ static void register_recurring_callbacks()
} }
} }
void runtime_init() void runtime_init()
{ {
_tick = 0; _tick = 0;
_future_callbacks = hashtable_create(hash_ptr, compare_ptr); _future_callbacks = hashtable_create(hash_ptr, compare_ptr);
_recurring_callbacks = ivector_create(sizeof(recurring_callback_t)); _recurring_callbacks = ivector_create(sizeof(recurring_callback_t));
_callback_pool = thread_pool_create(THREAD_POOL_SIZE);
_should_run = 1;
} }
void runtime_loop() void runtime_loop()
{ {
struct timeval before;
gettimeofday(&before, NULL);
//Runtime loop //Runtime loop
//Ticks every 10ms //Ticks every 10ms
//Systems request X tick delay callbacks //Systems request X tick delay callbacks
//Callbacks executed on separate threads //Callbacks executed on separate threads
while (_should_run)
{
//Handle pending callbacks and register recurring //Handle pending callbacks and register recurring
ivector_t *callbacks = hashtable_get(_future_callbacks, _tick); ivector_t *callbacks = hashtable_get(_future_callbacks, (void*)_tick);
if (callbacks) if (callbacks)
{ {
handle_callbacks(callbacks); handle_callbacks(callbacks);
hashtable_remove(_future_callbacks, _tick, NULL, NULL); hashtable_remove(_future_callbacks, (void*)_tick, NULL, NULL);
ivector_destroy(callbacks); ivector_destroy(callbacks);
} }
register_recurring_callbacks(); register_recurring_callbacks();
//Advance tick //Advance tick
++_tick; ++_tick;
//TODO: Delay and loop //Delay and performance eval
struct timeval now;
gettimeofday(&now, NULL);
long elapsed_usec = now.tv_usec - before.tv_usec;
elapsed_usec += (now.tv_sec - before.tv_sec) * USEC_TO_SEC;
//Update performance metrics
status_last_tick_usec = elapsed_usec;
_sum_tick_usec += elapsed_usec;
if (_tick % TICKS_P_SEC == 0)
{
status_avg_tick_usec = _sum_tick_usec / TICKS_P_SEC;
_sum_tick_usec = 0;
}
//Skip delay if needed
if (elapsed_usec >= TICK_USEC)
{
//TODO: Log tick lag
goto tick_cycle_end;
}
struct timespec target_sleep = { .tv_sec = 0, .tv_nsec = TICK_USEC - elapsed_usec };
while (nanosleep(&target_sleep, &target_sleep)); //Sleep until target reached
tick_cycle_end:
//Reference last cycle end instead of start of next one
//Should reduce tick drift
before = now;
}
} }
void runtime_destroy() void runtime_destroy()
{ {
hashtable_destroy_free(_future_callbacks, NULL, ivector_destroy); _should_run = 0;
hashtable_destroy_free(_future_callbacks, NULL, (void(*)(void*))ivector_destroy);
ivector_destroy(_recurring_callbacks); ivector_destroy(_recurring_callbacks);
thread_pool_destroy(_callback_pool);
}
void runtime_request_stop()
{
_should_run = 0;
}
void runtime_callback_single(system_tick_t callback, long delay)
{
register_callback(delay, callback);
}
void runtime_callback_many(system_tick_t callback, long period, long phase)
{
recurring_callback_t recurring = { .callback = callback, .period = period, .phase = phase };
ivector_append(_recurring_callbacks, &recurring);
} }

4
src/systems/status.c Normal file
View File

@ -0,0 +1,4 @@
#include "systems/status.h"
long status_last_tick_usec;
long status_avg_tick_usec;

View File

@ -1,5 +1,6 @@
#include "thread_pool.h" #include "thread_pool.h"
#include <stddef.h>
#include <stdlib.h> #include <stdlib.h>
#include <pthread.h> #include <pthread.h>
@ -13,6 +14,8 @@ typedef struct thread_info_t
void *(*func)(void *); void *(*func)(void *);
void *data; void *data;
long job_id; long job_id;
thread_pool_t *pool;
size_t thread_idx;
} thread_info_t; } thread_info_t;
struct thread_pool_t struct thread_pool_t
@ -28,7 +31,36 @@ struct thread_pool_t
}; };
static void *pooled_thread_entry(void *data); static void *pooled_thread_entry(void *data)
{
thread_info_t *info = data;
while (1) //TODO: Exit condition
{
//Signal idle
pthread_mutex_lock(&info->pool->idle_mutex);
ivector_append(info->pool->idle_threads, &info->thread_idx);
pthread_cond_signal(&info->pool->idle_cond);
pthread_mutex_unlock(&info->pool->idle_mutex);
//Wait for work
pthread_mutex_lock(&info->mutex);
while (info->job_id == -1)
pthread_cond_wait(&info->cond, &info->mutex);
pthread_mutex_unlock(&info->mutex);
//Do work
void *returned = info->func(info->data);
(void)returned; //REVIEW: Do something with return value?
//Reset
info->job_id = -1;
info->func = NULL;
info->data = NULL;
}
return NULL;
}
thread_pool_t *thread_pool_create(size_t thread_count) thread_pool_t *thread_pool_create(size_t thread_count)
@ -58,7 +90,9 @@ thread_pool_t *thread_pool_create(size_t thread_count)
pthread_cond_init(&info->cond, NULL); pthread_cond_init(&info->cond, NULL);
info->func = NULL; info->func = NULL;
info->data = NULL; info->data = NULL;
info->job_id = 0; info->job_id = -1;
info->pool = pool;
info->thread_idx = i;
pthread_create(&info->thread, NULL, pooled_thread_entry, info); pthread_create(&info->thread, NULL, pooled_thread_entry, info);
} }