summaryrefslogtreecommitdiff
path: root/thirdparty/bullet/LinearMath/TaskScheduler/btTaskScheduler.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'thirdparty/bullet/LinearMath/TaskScheduler/btTaskScheduler.cpp')
-rw-r--r--thirdparty/bullet/LinearMath/TaskScheduler/btTaskScheduler.cpp802
1 files changed, 802 insertions, 0 deletions
diff --git a/thirdparty/bullet/LinearMath/TaskScheduler/btTaskScheduler.cpp b/thirdparty/bullet/LinearMath/TaskScheduler/btTaskScheduler.cpp
new file mode 100644
index 0000000000..49510d1660
--- /dev/null
+++ b/thirdparty/bullet/LinearMath/TaskScheduler/btTaskScheduler.cpp
@@ -0,0 +1,802 @@
+
+#include "LinearMath/btMinMax.h"
+#include "LinearMath/btAlignedObjectArray.h"
+#include "LinearMath/btThreads.h"
+#include "LinearMath/btQuickprof.h"
+#include <stdio.h>
+#include <algorithm>
+
+
+
+#if BT_THREADSAFE
+
+#include "btThreadSupportInterface.h"
+
+#if defined( _WIN32 )
+
+#define WIN32_LEAN_AND_MEAN
+
+#include <windows.h>
+
+#endif
+
+
+typedef unsigned long long btU64;
+static const int kCacheLineSize = 64;
+
+void btSpinPause()
+{
+#if defined( _WIN32 )
+ YieldProcessor();
+#endif
+}
+
+
+struct WorkerThreadStatus
+{
+ enum Type
+ {
+ kInvalid,
+ kWaitingForWork,
+ kWorking,
+ kSleeping,
+ };
+};
+
+
+ATTRIBUTE_ALIGNED64(class) WorkerThreadDirectives
+{
+ static const int kMaxThreadCount = BT_MAX_THREAD_COUNT;
+ // directives for all worker threads packed into a single cacheline
+ char m_threadDirs[kMaxThreadCount];
+
+public:
+ enum Type
+ {
+ kInvalid,
+ kGoToSleep, // go to sleep
+ kStayAwakeButIdle, // wait for not checking job queue
+ kScanForJobs, // actively scan job queue for jobs
+ };
+ WorkerThreadDirectives()
+ {
+ for ( int i = 0; i < kMaxThreadCount; ++i )
+ {
+ m_threadDirs[ i ] = 0;
+ }
+ }
+
+ Type getDirective(int threadId)
+ {
+ btAssert(threadId < kMaxThreadCount);
+ return static_cast<Type>(m_threadDirs[threadId]);
+ }
+
+ void setDirectiveByRange(int threadBegin, int threadEnd, Type dir)
+ {
+ btAssert( threadBegin < threadEnd );
+ btAssert( threadEnd <= kMaxThreadCount );
+ char dirChar = static_cast<char>(dir);
+ for ( int i = threadBegin; i < threadEnd; ++i )
+ {
+ m_threadDirs[ i ] = dirChar;
+ }
+ }
+};
+
+class JobQueue;
+
+ATTRIBUTE_ALIGNED64(struct) ThreadLocalStorage
+{
+ int m_threadId;
+ WorkerThreadStatus::Type m_status;
+ int m_numJobsFinished;
+ btSpinMutex m_mutex;
+ btScalar m_sumResult;
+ WorkerThreadDirectives * m_directive;
+ JobQueue* m_queue;
+ btClock* m_clock;
+ unsigned int m_cooldownTime;
+};
+
+
+struct IJob
+{
+ virtual void executeJob(int threadId) = 0;
+};
+
+class ParallelForJob : public IJob
+{
+ const btIParallelForBody* m_body;
+ int m_begin;
+ int m_end;
+
+public:
+ ParallelForJob( int iBegin, int iEnd, const btIParallelForBody& body )
+ {
+ m_body = &body;
+ m_begin = iBegin;
+ m_end = iEnd;
+ }
+ virtual void executeJob(int threadId) BT_OVERRIDE
+ {
+ BT_PROFILE( "executeJob" );
+
+ // call the functor body to do the work
+ m_body->forLoop( m_begin, m_end );
+ }
+};
+
+
+class ParallelSumJob : public IJob
+{
+ const btIParallelSumBody* m_body;
+ ThreadLocalStorage* m_threadLocalStoreArray;
+ int m_begin;
+ int m_end;
+
+public:
+ ParallelSumJob( int iBegin, int iEnd, const btIParallelSumBody& body, ThreadLocalStorage* tls )
+ {
+ m_body = &body;
+ m_threadLocalStoreArray = tls;
+ m_begin = iBegin;
+ m_end = iEnd;
+ }
+ virtual void executeJob( int threadId ) BT_OVERRIDE
+ {
+ BT_PROFILE( "executeJob" );
+
+ // call the functor body to do the work
+ btScalar val = m_body->sumLoop( m_begin, m_end );
+#if BT_PARALLEL_SUM_DETERMINISTISM
+ // by truncating bits of the result, we can make the parallelSum deterministic (at the expense of precision)
+ const float TRUNC_SCALE = float(1<<19);
+ val = floor(val*TRUNC_SCALE+0.5f)/TRUNC_SCALE; // truncate some bits
+#endif
+ m_threadLocalStoreArray[threadId].m_sumResult += val;
+ }
+};
+
+
+ATTRIBUTE_ALIGNED64(class) JobQueue
+{
+ btThreadSupportInterface* m_threadSupport;
+ btCriticalSection* m_queueLock;
+ btSpinMutex m_mutex;
+
+ btAlignedObjectArray<IJob*> m_jobQueue;
+ char* m_jobMem;
+ int m_jobMemSize;
+ bool m_queueIsEmpty;
+ int m_tailIndex;
+ int m_headIndex;
+ int m_allocSize;
+ bool m_useSpinMutex;
+ btAlignedObjectArray<JobQueue*> m_neighborContexts;
+ char m_cachePadding[kCacheLineSize]; // prevent false sharing
+
+ void freeJobMem()
+ {
+ if ( m_jobMem )
+ {
+ // free old
+ btAlignedFree(m_jobMem);
+ m_jobMem = NULL;
+ }
+ }
+ void resizeJobMem(int newSize)
+ {
+ if (newSize > m_jobMemSize)
+ {
+ freeJobMem();
+ m_jobMem = static_cast<char*>(btAlignedAlloc(newSize, kCacheLineSize));
+ m_jobMemSize = newSize;
+ }
+ }
+
+public:
+
+ JobQueue()
+ {
+ m_jobMem = NULL;
+ m_jobMemSize = 0;
+ m_threadSupport = NULL;
+ m_queueLock = NULL;
+ m_headIndex = 0;
+ m_tailIndex = 0;
+ m_useSpinMutex = false;
+ }
+ ~JobQueue()
+ {
+ exit();
+ }
+ void exit()
+ {
+ freeJobMem();
+ if (m_queueLock && m_threadSupport)
+ {
+ m_threadSupport->deleteCriticalSection(m_queueLock);
+ m_queueLock = NULL;
+ m_threadSupport = 0;
+ }
+ }
+
+ void init(btThreadSupportInterface* threadSup, btAlignedObjectArray<JobQueue>* contextArray)
+ {
+ m_threadSupport = threadSup;
+ if (threadSup)
+ {
+ m_queueLock = m_threadSupport->createCriticalSection();
+ }
+ setupJobStealing(contextArray, contextArray->size());
+ }
+ void setupJobStealing(btAlignedObjectArray<JobQueue>* contextArray, int numActiveContexts)
+ {
+ btAlignedObjectArray<JobQueue>& contexts = *contextArray;
+ int selfIndex = 0;
+ for (int i = 0; i < contexts.size(); ++i)
+ {
+ if ( this == &contexts[ i ] )
+ {
+ selfIndex = i;
+ break;
+ }
+ }
+ int numNeighbors = btMin(2, contexts.size() - 1);
+ int neighborOffsets[ ] = {-1, 1, -2, 2, -3, 3};
+ int numOffsets = sizeof(neighborOffsets)/sizeof(neighborOffsets[0]);
+ m_neighborContexts.reserve( numNeighbors );
+ m_neighborContexts.resizeNoInitialize(0);
+ for (int i = 0; i < numOffsets && m_neighborContexts.size() < numNeighbors; i++)
+ {
+ int neighborIndex = selfIndex + neighborOffsets[i];
+ if ( neighborIndex >= 0 && neighborIndex < numActiveContexts)
+ {
+ m_neighborContexts.push_back( &contexts[ neighborIndex ] );
+ }
+ }
+ }
+
+ bool isQueueEmpty() const {return m_queueIsEmpty;}
+ void lockQueue()
+ {
+ if ( m_useSpinMutex )
+ {
+ m_mutex.lock();
+ }
+ else
+ {
+ m_queueLock->lock();
+ }
+ }
+ void unlockQueue()
+ {
+ if ( m_useSpinMutex )
+ {
+ m_mutex.unlock();
+ }
+ else
+ {
+ m_queueLock->unlock();
+ }
+ }
+ void clearQueue(int jobCount, int jobSize)
+ {
+ lockQueue();
+ m_headIndex = 0;
+ m_tailIndex = 0;
+ m_allocSize = 0;
+ m_queueIsEmpty = true;
+ int jobBufSize = jobSize * jobCount;
+ // make sure we have enough memory allocated to store jobs
+ if ( jobBufSize > m_jobMemSize )
+ {
+ resizeJobMem( jobBufSize );
+ }
+ // make sure job queue is big enough
+ if ( jobCount > m_jobQueue.capacity() )
+ {
+ m_jobQueue.reserve( jobCount );
+ }
+ unlockQueue();
+ m_jobQueue.resizeNoInitialize( 0 );
+ }
+ void* allocJobMem(int jobSize)
+ {
+ btAssert(m_jobMemSize >= (m_allocSize + jobSize));
+ void* jobMem = &m_jobMem[m_allocSize];
+ m_allocSize += jobSize;
+ return jobMem;
+ }
+ void submitJob( IJob* job )
+ {
+ btAssert( reinterpret_cast<char*>( job ) >= &m_jobMem[ 0 ] && reinterpret_cast<char*>( job ) < &m_jobMem[ 0 ] + m_allocSize );
+ m_jobQueue.push_back( job );
+ lockQueue();
+ m_tailIndex++;
+ m_queueIsEmpty = false;
+ unlockQueue();
+ }
+ IJob* consumeJobFromOwnQueue()
+ {
+ if ( m_queueIsEmpty )
+ {
+ // lock free path. even if this is taken erroneously it isn't harmful
+ return NULL;
+ }
+ IJob* job = NULL;
+ lockQueue();
+ if ( !m_queueIsEmpty )
+ {
+ job = m_jobQueue[ m_headIndex++ ];
+ btAssert( reinterpret_cast<char*>( job ) >= &m_jobMem[ 0 ] && reinterpret_cast<char*>( job ) < &m_jobMem[ 0 ] + m_allocSize );
+ if ( m_headIndex == m_tailIndex )
+ {
+ m_queueIsEmpty = true;
+ }
+ }
+ unlockQueue();
+ return job;
+ }
+ IJob* consumeJob()
+ {
+ if (IJob* job = consumeJobFromOwnQueue())
+ {
+ return job;
+ }
+ // own queue is empty, try to steal from neighbor
+ for (int i = 0; i < m_neighborContexts.size(); ++i)
+ {
+ JobQueue* otherContext = m_neighborContexts[ i ];
+ if ( IJob* job = otherContext->consumeJobFromOwnQueue() )
+ {
+ return job;
+ }
+ }
+ return NULL;
+ }
+};
+
+
+static void WorkerThreadFunc( void* userPtr )
+{
+ BT_PROFILE( "WorkerThreadFunc" );
+ ThreadLocalStorage* localStorage = (ThreadLocalStorage*) userPtr;
+ JobQueue* jobQueue = localStorage->m_queue;
+
+ bool shouldSleep = false;
+ int threadId = localStorage->m_threadId;
+ while (! shouldSleep)
+ {
+ // do work
+ localStorage->m_mutex.lock();
+ while ( IJob* job = jobQueue->consumeJob() )
+ {
+ localStorage->m_status = WorkerThreadStatus::kWorking;
+ job->executeJob( threadId );
+ localStorage->m_numJobsFinished++;
+ }
+ localStorage->m_status = WorkerThreadStatus::kWaitingForWork;
+ localStorage->m_mutex.unlock();
+ btU64 clockStart = localStorage->m_clock->getTimeMicroseconds();
+ // while queue is empty,
+ while (jobQueue->isQueueEmpty())
+ {
+ // todo: spin wait a bit to avoid hammering the empty queue
+ btSpinPause();
+ if ( localStorage->m_directive->getDirective(threadId) == WorkerThreadDirectives::kGoToSleep )
+ {
+ shouldSleep = true;
+ break;
+ }
+ // if jobs are incoming,
+ if ( localStorage->m_directive->getDirective( threadId ) == WorkerThreadDirectives::kScanForJobs )
+ {
+ clockStart = localStorage->m_clock->getTimeMicroseconds(); // reset clock
+ }
+ else
+ {
+ for ( int i = 0; i < 50; ++i )
+ {
+ btSpinPause();
+ btSpinPause();
+ btSpinPause();
+ btSpinPause();
+ if (localStorage->m_directive->getDirective( threadId ) == WorkerThreadDirectives::kScanForJobs || !jobQueue->isQueueEmpty())
+ {
+ break;
+ }
+ }
+ // if no jobs incoming and queue has been empty for the cooldown time, sleep
+ btU64 timeElapsed = localStorage->m_clock->getTimeMicroseconds() - clockStart;
+ if (timeElapsed > localStorage->m_cooldownTime)
+ {
+ shouldSleep = true;
+ break;
+ }
+ }
+ }
+ }
+ {
+ BT_PROFILE("sleep");
+ // go sleep
+ localStorage->m_mutex.lock();
+ localStorage->m_status = WorkerThreadStatus::kSleeping;
+ localStorage->m_mutex.unlock();
+ }
+}
+
+
+class btTaskSchedulerDefault : public btITaskScheduler
+{
+ btThreadSupportInterface* m_threadSupport;
+ WorkerThreadDirectives* m_workerDirective;
+ btAlignedObjectArray<JobQueue> m_jobQueues;
+ btAlignedObjectArray<JobQueue*> m_perThreadJobQueues;
+ btAlignedObjectArray<ThreadLocalStorage> m_threadLocalStorage;
+ btSpinMutex m_antiNestingLock; // prevent nested parallel-for
+ btClock m_clock;
+ int m_numThreads;
+ int m_numWorkerThreads;
+ int m_numActiveJobQueues;
+ int m_maxNumThreads;
+ int m_numJobs;
+ static const int kFirstWorkerThreadId = 1;
+public:
+
+ btTaskSchedulerDefault() : btITaskScheduler("ThreadSupport")
+ {
+ m_threadSupport = NULL;
+ m_workerDirective = NULL;
+ }
+
+ virtual ~btTaskSchedulerDefault()
+ {
+ waitForWorkersToSleep();
+
+ for ( int i = 0; i < m_jobQueues.size(); ++i )
+ {
+ m_jobQueues[i].exit();
+ }
+
+ if (m_threadSupport)
+ {
+ delete m_threadSupport;
+ m_threadSupport = NULL;
+ }
+ if (m_workerDirective)
+ {
+ btAlignedFree(m_workerDirective);
+ m_workerDirective = NULL;
+ }
+ }
+
+ void init()
+ {
+ btThreadSupportInterface::ConstructionInfo constructionInfo( "TaskScheduler", WorkerThreadFunc );
+ m_threadSupport = btThreadSupportInterface::create( constructionInfo );
+ m_workerDirective = static_cast<WorkerThreadDirectives*>(btAlignedAlloc(sizeof(*m_workerDirective), 64));
+
+ m_numWorkerThreads = m_threadSupport->getNumWorkerThreads();
+ m_maxNumThreads = m_threadSupport->getNumWorkerThreads() + 1;
+ m_numThreads = m_maxNumThreads;
+ // ideal to have one job queue for each physical processor (except for the main thread which needs no queue)
+ int numThreadsPerQueue = m_threadSupport->getLogicalToPhysicalCoreRatio();
+ int numJobQueues = (numThreadsPerQueue == 1) ? (m_maxNumThreads-1) : (m_maxNumThreads / numThreadsPerQueue);
+ m_jobQueues.resize(numJobQueues);
+ m_numActiveJobQueues = numJobQueues;
+ for ( int i = 0; i < m_jobQueues.size(); ++i )
+ {
+ m_jobQueues[i].init( m_threadSupport, &m_jobQueues );
+ }
+ m_perThreadJobQueues.resize(m_numThreads);
+ for ( int i = 0; i < m_numThreads; i++ )
+ {
+ JobQueue* jq = NULL;
+ // only worker threads get a job queue
+ if (i > 0)
+ {
+ if (numThreadsPerQueue == 1)
+ {
+ // one queue per worker thread
+ jq = &m_jobQueues[ i - kFirstWorkerThreadId ];
+ }
+ else
+ {
+ // 2 threads share each queue
+ jq = &m_jobQueues[ i / numThreadsPerQueue ];
+ }
+ }
+ m_perThreadJobQueues[i] = jq;
+ }
+ m_threadLocalStorage.resize(m_numThreads);
+ for ( int i = 0; i < m_numThreads; i++ )
+ {
+ ThreadLocalStorage& storage = m_threadLocalStorage[i];
+ storage.m_threadId = i;
+ storage.m_directive = m_workerDirective;
+ storage.m_status = WorkerThreadStatus::kSleeping;
+ storage.m_cooldownTime = 100; // 100 microseconds, threads go to sleep after this long if they have nothing to do
+ storage.m_clock = &m_clock;
+ storage.m_queue = m_perThreadJobQueues[i];
+ }
+ setWorkerDirectives( WorkerThreadDirectives::kGoToSleep ); // no work for them yet
+ setNumThreads( m_threadSupport->getCacheFriendlyNumThreads() );
+ }
+
+ void setWorkerDirectives(WorkerThreadDirectives::Type dir)
+ {
+ m_workerDirective->setDirectiveByRange(kFirstWorkerThreadId, m_numThreads, dir);
+ }
+
+ virtual int getMaxNumThreads() const BT_OVERRIDE
+ {
+ return m_maxNumThreads;
+ }
+
+ virtual int getNumThreads() const BT_OVERRIDE
+ {
+ return m_numThreads;
+ }
+
+ virtual void setNumThreads( int numThreads ) BT_OVERRIDE
+ {
+ m_numThreads = btMax( btMin(numThreads, int(m_maxNumThreads)), 1 );
+ m_numWorkerThreads = m_numThreads - 1;
+ m_numActiveJobQueues = 0;
+ // if there is at least 1 worker,
+ if ( m_numWorkerThreads > 0 )
+ {
+ // re-setup job stealing between queues to avoid attempting to steal from an inactive job queue
+ JobQueue* lastActiveContext = m_perThreadJobQueues[ m_numThreads - 1 ];
+ int iLastActiveContext = lastActiveContext - &m_jobQueues[0];
+ m_numActiveJobQueues = iLastActiveContext + 1;
+ for ( int i = 0; i < m_jobQueues.size(); ++i )
+ {
+ m_jobQueues[ i ].setupJobStealing( &m_jobQueues, m_numActiveJobQueues );
+ }
+ }
+ m_workerDirective->setDirectiveByRange(m_numThreads, BT_MAX_THREAD_COUNT, WorkerThreadDirectives::kGoToSleep);
+ }
+
+ void waitJobs()
+ {
+ BT_PROFILE( "waitJobs" );
+ // have the main thread work until the job queues are empty
+ int numMainThreadJobsFinished = 0;
+ for ( int i = 0; i < m_numActiveJobQueues; ++i )
+ {
+ while ( IJob* job = m_jobQueues[i].consumeJob() )
+ {
+ job->executeJob( 0 );
+ numMainThreadJobsFinished++;
+ }
+ }
+
+ // done with jobs for now, tell workers to rest (but not sleep)
+ setWorkerDirectives( WorkerThreadDirectives::kStayAwakeButIdle );
+
+ btU64 clockStart = m_clock.getTimeMicroseconds();
+ // wait for workers to finish any jobs in progress
+ while ( true )
+ {
+ int numWorkerJobsFinished = 0;
+ for ( int iThread = kFirstWorkerThreadId; iThread < m_numThreads; ++iThread )
+ {
+ ThreadLocalStorage* storage = &m_threadLocalStorage[iThread];
+ storage->m_mutex.lock();
+ numWorkerJobsFinished += storage->m_numJobsFinished;
+ storage->m_mutex.unlock();
+ }
+ if (numWorkerJobsFinished + numMainThreadJobsFinished == m_numJobs)
+ {
+ break;
+ }
+ btU64 timeElapsed = m_clock.getTimeMicroseconds() - clockStart;
+ btAssert(timeElapsed < 1000);
+ if (timeElapsed > 100000)
+ {
+ break;
+ }
+ btSpinPause();
+ }
+ }
+
+ void wakeWorkers(int numWorkersToWake)
+ {
+ BT_PROFILE( "wakeWorkers" );
+ btAssert( m_workerDirective->getDirective(1) == WorkerThreadDirectives::kScanForJobs );
+ int numDesiredWorkers = btMin(numWorkersToWake, m_numWorkerThreads);
+ int numActiveWorkers = 0;
+ for ( int iWorker = 0; iWorker < m_numWorkerThreads; ++iWorker )
+ {
+ // note this count of active workers is not necessarily totally reliable, because a worker thread could be
+ // just about to put itself to sleep. So we may on occasion fail to wake up all the workers. It should be rare.
+ ThreadLocalStorage& storage = m_threadLocalStorage[ kFirstWorkerThreadId + iWorker ];
+ if (storage.m_status != WorkerThreadStatus::kSleeping)
+ {
+ numActiveWorkers++;
+ }
+ }
+ for ( int iWorker = 0; iWorker < m_numWorkerThreads && numActiveWorkers < numDesiredWorkers; ++iWorker )
+ {
+ ThreadLocalStorage& storage = m_threadLocalStorage[ kFirstWorkerThreadId + iWorker ];
+ if (storage.m_status == WorkerThreadStatus::kSleeping)
+ {
+ m_threadSupport->runTask( iWorker, &storage );
+ numActiveWorkers++;
+ }
+ }
+ }
+
+ void waitForWorkersToSleep()
+ {
+ BT_PROFILE( "waitForWorkersToSleep" );
+ setWorkerDirectives( WorkerThreadDirectives::kGoToSleep );
+ m_threadSupport->waitForAllTasks();
+ for ( int i = kFirstWorkerThreadId; i < m_numThreads; i++ )
+ {
+ ThreadLocalStorage& storage = m_threadLocalStorage[i];
+ btAssert( storage.m_status == WorkerThreadStatus::kSleeping );
+ }
+ }
+
+ virtual void sleepWorkerThreadsHint() BT_OVERRIDE
+ {
+ BT_PROFILE( "sleepWorkerThreadsHint" );
+ // hint the task scheduler that we may not be using these threads for a little while
+ setWorkerDirectives( WorkerThreadDirectives::kGoToSleep );
+ }
+
+ void prepareWorkerThreads()
+ {
+ for ( int i = kFirstWorkerThreadId; i < m_numThreads; ++i )
+ {
+ ThreadLocalStorage& storage = m_threadLocalStorage[i];
+ storage.m_mutex.lock();
+ storage.m_numJobsFinished = 0;
+ storage.m_mutex.unlock();
+ }
+ setWorkerDirectives( WorkerThreadDirectives::kScanForJobs );
+ }
+
+ virtual void parallelFor( int iBegin, int iEnd, int grainSize, const btIParallelForBody& body ) BT_OVERRIDE
+ {
+ BT_PROFILE( "parallelFor_ThreadSupport" );
+ btAssert( iEnd >= iBegin );
+ btAssert( grainSize >= 1 );
+ int iterationCount = iEnd - iBegin;
+ if ( iterationCount > grainSize && m_numWorkerThreads > 0 && m_antiNestingLock.tryLock() )
+ {
+ typedef ParallelForJob JobType;
+ int jobCount = ( iterationCount + grainSize - 1 ) / grainSize;
+ m_numJobs = jobCount;
+ btAssert( jobCount >= 2 ); // need more than one job for multithreading
+ int jobSize = sizeof( JobType );
+
+ for (int i = 0; i < m_numActiveJobQueues; ++i)
+ {
+ m_jobQueues[i].clearQueue( jobCount, jobSize );
+ }
+ // prepare worker threads for incoming work
+ prepareWorkerThreads();
+ // submit all of the jobs
+ int iJob = 0;
+ int iThread = kFirstWorkerThreadId; // first worker thread
+ for ( int i = iBegin; i < iEnd; i += grainSize )
+ {
+ btAssert( iJob < jobCount );
+ int iE = btMin( i + grainSize, iEnd );
+ JobQueue* jq = m_perThreadJobQueues[ iThread ];
+ btAssert(jq);
+ btAssert((jq - &m_jobQueues[0]) < m_numActiveJobQueues);
+ void* jobMem = jq->allocJobMem(jobSize);
+ JobType* job = new ( jobMem ) ParallelForJob( i, iE, body ); // placement new
+ jq->submitJob( job );
+ iJob++;
+ iThread++;
+ if ( iThread >= m_numThreads )
+ {
+ iThread = kFirstWorkerThreadId; // first worker thread
+ }
+ }
+ wakeWorkers( jobCount - 1 );
+
+ // put the main thread to work on emptying the job queue and then wait for all workers to finish
+ waitJobs();
+ m_antiNestingLock.unlock();
+ }
+ else
+ {
+ BT_PROFILE( "parallelFor_mainThread" );
+ // just run on main thread
+ body.forLoop( iBegin, iEnd );
+ }
+ }
+ virtual btScalar parallelSum( int iBegin, int iEnd, int grainSize, const btIParallelSumBody& body ) BT_OVERRIDE
+ {
+ BT_PROFILE( "parallelSum_ThreadSupport" );
+ btAssert( iEnd >= iBegin );
+ btAssert( grainSize >= 1 );
+ int iterationCount = iEnd - iBegin;
+ if ( iterationCount > grainSize && m_numWorkerThreads > 0 && m_antiNestingLock.tryLock() )
+ {
+ typedef ParallelSumJob JobType;
+ int jobCount = ( iterationCount + grainSize - 1 ) / grainSize;
+ m_numJobs = jobCount;
+ btAssert( jobCount >= 2 ); // need more than one job for multithreading
+ int jobSize = sizeof( JobType );
+ for (int i = 0; i < m_numActiveJobQueues; ++i)
+ {
+ m_jobQueues[i].clearQueue( jobCount, jobSize );
+ }
+
+ // initialize summation
+ for ( int iThread = 0; iThread < m_numThreads; ++iThread )
+ {
+ m_threadLocalStorage[iThread].m_sumResult = btScalar(0);
+ }
+
+ // prepare worker threads for incoming work
+ prepareWorkerThreads();
+ // submit all of the jobs
+ int iJob = 0;
+ int iThread = kFirstWorkerThreadId; // first worker thread
+ for ( int i = iBegin; i < iEnd; i += grainSize )
+ {
+ btAssert( iJob < jobCount );
+ int iE = btMin( i + grainSize, iEnd );
+ JobQueue* jq = m_perThreadJobQueues[ iThread ];
+ btAssert(jq);
+ btAssert((jq - &m_jobQueues[0]) < m_numActiveJobQueues);
+ void* jobMem = jq->allocJobMem(jobSize);
+ JobType* job = new ( jobMem ) ParallelSumJob( i, iE, body, &m_threadLocalStorage[0] ); // placement new
+ jq->submitJob( job );
+ iJob++;
+ iThread++;
+ if ( iThread >= m_numThreads )
+ {
+ iThread = kFirstWorkerThreadId; // first worker thread
+ }
+ }
+ wakeWorkers( jobCount - 1 );
+
+ // put the main thread to work on emptying the job queue and then wait for all workers to finish
+ waitJobs();
+
+ // add up all the thread sums
+ btScalar sum = btScalar(0);
+ for ( int iThread = 0; iThread < m_numThreads; ++iThread )
+ {
+ sum += m_threadLocalStorage[ iThread ].m_sumResult;
+ }
+ m_antiNestingLock.unlock();
+ return sum;
+ }
+ else
+ {
+ BT_PROFILE( "parallelSum_mainThread" );
+ // just run on main thread
+ return body.sumLoop( iBegin, iEnd );
+ }
+ }
+};
+
+
+
+btITaskScheduler* btCreateDefaultTaskScheduler()
+{
+ btTaskSchedulerDefault* ts = new btTaskSchedulerDefault();
+ ts->init();
+ return ts;
+}
+
+#else // #if BT_THREADSAFE
+
+btITaskScheduler* btCreateDefaultTaskScheduler()
+{
+ return NULL;
+}
+
+#endif // #else // #if BT_THREADSAFE