diff options
author | Juan Linietsky <reduzio@gmail.com> | 2019-07-29 12:59:18 -0300 |
---|---|---|
committer | Juan Linietsky <reduzio@gmail.com> | 2020-02-11 11:53:29 +0100 |
commit | c613ead5fa2361296cf8d9a80d4648492ff4e16f (patch) | |
tree | 974356b6840ecf764415d43277ef78ad3c6b6373 /core | |
parent | 4fe3ee1730167b90ec8ae70c871c1dad032981d5 (diff) |
Added a spinlock template as well as a thread work pool class.
Also, optimized shader compilation to happen on threads.
Diffstat (limited to 'core')
-rw-r--r-- | core/bind/core_bind.cpp | 2 | ||||
-rw-r--r-- | core/bind/core_bind.h | 2 | ||||
-rw-r--r-- | core/command_queue_mt.cpp | 4 | ||||
-rw-r--r-- | core/command_queue_mt.h | 4 | ||||
-rw-r--r-- | core/io/file_access_network.cpp | 6 | ||||
-rw-r--r-- | core/io/file_access_network.h | 6 | ||||
-rw-r--r-- | core/io/ip.cpp | 4 | ||||
-rw-r--r-- | core/os/semaphore.cpp | 6 | ||||
-rw-r--r-- | core/os/semaphore.h | 40 | ||||
-rw-r--r-- | core/os/thread_dummy.cpp | 4 | ||||
-rw-r--r-- | core/os/thread_dummy.h | 4 | ||||
-rw-r--r-- | core/rid_owner.h | 67 | ||||
-rw-r--r-- | core/spin_lock.h | 20 | ||||
-rw-r--r-- | core/thread_work_pool.cpp | 53 | ||||
-rw-r--r-- | core/thread_work_pool.h | 78 |
15 files changed, 268 insertions, 32 deletions
diff --git a/core/bind/core_bind.cpp b/core/bind/core_bind.cpp index e61e392a79..1dc48a95a0 100644 --- a/core/bind/core_bind.cpp +++ b/core/bind/core_bind.cpp @@ -2612,7 +2612,7 @@ void _Semaphore::_bind_methods() { _Semaphore::_Semaphore() { - semaphore = Semaphore::create(); + semaphore = SemaphoreOld::create(); } _Semaphore::~_Semaphore() { diff --git a/core/bind/core_bind.h b/core/bind/core_bind.h index 87da51f97e..955375cc99 100644 --- a/core/bind/core_bind.h +++ b/core/bind/core_bind.h @@ -642,7 +642,7 @@ public: class _Semaphore : public Reference { GDCLASS(_Semaphore, Reference); - Semaphore *semaphore; + SemaphoreOld *semaphore; static void _bind_methods(); diff --git a/core/command_queue_mt.cpp b/core/command_queue_mt.cpp index c20735939d..861ca8d1d3 100644 --- a/core/command_queue_mt.cpp +++ b/core/command_queue_mt.cpp @@ -111,11 +111,11 @@ CommandQueueMT::CommandQueueMT(bool p_sync) { for (int i = 0; i < SYNC_SEMAPHORES; i++) { - sync_sems[i].sem = Semaphore::create(); + sync_sems[i].sem = SemaphoreOld::create(); sync_sems[i].in_use = false; } if (p_sync) - sync = Semaphore::create(); + sync = SemaphoreOld::create(); else sync = NULL; } diff --git a/core/command_queue_mt.h b/core/command_queue_mt.h index ba43fa07f3..2b6e0201f0 100644 --- a/core/command_queue_mt.h +++ b/core/command_queue_mt.h @@ -297,7 +297,7 @@ class CommandQueueMT { struct SyncSemaphore { - Semaphore *sem; + SemaphoreOld *sem; bool in_use; }; @@ -342,7 +342,7 @@ class CommandQueueMT { uint32_t dealloc_ptr; SyncSemaphore sync_sems[SYNC_SEMAPHORES]; Mutex *mutex; - Semaphore *sync; + SemaphoreOld *sync; template <class T> T *allocate() { diff --git a/core/io/file_access_network.cpp b/core/io/file_access_network.cpp index 1b09ac7208..202eb89dbd 100644 --- a/core/io/file_access_network.cpp +++ b/core/io/file_access_network.cpp @@ -231,7 +231,7 @@ FileAccessNetworkClient::FileAccessNetworkClient() { singleton = this; last_id = 0; client.instance(); - sem = Semaphore::create(); + sem = SemaphoreOld::create(); lockcount = 0; } @@ -522,8 +522,8 @@ FileAccessNetwork::FileAccessNetwork() { eof_flag = false; opened = false; pos = 0; - sem = Semaphore::create(); - page_sem = Semaphore::create(); + sem = SemaphoreOld::create(); + page_sem = SemaphoreOld::create(); buffer_mutex = Mutex::create(); FileAccessNetworkClient *nc = FileAccessNetworkClient::singleton; nc->lock_mutex(); diff --git a/core/io/file_access_network.h b/core/io/file_access_network.h index e2da1d0893..f329abf7c5 100644 --- a/core/io/file_access_network.h +++ b/core/io/file_access_network.h @@ -49,7 +49,7 @@ class FileAccessNetworkClient { List<BlockRequest> block_requests; - Semaphore *sem; + SemaphoreOld *sem; Thread *thread; bool quit; Mutex *mutex; @@ -85,8 +85,8 @@ public: class FileAccessNetwork : public FileAccess { - Semaphore *sem; - Semaphore *page_sem; + SemaphoreOld *sem; + SemaphoreOld *page_sem; Mutex *buffer_mutex; bool opened; size_t total_size; diff --git a/core/io/ip.cpp b/core/io/ip.cpp index 23f6ca25d0..7d18117711 100644 --- a/core/io/ip.cpp +++ b/core/io/ip.cpp @@ -71,7 +71,7 @@ struct _IP_ResolverPrivate { } Mutex *mutex; - Semaphore *sem; + SemaphoreOld *sem; Thread *thread; //Semaphore* semaphore; @@ -319,7 +319,7 @@ IP::IP() { #ifndef NO_THREADS - resolver->sem = Semaphore::create(); + resolver->sem = SemaphoreOld::create(); if (resolver->sem) { resolver->thread_abort = false; diff --git a/core/os/semaphore.cpp b/core/os/semaphore.cpp index b2ba9716f0..2c20f234d0 100644 --- a/core/os/semaphore.cpp +++ b/core/os/semaphore.cpp @@ -32,14 +32,14 @@ #include "core/error_macros.h" -Semaphore *(*Semaphore::create_func)() = 0; +SemaphoreOld *(*SemaphoreOld::create_func)() = 0; -Semaphore *Semaphore::create() { +SemaphoreOld *SemaphoreOld::create() { ERR_FAIL_COND_V(!create_func, 0); return create_func(); } -Semaphore::~Semaphore() { +SemaphoreOld::~SemaphoreOld() { } diff --git a/core/os/semaphore.h b/core/os/semaphore.h index 9f3c0f549c..f16a15a6db 100644 --- a/core/os/semaphore.h +++ b/core/os/semaphore.h @@ -32,19 +32,53 @@ #define SEMAPHORE_H #include "core/error_list.h" +#include "core/typedefs.h" + +#include <condition_variable> +#include <mutex> class Semaphore { +private: + std::mutex mutex_; + std::condition_variable condition_; + unsigned long count_ = 0; // Initialized as locked. + +public: + _ALWAYS_INLINE_ void post() { + std::lock_guard<decltype(mutex_)> lock(mutex_); + ++count_; + condition_.notify_one(); + } + + _ALWAYS_INLINE_ void wait() { + std::unique_lock<decltype(mutex_)> lock(mutex_); + while (!count_) // Handle spurious wake-ups. + condition_.wait(lock); + --count_; + } + + _ALWAYS_INLINE_ bool try_wait() { + std::lock_guard<decltype(mutex_)> lock(mutex_); + if (count_) { + --count_; + return true; + } + return false; + } +}; + +class SemaphoreOld { protected: - static Semaphore *(*create_func)(); + static SemaphoreOld *(*create_func)(); public: virtual Error wait() = 0; ///< wait until semaphore has positive value, then decrement and pass virtual Error post() = 0; ///< unlock the semaphore, incrementing the value virtual int get() const = 0; ///< get semaphore value - static Semaphore *create(); ///< Create a mutex + static SemaphoreOld *create(); ///< Create a mutex - virtual ~Semaphore(); + virtual ~SemaphoreOld(); }; #endif diff --git a/core/os/thread_dummy.cpp b/core/os/thread_dummy.cpp index d4f65b0312..916aeeda30 100644 --- a/core/os/thread_dummy.cpp +++ b/core/os/thread_dummy.cpp @@ -48,12 +48,12 @@ void MutexDummy::make_default() { Mutex::create_func = &MutexDummy::create; }; -Semaphore *SemaphoreDummy::create() { +SemaphoreOld *SemaphoreDummy::create() { return memnew(SemaphoreDummy); }; void SemaphoreDummy::make_default() { - Semaphore::create_func = &SemaphoreDummy::create; + SemaphoreOld::create_func = &SemaphoreDummy::create; }; RWLock *RWLockDummy::create() { diff --git a/core/os/thread_dummy.h b/core/os/thread_dummy.h index c8b52ae4dd..9329cdaa32 100644 --- a/core/os/thread_dummy.h +++ b/core/os/thread_dummy.h @@ -58,9 +58,9 @@ public: static void make_default(); }; -class SemaphoreDummy : public Semaphore { +class SemaphoreDummy : public SemaphoreOld { - static Semaphore *create(); + static SemaphoreOld *create(); public: virtual Error wait() { return OK; }; diff --git a/core/rid_owner.h b/core/rid_owner.h index 7490416c19..8206e5b4fc 100644 --- a/core/rid_owner.h +++ b/core/rid_owner.h @@ -3,6 +3,8 @@ #include "core/print_string.h" #include "core/rid.h" +#include "core/spin_lock.h" + #include <typeinfo> class RID_AllocBase { @@ -28,7 +30,7 @@ public: virtual ~RID_AllocBase() {} }; -template <class T> +template <class T, bool THREAD_SAFE = false> class RID_Alloc : public RID_AllocBase { T **chunks; @@ -41,9 +43,15 @@ class RID_Alloc : public RID_AllocBase { const char *description; + SpinLock spin_lock; + public: RID make_rid(const T &p_value) { + if (THREAD_SAFE) { + spin_lock.lock(); + } + if (alloc_count == max_alloc) { //allocate a new chunk uint32_t chunk_count = alloc_count == 0 ? 0 : (max_alloc / elements_in_chunk); @@ -85,11 +93,19 @@ public: validator_chunks[free_chunk][free_element] = validator; alloc_count++; + if (THREAD_SAFE) { + spin_lock.unlock(); + } + return _make_from_id(id); } _FORCE_INLINE_ T *getornull(const RID &p_rid) { + if (THREAD_SAFE) { + spin_lock.lock(); + } + uint64_t id = p_rid.get_id(); uint32_t idx = uint32_t(id & 0xFFFFFFFF); if (unlikely(idx >= max_alloc)) { @@ -104,14 +120,27 @@ public: return NULL; } - return &chunks[idx_chunk][idx_element]; + T *ptr = &chunks[idx_chunk][idx_element]; + + if (THREAD_SAFE) { + spin_lock.unlock(); + } + + return ptr; } _FORCE_INLINE_ bool owns(const RID &p_rid) { + if (THREAD_SAFE) { + spin_lock.lock(); + } + uint64_t id = p_rid.get_id(); uint32_t idx = uint32_t(id & 0xFFFFFFFF); if (unlikely(idx >= max_alloc)) { + if (THREAD_SAFE) { + spin_lock.unlock(); + } return false; } @@ -119,11 +148,22 @@ public: uint32_t idx_element = idx % elements_in_chunk; uint32_t validator = uint32_t(id >> 32); - return validator_chunks[idx_chunk][idx_element] == validator; + + bool owned = validator_chunks[idx_chunk][idx_element] == validator; + + if (THREAD_SAFE) { + spin_lock.unlock(); + } + + return owned; } _FORCE_INLINE_ void free(const RID &p_rid) { + if (THREAD_SAFE) { + spin_lock.lock(); + } + uint64_t id = p_rid.get_id(); uint32_t idx = uint32_t(id & 0xFFFFFFFF); ERR_FAIL_COND(idx >= max_alloc); @@ -139,6 +179,10 @@ public: alloc_count--; free_list_chunks[alloc_count / elements_in_chunk][alloc_count % elements_in_chunk] = idx; + + if (THREAD_SAFE) { + spin_lock.unlock(); + } } _FORCE_INLINE_ uint32_t get_rid_count() const { @@ -147,8 +191,15 @@ public: _FORCE_INLINE_ T *get_rid_by_index(uint32_t p_index) { ERR_FAIL_INDEX_V(p_index, alloc_count, NULL); + if (THREAD_SAFE) { + spin_lock.lock(); + } uint64_t idx = free_list_chunks[p_index / elements_in_chunk][p_index % elements_in_chunk]; - return &chunks[idx / elements_in_chunk][idx % elements_in_chunk]; + T *ptr = &chunks[idx / elements_in_chunk][idx % elements_in_chunk]; + if (THREAD_SAFE) { + spin_lock.unlock(); + } + return ptr; } void get_owned_list(List<RID> *p_owned) { @@ -203,9 +254,9 @@ public: } }; -template <class T> +template <class T, bool THREAD_SAFE = false> class RID_PtrOwner { - RID_Alloc<T *> alloc; + RID_Alloc<T *, THREAD_SAFE> alloc; public: _FORCE_INLINE_ RID make_rid(T *p_ptr) { @@ -239,9 +290,9 @@ public: alloc(p_target_chunk_byte_size) {} }; -template <class T> +template <class T, bool THREAD_SAFE = false> class RID_Owner { - RID_Alloc<T> alloc; + RID_Alloc<T, THREAD_SAFE> alloc; public: _FORCE_INLINE_ RID make_rid(const T &p_ptr) { diff --git a/core/spin_lock.h b/core/spin_lock.h new file mode 100644 index 0000000000..07f865d91a --- /dev/null +++ b/core/spin_lock.h @@ -0,0 +1,20 @@ +#ifndef SPIN_LOCK_H +#define SPIN_LOCK_H + +#include "core/typedefs.h" +#include <atomic> + +class SpinLock { + std::atomic_flag locked = ATOMIC_FLAG_INIT; + +public: + _ALWAYS_INLINE_ void lock() { + while (locked.test_and_set(std::memory_order_acquire)) { + ; + } + } + _ALWAYS_INLINE_ void unlock() { + locked.clear(std::memory_order_release); + } +}; +#endif // SPIN_LOCK_H diff --git a/core/thread_work_pool.cpp b/core/thread_work_pool.cpp new file mode 100644 index 0000000000..aafb9c11d3 --- /dev/null +++ b/core/thread_work_pool.cpp @@ -0,0 +1,53 @@ +#include "thread_work_pool.h" +#include "core/os/os.h" + +void ThreadWorkPool::_thread_function(ThreadData *p_thread) { + + while (true) { + p_thread->start.wait(); + if (p_thread->exit.load()) { + break; + } + p_thread->work->work(); + p_thread->completed.post(); + } +} + +void ThreadWorkPool::init(int p_thread_count) { + ERR_FAIL_COND(threads != nullptr); + if (p_thread_count < 0) { + p_thread_count = OS::get_singleton()->get_processor_count(); + } + + thread_count = p_thread_count; + threads = memnew_arr(ThreadData, thread_count); + + for (uint32_t i = 0; i < thread_count; i++) { + threads[i].exit.store(false); + threads[i].thread = memnew(std::thread(ThreadWorkPool::_thread_function, &threads[i])); + } +} + +void ThreadWorkPool::finish() { + + if (threads == nullptr) { + return; + } + + for (uint32_t i = 0; i < thread_count; i++) { + threads[i].exit.store(true); + threads[i].start.post(); + } + for (uint32_t i = 0; i < thread_count; i++) { + threads[i].thread->join(); + memdelete(threads[i].thread); + } + + memdelete_arr(threads); + threads = nullptr; +} + +ThreadWorkPool::~ThreadWorkPool() { + + finish(); +} diff --git a/core/thread_work_pool.h b/core/thread_work_pool.h new file mode 100644 index 0000000000..165eb391e2 --- /dev/null +++ b/core/thread_work_pool.h @@ -0,0 +1,78 @@ +#ifndef THREAD_WORK_POOL_H +#define THREAD_WORK_POOL_H + +#include "core/os/memory.h" +#include "core/os/semaphore.h" +#include <atomic> +#include <thread> +class ThreadWorkPool { + + std::atomic<uint32_t> index; + + struct BaseWork { + std::atomic<uint32_t> *index; + uint32_t max_elements; + virtual void work() = 0; + }; + + template <class C, class M, class U> + struct Work : public BaseWork { + C *instance; + M method; + U userdata; + virtual void work() { + + while (true) { + uint32_t work_index = index->fetch_add(1, std::memory_order_relaxed); + if (work_index >= max_elements) { + break; + } + (instance->*method)(work_index, userdata); + } + } + }; + + struct ThreadData { + std::thread *thread; + Semaphore start; + Semaphore completed; + std::atomic<bool> exit; + BaseWork *work; + }; + + ThreadData *threads = nullptr; + uint32_t thread_count = 0; + + static void _thread_function(ThreadData *p_thread); + +public: + template <class C, class M, class U> + void do_work(uint32_t p_elements, C *p_instance, M p_method, U p_userdata) { + + ERR_FAIL_COND(!threads); //never initialized + + index.store(0); + + Work<C, M, U> *w = memnew((Work<C, M, U>)); + w->instance = p_instance; + w->userdata = p_userdata; + w->method = p_method; + w->index = &index; + w->max_elements = p_elements; + + for (uint32_t i = 0; i < thread_count; i++) { + threads[i].work = w; + threads[i].start.post(); + } + for (uint32_t i = 0; i < thread_count; i++) { + threads[i].completed.wait(); + threads[i].work = nullptr; + } + } + + void init(int p_thread_count = -1); + void finish(); + ~ThreadWorkPool(); +}; + +#endif // THREAD_POOL_H |