summaryrefslogtreecommitdiff
path: root/thirdparty/zstd/compress/zstdmt_compress.c
diff options
context:
space:
mode:
Diffstat (limited to 'thirdparty/zstd/compress/zstdmt_compress.c')
-rw-r--r--thirdparty/zstd/compress/zstdmt_compress.c90
1 files changed, 41 insertions, 49 deletions
diff --git a/thirdparty/zstd/compress/zstdmt_compress.c b/thirdparty/zstd/compress/zstdmt_compress.c
index 2cbd6ffade..38fbb90768 100644
--- a/thirdparty/zstd/compress/zstdmt_compress.c
+++ b/thirdparty/zstd/compress/zstdmt_compress.c
@@ -22,6 +22,7 @@
/* ====== Dependencies ====== */
#include <string.h> /* memcpy, memset */
#include <limits.h> /* INT_MAX, UINT_MAX */
+#include "mem.h" /* MEM_STATIC */
#include "pool.h" /* threadpool */
#include "threading.h" /* mutex */
#include "zstd_compress_internal.h" /* MIN, ERROR, ZSTD_*, ZSTD_highbit32 */
@@ -456,7 +457,7 @@ typedef struct {
* Must be acquired after the main mutex when acquiring both.
*/
ZSTD_pthread_mutex_t ldmWindowMutex;
- ZSTD_pthread_cond_t ldmWindowCond; /* Signaled when ldmWindow is udpated */
+ ZSTD_pthread_cond_t ldmWindowCond; /* Signaled when ldmWindow is updated */
ZSTD_window_t ldmWindow; /* A thread-safe copy of ldmState.window */
} serialState_t;
@@ -647,7 +648,7 @@ static void ZSTDMT_compressionJob(void* jobDescription)
buffer_t dstBuff = job->dstBuff;
size_t lastCBlockSize = 0;
- /* ressources */
+ /* resources */
if (cctx==NULL) JOB_ERROR(ERROR(memory_allocation));
if (dstBuff.start == NULL) { /* streaming job : doesn't provide a dstBuffer */
dstBuff = ZSTDMT_getBuffer(job->bufPool);
@@ -672,7 +673,7 @@ static void ZSTDMT_compressionJob(void* jobDescription)
if (ZSTD_isError(initError)) JOB_ERROR(initError);
} else { /* srcStart points at reloaded section */
U64 const pledgedSrcSize = job->firstJob ? job->fullFrameSize : job->src.size;
- { size_t const forceWindowError = ZSTD_CCtxParam_setParameter(&jobParams, ZSTD_c_forceMaxWindow, !job->firstJob);
+ { size_t const forceWindowError = ZSTD_CCtxParams_setParameter(&jobParams, ZSTD_c_forceMaxWindow, !job->firstJob);
if (ZSTD_isError(forceWindowError)) JOB_ERROR(forceWindowError);
}
{ size_t const initError = ZSTD_compressBegin_advanced_internal(cctx,
@@ -864,14 +865,10 @@ static size_t ZSTDMT_expandJobsTable (ZSTDMT_CCtx* mtctx, U32 nbWorkers) {
* Internal use only */
size_t ZSTDMT_CCtxParam_setNbWorkers(ZSTD_CCtx_params* params, unsigned nbWorkers)
{
- if (nbWorkers > ZSTDMT_NBWORKERS_MAX) nbWorkers = ZSTDMT_NBWORKERS_MAX;
- params->nbWorkers = nbWorkers;
- params->overlapLog = ZSTDMT_OVERLAPLOG_DEFAULT;
- params->jobSize = 0;
- return nbWorkers;
+ return ZSTD_CCtxParams_setParameter(params, ZSTD_c_nbWorkers, (int)nbWorkers);
}
-ZSTDMT_CCtx* ZSTDMT_createCCtx_advanced(unsigned nbWorkers, ZSTD_customMem cMem)
+MEM_STATIC ZSTDMT_CCtx* ZSTDMT_createCCtx_advanced_internal(unsigned nbWorkers, ZSTD_customMem cMem)
{
ZSTDMT_CCtx* mtctx;
U32 nbJobs = nbWorkers + 2;
@@ -906,6 +903,17 @@ ZSTDMT_CCtx* ZSTDMT_createCCtx_advanced(unsigned nbWorkers, ZSTD_customMem cMem)
return mtctx;
}
+ZSTDMT_CCtx* ZSTDMT_createCCtx_advanced(unsigned nbWorkers, ZSTD_customMem cMem)
+{
+#ifdef ZSTD_MULTITHREAD
+ return ZSTDMT_createCCtx_advanced_internal(nbWorkers, cMem);
+#else
+ (void)nbWorkers;
+ (void)cMem;
+ return NULL;
+#endif
+}
+
ZSTDMT_CCtx* ZSTDMT_createCCtx(unsigned nbWorkers)
{
return ZSTDMT_createCCtx_advanced(nbWorkers, ZSTD_defaultCMem);
@@ -986,26 +994,13 @@ ZSTDMT_CCtxParam_setMTCtxParameter(ZSTD_CCtx_params* params,
{
case ZSTDMT_p_jobSize :
DEBUGLOG(4, "ZSTDMT_CCtxParam_setMTCtxParameter : set jobSize to %i", value);
- if ( value != 0 /* default */
- && value < ZSTDMT_JOBSIZE_MIN)
- value = ZSTDMT_JOBSIZE_MIN;
- assert(value >= 0);
- if (value > ZSTDMT_JOBSIZE_MAX) value = ZSTDMT_JOBSIZE_MAX;
- params->jobSize = value;
- return value;
-
+ return ZSTD_CCtxParams_setParameter(params, ZSTD_c_jobSize, value);
case ZSTDMT_p_overlapLog :
DEBUGLOG(4, "ZSTDMT_p_overlapLog : %i", value);
- if (value < ZSTD_OVERLAPLOG_MIN) value = ZSTD_OVERLAPLOG_MIN;
- if (value > ZSTD_OVERLAPLOG_MAX) value = ZSTD_OVERLAPLOG_MAX;
- params->overlapLog = value;
- return value;
-
+ return ZSTD_CCtxParams_setParameter(params, ZSTD_c_overlapLog, value);
case ZSTDMT_p_rsyncable :
- value = (value != 0);
- params->rsyncable = value;
- return value;
-
+ DEBUGLOG(4, "ZSTD_p_rsyncable : %i", value);
+ return ZSTD_CCtxParams_setParameter(params, ZSTD_c_rsyncable, value);
default :
return ERROR(parameter_unsupported);
}
@@ -1021,32 +1016,29 @@ size_t ZSTDMT_getMTCtxParameter(ZSTDMT_CCtx* mtctx, ZSTDMT_parameter parameter,
{
switch (parameter) {
case ZSTDMT_p_jobSize:
- assert(mtctx->params.jobSize <= INT_MAX);
- *value = (int)(mtctx->params.jobSize);
- break;
+ return ZSTD_CCtxParams_getParameter(&mtctx->params, ZSTD_c_jobSize, value);
case ZSTDMT_p_overlapLog:
- *value = mtctx->params.overlapLog;
- break;
+ return ZSTD_CCtxParams_getParameter(&mtctx->params, ZSTD_c_overlapLog, value);
case ZSTDMT_p_rsyncable:
- *value = mtctx->params.rsyncable;
- break;
+ return ZSTD_CCtxParams_getParameter(&mtctx->params, ZSTD_c_rsyncable, value);
default:
return ERROR(parameter_unsupported);
}
- return 0;
}
/* Sets parameters relevant to the compression job,
* initializing others to default values. */
static ZSTD_CCtx_params ZSTDMT_initJobCCtxParams(ZSTD_CCtx_params const params)
{
- ZSTD_CCtx_params jobParams;
- memset(&jobParams, 0, sizeof(jobParams));
-
- jobParams.cParams = params.cParams;
- jobParams.fParams = params.fParams;
- jobParams.compressionLevel = params.compressionLevel;
-
+ ZSTD_CCtx_params jobParams = params;
+ /* Clear parameters related to multithreading */
+ jobParams.forceWindow = 0;
+ jobParams.nbWorkers = 0;
+ jobParams.jobSize = 0;
+ jobParams.overlapLog = 0;
+ jobParams.rsyncable = 0;
+ memset(&jobParams.ldmParams, 0, sizeof(ldmParams_t));
+ memset(&jobParams.customMem, 0, sizeof(ZSTD_customMem));
return jobParams;
}
@@ -1056,7 +1048,7 @@ static ZSTD_CCtx_params ZSTDMT_initJobCCtxParams(ZSTD_CCtx_params const params)
static size_t ZSTDMT_resize(ZSTDMT_CCtx* mtctx, unsigned nbWorkers)
{
if (POOL_resize(mtctx->factory, nbWorkers)) return ERROR(memory_allocation);
- CHECK_F( ZSTDMT_expandJobsTable(mtctx, nbWorkers) );
+ FORWARD_IF_ERROR( ZSTDMT_expandJobsTable(mtctx, nbWorkers) );
mtctx->bufPool = ZSTDMT_expandBufferPool(mtctx->bufPool, nbWorkers);
if (mtctx->bufPool == NULL) return ERROR(memory_allocation);
mtctx->cctxPool = ZSTDMT_expandCCtxPool(mtctx->cctxPool, nbWorkers);
@@ -1263,7 +1255,7 @@ static size_t ZSTDMT_compress_advanced_internal(
if (ZSTDMT_serialState_reset(&mtctx->serial, mtctx->seqPool, params, avgJobSize))
return ERROR(memory_allocation);
- CHECK_F( ZSTDMT_expandJobsTable(mtctx, nbJobs) ); /* only expands if necessary */
+ FORWARD_IF_ERROR( ZSTDMT_expandJobsTable(mtctx, nbJobs) ); /* only expands if necessary */
{ unsigned u;
for (u=0; u<nbJobs; u++) {
@@ -1396,7 +1388,7 @@ size_t ZSTDMT_initCStream_internal(
/* init */
if (params.nbWorkers != mtctx->params.nbWorkers)
- CHECK_F( ZSTDMT_resize(mtctx, params.nbWorkers) );
+ FORWARD_IF_ERROR( ZSTDMT_resize(mtctx, params.nbWorkers) );
if (params.jobSize != 0 && params.jobSize < ZSTDMT_JOBSIZE_MIN) params.jobSize = ZSTDMT_JOBSIZE_MIN;
if (params.jobSize > (size_t)ZSTDMT_JOBSIZE_MAX) params.jobSize = ZSTDMT_JOBSIZE_MAX;
@@ -1547,7 +1539,7 @@ size_t ZSTDMT_initCStream(ZSTDMT_CCtx* mtctx, int compressionLevel) {
/* ZSTDMT_writeLastEmptyBlock()
* Write a single empty block with an end-of-frame to finish a frame.
* Job must be created from streaming variant.
- * This function is always successfull if expected conditions are fulfilled.
+ * This function is always successful if expected conditions are fulfilled.
*/
static void ZSTDMT_writeLastEmptyBlock(ZSTDMT_jobDescription* job)
{
@@ -1987,7 +1979,7 @@ size_t ZSTDMT_compressStream_generic(ZSTDMT_CCtx* mtctx,
assert(input->pos <= input->size);
if (mtctx->singleBlockingThread) { /* delegate to single-thread (synchronous) */
- return ZSTD_compressStream_generic(mtctx->cctxPool->cctx[0], output, input, endOp);
+ return ZSTD_compressStream2(mtctx->cctxPool->cctx[0], output, input, endOp);
}
if ((mtctx->frameEnded) && (endOp==ZSTD_e_continue)) {
@@ -2051,7 +2043,7 @@ size_t ZSTDMT_compressStream_generic(ZSTDMT_CCtx* mtctx,
|| ((endOp == ZSTD_e_end) && (!mtctx->frameEnded)) ) { /* must finish the frame with a zero-size block */
size_t const jobSize = mtctx->inBuff.filled;
assert(mtctx->inBuff.filled <= mtctx->targetSectionSize);
- CHECK_F( ZSTDMT_createCompressionJob(mtctx, jobSize, endOp) );
+ FORWARD_IF_ERROR( ZSTDMT_createCompressionJob(mtctx, jobSize, endOp) );
}
/* check for potential compressed data ready to be flushed */
@@ -2065,7 +2057,7 @@ size_t ZSTDMT_compressStream_generic(ZSTDMT_CCtx* mtctx,
size_t ZSTDMT_compressStream(ZSTDMT_CCtx* mtctx, ZSTD_outBuffer* output, ZSTD_inBuffer* input)
{
- CHECK_F( ZSTDMT_compressStream_generic(mtctx, output, input, ZSTD_e_continue) );
+ FORWARD_IF_ERROR( ZSTDMT_compressStream_generic(mtctx, output, input, ZSTD_e_continue) );
/* recommended next input size : fill current input buffer */
return mtctx->targetSectionSize - mtctx->inBuff.filled; /* note : could be zero when input buffer is fully filled and no more availability to create new job */
@@ -2082,7 +2074,7 @@ static size_t ZSTDMT_flushStream_internal(ZSTDMT_CCtx* mtctx, ZSTD_outBuffer* ou
|| ((endFrame==ZSTD_e_end) && !mtctx->frameEnded)) { /* need a last 0-size block to end frame */
DEBUGLOG(5, "ZSTDMT_flushStream_internal : create a new job (%u bytes, end:%u)",
(U32)srcSize, (U32)endFrame);
- CHECK_F( ZSTDMT_createCompressionJob(mtctx, srcSize, endFrame) );
+ FORWARD_IF_ERROR( ZSTDMT_createCompressionJob(mtctx, srcSize, endFrame) );
}
/* check if there is any data available to flush */