diff --git a/include/runtime.h b/include/runtime.h index 8021e29..b0a711b 100644 --- a/include/runtime.h +++ b/include/runtime.h @@ -1,10 +1,12 @@ #pragma once -typedef void (*system_tick_t)(int type); +typedef void (*system_tick_t)(); void runtime_init(); void runtime_loop(); void runtime_destroy(); -int runtime_callback_single(system_tick_t callback, long delay); -int runtime_callback_many(system_tick_t callback, long period, long phase); +void runtime_request_stop(); + +void runtime_callback_single(system_tick_t callback, long delay); +void runtime_callback_many(system_tick_t callback, long period, long phase); diff --git a/include/systems/status.h b/include/systems/status.h new file mode 100644 index 0000000..aad185a --- /dev/null +++ b/include/systems/status.h @@ -0,0 +1,4 @@ +#pragma once + +extern long status_last_tick_usec; +extern long status_avg_tick_usec; diff --git a/include/thread_pool.h b/include/thread_pool.h index c829ab6..f85b5ea 100644 --- a/include/thread_pool.h +++ b/include/thread_pool.h @@ -1,5 +1,7 @@ #pragma once +#include + typedef struct thread_pool_t thread_pool_t; thread_pool_t *thread_pool_create(size_t thread_count); diff --git a/makefile b/makefile index 2241e9c..6fc8768 100644 --- a/makefile +++ b/makefile @@ -1,10 +1,10 @@ CC=gcc -C_FLAGS=-Wall -Wextra -pedantic -Wno-unknown-pragmas -g -DDEBUG +C_FLAGS=-Wall -Wextra -pedantic -Wno-unknown-pragmas -g -DDEBUG -DSUS_TARGET_VERSION=10000 DIR_SRC=src DIR_INC=include DIR_BUILD=build -DEPS=dl pthread m atomic pigpio rt sus +DEPS=dl pthread m atomic rt sus OUTBIN=$(DIR_BUILD)/bin/main diff --git a/src/runtime.c b/src/runtime.c index a3a6ae7..22d75dd 100644 --- a/src/runtime.c +++ b/src/runtime.c @@ -1,10 +1,22 @@ #include "runtime.h" +#include +#include + #include #include #include #include "thread_pool.h" +#include "systems/status.h" + +#define THREAD_POOL_SIZE 16 + +#define TICKS_P_SEC 100 +#define TICK_USEC 10000 +#define USEC_TO_SEC 1000000 + + typedef struct recurring_callback_t { long period; @@ -19,22 +31,33 @@ static long _tick; static hashtable_t *_future_callbacks; //ivector_t static ivector_t *_recurring_callbacks; -static thread_pool_t *callback_pool; +static thread_pool_t *_callback_pool; +static int _should_run = 0; + +static long _sum_tick_usec; +static void *callback_entry(void *func) +{ + system_tick_t callback = (system_tick_t)func; + callback(); + return NULL; +} + static void register_callback(long delay, system_tick_t callback) { long tick = delay + _tick; - ivector_t *vector = hashtable_get(_future_callbacks, tick); + ivector_t *vector = hashtable_get(_future_callbacks, (void*)tick); if (!vector) { + //ivector vector = ivector_create(sizeof(sizeof(system_tick_t))); - hashtable_add(_future_callbacks, tick, vector); + hashtable_add(_future_callbacks, (void*)tick, vector); } - ivector_append(vector, callback); + ivector_append(vector, (void*)callback); } static void handle_callbacks(ivector_t *callbacks) @@ -42,8 +65,8 @@ static void handle_callbacks(ivector_t *callbacks) size_t count = ivector_get_count(callbacks); for (size_t i = 0; i < count; ++i) { - system_tick_t *tick = ivector_get(callbacks, i); - //TODO: Dispatch work + system_tick_t callback = ivector_get(callbacks, i); + thread_pool_run(_callback_pool, callback_entry, (void*)callback); } } @@ -62,39 +85,97 @@ static void register_recurring_callbacks() } } + + void runtime_init() { _tick = 0; _future_callbacks = hashtable_create(hash_ptr, compare_ptr); _recurring_callbacks = ivector_create(sizeof(recurring_callback_t)); + _callback_pool = thread_pool_create(THREAD_POOL_SIZE); + _should_run = 1; } void runtime_loop() { + struct timeval before; + gettimeofday(&before, NULL); + //Runtime loop //Ticks every 10ms //Systems request X tick delay callbacks //Callbacks executed on separate threads - //Handle pending callbacks and register recurring - ivector_t *callbacks = hashtable_get(_future_callbacks, _tick); - if (callbacks) + while (_should_run) { - handle_callbacks(callbacks); - hashtable_remove(_future_callbacks, _tick, NULL, NULL); - ivector_destroy(callbacks); + //Handle pending callbacks and register recurring + ivector_t *callbacks = hashtable_get(_future_callbacks, (void*)_tick); + if (callbacks) + { + handle_callbacks(callbacks); + hashtable_remove(_future_callbacks, (void*)_tick, NULL, NULL); + ivector_destroy(callbacks); + } + register_recurring_callbacks(); + + //Advance tick + ++_tick; + + //Delay and performance eval + struct timeval now; + gettimeofday(&now, NULL); + + long elapsed_usec = now.tv_usec - before.tv_usec; + elapsed_usec += (now.tv_sec - before.tv_sec) * USEC_TO_SEC; + + //Update performance metrics + status_last_tick_usec = elapsed_usec; + _sum_tick_usec += elapsed_usec; + if (_tick % TICKS_P_SEC == 0) + { + status_avg_tick_usec = _sum_tick_usec / TICKS_P_SEC; + _sum_tick_usec = 0; + } + + //Skip delay if needed + if (elapsed_usec >= TICK_USEC) + { + //TODO: Log tick lag + goto tick_cycle_end; + } + + struct timespec target_sleep = { .tv_sec = 0, .tv_nsec = TICK_USEC - elapsed_usec }; + while (nanosleep(&target_sleep, &target_sleep)); //Sleep until target reached + +tick_cycle_end: + //Reference last cycle end instead of start of next one + //Should reduce tick drift + before = now; } - - register_recurring_callbacks(); - - //Advance tick - ++_tick; - - //TODO: Delay and loop } void runtime_destroy() { - hashtable_destroy_free(_future_callbacks, NULL, ivector_destroy); + _should_run = 0; + hashtable_destroy_free(_future_callbacks, NULL, (void(*)(void*))ivector_destroy); ivector_destroy(_recurring_callbacks); + thread_pool_destroy(_callback_pool); +} + +void runtime_request_stop() +{ + _should_run = 0; +} + + + +void runtime_callback_single(system_tick_t callback, long delay) +{ + register_callback(delay, callback); +} + +void runtime_callback_many(system_tick_t callback, long period, long phase) +{ + recurring_callback_t recurring = { .callback = callback, .period = period, .phase = phase }; + ivector_append(_recurring_callbacks, &recurring); } diff --git a/src/systems/status.c b/src/systems/status.c new file mode 100644 index 0000000..fc26d02 --- /dev/null +++ b/src/systems/status.c @@ -0,0 +1,4 @@ +#include "systems/status.h" + +long status_last_tick_usec; +long status_avg_tick_usec; diff --git a/src/thread_pool.c b/src/thread_pool.c index 07c6ab7..6cd5a85 100644 --- a/src/thread_pool.c +++ b/src/thread_pool.c @@ -1,5 +1,6 @@ #include "thread_pool.h" +#include #include #include @@ -13,6 +14,8 @@ typedef struct thread_info_t void *(*func)(void *); void *data; long job_id; + thread_pool_t *pool; + size_t thread_idx; } thread_info_t; struct thread_pool_t @@ -28,7 +31,36 @@ struct thread_pool_t }; -static void *pooled_thread_entry(void *data); +static void *pooled_thread_entry(void *data) +{ + thread_info_t *info = data; + + while (1) //TODO: Exit condition + { + //Signal idle + pthread_mutex_lock(&info->pool->idle_mutex); + ivector_append(info->pool->idle_threads, &info->thread_idx); + pthread_cond_signal(&info->pool->idle_cond); + pthread_mutex_unlock(&info->pool->idle_mutex); + + //Wait for work + pthread_mutex_lock(&info->mutex); + while (info->job_id == -1) + pthread_cond_wait(&info->cond, &info->mutex); + pthread_mutex_unlock(&info->mutex); + + //Do work + void *returned = info->func(info->data); + (void)returned; //REVIEW: Do something with return value? + + //Reset + info->job_id = -1; + info->func = NULL; + info->data = NULL; + } + + return NULL; +} thread_pool_t *thread_pool_create(size_t thread_count) @@ -58,7 +90,9 @@ thread_pool_t *thread_pool_create(size_t thread_count) pthread_cond_init(&info->cond, NULL); info->func = NULL; info->data = NULL; - info->job_id = 0; + info->job_id = -1; + info->pool = pool; + info->thread_idx = i; pthread_create(&info->thread, NULL, pooled_thread_entry, info); }