diff options
Diffstat (limited to 'thirdparty/zstd/common/pool.c')
-rw-r--r-- | thirdparty/zstd/common/pool.c | 107 |
1 files changed, 61 insertions, 46 deletions
diff --git a/thirdparty/zstd/common/pool.c b/thirdparty/zstd/common/pool.c index a227044f7f..1b0fe1035d 100644 --- a/thirdparty/zstd/common/pool.c +++ b/thirdparty/zstd/common/pool.c @@ -5,6 +5,7 @@ * This source code is licensed under both the BSD-style license (found in the * LICENSE file in the root directory of this source tree) and the GPLv2 (found * in the COPYING file in the root directory of this source tree). + * You may select, at your option, one of the above-listed licenses. */ @@ -25,13 +26,14 @@ /* A job is a function and an opaque argument */ typedef struct POOL_job_s { - POOL_function function; - void *opaque; + POOL_function function; + void *opaque; } POOL_job; struct POOL_ctx_s { + ZSTD_customMem customMem; /* Keep track of the threads */ - pthread_t *threads; + ZSTD_pthread_t *threads; size_t numThreads; /* The queue is a circular buffer */ @@ -46,11 +48,11 @@ struct POOL_ctx_s { int queueEmpty; /* The mutex protects the queue */ - pthread_mutex_t queueMutex; + ZSTD_pthread_mutex_t queueMutex; /* Condition variable for pushers to wait on when the queue is full */ - pthread_cond_t queuePushCond; + ZSTD_pthread_cond_t queuePushCond; /* Condition variables for poppers to wait on when the queue is empty */ - pthread_cond_t queuePopCond; + ZSTD_pthread_cond_t queuePopCond; /* Indicates if the queue is shutting down */ int shutdown; }; @@ -65,14 +67,14 @@ static void* POOL_thread(void* opaque) { if (!ctx) { return NULL; } for (;;) { /* Lock the mutex and wait for a non-empty queue or until shutdown */ - pthread_mutex_lock(&ctx->queueMutex); + ZSTD_pthread_mutex_lock(&ctx->queueMutex); while (ctx->queueEmpty && !ctx->shutdown) { - pthread_cond_wait(&ctx->queuePopCond, &ctx->queueMutex); + ZSTD_pthread_cond_wait(&ctx->queuePopCond, &ctx->queueMutex); } /* empty => shutting down: so stop */ if (ctx->queueEmpty) { - pthread_mutex_unlock(&ctx->queueMutex); + ZSTD_pthread_mutex_unlock(&ctx->queueMutex); return opaque; } /* Pop a job off the queue */ @@ -81,28 +83,32 @@ static void* POOL_thread(void* opaque) { ctx->numThreadsBusy++; ctx->queueEmpty = ctx->queueHead == ctx->queueTail; /* Unlock the mutex, signal a pusher, and run the job */ - pthread_mutex_unlock(&ctx->queueMutex); - pthread_cond_signal(&ctx->queuePushCond); + ZSTD_pthread_mutex_unlock(&ctx->queueMutex); + ZSTD_pthread_cond_signal(&ctx->queuePushCond); job.function(job.opaque); /* If the intended queue size was 0, signal after finishing job */ if (ctx->queueSize == 1) { - pthread_mutex_lock(&ctx->queueMutex); + ZSTD_pthread_mutex_lock(&ctx->queueMutex); ctx->numThreadsBusy--; - pthread_mutex_unlock(&ctx->queueMutex); - pthread_cond_signal(&ctx->queuePushCond); + ZSTD_pthread_mutex_unlock(&ctx->queueMutex); + ZSTD_pthread_cond_signal(&ctx->queuePushCond); } } } /* for (;;) */ /* Unreachable */ } -POOL_ctx *POOL_create(size_t numThreads, size_t queueSize) { - POOL_ctx *ctx; +POOL_ctx* POOL_create(size_t numThreads, size_t queueSize) { + return POOL_create_advanced(numThreads, queueSize, ZSTD_defaultCMem); +} + +POOL_ctx* POOL_create_advanced(size_t numThreads, size_t queueSize, ZSTD_customMem customMem) { + POOL_ctx* ctx; /* Check the parameters */ if (!numThreads) { return NULL; } /* Allocate the context and zero initialize */ - ctx = (POOL_ctx *)calloc(1, sizeof(POOL_ctx)); + ctx = (POOL_ctx*)ZSTD_calloc(sizeof(POOL_ctx), customMem); if (!ctx) { return NULL; } /* Initialize the job queue. * It needs one extra space since one space is wasted to differentiate empty @@ -114,19 +120,20 @@ POOL_ctx *POOL_create(size_t numThreads, size_t queueSize) { ctx->queueTail = 0; ctx->numThreadsBusy = 0; ctx->queueEmpty = 1; - (void)pthread_mutex_init(&ctx->queueMutex, NULL); - (void)pthread_cond_init(&ctx->queuePushCond, NULL); - (void)pthread_cond_init(&ctx->queuePopCond, NULL); + (void)ZSTD_pthread_mutex_init(&ctx->queueMutex, NULL); + (void)ZSTD_pthread_cond_init(&ctx->queuePushCond, NULL); + (void)ZSTD_pthread_cond_init(&ctx->queuePopCond, NULL); ctx->shutdown = 0; /* Allocate space for the thread handles */ - ctx->threads = (pthread_t*)malloc(numThreads * sizeof(pthread_t)); + ctx->threads = (ZSTD_pthread_t*)ZSTD_malloc(numThreads * sizeof(ZSTD_pthread_t), customMem); ctx->numThreads = 0; + ctx->customMem = customMem; /* Check for errors */ if (!ctx->threads || !ctx->queue) { POOL_free(ctx); return NULL; } /* Initialize the threads */ { size_t i; for (i = 0; i < numThreads; ++i) { - if (pthread_create(&ctx->threads[i], NULL, &POOL_thread, ctx)) { + if (ZSTD_pthread_create(&ctx->threads[i], NULL, &POOL_thread, ctx)) { ctx->numThreads = i; POOL_free(ctx); return NULL; @@ -139,37 +146,37 @@ POOL_ctx *POOL_create(size_t numThreads, size_t queueSize) { /*! POOL_join() : Shutdown the queue, wake any sleeping threads, and join all of the threads. */ -static void POOL_join(POOL_ctx *ctx) { +static void POOL_join(POOL_ctx* ctx) { /* Shut down the queue */ - pthread_mutex_lock(&ctx->queueMutex); + ZSTD_pthread_mutex_lock(&ctx->queueMutex); ctx->shutdown = 1; - pthread_mutex_unlock(&ctx->queueMutex); + ZSTD_pthread_mutex_unlock(&ctx->queueMutex); /* Wake up sleeping threads */ - pthread_cond_broadcast(&ctx->queuePushCond); - pthread_cond_broadcast(&ctx->queuePopCond); + ZSTD_pthread_cond_broadcast(&ctx->queuePushCond); + ZSTD_pthread_cond_broadcast(&ctx->queuePopCond); /* Join all of the threads */ { size_t i; for (i = 0; i < ctx->numThreads; ++i) { - pthread_join(ctx->threads[i], NULL); + ZSTD_pthread_join(ctx->threads[i], NULL); } } } void POOL_free(POOL_ctx *ctx) { if (!ctx) { return; } POOL_join(ctx); - pthread_mutex_destroy(&ctx->queueMutex); - pthread_cond_destroy(&ctx->queuePushCond); - pthread_cond_destroy(&ctx->queuePopCond); - if (ctx->queue) free(ctx->queue); - if (ctx->threads) free(ctx->threads); - free(ctx); + ZSTD_pthread_mutex_destroy(&ctx->queueMutex); + ZSTD_pthread_cond_destroy(&ctx->queuePushCond); + ZSTD_pthread_cond_destroy(&ctx->queuePopCond); + ZSTD_free(ctx->queue, ctx->customMem); + ZSTD_free(ctx->threads, ctx->customMem); + ZSTD_free(ctx, ctx->customMem); } size_t POOL_sizeof(POOL_ctx *ctx) { if (ctx==NULL) return 0; /* supports sizeof NULL */ return sizeof(*ctx) + ctx->queueSize * sizeof(POOL_job) - + ctx->numThreads * sizeof(pthread_t); + + ctx->numThreads * sizeof(ZSTD_pthread_t); } /** @@ -191,12 +198,12 @@ void POOL_add(void* ctxVoid, POOL_function function, void *opaque) { POOL_ctx* const ctx = (POOL_ctx*)ctxVoid; if (!ctx) { return; } - pthread_mutex_lock(&ctx->queueMutex); + ZSTD_pthread_mutex_lock(&ctx->queueMutex); { POOL_job const job = {function, opaque}; /* Wait until there is space in the queue for the new job */ while (isQueueFull(ctx) && !ctx->shutdown) { - pthread_cond_wait(&ctx->queuePushCond, &ctx->queueMutex); + ZSTD_pthread_cond_wait(&ctx->queuePushCond, &ctx->queueMutex); } /* The queue is still going => there is space */ if (!ctx->shutdown) { @@ -205,8 +212,8 @@ void POOL_add(void* ctxVoid, POOL_function function, void *opaque) { ctx->queueTail = (ctx->queueTail + 1) % ctx->queueSize; } } - pthread_mutex_unlock(&ctx->queueMutex); - pthread_cond_signal(&ctx->queuePopCond); + ZSTD_pthread_mutex_unlock(&ctx->queueMutex); + ZSTD_pthread_cond_signal(&ctx->queuePopCond); } #else /* ZSTD_MULTITHREAD not defined */ @@ -214,26 +221,34 @@ void POOL_add(void* ctxVoid, POOL_function function, void *opaque) { /* We don't need any data, but if it is empty malloc() might return NULL. */ struct POOL_ctx_s { - int data; + int dummy; }; +static POOL_ctx g_ctx; POOL_ctx* POOL_create(size_t numThreads, size_t queueSize) { - (void)numThreads; - (void)queueSize; - return (POOL_ctx*)malloc(sizeof(POOL_ctx)); + return POOL_create_advanced(numThreads, queueSize, ZSTD_defaultCMem); +} + +POOL_ctx* POOL_create_advanced(size_t numThreads, size_t queueSize, ZSTD_customMem customMem) { + (void)numThreads; + (void)queueSize; + (void)customMem; + return &g_ctx; } void POOL_free(POOL_ctx* ctx) { - free(ctx); + assert(!ctx || ctx == &g_ctx); + (void)ctx; } void POOL_add(void* ctx, POOL_function function, void* opaque) { - (void)ctx; - function(opaque); + (void)ctx; + function(opaque); } size_t POOL_sizeof(POOL_ctx* ctx) { if (ctx==NULL) return 0; /* supports sizeof NULL */ + assert(ctx == &g_ctx); return sizeof(*ctx); } |