diff options
Diffstat (limited to 'thirdparty/bullet/LinearMath/TaskScheduler')
4 files changed, 1296 insertions, 1344 deletions
diff --git a/thirdparty/bullet/LinearMath/TaskScheduler/btTaskScheduler.cpp b/thirdparty/bullet/LinearMath/TaskScheduler/btTaskScheduler.cpp index 49510d1660..5f1115c402 100644 --- a/thirdparty/bullet/LinearMath/TaskScheduler/btTaskScheduler.cpp +++ b/thirdparty/bullet/LinearMath/TaskScheduler/btTaskScheduler.cpp @@ -6,13 +6,11 @@ #include <stdio.h> #include <algorithm> - - #if BT_THREADSAFE #include "btThreadSupportInterface.h" -#if defined( _WIN32 ) +#if defined(_WIN32) #define WIN32_LEAN_AND_MEAN @@ -20,404 +18,399 @@ #endif - typedef unsigned long long btU64; static const int kCacheLineSize = 64; void btSpinPause() { -#if defined( _WIN32 ) - YieldProcessor(); +#if defined(_WIN32) + YieldProcessor(); #endif } - struct WorkerThreadStatus { - enum Type - { - kInvalid, - kWaitingForWork, - kWorking, - kSleeping, - }; + enum Type + { + kInvalid, + kWaitingForWork, + kWorking, + kSleeping, + }; }; - -ATTRIBUTE_ALIGNED64(class) WorkerThreadDirectives +ATTRIBUTE_ALIGNED64(class) +WorkerThreadDirectives { - static const int kMaxThreadCount = BT_MAX_THREAD_COUNT; - // directives for all worker threads packed into a single cacheline - char m_threadDirs[kMaxThreadCount]; + static const int kMaxThreadCount = BT_MAX_THREAD_COUNT; + // directives for all worker threads packed into a single cacheline + char m_threadDirs[kMaxThreadCount]; public: - enum Type - { - kInvalid, - kGoToSleep, // go to sleep - kStayAwakeButIdle, // wait for not checking job queue - kScanForJobs, // actively scan job queue for jobs - }; - WorkerThreadDirectives() - { - for ( int i = 0; i < kMaxThreadCount; ++i ) - { - m_threadDirs[ i ] = 0; - } - } - - Type getDirective(int threadId) - { - btAssert(threadId < kMaxThreadCount); - return static_cast<Type>(m_threadDirs[threadId]); - } - - void setDirectiveByRange(int threadBegin, int threadEnd, Type dir) - { - btAssert( threadBegin < threadEnd ); - btAssert( threadEnd <= kMaxThreadCount ); - char dirChar = static_cast<char>(dir); - for ( int i = threadBegin; i < threadEnd; ++i ) - { - m_threadDirs[ i ] = dirChar; - } - } + enum Type + { + kInvalid, + kGoToSleep, // go to sleep + kStayAwakeButIdle, // wait for not checking job queue + kScanForJobs, // actively scan job queue for jobs + }; + WorkerThreadDirectives() + { + for (int i = 0; i < kMaxThreadCount; ++i) + { + m_threadDirs[i] = 0; + } + } + + Type getDirective(int threadId) + { + btAssert(threadId < kMaxThreadCount); + return static_cast<Type>(m_threadDirs[threadId]); + } + + void setDirectiveByRange(int threadBegin, int threadEnd, Type dir) + { + btAssert(threadBegin < threadEnd); + btAssert(threadEnd <= kMaxThreadCount); + char dirChar = static_cast<char>(dir); + for (int i = threadBegin; i < threadEnd; ++i) + { + m_threadDirs[i] = dirChar; + } + } }; class JobQueue; -ATTRIBUTE_ALIGNED64(struct) ThreadLocalStorage +ATTRIBUTE_ALIGNED64(struct) +ThreadLocalStorage { - int m_threadId; - WorkerThreadStatus::Type m_status; - int m_numJobsFinished; - btSpinMutex m_mutex; - btScalar m_sumResult; - WorkerThreadDirectives * m_directive; - JobQueue* m_queue; - btClock* m_clock; - unsigned int m_cooldownTime; + int m_threadId; + WorkerThreadStatus::Type m_status; + int m_numJobsFinished; + btSpinMutex m_mutex; + btScalar m_sumResult; + WorkerThreadDirectives* m_directive; + JobQueue* m_queue; + btClock* m_clock; + unsigned int m_cooldownTime; }; - struct IJob { - virtual void executeJob(int threadId) = 0; + virtual void executeJob(int threadId) = 0; }; class ParallelForJob : public IJob { - const btIParallelForBody* m_body; - int m_begin; - int m_end; + const btIParallelForBody* m_body; + int m_begin; + int m_end; public: - ParallelForJob( int iBegin, int iEnd, const btIParallelForBody& body ) - { - m_body = &body; - m_begin = iBegin; - m_end = iEnd; - } - virtual void executeJob(int threadId) BT_OVERRIDE - { - BT_PROFILE( "executeJob" ); - - // call the functor body to do the work - m_body->forLoop( m_begin, m_end ); - } -}; + ParallelForJob(int iBegin, int iEnd, const btIParallelForBody& body) + { + m_body = &body; + m_begin = iBegin; + m_end = iEnd; + } + virtual void executeJob(int threadId) BT_OVERRIDE + { + BT_PROFILE("executeJob"); + // call the functor body to do the work + m_body->forLoop(m_begin, m_end); + } +}; class ParallelSumJob : public IJob { - const btIParallelSumBody* m_body; - ThreadLocalStorage* m_threadLocalStoreArray; - int m_begin; - int m_end; + const btIParallelSumBody* m_body; + ThreadLocalStorage* m_threadLocalStoreArray; + int m_begin; + int m_end; public: - ParallelSumJob( int iBegin, int iEnd, const btIParallelSumBody& body, ThreadLocalStorage* tls ) - { - m_body = &body; - m_threadLocalStoreArray = tls; - m_begin = iBegin; - m_end = iEnd; - } - virtual void executeJob( int threadId ) BT_OVERRIDE - { - BT_PROFILE( "executeJob" ); - - // call the functor body to do the work - btScalar val = m_body->sumLoop( m_begin, m_end ); + ParallelSumJob(int iBegin, int iEnd, const btIParallelSumBody& body, ThreadLocalStorage* tls) + { + m_body = &body; + m_threadLocalStoreArray = tls; + m_begin = iBegin; + m_end = iEnd; + } + virtual void executeJob(int threadId) BT_OVERRIDE + { + BT_PROFILE("executeJob"); + + // call the functor body to do the work + btScalar val = m_body->sumLoop(m_begin, m_end); #if BT_PARALLEL_SUM_DETERMINISTISM - // by truncating bits of the result, we can make the parallelSum deterministic (at the expense of precision) - const float TRUNC_SCALE = float(1<<19); - val = floor(val*TRUNC_SCALE+0.5f)/TRUNC_SCALE; // truncate some bits + // by truncating bits of the result, we can make the parallelSum deterministic (at the expense of precision) + const float TRUNC_SCALE = float(1 << 19); + val = floor(val * TRUNC_SCALE + 0.5f) / TRUNC_SCALE; // truncate some bits #endif - m_threadLocalStoreArray[threadId].m_sumResult += val; - } + m_threadLocalStoreArray[threadId].m_sumResult += val; + } }; - -ATTRIBUTE_ALIGNED64(class) JobQueue +ATTRIBUTE_ALIGNED64(class) +JobQueue { - btThreadSupportInterface* m_threadSupport; - btCriticalSection* m_queueLock; - btSpinMutex m_mutex; - - btAlignedObjectArray<IJob*> m_jobQueue; - char* m_jobMem; - int m_jobMemSize; - bool m_queueIsEmpty; - int m_tailIndex; - int m_headIndex; - int m_allocSize; - bool m_useSpinMutex; - btAlignedObjectArray<JobQueue*> m_neighborContexts; - char m_cachePadding[kCacheLineSize]; // prevent false sharing - - void freeJobMem() - { - if ( m_jobMem ) - { - // free old - btAlignedFree(m_jobMem); - m_jobMem = NULL; - } - } - void resizeJobMem(int newSize) - { - if (newSize > m_jobMemSize) - { - freeJobMem(); - m_jobMem = static_cast<char*>(btAlignedAlloc(newSize, kCacheLineSize)); - m_jobMemSize = newSize; - } - } + btThreadSupportInterface* m_threadSupport; + btCriticalSection* m_queueLock; + btSpinMutex m_mutex; + + btAlignedObjectArray<IJob*> m_jobQueue; + char* m_jobMem; + int m_jobMemSize; + bool m_queueIsEmpty; + int m_tailIndex; + int m_headIndex; + int m_allocSize; + bool m_useSpinMutex; + btAlignedObjectArray<JobQueue*> m_neighborContexts; + char m_cachePadding[kCacheLineSize]; // prevent false sharing + + void freeJobMem() + { + if (m_jobMem) + { + // free old + btAlignedFree(m_jobMem); + m_jobMem = NULL; + } + } + void resizeJobMem(int newSize) + { + if (newSize > m_jobMemSize) + { + freeJobMem(); + m_jobMem = static_cast<char*>(btAlignedAlloc(newSize, kCacheLineSize)); + m_jobMemSize = newSize; + } + } public: - - JobQueue() - { - m_jobMem = NULL; - m_jobMemSize = 0; - m_threadSupport = NULL; - m_queueLock = NULL; - m_headIndex = 0; - m_tailIndex = 0; - m_useSpinMutex = false; - } - ~JobQueue() - { + JobQueue() + { + m_jobMem = NULL; + m_jobMemSize = 0; + m_threadSupport = NULL; + m_queueLock = NULL; + m_headIndex = 0; + m_tailIndex = 0; + m_useSpinMutex = false; + } + ~JobQueue() + { exit(); - } + } void exit() - { + { freeJobMem(); - if (m_queueLock && m_threadSupport) - { - m_threadSupport->deleteCriticalSection(m_queueLock); - m_queueLock = NULL; + if (m_queueLock && m_threadSupport) + { + m_threadSupport->deleteCriticalSection(m_queueLock); + m_queueLock = NULL; m_threadSupport = 0; - } - } - - void init(btThreadSupportInterface* threadSup, btAlignedObjectArray<JobQueue>* contextArray) - { - m_threadSupport = threadSup; - if (threadSup) - { - m_queueLock = m_threadSupport->createCriticalSection(); - } - setupJobStealing(contextArray, contextArray->size()); - } - void setupJobStealing(btAlignedObjectArray<JobQueue>* contextArray, int numActiveContexts) - { - btAlignedObjectArray<JobQueue>& contexts = *contextArray; - int selfIndex = 0; - for (int i = 0; i < contexts.size(); ++i) - { - if ( this == &contexts[ i ] ) - { - selfIndex = i; - break; - } - } - int numNeighbors = btMin(2, contexts.size() - 1); - int neighborOffsets[ ] = {-1, 1, -2, 2, -3, 3}; - int numOffsets = sizeof(neighborOffsets)/sizeof(neighborOffsets[0]); - m_neighborContexts.reserve( numNeighbors ); - m_neighborContexts.resizeNoInitialize(0); - for (int i = 0; i < numOffsets && m_neighborContexts.size() < numNeighbors; i++) - { - int neighborIndex = selfIndex + neighborOffsets[i]; - if ( neighborIndex >= 0 && neighborIndex < numActiveContexts) - { - m_neighborContexts.push_back( &contexts[ neighborIndex ] ); - } - } - } - - bool isQueueEmpty() const {return m_queueIsEmpty;} - void lockQueue() - { - if ( m_useSpinMutex ) - { - m_mutex.lock(); - } - else - { - m_queueLock->lock(); - } - } - void unlockQueue() - { - if ( m_useSpinMutex ) - { - m_mutex.unlock(); - } - else - { - m_queueLock->unlock(); - } - } - void clearQueue(int jobCount, int jobSize) - { - lockQueue(); - m_headIndex = 0; - m_tailIndex = 0; - m_allocSize = 0; - m_queueIsEmpty = true; - int jobBufSize = jobSize * jobCount; - // make sure we have enough memory allocated to store jobs - if ( jobBufSize > m_jobMemSize ) - { - resizeJobMem( jobBufSize ); - } - // make sure job queue is big enough - if ( jobCount > m_jobQueue.capacity() ) - { - m_jobQueue.reserve( jobCount ); - } - unlockQueue(); - m_jobQueue.resizeNoInitialize( 0 ); - } - void* allocJobMem(int jobSize) - { - btAssert(m_jobMemSize >= (m_allocSize + jobSize)); - void* jobMem = &m_jobMem[m_allocSize]; - m_allocSize += jobSize; - return jobMem; - } - void submitJob( IJob* job ) - { - btAssert( reinterpret_cast<char*>( job ) >= &m_jobMem[ 0 ] && reinterpret_cast<char*>( job ) < &m_jobMem[ 0 ] + m_allocSize ); - m_jobQueue.push_back( job ); - lockQueue(); - m_tailIndex++; - m_queueIsEmpty = false; - unlockQueue(); - } - IJob* consumeJobFromOwnQueue() - { - if ( m_queueIsEmpty ) - { - // lock free path. even if this is taken erroneously it isn't harmful - return NULL; - } - IJob* job = NULL; - lockQueue(); - if ( !m_queueIsEmpty ) - { - job = m_jobQueue[ m_headIndex++ ]; - btAssert( reinterpret_cast<char*>( job ) >= &m_jobMem[ 0 ] && reinterpret_cast<char*>( job ) < &m_jobMem[ 0 ] + m_allocSize ); - if ( m_headIndex == m_tailIndex ) - { - m_queueIsEmpty = true; - } - } - unlockQueue(); - return job; - } - IJob* consumeJob() - { - if (IJob* job = consumeJobFromOwnQueue()) - { - return job; - } - // own queue is empty, try to steal from neighbor - for (int i = 0; i < m_neighborContexts.size(); ++i) - { - JobQueue* otherContext = m_neighborContexts[ i ]; - if ( IJob* job = otherContext->consumeJobFromOwnQueue() ) - { - return job; - } - } - return NULL; - } -}; + } + } + void init(btThreadSupportInterface * threadSup, btAlignedObjectArray<JobQueue> * contextArray) + { + m_threadSupport = threadSup; + if (threadSup) + { + m_queueLock = m_threadSupport->createCriticalSection(); + } + setupJobStealing(contextArray, contextArray->size()); + } + void setupJobStealing(btAlignedObjectArray<JobQueue> * contextArray, int numActiveContexts) + { + btAlignedObjectArray<JobQueue>& contexts = *contextArray; + int selfIndex = 0; + for (int i = 0; i < contexts.size(); ++i) + { + if (this == &contexts[i]) + { + selfIndex = i; + break; + } + } + int numNeighbors = btMin(2, contexts.size() - 1); + int neighborOffsets[] = {-1, 1, -2, 2, -3, 3}; + int numOffsets = sizeof(neighborOffsets) / sizeof(neighborOffsets[0]); + m_neighborContexts.reserve(numNeighbors); + m_neighborContexts.resizeNoInitialize(0); + for (int i = 0; i < numOffsets && m_neighborContexts.size() < numNeighbors; i++) + { + int neighborIndex = selfIndex + neighborOffsets[i]; + if (neighborIndex >= 0 && neighborIndex < numActiveContexts) + { + m_neighborContexts.push_back(&contexts[neighborIndex]); + } + } + } + + bool isQueueEmpty() const { return m_queueIsEmpty; } + void lockQueue() + { + if (m_useSpinMutex) + { + m_mutex.lock(); + } + else + { + m_queueLock->lock(); + } + } + void unlockQueue() + { + if (m_useSpinMutex) + { + m_mutex.unlock(); + } + else + { + m_queueLock->unlock(); + } + } + void clearQueue(int jobCount, int jobSize) + { + lockQueue(); + m_headIndex = 0; + m_tailIndex = 0; + m_allocSize = 0; + m_queueIsEmpty = true; + int jobBufSize = jobSize * jobCount; + // make sure we have enough memory allocated to store jobs + if (jobBufSize > m_jobMemSize) + { + resizeJobMem(jobBufSize); + } + // make sure job queue is big enough + if (jobCount > m_jobQueue.capacity()) + { + m_jobQueue.reserve(jobCount); + } + unlockQueue(); + m_jobQueue.resizeNoInitialize(0); + } + void* allocJobMem(int jobSize) + { + btAssert(m_jobMemSize >= (m_allocSize + jobSize)); + void* jobMem = &m_jobMem[m_allocSize]; + m_allocSize += jobSize; + return jobMem; + } + void submitJob(IJob * job) + { + btAssert(reinterpret_cast<char*>(job) >= &m_jobMem[0] && reinterpret_cast<char*>(job) < &m_jobMem[0] + m_allocSize); + m_jobQueue.push_back(job); + lockQueue(); + m_tailIndex++; + m_queueIsEmpty = false; + unlockQueue(); + } + IJob* consumeJobFromOwnQueue() + { + if (m_queueIsEmpty) + { + // lock free path. even if this is taken erroneously it isn't harmful + return NULL; + } + IJob* job = NULL; + lockQueue(); + if (!m_queueIsEmpty) + { + job = m_jobQueue[m_headIndex++]; + btAssert(reinterpret_cast<char*>(job) >= &m_jobMem[0] && reinterpret_cast<char*>(job) < &m_jobMem[0] + m_allocSize); + if (m_headIndex == m_tailIndex) + { + m_queueIsEmpty = true; + } + } + unlockQueue(); + return job; + } + IJob* consumeJob() + { + if (IJob* job = consumeJobFromOwnQueue()) + { + return job; + } + // own queue is empty, try to steal from neighbor + for (int i = 0; i < m_neighborContexts.size(); ++i) + { + JobQueue* otherContext = m_neighborContexts[i]; + if (IJob* job = otherContext->consumeJobFromOwnQueue()) + { + return job; + } + } + return NULL; + } +}; -static void WorkerThreadFunc( void* userPtr ) +static void WorkerThreadFunc(void* userPtr) { - BT_PROFILE( "WorkerThreadFunc" ); - ThreadLocalStorage* localStorage = (ThreadLocalStorage*) userPtr; - JobQueue* jobQueue = localStorage->m_queue; - - bool shouldSleep = false; - int threadId = localStorage->m_threadId; - while (! shouldSleep) - { - // do work - localStorage->m_mutex.lock(); - while ( IJob* job = jobQueue->consumeJob() ) - { - localStorage->m_status = WorkerThreadStatus::kWorking; - job->executeJob( threadId ); - localStorage->m_numJobsFinished++; - } - localStorage->m_status = WorkerThreadStatus::kWaitingForWork; - localStorage->m_mutex.unlock(); - btU64 clockStart = localStorage->m_clock->getTimeMicroseconds(); - // while queue is empty, - while (jobQueue->isQueueEmpty()) - { - // todo: spin wait a bit to avoid hammering the empty queue - btSpinPause(); - if ( localStorage->m_directive->getDirective(threadId) == WorkerThreadDirectives::kGoToSleep ) - { - shouldSleep = true; - break; - } - // if jobs are incoming, - if ( localStorage->m_directive->getDirective( threadId ) == WorkerThreadDirectives::kScanForJobs ) - { - clockStart = localStorage->m_clock->getTimeMicroseconds(); // reset clock - } - else - { - for ( int i = 0; i < 50; ++i ) - { - btSpinPause(); - btSpinPause(); - btSpinPause(); - btSpinPause(); - if (localStorage->m_directive->getDirective( threadId ) == WorkerThreadDirectives::kScanForJobs || !jobQueue->isQueueEmpty()) - { - break; - } - } - // if no jobs incoming and queue has been empty for the cooldown time, sleep - btU64 timeElapsed = localStorage->m_clock->getTimeMicroseconds() - clockStart; - if (timeElapsed > localStorage->m_cooldownTime) - { - shouldSleep = true; - break; - } - } - } - } + BT_PROFILE("WorkerThreadFunc"); + ThreadLocalStorage* localStorage = (ThreadLocalStorage*)userPtr; + JobQueue* jobQueue = localStorage->m_queue; + + bool shouldSleep = false; + int threadId = localStorage->m_threadId; + while (!shouldSleep) + { + // do work + localStorage->m_mutex.lock(); + while (IJob* job = jobQueue->consumeJob()) + { + localStorage->m_status = WorkerThreadStatus::kWorking; + job->executeJob(threadId); + localStorage->m_numJobsFinished++; + } + localStorage->m_status = WorkerThreadStatus::kWaitingForWork; + localStorage->m_mutex.unlock(); + btU64 clockStart = localStorage->m_clock->getTimeMicroseconds(); + // while queue is empty, + while (jobQueue->isQueueEmpty()) + { + // todo: spin wait a bit to avoid hammering the empty queue + btSpinPause(); + if (localStorage->m_directive->getDirective(threadId) == WorkerThreadDirectives::kGoToSleep) + { + shouldSleep = true; + break; + } + // if jobs are incoming, + if (localStorage->m_directive->getDirective(threadId) == WorkerThreadDirectives::kScanForJobs) + { + clockStart = localStorage->m_clock->getTimeMicroseconds(); // reset clock + } + else + { + for (int i = 0; i < 50; ++i) + { + btSpinPause(); + btSpinPause(); + btSpinPause(); + btSpinPause(); + if (localStorage->m_directive->getDirective(threadId) == WorkerThreadDirectives::kScanForJobs || !jobQueue->isQueueEmpty()) + { + break; + } + } + // if no jobs incoming and queue has been empty for the cooldown time, sleep + btU64 timeElapsed = localStorage->m_clock->getTimeMicroseconds() - clockStart; + if (timeElapsed > localStorage->m_cooldownTime) + { + shouldSleep = true; + break; + } + } + } + } { BT_PROFILE("sleep"); // go sleep @@ -427,376 +420,373 @@ static void WorkerThreadFunc( void* userPtr ) } } - class btTaskSchedulerDefault : public btITaskScheduler { - btThreadSupportInterface* m_threadSupport; - WorkerThreadDirectives* m_workerDirective; - btAlignedObjectArray<JobQueue> m_jobQueues; - btAlignedObjectArray<JobQueue*> m_perThreadJobQueues; - btAlignedObjectArray<ThreadLocalStorage> m_threadLocalStorage; - btSpinMutex m_antiNestingLock; // prevent nested parallel-for - btClock m_clock; - int m_numThreads; - int m_numWorkerThreads; - int m_numActiveJobQueues; - int m_maxNumThreads; - int m_numJobs; - static const int kFirstWorkerThreadId = 1; + btThreadSupportInterface* m_threadSupport; + WorkerThreadDirectives* m_workerDirective; + btAlignedObjectArray<JobQueue> m_jobQueues; + btAlignedObjectArray<JobQueue*> m_perThreadJobQueues; + btAlignedObjectArray<ThreadLocalStorage> m_threadLocalStorage; + btSpinMutex m_antiNestingLock; // prevent nested parallel-for + btClock m_clock; + int m_numThreads; + int m_numWorkerThreads; + int m_numActiveJobQueues; + int m_maxNumThreads; + int m_numJobs; + static const int kFirstWorkerThreadId = 1; + public: + btTaskSchedulerDefault() : btITaskScheduler("ThreadSupport") + { + m_threadSupport = NULL; + m_workerDirective = NULL; + } - btTaskSchedulerDefault() : btITaskScheduler("ThreadSupport") - { - m_threadSupport = NULL; - m_workerDirective = NULL; - } - - virtual ~btTaskSchedulerDefault() - { - waitForWorkersToSleep(); - - for ( int i = 0; i < m_jobQueues.size(); ++i ) - { - m_jobQueues[i].exit(); - } - - if (m_threadSupport) - { - delete m_threadSupport; - m_threadSupport = NULL; - } - if (m_workerDirective) - { - btAlignedFree(m_workerDirective); - m_workerDirective = NULL; - } - } - - void init() - { - btThreadSupportInterface::ConstructionInfo constructionInfo( "TaskScheduler", WorkerThreadFunc ); - m_threadSupport = btThreadSupportInterface::create( constructionInfo ); - m_workerDirective = static_cast<WorkerThreadDirectives*>(btAlignedAlloc(sizeof(*m_workerDirective), 64)); - - m_numWorkerThreads = m_threadSupport->getNumWorkerThreads(); - m_maxNumThreads = m_threadSupport->getNumWorkerThreads() + 1; - m_numThreads = m_maxNumThreads; - // ideal to have one job queue for each physical processor (except for the main thread which needs no queue) - int numThreadsPerQueue = m_threadSupport->getLogicalToPhysicalCoreRatio(); - int numJobQueues = (numThreadsPerQueue == 1) ? (m_maxNumThreads-1) : (m_maxNumThreads / numThreadsPerQueue); - m_jobQueues.resize(numJobQueues); - m_numActiveJobQueues = numJobQueues; - for ( int i = 0; i < m_jobQueues.size(); ++i ) - { - m_jobQueues[i].init( m_threadSupport, &m_jobQueues ); - } - m_perThreadJobQueues.resize(m_numThreads); - for ( int i = 0; i < m_numThreads; i++ ) - { - JobQueue* jq = NULL; - // only worker threads get a job queue - if (i > 0) - { - if (numThreadsPerQueue == 1) - { - // one queue per worker thread - jq = &m_jobQueues[ i - kFirstWorkerThreadId ]; - } - else - { - // 2 threads share each queue - jq = &m_jobQueues[ i / numThreadsPerQueue ]; - } - } - m_perThreadJobQueues[i] = jq; - } - m_threadLocalStorage.resize(m_numThreads); - for ( int i = 0; i < m_numThreads; i++ ) - { - ThreadLocalStorage& storage = m_threadLocalStorage[i]; - storage.m_threadId = i; - storage.m_directive = m_workerDirective; - storage.m_status = WorkerThreadStatus::kSleeping; - storage.m_cooldownTime = 100; // 100 microseconds, threads go to sleep after this long if they have nothing to do - storage.m_clock = &m_clock; - storage.m_queue = m_perThreadJobQueues[i]; - } - setWorkerDirectives( WorkerThreadDirectives::kGoToSleep ); // no work for them yet - setNumThreads( m_threadSupport->getCacheFriendlyNumThreads() ); - } - - void setWorkerDirectives(WorkerThreadDirectives::Type dir) - { - m_workerDirective->setDirectiveByRange(kFirstWorkerThreadId, m_numThreads, dir); - } - - virtual int getMaxNumThreads() const BT_OVERRIDE - { - return m_maxNumThreads; - } - - virtual int getNumThreads() const BT_OVERRIDE - { - return m_numThreads; - } - - virtual void setNumThreads( int numThreads ) BT_OVERRIDE - { - m_numThreads = btMax( btMin(numThreads, int(m_maxNumThreads)), 1 ); - m_numWorkerThreads = m_numThreads - 1; - m_numActiveJobQueues = 0; - // if there is at least 1 worker, - if ( m_numWorkerThreads > 0 ) - { - // re-setup job stealing between queues to avoid attempting to steal from an inactive job queue - JobQueue* lastActiveContext = m_perThreadJobQueues[ m_numThreads - 1 ]; - int iLastActiveContext = lastActiveContext - &m_jobQueues[0]; - m_numActiveJobQueues = iLastActiveContext + 1; - for ( int i = 0; i < m_jobQueues.size(); ++i ) - { - m_jobQueues[ i ].setupJobStealing( &m_jobQueues, m_numActiveJobQueues ); - } - } - m_workerDirective->setDirectiveByRange(m_numThreads, BT_MAX_THREAD_COUNT, WorkerThreadDirectives::kGoToSleep); - } - - void waitJobs() - { - BT_PROFILE( "waitJobs" ); - // have the main thread work until the job queues are empty - int numMainThreadJobsFinished = 0; - for ( int i = 0; i < m_numActiveJobQueues; ++i ) - { - while ( IJob* job = m_jobQueues[i].consumeJob() ) - { - job->executeJob( 0 ); - numMainThreadJobsFinished++; - } - } - - // done with jobs for now, tell workers to rest (but not sleep) - setWorkerDirectives( WorkerThreadDirectives::kStayAwakeButIdle ); - - btU64 clockStart = m_clock.getTimeMicroseconds(); - // wait for workers to finish any jobs in progress - while ( true ) - { - int numWorkerJobsFinished = 0; - for ( int iThread = kFirstWorkerThreadId; iThread < m_numThreads; ++iThread ) - { - ThreadLocalStorage* storage = &m_threadLocalStorage[iThread]; - storage->m_mutex.lock(); - numWorkerJobsFinished += storage->m_numJobsFinished; - storage->m_mutex.unlock(); - } - if (numWorkerJobsFinished + numMainThreadJobsFinished == m_numJobs) - { - break; - } - btU64 timeElapsed = m_clock.getTimeMicroseconds() - clockStart; - btAssert(timeElapsed < 1000); - if (timeElapsed > 100000) - { - break; - } - btSpinPause(); - } - } - - void wakeWorkers(int numWorkersToWake) - { - BT_PROFILE( "wakeWorkers" ); - btAssert( m_workerDirective->getDirective(1) == WorkerThreadDirectives::kScanForJobs ); - int numDesiredWorkers = btMin(numWorkersToWake, m_numWorkerThreads); - int numActiveWorkers = 0; - for ( int iWorker = 0; iWorker < m_numWorkerThreads; ++iWorker ) - { - // note this count of active workers is not necessarily totally reliable, because a worker thread could be - // just about to put itself to sleep. So we may on occasion fail to wake up all the workers. It should be rare. - ThreadLocalStorage& storage = m_threadLocalStorage[ kFirstWorkerThreadId + iWorker ]; - if (storage.m_status != WorkerThreadStatus::kSleeping) - { - numActiveWorkers++; - } - } - for ( int iWorker = 0; iWorker < m_numWorkerThreads && numActiveWorkers < numDesiredWorkers; ++iWorker ) - { - ThreadLocalStorage& storage = m_threadLocalStorage[ kFirstWorkerThreadId + iWorker ]; - if (storage.m_status == WorkerThreadStatus::kSleeping) - { - m_threadSupport->runTask( iWorker, &storage ); - numActiveWorkers++; - } - } - } - - void waitForWorkersToSleep() - { - BT_PROFILE( "waitForWorkersToSleep" ); - setWorkerDirectives( WorkerThreadDirectives::kGoToSleep ); - m_threadSupport->waitForAllTasks(); - for ( int i = kFirstWorkerThreadId; i < m_numThreads; i++ ) - { - ThreadLocalStorage& storage = m_threadLocalStorage[i]; - btAssert( storage.m_status == WorkerThreadStatus::kSleeping ); - } - } - - virtual void sleepWorkerThreadsHint() BT_OVERRIDE - { - BT_PROFILE( "sleepWorkerThreadsHint" ); - // hint the task scheduler that we may not be using these threads for a little while - setWorkerDirectives( WorkerThreadDirectives::kGoToSleep ); - } - - void prepareWorkerThreads() - { - for ( int i = kFirstWorkerThreadId; i < m_numThreads; ++i ) - { - ThreadLocalStorage& storage = m_threadLocalStorage[i]; - storage.m_mutex.lock(); - storage.m_numJobsFinished = 0; - storage.m_mutex.unlock(); - } - setWorkerDirectives( WorkerThreadDirectives::kScanForJobs ); - } - - virtual void parallelFor( int iBegin, int iEnd, int grainSize, const btIParallelForBody& body ) BT_OVERRIDE - { - BT_PROFILE( "parallelFor_ThreadSupport" ); - btAssert( iEnd >= iBegin ); - btAssert( grainSize >= 1 ); - int iterationCount = iEnd - iBegin; - if ( iterationCount > grainSize && m_numWorkerThreads > 0 && m_antiNestingLock.tryLock() ) - { - typedef ParallelForJob JobType; - int jobCount = ( iterationCount + grainSize - 1 ) / grainSize; - m_numJobs = jobCount; - btAssert( jobCount >= 2 ); // need more than one job for multithreading - int jobSize = sizeof( JobType ); - - for (int i = 0; i < m_numActiveJobQueues; ++i) - { - m_jobQueues[i].clearQueue( jobCount, jobSize ); - } - // prepare worker threads for incoming work - prepareWorkerThreads(); - // submit all of the jobs - int iJob = 0; - int iThread = kFirstWorkerThreadId; // first worker thread - for ( int i = iBegin; i < iEnd; i += grainSize ) - { - btAssert( iJob < jobCount ); - int iE = btMin( i + grainSize, iEnd ); - JobQueue* jq = m_perThreadJobQueues[ iThread ]; - btAssert(jq); - btAssert((jq - &m_jobQueues[0]) < m_numActiveJobQueues); - void* jobMem = jq->allocJobMem(jobSize); - JobType* job = new ( jobMem ) ParallelForJob( i, iE, body ); // placement new - jq->submitJob( job ); - iJob++; - iThread++; - if ( iThread >= m_numThreads ) - { - iThread = kFirstWorkerThreadId; // first worker thread - } - } - wakeWorkers( jobCount - 1 ); - - // put the main thread to work on emptying the job queue and then wait for all workers to finish - waitJobs(); - m_antiNestingLock.unlock(); - } - else - { - BT_PROFILE( "parallelFor_mainThread" ); - // just run on main thread - body.forLoop( iBegin, iEnd ); - } - } - virtual btScalar parallelSum( int iBegin, int iEnd, int grainSize, const btIParallelSumBody& body ) BT_OVERRIDE - { - BT_PROFILE( "parallelSum_ThreadSupport" ); - btAssert( iEnd >= iBegin ); - btAssert( grainSize >= 1 ); - int iterationCount = iEnd - iBegin; - if ( iterationCount > grainSize && m_numWorkerThreads > 0 && m_antiNestingLock.tryLock() ) - { - typedef ParallelSumJob JobType; - int jobCount = ( iterationCount + grainSize - 1 ) / grainSize; - m_numJobs = jobCount; - btAssert( jobCount >= 2 ); // need more than one job for multithreading - int jobSize = sizeof( JobType ); - for (int i = 0; i < m_numActiveJobQueues; ++i) - { - m_jobQueues[i].clearQueue( jobCount, jobSize ); - } - - // initialize summation - for ( int iThread = 0; iThread < m_numThreads; ++iThread ) - { - m_threadLocalStorage[iThread].m_sumResult = btScalar(0); - } - - // prepare worker threads for incoming work - prepareWorkerThreads(); - // submit all of the jobs - int iJob = 0; - int iThread = kFirstWorkerThreadId; // first worker thread - for ( int i = iBegin; i < iEnd; i += grainSize ) - { - btAssert( iJob < jobCount ); - int iE = btMin( i + grainSize, iEnd ); - JobQueue* jq = m_perThreadJobQueues[ iThread ]; - btAssert(jq); - btAssert((jq - &m_jobQueues[0]) < m_numActiveJobQueues); - void* jobMem = jq->allocJobMem(jobSize); - JobType* job = new ( jobMem ) ParallelSumJob( i, iE, body, &m_threadLocalStorage[0] ); // placement new - jq->submitJob( job ); - iJob++; - iThread++; - if ( iThread >= m_numThreads ) - { - iThread = kFirstWorkerThreadId; // first worker thread - } - } - wakeWorkers( jobCount - 1 ); - - // put the main thread to work on emptying the job queue and then wait for all workers to finish - waitJobs(); - - // add up all the thread sums - btScalar sum = btScalar(0); - for ( int iThread = 0; iThread < m_numThreads; ++iThread ) - { - sum += m_threadLocalStorage[ iThread ].m_sumResult; - } - m_antiNestingLock.unlock(); - return sum; - } - else - { - BT_PROFILE( "parallelSum_mainThread" ); - // just run on main thread - return body.sumLoop( iBegin, iEnd ); - } - } -}; + virtual ~btTaskSchedulerDefault() + { + waitForWorkersToSleep(); + + for (int i = 0; i < m_jobQueues.size(); ++i) + { + m_jobQueues[i].exit(); + } + + if (m_threadSupport) + { + delete m_threadSupport; + m_threadSupport = NULL; + } + if (m_workerDirective) + { + btAlignedFree(m_workerDirective); + m_workerDirective = NULL; + } + } + void init() + { + btThreadSupportInterface::ConstructionInfo constructionInfo("TaskScheduler", WorkerThreadFunc); + m_threadSupport = btThreadSupportInterface::create(constructionInfo); + m_workerDirective = static_cast<WorkerThreadDirectives*>(btAlignedAlloc(sizeof(*m_workerDirective), 64)); + + m_numWorkerThreads = m_threadSupport->getNumWorkerThreads(); + m_maxNumThreads = m_threadSupport->getNumWorkerThreads() + 1; + m_numThreads = m_maxNumThreads; + // ideal to have one job queue for each physical processor (except for the main thread which needs no queue) + int numThreadsPerQueue = m_threadSupport->getLogicalToPhysicalCoreRatio(); + int numJobQueues = (numThreadsPerQueue == 1) ? (m_maxNumThreads - 1) : (m_maxNumThreads / numThreadsPerQueue); + m_jobQueues.resize(numJobQueues); + m_numActiveJobQueues = numJobQueues; + for (int i = 0; i < m_jobQueues.size(); ++i) + { + m_jobQueues[i].init(m_threadSupport, &m_jobQueues); + } + m_perThreadJobQueues.resize(m_numThreads); + for (int i = 0; i < m_numThreads; i++) + { + JobQueue* jq = NULL; + // only worker threads get a job queue + if (i > 0) + { + if (numThreadsPerQueue == 1) + { + // one queue per worker thread + jq = &m_jobQueues[i - kFirstWorkerThreadId]; + } + else + { + // 2 threads share each queue + jq = &m_jobQueues[i / numThreadsPerQueue]; + } + } + m_perThreadJobQueues[i] = jq; + } + m_threadLocalStorage.resize(m_numThreads); + for (int i = 0; i < m_numThreads; i++) + { + ThreadLocalStorage& storage = m_threadLocalStorage[i]; + storage.m_threadId = i; + storage.m_directive = m_workerDirective; + storage.m_status = WorkerThreadStatus::kSleeping; + storage.m_cooldownTime = 100; // 100 microseconds, threads go to sleep after this long if they have nothing to do + storage.m_clock = &m_clock; + storage.m_queue = m_perThreadJobQueues[i]; + } + setWorkerDirectives(WorkerThreadDirectives::kGoToSleep); // no work for them yet + setNumThreads(m_threadSupport->getCacheFriendlyNumThreads()); + } + + void setWorkerDirectives(WorkerThreadDirectives::Type dir) + { + m_workerDirective->setDirectiveByRange(kFirstWorkerThreadId, m_numThreads, dir); + } + + virtual int getMaxNumThreads() const BT_OVERRIDE + { + return m_maxNumThreads; + } + virtual int getNumThreads() const BT_OVERRIDE + { + return m_numThreads; + } + + virtual void setNumThreads(int numThreads) BT_OVERRIDE + { + m_numThreads = btMax(btMin(numThreads, int(m_maxNumThreads)), 1); + m_numWorkerThreads = m_numThreads - 1; + m_numActiveJobQueues = 0; + // if there is at least 1 worker, + if (m_numWorkerThreads > 0) + { + // re-setup job stealing between queues to avoid attempting to steal from an inactive job queue + JobQueue* lastActiveContext = m_perThreadJobQueues[m_numThreads - 1]; + int iLastActiveContext = lastActiveContext - &m_jobQueues[0]; + m_numActiveJobQueues = iLastActiveContext + 1; + for (int i = 0; i < m_jobQueues.size(); ++i) + { + m_jobQueues[i].setupJobStealing(&m_jobQueues, m_numActiveJobQueues); + } + } + m_workerDirective->setDirectiveByRange(m_numThreads, BT_MAX_THREAD_COUNT, WorkerThreadDirectives::kGoToSleep); + } + + void waitJobs() + { + BT_PROFILE("waitJobs"); + // have the main thread work until the job queues are empty + int numMainThreadJobsFinished = 0; + for (int i = 0; i < m_numActiveJobQueues; ++i) + { + while (IJob* job = m_jobQueues[i].consumeJob()) + { + job->executeJob(0); + numMainThreadJobsFinished++; + } + } + + // done with jobs for now, tell workers to rest (but not sleep) + setWorkerDirectives(WorkerThreadDirectives::kStayAwakeButIdle); + + btU64 clockStart = m_clock.getTimeMicroseconds(); + // wait for workers to finish any jobs in progress + while (true) + { + int numWorkerJobsFinished = 0; + for (int iThread = kFirstWorkerThreadId; iThread < m_numThreads; ++iThread) + { + ThreadLocalStorage* storage = &m_threadLocalStorage[iThread]; + storage->m_mutex.lock(); + numWorkerJobsFinished += storage->m_numJobsFinished; + storage->m_mutex.unlock(); + } + if (numWorkerJobsFinished + numMainThreadJobsFinished == m_numJobs) + { + break; + } + btU64 timeElapsed = m_clock.getTimeMicroseconds() - clockStart; + btAssert(timeElapsed < 1000); + if (timeElapsed > 100000) + { + break; + } + btSpinPause(); + } + } + + void wakeWorkers(int numWorkersToWake) + { + BT_PROFILE("wakeWorkers"); + btAssert(m_workerDirective->getDirective(1) == WorkerThreadDirectives::kScanForJobs); + int numDesiredWorkers = btMin(numWorkersToWake, m_numWorkerThreads); + int numActiveWorkers = 0; + for (int iWorker = 0; iWorker < m_numWorkerThreads; ++iWorker) + { + // note this count of active workers is not necessarily totally reliable, because a worker thread could be + // just about to put itself to sleep. So we may on occasion fail to wake up all the workers. It should be rare. + ThreadLocalStorage& storage = m_threadLocalStorage[kFirstWorkerThreadId + iWorker]; + if (storage.m_status != WorkerThreadStatus::kSleeping) + { + numActiveWorkers++; + } + } + for (int iWorker = 0; iWorker < m_numWorkerThreads && numActiveWorkers < numDesiredWorkers; ++iWorker) + { + ThreadLocalStorage& storage = m_threadLocalStorage[kFirstWorkerThreadId + iWorker]; + if (storage.m_status == WorkerThreadStatus::kSleeping) + { + m_threadSupport->runTask(iWorker, &storage); + numActiveWorkers++; + } + } + } + + void waitForWorkersToSleep() + { + BT_PROFILE("waitForWorkersToSleep"); + setWorkerDirectives(WorkerThreadDirectives::kGoToSleep); + m_threadSupport->waitForAllTasks(); + for (int i = kFirstWorkerThreadId; i < m_numThreads; i++) + { + ThreadLocalStorage& storage = m_threadLocalStorage[i]; + btAssert(storage.m_status == WorkerThreadStatus::kSleeping); + } + } + + virtual void sleepWorkerThreadsHint() BT_OVERRIDE + { + BT_PROFILE("sleepWorkerThreadsHint"); + // hint the task scheduler that we may not be using these threads for a little while + setWorkerDirectives(WorkerThreadDirectives::kGoToSleep); + } + + void prepareWorkerThreads() + { + for (int i = kFirstWorkerThreadId; i < m_numThreads; ++i) + { + ThreadLocalStorage& storage = m_threadLocalStorage[i]; + storage.m_mutex.lock(); + storage.m_numJobsFinished = 0; + storage.m_mutex.unlock(); + } + setWorkerDirectives(WorkerThreadDirectives::kScanForJobs); + } + + virtual void parallelFor(int iBegin, int iEnd, int grainSize, const btIParallelForBody& body) BT_OVERRIDE + { + BT_PROFILE("parallelFor_ThreadSupport"); + btAssert(iEnd >= iBegin); + btAssert(grainSize >= 1); + int iterationCount = iEnd - iBegin; + if (iterationCount > grainSize && m_numWorkerThreads > 0 && m_antiNestingLock.tryLock()) + { + typedef ParallelForJob JobType; + int jobCount = (iterationCount + grainSize - 1) / grainSize; + m_numJobs = jobCount; + btAssert(jobCount >= 2); // need more than one job for multithreading + int jobSize = sizeof(JobType); + + for (int i = 0; i < m_numActiveJobQueues; ++i) + { + m_jobQueues[i].clearQueue(jobCount, jobSize); + } + // prepare worker threads for incoming work + prepareWorkerThreads(); + // submit all of the jobs + int iJob = 0; + int iThread = kFirstWorkerThreadId; // first worker thread + for (int i = iBegin; i < iEnd; i += grainSize) + { + btAssert(iJob < jobCount); + int iE = btMin(i + grainSize, iEnd); + JobQueue* jq = m_perThreadJobQueues[iThread]; + btAssert(jq); + btAssert((jq - &m_jobQueues[0]) < m_numActiveJobQueues); + void* jobMem = jq->allocJobMem(jobSize); + JobType* job = new (jobMem) ParallelForJob(i, iE, body); // placement new + jq->submitJob(job); + iJob++; + iThread++; + if (iThread >= m_numThreads) + { + iThread = kFirstWorkerThreadId; // first worker thread + } + } + wakeWorkers(jobCount - 1); + + // put the main thread to work on emptying the job queue and then wait for all workers to finish + waitJobs(); + m_antiNestingLock.unlock(); + } + else + { + BT_PROFILE("parallelFor_mainThread"); + // just run on main thread + body.forLoop(iBegin, iEnd); + } + } + virtual btScalar parallelSum(int iBegin, int iEnd, int grainSize, const btIParallelSumBody& body) BT_OVERRIDE + { + BT_PROFILE("parallelSum_ThreadSupport"); + btAssert(iEnd >= iBegin); + btAssert(grainSize >= 1); + int iterationCount = iEnd - iBegin; + if (iterationCount > grainSize && m_numWorkerThreads > 0 && m_antiNestingLock.tryLock()) + { + typedef ParallelSumJob JobType; + int jobCount = (iterationCount + grainSize - 1) / grainSize; + m_numJobs = jobCount; + btAssert(jobCount >= 2); // need more than one job for multithreading + int jobSize = sizeof(JobType); + for (int i = 0; i < m_numActiveJobQueues; ++i) + { + m_jobQueues[i].clearQueue(jobCount, jobSize); + } + + // initialize summation + for (int iThread = 0; iThread < m_numThreads; ++iThread) + { + m_threadLocalStorage[iThread].m_sumResult = btScalar(0); + } + + // prepare worker threads for incoming work + prepareWorkerThreads(); + // submit all of the jobs + int iJob = 0; + int iThread = kFirstWorkerThreadId; // first worker thread + for (int i = iBegin; i < iEnd; i += grainSize) + { + btAssert(iJob < jobCount); + int iE = btMin(i + grainSize, iEnd); + JobQueue* jq = m_perThreadJobQueues[iThread]; + btAssert(jq); + btAssert((jq - &m_jobQueues[0]) < m_numActiveJobQueues); + void* jobMem = jq->allocJobMem(jobSize); + JobType* job = new (jobMem) ParallelSumJob(i, iE, body, &m_threadLocalStorage[0]); // placement new + jq->submitJob(job); + iJob++; + iThread++; + if (iThread >= m_numThreads) + { + iThread = kFirstWorkerThreadId; // first worker thread + } + } + wakeWorkers(jobCount - 1); + + // put the main thread to work on emptying the job queue and then wait for all workers to finish + waitJobs(); + + // add up all the thread sums + btScalar sum = btScalar(0); + for (int iThread = 0; iThread < m_numThreads; ++iThread) + { + sum += m_threadLocalStorage[iThread].m_sumResult; + } + m_antiNestingLock.unlock(); + return sum; + } + else + { + BT_PROFILE("parallelSum_mainThread"); + // just run on main thread + return body.sumLoop(iBegin, iEnd); + } + } +}; btITaskScheduler* btCreateDefaultTaskScheduler() { - btTaskSchedulerDefault* ts = new btTaskSchedulerDefault(); - ts->init(); - return ts; + btTaskSchedulerDefault* ts = new btTaskSchedulerDefault(); + ts->init(); + return ts; } -#else // #if BT_THREADSAFE +#else // #if BT_THREADSAFE btITaskScheduler* btCreateDefaultTaskScheduler() { - return NULL; + return NULL; } -#endif // #else // #if BT_THREADSAFE +#endif // #else // #if BT_THREADSAFE diff --git a/thirdparty/bullet/LinearMath/TaskScheduler/btThreadSupportInterface.h b/thirdparty/bullet/LinearMath/TaskScheduler/btThreadSupportInterface.h index a0ad802b1e..1fe49335a1 100644 --- a/thirdparty/bullet/LinearMath/TaskScheduler/btThreadSupportInterface.h +++ b/thirdparty/bullet/LinearMath/TaskScheduler/btThreadSupportInterface.h @@ -16,55 +16,49 @@ subject to the following restrictions: #ifndef BT_THREAD_SUPPORT_INTERFACE_H #define BT_THREAD_SUPPORT_INTERFACE_H - - class btCriticalSection { public: - btCriticalSection() {} - virtual ~btCriticalSection() {} + btCriticalSection() {} + virtual ~btCriticalSection() {} - virtual void lock() = 0; - virtual void unlock() = 0; + virtual void lock() = 0; + virtual void unlock() = 0; }; - class btThreadSupportInterface { public: - - virtual ~btThreadSupportInterface() {} - - virtual int getNumWorkerThreads() const = 0; // number of worker threads (total number of logical processors - 1) - virtual int getCacheFriendlyNumThreads() const = 0; // the number of logical processors sharing a single L3 cache - virtual int getLogicalToPhysicalCoreRatio() const = 0; // the number of logical processors per physical processor (usually 1 or 2) - virtual void runTask( int threadIndex, void* userData ) = 0; - virtual void waitForAllTasks() = 0; - - virtual btCriticalSection* createCriticalSection() = 0; - virtual void deleteCriticalSection( btCriticalSection* criticalSection ) = 0; - - typedef void( *ThreadFunc )( void* userPtr ); - - struct ConstructionInfo - { - ConstructionInfo( const char* uniqueName, - ThreadFunc userThreadFunc, - int threadStackSize = 65535 - ) - :m_uniqueName( uniqueName ), - m_userThreadFunc( userThreadFunc ), - m_threadStackSize( threadStackSize ) - { - } - - const char* m_uniqueName; - ThreadFunc m_userThreadFunc; - int m_threadStackSize; - }; - - static btThreadSupportInterface* create( const ConstructionInfo& info ); + virtual ~btThreadSupportInterface() {} + + virtual int getNumWorkerThreads() const = 0; // number of worker threads (total number of logical processors - 1) + virtual int getCacheFriendlyNumThreads() const = 0; // the number of logical processors sharing a single L3 cache + virtual int getLogicalToPhysicalCoreRatio() const = 0; // the number of logical processors per physical processor (usually 1 or 2) + virtual void runTask(int threadIndex, void* userData) = 0; + virtual void waitForAllTasks() = 0; + + virtual btCriticalSection* createCriticalSection() = 0; + virtual void deleteCriticalSection(btCriticalSection* criticalSection) = 0; + + typedef void (*ThreadFunc)(void* userPtr); + + struct ConstructionInfo + { + ConstructionInfo(const char* uniqueName, + ThreadFunc userThreadFunc, + int threadStackSize = 65535) + : m_uniqueName(uniqueName), + m_userThreadFunc(userThreadFunc), + m_threadStackSize(threadStackSize) + { + } + + const char* m_uniqueName; + ThreadFunc m_userThreadFunc; + int m_threadStackSize; + }; + + static btThreadSupportInterface* create(const ConstructionInfo& info); }; -#endif //BT_THREAD_SUPPORT_INTERFACE_H - +#endif //BT_THREAD_SUPPORT_INTERFACE_H diff --git a/thirdparty/bullet/LinearMath/TaskScheduler/btThreadSupportPosix.cpp b/thirdparty/bullet/LinearMath/TaskScheduler/btThreadSupportPosix.cpp index 50ca060dfe..02f4ed1631 100644 --- a/thirdparty/bullet/LinearMath/TaskScheduler/btThreadSupportPosix.cpp +++ b/thirdparty/bullet/LinearMath/TaskScheduler/btThreadSupportPosix.cpp @@ -1,3 +1,4 @@ + /* Bullet Continuous Collision Detection and Physics Library Copyright (c) 2003-2018 Erwin Coumans http://bulletphysics.com @@ -13,9 +14,7 @@ subject to the following restrictions: 3. This notice may not be removed or altered from any source distribution. */ - -#if BT_THREADSAFE && !defined( _WIN32 ) - +#if BT_THREADSAFE && !defined(_WIN32) #include "LinearMath/btScalar.h" #include "LinearMath/btAlignedObjectArray.h" @@ -27,14 +26,12 @@ subject to the following restrictions: #include <errno.h> #include <unistd.h> - #ifndef _XOPEN_SOURCE -#define _XOPEN_SOURCE 600 //for definition of pthread_barrier_t, see http://pages.cs.wisc.edu/~travitch/pthreads_primer.html -#endif //_XOPEN_SOURCE +#define _XOPEN_SOURCE 600 //for definition of pthread_barrier_t, see http://pages.cs.wisc.edu/~travitch/pthreads_primer.html +#endif //_XOPEN_SOURCE #include <pthread.h> #include <semaphore.h> -#include <unistd.h> //for sysconf - +#include <unistd.h> //for sysconf /// /// getNumHardwareThreads() @@ -48,318 +45,309 @@ subject to the following restrictions: int btGetNumHardwareThreads() { - return btMin<int>(BT_MAX_THREAD_COUNT, std::thread::hardware_concurrency()); + return btMin<int>(BT_MAX_THREAD_COUNT, std::thread::hardware_concurrency()); } #else int btGetNumHardwareThreads() { - return btMin<int>(BT_MAX_THREAD_COUNT, sysconf( _SC_NPROCESSORS_ONLN )); + return btMin<int>(BT_MAX_THREAD_COUNT, sysconf(_SC_NPROCESSORS_ONLN)); } #endif - // btThreadSupportPosix helps to initialize/shutdown libspe2, start/stop SPU tasks and communication class btThreadSupportPosix : public btThreadSupportInterface { public: - struct btThreadStatus - { - int m_taskId; - int m_commandId; - int m_status; - - ThreadFunc m_userThreadFunc; - void* m_userPtr; //for taskDesc etc - - pthread_t thread; - //each tread will wait until this signal to start its work - sem_t* startSemaphore; - - // this is a copy of m_mainSemaphore, - //each tread will signal once it is finished with its work - sem_t* m_mainSemaphore; - unsigned long threadUsed; - }; -private: - typedef unsigned long long UINT64; - - btAlignedObjectArray<btThreadStatus> m_activeThreadStatus; - // m_mainSemaphoresemaphore will signal, if and how many threads are finished with their work - sem_t* m_mainSemaphore; - int m_numThreads; - UINT64 m_startedThreadsMask; - void startThreads( const ConstructionInfo& threadInfo ); - void stopThreads(); - int waitForResponse(); + struct btThreadStatus + { + int m_taskId; + int m_commandId; + int m_status; + + ThreadFunc m_userThreadFunc; + void* m_userPtr; //for taskDesc etc + + pthread_t thread; + //each tread will wait until this signal to start its work + sem_t* startSemaphore; + btCriticalSection* m_cs; + // this is a copy of m_mainSemaphore, + //each tread will signal once it is finished with its work + sem_t* m_mainSemaphore; + unsigned long threadUsed; + }; +private: + typedef unsigned long long UINT64; + + btAlignedObjectArray<btThreadStatus> m_activeThreadStatus; + // m_mainSemaphoresemaphore will signal, if and how many threads are finished with their work + sem_t* m_mainSemaphore; + int m_numThreads; + UINT64 m_startedThreadsMask; + void startThreads(const ConstructionInfo& threadInfo); + void stopThreads(); + int waitForResponse(); + btCriticalSection* m_cs; public: - btThreadSupportPosix( const ConstructionInfo& threadConstructionInfo ); - virtual ~btThreadSupportPosix(); + btThreadSupportPosix(const ConstructionInfo& threadConstructionInfo); + virtual ~btThreadSupportPosix(); - virtual int getNumWorkerThreads() const BT_OVERRIDE { return m_numThreads; } - // TODO: return the number of logical processors sharing the first L3 cache - virtual int getCacheFriendlyNumThreads() const BT_OVERRIDE { return m_numThreads + 1; } - // TODO: detect if CPU has hyperthreading enabled - virtual int getLogicalToPhysicalCoreRatio() const BT_OVERRIDE { return 1; } + virtual int getNumWorkerThreads() const BT_OVERRIDE { return m_numThreads; } + // TODO: return the number of logical processors sharing the first L3 cache + virtual int getCacheFriendlyNumThreads() const BT_OVERRIDE { return m_numThreads + 1; } + // TODO: detect if CPU has hyperthreading enabled + virtual int getLogicalToPhysicalCoreRatio() const BT_OVERRIDE { return 1; } - virtual void runTask( int threadIndex, void* userData ) BT_OVERRIDE; - virtual void waitForAllTasks() BT_OVERRIDE; + virtual void runTask(int threadIndex, void* userData) BT_OVERRIDE; + virtual void waitForAllTasks() BT_OVERRIDE; - virtual btCriticalSection* createCriticalSection() BT_OVERRIDE; - virtual void deleteCriticalSection( btCriticalSection* criticalSection ) BT_OVERRIDE; + virtual btCriticalSection* createCriticalSection() BT_OVERRIDE; + virtual void deleteCriticalSection(btCriticalSection* criticalSection) BT_OVERRIDE; }; - -#define checkPThreadFunction(returnValue) \ - if(0 != returnValue) { \ - printf("PThread problem at line %i in file %s: %i %d\n", __LINE__, __FILE__, returnValue, errno); \ - } +#define checkPThreadFunction(returnValue) \ + if (0 != returnValue) \ + { \ + printf("PThread problem at line %i in file %s: %i %d\n", __LINE__, __FILE__, returnValue, errno); \ + } // The number of threads should be equal to the number of available cores // Todo: each worker should be linked to a single core, using SetThreadIdealProcessor. - -btThreadSupportPosix::btThreadSupportPosix( const ConstructionInfo& threadConstructionInfo ) +btThreadSupportPosix::btThreadSupportPosix(const ConstructionInfo& threadConstructionInfo) { - startThreads( threadConstructionInfo ); + m_cs = createCriticalSection(); + startThreads(threadConstructionInfo); } // cleanup/shutdown Libspe2 btThreadSupportPosix::~btThreadSupportPosix() { - stopThreads(); + stopThreads(); + deleteCriticalSection(m_cs); + m_cs=0; } -#if (defined (__APPLE__)) +#if (defined(__APPLE__)) #define NAMED_SEMAPHORES #endif - -static sem_t* createSem( const char* baseName ) +static sem_t* createSem(const char* baseName) { - static int semCount = 0; + static int semCount = 0; #ifdef NAMED_SEMAPHORES - /// Named semaphore begin - char name[ 32 ]; - snprintf( name, 32, "/%8.s-%4.d-%4.4d", baseName, getpid(), semCount++ ); - sem_t* tempSem = sem_open( name, O_CREAT, 0600, 0 ); - - if ( tempSem != reinterpret_cast<sem_t *>( SEM_FAILED ) ) - { - // printf("Created \"%s\" Semaphore %p\n", name, tempSem); - } - else - { - //printf("Error creating Semaphore %d\n", errno); - exit( -1 ); - } - /// Named semaphore end + /// Named semaphore begin + char name[32]; + snprintf(name, 32, "/%8.s-%4.d-%4.4d", baseName, getpid(), semCount++); + sem_t* tempSem = sem_open(name, O_CREAT, 0600, 0); + + if (tempSem != reinterpret_cast<sem_t*>(SEM_FAILED)) + { + // printf("Created \"%s\" Semaphore %p\n", name, tempSem); + } + else + { + //printf("Error creating Semaphore %d\n", errno); + exit(-1); + } + /// Named semaphore end #else - sem_t* tempSem = new sem_t; - checkPThreadFunction( sem_init( tempSem, 0, 0 ) ); + sem_t* tempSem = new sem_t; + checkPThreadFunction(sem_init(tempSem, 0, 0)); #endif - return tempSem; + return tempSem; } -static void destroySem( sem_t* semaphore ) +static void destroySem(sem_t* semaphore) { #ifdef NAMED_SEMAPHORES - checkPThreadFunction( sem_close( semaphore ) ); + checkPThreadFunction(sem_close(semaphore)); #else - checkPThreadFunction( sem_destroy( semaphore ) ); - delete semaphore; + checkPThreadFunction(sem_destroy(semaphore)); + delete semaphore; #endif } -static void *threadFunction( void *argument ) +static void* threadFunction(void* argument) { - btThreadSupportPosix::btThreadStatus* status = ( btThreadSupportPosix::btThreadStatus* )argument; - - while ( 1 ) - { - checkPThreadFunction( sem_wait( status->startSemaphore ) ); - void* userPtr = status->m_userPtr; - - if ( userPtr ) - { - btAssert( status->m_status ); - status->m_userThreadFunc( userPtr ); - status->m_status = 2; - checkPThreadFunction( sem_post( status->m_mainSemaphore ) ); - status->threadUsed++; - } - else - { - //exit Thread - status->m_status = 3; - checkPThreadFunction( sem_post( status->m_mainSemaphore ) ); - printf( "Thread with taskId %i exiting\n", status->m_taskId ); - break; - } - } - - printf( "Thread TERMINATED\n" ); - return 0; + btThreadSupportPosix::btThreadStatus* status = (btThreadSupportPosix::btThreadStatus*)argument; + + while (1) + { + checkPThreadFunction(sem_wait(status->startSemaphore)); + void* userPtr = status->m_userPtr; + + if (userPtr) + { + btAssert(status->m_status); + status->m_userThreadFunc(userPtr); + status->m_cs->lock(); + status->m_status = 2; + status->m_cs->unlock(); + checkPThreadFunction(sem_post(status->m_mainSemaphore)); + status->threadUsed++; + } + else + { + //exit Thread + status->m_cs->lock(); + status->m_status = 3; + status->m_cs->unlock(); + checkPThreadFunction(sem_post(status->m_mainSemaphore)); + break; + } + } + + return 0; } ///send messages to SPUs -void btThreadSupportPosix::runTask( int threadIndex, void* userData ) +void btThreadSupportPosix::runTask(int threadIndex, void* userData) { - ///we should spawn an SPU task here, and in 'waitForResponse' it should wait for response of the (one of) the first tasks that finished - btThreadStatus& threadStatus = m_activeThreadStatus[ threadIndex ]; - btAssert( threadIndex >= 0 ); - btAssert( threadIndex < m_activeThreadStatus.size() ); - - threadStatus.m_commandId = 1; - threadStatus.m_status = 1; - threadStatus.m_userPtr = userData; - m_startedThreadsMask |= UINT64( 1 ) << threadIndex; - - // fire event to start new task - checkPThreadFunction( sem_post( threadStatus.startSemaphore ) ); + ///we should spawn an SPU task here, and in 'waitForResponse' it should wait for response of the (one of) the first tasks that finished + btThreadStatus& threadStatus = m_activeThreadStatus[threadIndex]; + btAssert(threadIndex >= 0); + btAssert(threadIndex < m_activeThreadStatus.size()); + threadStatus.m_cs = m_cs; + threadStatus.m_commandId = 1; + threadStatus.m_status = 1; + threadStatus.m_userPtr = userData; + m_startedThreadsMask |= UINT64(1) << threadIndex; + + // fire event to start new task + checkPThreadFunction(sem_post(threadStatus.startSemaphore)); } - ///check for messages from SPUs int btThreadSupportPosix::waitForResponse() { - ///We should wait for (one of) the first tasks to finish (or other SPU messages), and report its response - ///A possible response can be 'yes, SPU handled it', or 'no, please do a PPU fallback' - - btAssert( m_activeThreadStatus.size() ); - - // wait for any of the threads to finish - checkPThreadFunction( sem_wait( m_mainSemaphore ) ); - // get at least one thread which has finished - size_t last = -1; - - for ( size_t t = 0; t < size_t( m_activeThreadStatus.size() ); ++t ) - { - if ( 2 == m_activeThreadStatus[ t ].m_status ) - { - last = t; - break; - } - } - - btThreadStatus& threadStatus = m_activeThreadStatus[ last ]; - - btAssert( threadStatus.m_status > 1 ); - threadStatus.m_status = 0; - - // need to find an active spu - btAssert( last >= 0 ); - m_startedThreadsMask &= ~( UINT64( 1 ) << last ); - - return last; + ///We should wait for (one of) the first tasks to finish (or other SPU messages), and report its response + ///A possible response can be 'yes, SPU handled it', or 'no, please do a PPU fallback' + + btAssert(m_activeThreadStatus.size()); + + // wait for any of the threads to finish + checkPThreadFunction(sem_wait(m_mainSemaphore)); + // get at least one thread which has finished + size_t last = -1; + + for (size_t t = 0; t < size_t(m_activeThreadStatus.size()); ++t) + { + m_cs->lock(); + bool hasFinished = (2 == m_activeThreadStatus[t].m_status); + m_cs->unlock(); + if (hasFinished) + { + last = t; + break; + } + } + + btThreadStatus& threadStatus = m_activeThreadStatus[last]; + + btAssert(threadStatus.m_status > 1); + threadStatus.m_status = 0; + + // need to find an active spu + btAssert(last >= 0); + m_startedThreadsMask &= ~(UINT64(1) << last); + + return last; } - void btThreadSupportPosix::waitForAllTasks() { - while ( m_startedThreadsMask ) - { - waitForResponse(); - } + while (m_startedThreadsMask) + { + waitForResponse(); + } } - -void btThreadSupportPosix::startThreads( const ConstructionInfo& threadConstructionInfo ) +void btThreadSupportPosix::startThreads(const ConstructionInfo& threadConstructionInfo) { - m_numThreads = btGetNumHardwareThreads() - 1; // main thread exists already - printf( "%s creating %i threads.\n", __FUNCTION__, m_numThreads ); - m_activeThreadStatus.resize( m_numThreads ); - m_startedThreadsMask = 0; - - m_mainSemaphore = createSem( "main" ); - //checkPThreadFunction(sem_wait(mainSemaphore)); - - for ( int i = 0; i < m_numThreads; i++ ) - { - printf( "starting thread %d\n", i ); - btThreadStatus& threadStatus = m_activeThreadStatus[ i ]; - threadStatus.startSemaphore = createSem( "threadLocal" ); - checkPThreadFunction( pthread_create( &threadStatus.thread, NULL, &threadFunction, (void*) &threadStatus ) ); - - threadStatus.m_userPtr = 0; - threadStatus.m_taskId = i; - threadStatus.m_commandId = 0; - threadStatus.m_status = 0; - threadStatus.m_mainSemaphore = m_mainSemaphore; - threadStatus.m_userThreadFunc = threadConstructionInfo.m_userThreadFunc; - threadStatus.threadUsed = 0; - - printf( "started thread %d \n", i ); - } + m_numThreads = btGetNumHardwareThreads() - 1; // main thread exists already + m_activeThreadStatus.resize(m_numThreads); + m_startedThreadsMask = 0; + + m_mainSemaphore = createSem("main"); + //checkPThreadFunction(sem_wait(mainSemaphore)); + + for (int i = 0; i < m_numThreads; i++) + { + btThreadStatus& threadStatus = m_activeThreadStatus[i]; + threadStatus.startSemaphore = createSem("threadLocal"); + threadStatus.m_userPtr = 0; + threadStatus.m_cs = m_cs; + threadStatus.m_taskId = i; + threadStatus.m_commandId = 0; + threadStatus.m_status = 0; + threadStatus.m_mainSemaphore = m_mainSemaphore; + threadStatus.m_userThreadFunc = threadConstructionInfo.m_userThreadFunc; + threadStatus.threadUsed = 0; + checkPThreadFunction(pthread_create(&threadStatus.thread, NULL, &threadFunction, (void*)&threadStatus)); + + } } ///tell the task scheduler we are done with the SPU tasks void btThreadSupportPosix::stopThreads() { - for ( size_t t = 0; t < size_t( m_activeThreadStatus.size() ); ++t ) - { - btThreadStatus& threadStatus = m_activeThreadStatus[ t ]; - printf( "%s: Thread %i used: %ld\n", __FUNCTION__, int( t ), threadStatus.threadUsed ); - - threadStatus.m_userPtr = 0; - checkPThreadFunction( sem_post( threadStatus.startSemaphore ) ); - checkPThreadFunction( sem_wait( m_mainSemaphore ) ); - - printf( "destroy semaphore\n" ); - destroySem( threadStatus.startSemaphore ); - printf( "semaphore destroyed\n" ); - checkPThreadFunction( pthread_join( threadStatus.thread, 0 ) ); - - } - printf( "destroy main semaphore\n" ); - destroySem( m_mainSemaphore ); - printf( "main semaphore destroyed\n" ); - m_activeThreadStatus.clear(); + for (size_t t = 0; t < size_t(m_activeThreadStatus.size()); ++t) + { + btThreadStatus& threadStatus = m_activeThreadStatus[t]; + + threadStatus.m_userPtr = 0; + checkPThreadFunction(sem_post(threadStatus.startSemaphore)); + checkPThreadFunction(sem_wait(m_mainSemaphore)); + + destroySem(threadStatus.startSemaphore); + checkPThreadFunction(pthread_join(threadStatus.thread, 0)); + } + destroySem(m_mainSemaphore); + m_activeThreadStatus.clear(); } class btCriticalSectionPosix : public btCriticalSection { - pthread_mutex_t m_mutex; + pthread_mutex_t m_mutex; public: - btCriticalSectionPosix() - { - pthread_mutex_init( &m_mutex, NULL ); - } - virtual ~btCriticalSectionPosix() - { - pthread_mutex_destroy( &m_mutex ); - } - - virtual void lock() - { - pthread_mutex_lock( &m_mutex ); - } - virtual void unlock() - { - pthread_mutex_unlock( &m_mutex ); - } + btCriticalSectionPosix() + { + pthread_mutex_init(&m_mutex, NULL); + } + virtual ~btCriticalSectionPosix() + { + pthread_mutex_destroy(&m_mutex); + } + + virtual void lock() + { + pthread_mutex_lock(&m_mutex); + } + virtual void unlock() + { + pthread_mutex_unlock(&m_mutex); + } }; - btCriticalSection* btThreadSupportPosix::createCriticalSection() { - return new btCriticalSectionPosix(); + return new btCriticalSectionPosix(); } -void btThreadSupportPosix::deleteCriticalSection( btCriticalSection* cs ) +void btThreadSupportPosix::deleteCriticalSection(btCriticalSection* cs) { - delete cs; + delete cs; } - -btThreadSupportInterface* btThreadSupportInterface::create( const ConstructionInfo& info ) +btThreadSupportInterface* btThreadSupportInterface::create(const ConstructionInfo& info) { - return new btThreadSupportPosix( info ); + return new btThreadSupportPosix(info); } -#endif // BT_THREADSAFE && !defined( _WIN32 ) - +#endif // BT_THREADSAFE && !defined( _WIN32 ) diff --git a/thirdparty/bullet/LinearMath/TaskScheduler/btThreadSupportWin32.cpp b/thirdparty/bullet/LinearMath/TaskScheduler/btThreadSupportWin32.cpp index 00edac650b..922e449cce 100644 --- a/thirdparty/bullet/LinearMath/TaskScheduler/btThreadSupportWin32.cpp +++ b/thirdparty/bullet/LinearMath/TaskScheduler/btThreadSupportWin32.cpp @@ -13,7 +13,7 @@ subject to the following restrictions: 3. This notice may not be removed or altered from any source distribution. */ -#if defined( _WIN32 ) && BT_THREADSAFE +#if defined(_WIN32) && BT_THREADSAFE #include "LinearMath/btScalar.h" #include "LinearMath/btMinMax.h" @@ -23,450 +23,430 @@ subject to the following restrictions: #include <windows.h> #include <stdio.h> - struct btProcessorInfo { - int numLogicalProcessors; - int numCores; - int numNumaNodes; - int numL1Cache; - int numL2Cache; - int numL3Cache; - int numPhysicalPackages; - static const int maxNumTeamMasks = 32; - int numTeamMasks; - UINT64 processorTeamMasks[ maxNumTeamMasks ]; + int numLogicalProcessors; + int numCores; + int numNumaNodes; + int numL1Cache; + int numL2Cache; + int numL3Cache; + int numPhysicalPackages; + static const int maxNumTeamMasks = 32; + int numTeamMasks; + UINT64 processorTeamMasks[maxNumTeamMasks]; }; -UINT64 getProcessorTeamMask( const btProcessorInfo& procInfo, int procId ) +UINT64 getProcessorTeamMask(const btProcessorInfo& procInfo, int procId) { - UINT64 procMask = UINT64( 1 ) << procId; - for ( int i = 0; i < procInfo.numTeamMasks; ++i ) - { - if ( procMask & procInfo.processorTeamMasks[ i ] ) - { - return procInfo.processorTeamMasks[ i ]; - } - } - return 0; + UINT64 procMask = UINT64(1) << procId; + for (int i = 0; i < procInfo.numTeamMasks; ++i) + { + if (procMask & procInfo.processorTeamMasks[i]) + { + return procInfo.processorTeamMasks[i]; + } + } + return 0; } -int getProcessorTeamIndex( const btProcessorInfo& procInfo, int procId ) +int getProcessorTeamIndex(const btProcessorInfo& procInfo, int procId) { - UINT64 procMask = UINT64( 1 ) << procId; - for ( int i = 0; i < procInfo.numTeamMasks; ++i ) - { - if ( procMask & procInfo.processorTeamMasks[ i ] ) - { - return i; - } - } - return -1; + UINT64 procMask = UINT64(1) << procId; + for (int i = 0; i < procInfo.numTeamMasks; ++i) + { + if (procMask & procInfo.processorTeamMasks[i]) + { + return i; + } + } + return -1; } -int countSetBits( ULONG64 bits ) +int countSetBits(ULONG64 bits) { - int count = 0; - while ( bits ) - { - if ( bits & 1 ) - { - count++; - } - bits >>= 1; - } - return count; + int count = 0; + while (bits) + { + if (bits & 1) + { + count++; + } + bits >>= 1; + } + return count; } +typedef BOOL(WINAPI* Pfn_GetLogicalProcessorInformation)(PSYSTEM_LOGICAL_PROCESSOR_INFORMATION, PDWORD); -typedef BOOL( WINAPI *Pfn_GetLogicalProcessorInformation )( PSYSTEM_LOGICAL_PROCESSOR_INFORMATION, PDWORD ); - - -void getProcessorInformation( btProcessorInfo* procInfo ) +void getProcessorInformation(btProcessorInfo* procInfo) { - memset( procInfo, 0, sizeof( *procInfo ) ); - Pfn_GetLogicalProcessorInformation getLogicalProcInfo = - (Pfn_GetLogicalProcessorInformation) GetProcAddress( GetModuleHandle( TEXT( "kernel32" ) ), "GetLogicalProcessorInformation" ); - if ( getLogicalProcInfo == NULL ) - { - // no info - return; - } - PSYSTEM_LOGICAL_PROCESSOR_INFORMATION buf = NULL; - DWORD bufSize = 0; - while ( true ) - { - if ( getLogicalProcInfo( buf, &bufSize ) ) - { - break; - } - else - { - if ( GetLastError() == ERROR_INSUFFICIENT_BUFFER ) - { - if ( buf ) - { - free( buf ); - } - buf = (PSYSTEM_LOGICAL_PROCESSOR_INFORMATION) malloc( bufSize ); - } - } - } - - int len = bufSize / sizeof( *buf ); - for ( int i = 0; i < len; ++i ) - { - PSYSTEM_LOGICAL_PROCESSOR_INFORMATION info = buf + i; - switch ( info->Relationship ) - { - case RelationNumaNode: - procInfo->numNumaNodes++; - break; - - case RelationProcessorCore: - procInfo->numCores++; - procInfo->numLogicalProcessors += countSetBits( info->ProcessorMask ); - break; - - case RelationCache: - if ( info->Cache.Level == 1 ) - { - procInfo->numL1Cache++; - } - else if ( info->Cache.Level == 2 ) - { - procInfo->numL2Cache++; - } - else if ( info->Cache.Level == 3 ) - { - procInfo->numL3Cache++; - // processors that share L3 cache are considered to be on the same team - // because they can more easily work together on the same data. - // Large performance penalties will occur if 2 or more threads from different - // teams attempt to frequently read and modify the same cache lines. - // - // On the AMD Ryzen 7 CPU for example, the 8 cores on the CPU are split into - // 2 CCX units of 4 cores each. Each CCX has a separate L3 cache, so if both - // CCXs are operating on the same data, many cycles will be spent keeping the - // two caches coherent. - if ( procInfo->numTeamMasks < btProcessorInfo::maxNumTeamMasks ) - { - procInfo->processorTeamMasks[ procInfo->numTeamMasks ] = info->ProcessorMask; - procInfo->numTeamMasks++; - } - } - break; - - case RelationProcessorPackage: - procInfo->numPhysicalPackages++; - break; - } - } - free( buf ); + memset(procInfo, 0, sizeof(*procInfo)); + Pfn_GetLogicalProcessorInformation getLogicalProcInfo = + (Pfn_GetLogicalProcessorInformation)GetProcAddress(GetModuleHandle(TEXT("kernel32")), "GetLogicalProcessorInformation"); + if (getLogicalProcInfo == NULL) + { + // no info + return; + } + PSYSTEM_LOGICAL_PROCESSOR_INFORMATION buf = NULL; + DWORD bufSize = 0; + while (true) + { + if (getLogicalProcInfo(buf, &bufSize)) + { + break; + } + else + { + if (GetLastError() == ERROR_INSUFFICIENT_BUFFER) + { + if (buf) + { + free(buf); + } + buf = (PSYSTEM_LOGICAL_PROCESSOR_INFORMATION)malloc(bufSize); + } + } + } + + int len = bufSize / sizeof(*buf); + for (int i = 0; i < len; ++i) + { + PSYSTEM_LOGICAL_PROCESSOR_INFORMATION info = buf + i; + switch (info->Relationship) + { + case RelationNumaNode: + procInfo->numNumaNodes++; + break; + + case RelationProcessorCore: + procInfo->numCores++; + procInfo->numLogicalProcessors += countSetBits(info->ProcessorMask); + break; + + case RelationCache: + if (info->Cache.Level == 1) + { + procInfo->numL1Cache++; + } + else if (info->Cache.Level == 2) + { + procInfo->numL2Cache++; + } + else if (info->Cache.Level == 3) + { + procInfo->numL3Cache++; + // processors that share L3 cache are considered to be on the same team + // because they can more easily work together on the same data. + // Large performance penalties will occur if 2 or more threads from different + // teams attempt to frequently read and modify the same cache lines. + // + // On the AMD Ryzen 7 CPU for example, the 8 cores on the CPU are split into + // 2 CCX units of 4 cores each. Each CCX has a separate L3 cache, so if both + // CCXs are operating on the same data, many cycles will be spent keeping the + // two caches coherent. + if (procInfo->numTeamMasks < btProcessorInfo::maxNumTeamMasks) + { + procInfo->processorTeamMasks[procInfo->numTeamMasks] = info->ProcessorMask; + procInfo->numTeamMasks++; + } + } + break; + + case RelationProcessorPackage: + procInfo->numPhysicalPackages++; + break; + } + } + free(buf); } - - ///btThreadSupportWin32 helps to initialize/shutdown libspe2, start/stop SPU tasks and communication class btThreadSupportWin32 : public btThreadSupportInterface { public: - struct btThreadStatus - { - int m_taskId; - int m_commandId; - int m_status; + struct btThreadStatus + { + int m_taskId; + int m_commandId; + int m_status; - ThreadFunc m_userThreadFunc; - void* m_userPtr; //for taskDesc etc + ThreadFunc m_userThreadFunc; + void* m_userPtr; //for taskDesc etc - void* m_threadHandle; //this one is calling 'Win32ThreadFunc' + void* m_threadHandle; //this one is calling 'Win32ThreadFunc' - void* m_eventStartHandle; - char m_eventStartHandleName[ 32 ]; + void* m_eventStartHandle; + char m_eventStartHandleName[32]; - void* m_eventCompleteHandle; - char m_eventCompleteHandleName[ 32 ]; - }; + void* m_eventCompleteHandle; + char m_eventCompleteHandleName[32]; + }; private: - btAlignedObjectArray<btThreadStatus> m_activeThreadStatus; - btAlignedObjectArray<void*> m_completeHandles; - int m_numThreads; - DWORD_PTR m_startedThreadMask; - btProcessorInfo m_processorInfo; + btAlignedObjectArray<btThreadStatus> m_activeThreadStatus; + btAlignedObjectArray<void*> m_completeHandles; + int m_numThreads; + DWORD_PTR m_startedThreadMask; + btProcessorInfo m_processorInfo; - void startThreads( const ConstructionInfo& threadInfo ); - void stopThreads(); - int waitForResponse(); + void startThreads(const ConstructionInfo& threadInfo); + void stopThreads(); + int waitForResponse(); public: + btThreadSupportWin32(const ConstructionInfo& threadConstructionInfo); + virtual ~btThreadSupportWin32(); - btThreadSupportWin32( const ConstructionInfo& threadConstructionInfo ); - virtual ~btThreadSupportWin32(); + virtual int getNumWorkerThreads() const BT_OVERRIDE { return m_numThreads; } + virtual int getCacheFriendlyNumThreads() const BT_OVERRIDE { return countSetBits(m_processorInfo.processorTeamMasks[0]); } + virtual int getLogicalToPhysicalCoreRatio() const BT_OVERRIDE { return m_processorInfo.numLogicalProcessors / m_processorInfo.numCores; } - virtual int getNumWorkerThreads() const BT_OVERRIDE { return m_numThreads; } - virtual int getCacheFriendlyNumThreads() const BT_OVERRIDE { return countSetBits(m_processorInfo.processorTeamMasks[0]); } - virtual int getLogicalToPhysicalCoreRatio() const BT_OVERRIDE { return m_processorInfo.numLogicalProcessors / m_processorInfo.numCores; } + virtual void runTask(int threadIndex, void* userData) BT_OVERRIDE; + virtual void waitForAllTasks() BT_OVERRIDE; - virtual void runTask( int threadIndex, void* userData ) BT_OVERRIDE; - virtual void waitForAllTasks() BT_OVERRIDE; - - virtual btCriticalSection* createCriticalSection() BT_OVERRIDE; - virtual void deleteCriticalSection( btCriticalSection* criticalSection ) BT_OVERRIDE; + virtual btCriticalSection* createCriticalSection() BT_OVERRIDE; + virtual void deleteCriticalSection(btCriticalSection* criticalSection) BT_OVERRIDE; }; - -btThreadSupportWin32::btThreadSupportWin32( const ConstructionInfo & threadConstructionInfo ) +btThreadSupportWin32::btThreadSupportWin32(const ConstructionInfo& threadConstructionInfo) { - startThreads( threadConstructionInfo ); + startThreads(threadConstructionInfo); } - btThreadSupportWin32::~btThreadSupportWin32() { - stopThreads(); + stopThreads(); } - -DWORD WINAPI win32threadStartFunc( LPVOID lpParam ) +DWORD WINAPI win32threadStartFunc(LPVOID lpParam) { - btThreadSupportWin32::btThreadStatus* status = ( btThreadSupportWin32::btThreadStatus* )lpParam; - - while ( 1 ) - { - WaitForSingleObject( status->m_eventStartHandle, INFINITE ); - void* userPtr = status->m_userPtr; - - if ( userPtr ) - { - btAssert( status->m_status ); - status->m_userThreadFunc( userPtr ); - status->m_status = 2; - SetEvent( status->m_eventCompleteHandle ); - } - else - { - //exit Thread - status->m_status = 3; - printf( "Thread with taskId %i with handle %p exiting\n", status->m_taskId, status->m_threadHandle ); - SetEvent( status->m_eventCompleteHandle ); - break; - } - } - printf( "Thread TERMINATED\n" ); - return 0; + btThreadSupportWin32::btThreadStatus* status = (btThreadSupportWin32::btThreadStatus*)lpParam; + + while (1) + { + WaitForSingleObject(status->m_eventStartHandle, INFINITE); + void* userPtr = status->m_userPtr; + + if (userPtr) + { + btAssert(status->m_status); + status->m_userThreadFunc(userPtr); + status->m_status = 2; + SetEvent(status->m_eventCompleteHandle); + } + else + { + //exit Thread + status->m_status = 3; + printf("Thread with taskId %i with handle %p exiting\n", status->m_taskId, status->m_threadHandle); + SetEvent(status->m_eventCompleteHandle); + break; + } + } + printf("Thread TERMINATED\n"); + return 0; } - -void btThreadSupportWin32::runTask( int threadIndex, void* userData ) +void btThreadSupportWin32::runTask(int threadIndex, void* userData) { - btThreadStatus& threadStatus = m_activeThreadStatus[ threadIndex ]; - btAssert( threadIndex >= 0 ); - btAssert( int( threadIndex ) < m_activeThreadStatus.size() ); + btThreadStatus& threadStatus = m_activeThreadStatus[threadIndex]; + btAssert(threadIndex >= 0); + btAssert(int(threadIndex) < m_activeThreadStatus.size()); - threadStatus.m_commandId = 1; - threadStatus.m_status = 1; - threadStatus.m_userPtr = userData; - m_startedThreadMask |= DWORD_PTR( 1 ) << threadIndex; + threadStatus.m_commandId = 1; + threadStatus.m_status = 1; + threadStatus.m_userPtr = userData; + m_startedThreadMask |= DWORD_PTR(1) << threadIndex; - ///fire event to start new task - SetEvent( threadStatus.m_eventStartHandle ); + ///fire event to start new task + SetEvent(threadStatus.m_eventStartHandle); } - int btThreadSupportWin32::waitForResponse() { - btAssert( m_activeThreadStatus.size() ); + btAssert(m_activeThreadStatus.size()); - int last = -1; - DWORD res = WaitForMultipleObjects( m_completeHandles.size(), &m_completeHandles[ 0 ], FALSE, INFINITE ); - btAssert( res != WAIT_FAILED ); - last = res - WAIT_OBJECT_0; + int last = -1; + DWORD res = WaitForMultipleObjects(m_completeHandles.size(), &m_completeHandles[0], FALSE, INFINITE); + btAssert(res != WAIT_FAILED); + last = res - WAIT_OBJECT_0; - btThreadStatus& threadStatus = m_activeThreadStatus[ last ]; - btAssert( threadStatus.m_threadHandle ); - btAssert( threadStatus.m_eventCompleteHandle ); + btThreadStatus& threadStatus = m_activeThreadStatus[last]; + btAssert(threadStatus.m_threadHandle); + btAssert(threadStatus.m_eventCompleteHandle); - //WaitForSingleObject(threadStatus.m_eventCompleteHandle, INFINITE); - btAssert( threadStatus.m_status > 1 ); - threadStatus.m_status = 0; + //WaitForSingleObject(threadStatus.m_eventCompleteHandle, INFINITE); + btAssert(threadStatus.m_status > 1); + threadStatus.m_status = 0; - ///need to find an active spu - btAssert( last >= 0 ); - m_startedThreadMask &= ~( DWORD_PTR( 1 ) << last ); + ///need to find an active spu + btAssert(last >= 0); + m_startedThreadMask &= ~(DWORD_PTR(1) << last); - return last; + return last; } - void btThreadSupportWin32::waitForAllTasks() { - while ( m_startedThreadMask ) - { - waitForResponse(); - } + while (m_startedThreadMask) + { + waitForResponse(); + } } - -void btThreadSupportWin32::startThreads( const ConstructionInfo& threadConstructionInfo ) +void btThreadSupportWin32::startThreads(const ConstructionInfo& threadConstructionInfo) { - static int uniqueId = 0; - uniqueId++; - btProcessorInfo& procInfo = m_processorInfo; - getProcessorInformation( &procInfo ); - DWORD_PTR dwProcessAffinityMask = 0; - DWORD_PTR dwSystemAffinityMask = 0; - if ( !GetProcessAffinityMask( GetCurrentProcess(), &dwProcessAffinityMask, &dwSystemAffinityMask ) ) - { - dwProcessAffinityMask = 0; - } - ///The number of threads should be equal to the number of available cores - 1 - m_numThreads = btMin(procInfo.numLogicalProcessors, int(BT_MAX_THREAD_COUNT)) - 1; // cap to max thread count (-1 because main thread already exists) - - m_activeThreadStatus.resize( m_numThreads ); - m_completeHandles.resize( m_numThreads ); - m_startedThreadMask = 0; - - // set main thread affinity - if ( DWORD_PTR mask = dwProcessAffinityMask & getProcessorTeamMask( procInfo, 0 )) - { - SetThreadAffinityMask( GetCurrentThread(), mask ); - SetThreadIdealProcessor( GetCurrentThread(), 0 ); - } - - for ( int i = 0; i < m_numThreads; i++ ) - { - printf( "starting thread %d\n", i ); - - btThreadStatus& threadStatus = m_activeThreadStatus[ i ]; - - LPSECURITY_ATTRIBUTES lpThreadAttributes = NULL; - SIZE_T dwStackSize = threadConstructionInfo.m_threadStackSize; - LPTHREAD_START_ROUTINE lpStartAddress = &win32threadStartFunc; - LPVOID lpParameter = &threadStatus; - DWORD dwCreationFlags = 0; - LPDWORD lpThreadId = 0; - - threadStatus.m_userPtr = 0; - - sprintf( threadStatus.m_eventStartHandleName, "es%.8s%d%d", threadConstructionInfo.m_uniqueName, uniqueId, i ); - threadStatus.m_eventStartHandle = CreateEventA( 0, false, false, threadStatus.m_eventStartHandleName ); - - sprintf( threadStatus.m_eventCompleteHandleName, "ec%.8s%d%d", threadConstructionInfo.m_uniqueName, uniqueId, i ); - threadStatus.m_eventCompleteHandle = CreateEventA( 0, false, false, threadStatus.m_eventCompleteHandleName ); - - m_completeHandles[ i ] = threadStatus.m_eventCompleteHandle; - - HANDLE handle = CreateThread( lpThreadAttributes, dwStackSize, lpStartAddress, lpParameter, dwCreationFlags, lpThreadId ); - //SetThreadPriority( handle, THREAD_PRIORITY_HIGHEST ); - // highest priority -- can cause erratic performance when numThreads > numCores - // we don't want worker threads to be higher priority than the main thread or the main thread could get - // totally shut out and unable to tell the workers to stop - //SetThreadPriority( handle, THREAD_PRIORITY_BELOW_NORMAL ); - - { - int processorId = i + 1; // leave processor 0 for main thread - DWORD_PTR teamMask = getProcessorTeamMask( procInfo, processorId ); - if ( teamMask ) - { - // bind each thread to only execute on processors of it's assigned team - // - for single-socket Intel x86 CPUs this has no effect (only a single, shared L3 cache so there is only 1 team) - // - for multi-socket Intel this will keep threads from migrating from one socket to another - // - for AMD Ryzen this will keep threads from migrating from one CCX to another - DWORD_PTR mask = teamMask & dwProcessAffinityMask; - if ( mask ) - { - SetThreadAffinityMask( handle, mask ); - } - } - SetThreadIdealProcessor( handle, processorId ); - } - - threadStatus.m_taskId = i; - threadStatus.m_commandId = 0; - threadStatus.m_status = 0; - threadStatus.m_threadHandle = handle; - threadStatus.m_userThreadFunc = threadConstructionInfo.m_userThreadFunc; - - printf( "started %s thread %d with threadHandle %p\n", threadConstructionInfo.m_uniqueName, i, handle ); - } + static int uniqueId = 0; + uniqueId++; + btProcessorInfo& procInfo = m_processorInfo; + getProcessorInformation(&procInfo); + DWORD_PTR dwProcessAffinityMask = 0; + DWORD_PTR dwSystemAffinityMask = 0; + if (!GetProcessAffinityMask(GetCurrentProcess(), &dwProcessAffinityMask, &dwSystemAffinityMask)) + { + dwProcessAffinityMask = 0; + } + ///The number of threads should be equal to the number of available cores - 1 + m_numThreads = btMin(procInfo.numLogicalProcessors, int(BT_MAX_THREAD_COUNT)) - 1; // cap to max thread count (-1 because main thread already exists) + + m_activeThreadStatus.resize(m_numThreads); + m_completeHandles.resize(m_numThreads); + m_startedThreadMask = 0; + + // set main thread affinity + if (DWORD_PTR mask = dwProcessAffinityMask & getProcessorTeamMask(procInfo, 0)) + { + SetThreadAffinityMask(GetCurrentThread(), mask); + SetThreadIdealProcessor(GetCurrentThread(), 0); + } + + for (int i = 0; i < m_numThreads; i++) + { + printf("starting thread %d\n", i); + + btThreadStatus& threadStatus = m_activeThreadStatus[i]; + + LPSECURITY_ATTRIBUTES lpThreadAttributes = NULL; + SIZE_T dwStackSize = threadConstructionInfo.m_threadStackSize; + LPTHREAD_START_ROUTINE lpStartAddress = &win32threadStartFunc; + LPVOID lpParameter = &threadStatus; + DWORD dwCreationFlags = 0; + LPDWORD lpThreadId = 0; + + threadStatus.m_userPtr = 0; + + sprintf(threadStatus.m_eventStartHandleName, "es%.8s%d%d", threadConstructionInfo.m_uniqueName, uniqueId, i); + threadStatus.m_eventStartHandle = CreateEventA(0, false, false, threadStatus.m_eventStartHandleName); + + sprintf(threadStatus.m_eventCompleteHandleName, "ec%.8s%d%d", threadConstructionInfo.m_uniqueName, uniqueId, i); + threadStatus.m_eventCompleteHandle = CreateEventA(0, false, false, threadStatus.m_eventCompleteHandleName); + + m_completeHandles[i] = threadStatus.m_eventCompleteHandle; + + HANDLE handle = CreateThread(lpThreadAttributes, dwStackSize, lpStartAddress, lpParameter, dwCreationFlags, lpThreadId); + //SetThreadPriority( handle, THREAD_PRIORITY_HIGHEST ); + // highest priority -- can cause erratic performance when numThreads > numCores + // we don't want worker threads to be higher priority than the main thread or the main thread could get + // totally shut out and unable to tell the workers to stop + //SetThreadPriority( handle, THREAD_PRIORITY_BELOW_NORMAL ); + + { + int processorId = i + 1; // leave processor 0 for main thread + DWORD_PTR teamMask = getProcessorTeamMask(procInfo, processorId); + if (teamMask) + { + // bind each thread to only execute on processors of it's assigned team + // - for single-socket Intel x86 CPUs this has no effect (only a single, shared L3 cache so there is only 1 team) + // - for multi-socket Intel this will keep threads from migrating from one socket to another + // - for AMD Ryzen this will keep threads from migrating from one CCX to another + DWORD_PTR mask = teamMask & dwProcessAffinityMask; + if (mask) + { + SetThreadAffinityMask(handle, mask); + } + } + SetThreadIdealProcessor(handle, processorId); + } + + threadStatus.m_taskId = i; + threadStatus.m_commandId = 0; + threadStatus.m_status = 0; + threadStatus.m_threadHandle = handle; + threadStatus.m_userThreadFunc = threadConstructionInfo.m_userThreadFunc; + + printf("started %s thread %d with threadHandle %p\n", threadConstructionInfo.m_uniqueName, i, handle); + } } ///tell the task scheduler we are done with the SPU tasks void btThreadSupportWin32::stopThreads() { - for ( int i = 0; i < m_activeThreadStatus.size(); i++ ) - { - btThreadStatus& threadStatus = m_activeThreadStatus[ i ]; - if ( threadStatus.m_status > 0 ) - { - WaitForSingleObject( threadStatus.m_eventCompleteHandle, INFINITE ); - } - - threadStatus.m_userPtr = NULL; - SetEvent( threadStatus.m_eventStartHandle ); - WaitForSingleObject( threadStatus.m_eventCompleteHandle, INFINITE ); - - CloseHandle( threadStatus.m_eventCompleteHandle ); - CloseHandle( threadStatus.m_eventStartHandle ); - CloseHandle( threadStatus.m_threadHandle ); - - } - - m_activeThreadStatus.clear(); - m_completeHandles.clear(); + for (int i = 0; i < m_activeThreadStatus.size(); i++) + { + btThreadStatus& threadStatus = m_activeThreadStatus[i]; + if (threadStatus.m_status > 0) + { + WaitForSingleObject(threadStatus.m_eventCompleteHandle, INFINITE); + } + + threadStatus.m_userPtr = NULL; + SetEvent(threadStatus.m_eventStartHandle); + WaitForSingleObject(threadStatus.m_eventCompleteHandle, INFINITE); + + CloseHandle(threadStatus.m_eventCompleteHandle); + CloseHandle(threadStatus.m_eventStartHandle); + CloseHandle(threadStatus.m_threadHandle); + } + + m_activeThreadStatus.clear(); + m_completeHandles.clear(); } - class btWin32CriticalSection : public btCriticalSection { private: - CRITICAL_SECTION mCriticalSection; + CRITICAL_SECTION mCriticalSection; public: - btWin32CriticalSection() - { - InitializeCriticalSection( &mCriticalSection ); - } - - ~btWin32CriticalSection() - { - DeleteCriticalSection( &mCriticalSection ); - } - - void lock() - { - EnterCriticalSection( &mCriticalSection ); - } - - void unlock() - { - LeaveCriticalSection( &mCriticalSection ); - } + btWin32CriticalSection() + { + InitializeCriticalSection(&mCriticalSection); + } + + ~btWin32CriticalSection() + { + DeleteCriticalSection(&mCriticalSection); + } + + void lock() + { + EnterCriticalSection(&mCriticalSection); + } + + void unlock() + { + LeaveCriticalSection(&mCriticalSection); + } }; - btCriticalSection* btThreadSupportWin32::createCriticalSection() { - unsigned char* mem = (unsigned char*) btAlignedAlloc( sizeof( btWin32CriticalSection ), 16 ); - btWin32CriticalSection* cs = new( mem ) btWin32CriticalSection(); - return cs; + unsigned char* mem = (unsigned char*)btAlignedAlloc(sizeof(btWin32CriticalSection), 16); + btWin32CriticalSection* cs = new (mem) btWin32CriticalSection(); + return cs; } -void btThreadSupportWin32::deleteCriticalSection( btCriticalSection* criticalSection ) +void btThreadSupportWin32::deleteCriticalSection(btCriticalSection* criticalSection) { - criticalSection->~btCriticalSection(); - btAlignedFree( criticalSection ); + criticalSection->~btCriticalSection(); + btAlignedFree(criticalSection); } - -btThreadSupportInterface* btThreadSupportInterface::create( const ConstructionInfo& info ) +btThreadSupportInterface* btThreadSupportInterface::create(const ConstructionInfo& info) { - return new btThreadSupportWin32( info ); + return new btThreadSupportWin32(info); } - - -#endif //defined(_WIN32) && BT_THREADSAFE - +#endif //defined(_WIN32) && BT_THREADSAFE |