diff options
Diffstat (limited to 'thirdparty/zstd/compress/zstdmt_compress.c')
| -rw-r--r-- | thirdparty/zstd/compress/zstdmt_compress.c | 480 | 
1 files changed, 74 insertions, 406 deletions
| diff --git a/thirdparty/zstd/compress/zstdmt_compress.c b/thirdparty/zstd/compress/zstdmt_compress.c index 1e3c8fdbee..50454a50b9 100644 --- a/thirdparty/zstd/compress/zstdmt_compress.c +++ b/thirdparty/zstd/compress/zstdmt_compress.c @@ -20,8 +20,7 @@  /* ======   Dependencies   ====== */ -#include <string.h>      /* memcpy, memset */ -#include <limits.h>      /* INT_MAX, UINT_MAX */ +#include "../common/zstd_deps.h"   /* ZSTD_memcpy, ZSTD_memset, INT_MAX, UINT_MAX */  #include "../common/mem.h"         /* MEM_STATIC */  #include "../common/pool.h"        /* threadpool */  #include "../common/threading.h"   /* mutex */ @@ -106,11 +105,11 @@ typedef struct ZSTDMT_bufferPool_s {  static ZSTDMT_bufferPool* ZSTDMT_createBufferPool(unsigned nbWorkers, ZSTD_customMem cMem)  {      unsigned const maxNbBuffers = 2*nbWorkers + 3; -    ZSTDMT_bufferPool* const bufPool = (ZSTDMT_bufferPool*)ZSTD_calloc( +    ZSTDMT_bufferPool* const bufPool = (ZSTDMT_bufferPool*)ZSTD_customCalloc(          sizeof(ZSTDMT_bufferPool) + (maxNbBuffers-1) * sizeof(buffer_t), cMem);      if (bufPool==NULL) return NULL;      if (ZSTD_pthread_mutex_init(&bufPool->poolMutex, NULL)) { -        ZSTD_free(bufPool, cMem); +        ZSTD_customFree(bufPool, cMem);          return NULL;      }      bufPool->bufferSize = 64 KB; @@ -127,10 +126,10 @@ static void ZSTDMT_freeBufferPool(ZSTDMT_bufferPool* bufPool)      if (!bufPool) return;   /* compatibility with free on NULL */      for (u=0; u<bufPool->totalBuffers; u++) {          DEBUGLOG(4, "free buffer %2u (address:%08X)", u, (U32)(size_t)bufPool->bTable[u].start); -        ZSTD_free(bufPool->bTable[u].start, bufPool->cMem); +        ZSTD_customFree(bufPool->bTable[u].start, bufPool->cMem);      }      ZSTD_pthread_mutex_destroy(&bufPool->poolMutex); -    ZSTD_free(bufPool, bufPool->cMem); +    ZSTD_customFree(bufPool, bufPool->cMem);  }  /* only works at initialization, not during compression */ @@ -201,13 +200,13 @@ static buffer_t ZSTDMT_getBuffer(ZSTDMT_bufferPool* bufPool)          }          /* size conditions not respected : scratch this buffer, create new one */          DEBUGLOG(5, "ZSTDMT_getBuffer: existing buffer does not meet size conditions => freeing"); -        ZSTD_free(buf.start, bufPool->cMem); +        ZSTD_customFree(buf.start, bufPool->cMem);      }      ZSTD_pthread_mutex_unlock(&bufPool->poolMutex);      /* create new buffer */      DEBUGLOG(5, "ZSTDMT_getBuffer: create a new buffer");      {   buffer_t buffer; -        void* const start = ZSTD_malloc(bSize, bufPool->cMem); +        void* const start = ZSTD_customMalloc(bSize, bufPool->cMem);          buffer.start = start;   /* note : start can be NULL if malloc fails ! */          buffer.capacity = (start==NULL) ? 0 : bSize;          if (start==NULL) { @@ -229,13 +228,13 @@ static buffer_t ZSTDMT_resizeBuffer(ZSTDMT_bufferPool* bufPool, buffer_t buffer)  {      size_t const bSize = bufPool->bufferSize;      if (buffer.capacity < bSize) { -        void* const start = ZSTD_malloc(bSize, bufPool->cMem); +        void* const start = ZSTD_customMalloc(bSize, bufPool->cMem);          buffer_t newBuffer;          newBuffer.start = start;          newBuffer.capacity = start == NULL ? 0 : bSize;          if (start != NULL) {              assert(newBuffer.capacity >= buffer.capacity); -            memcpy(newBuffer.start, buffer.start, buffer.capacity); +            ZSTD_memcpy(newBuffer.start, buffer.start, buffer.capacity);              DEBUGLOG(5, "ZSTDMT_resizeBuffer: created buffer of size %u", (U32)bSize);              return newBuffer;          } @@ -261,14 +260,12 @@ static void ZSTDMT_releaseBuffer(ZSTDMT_bufferPool* bufPool, buffer_t buf)      ZSTD_pthread_mutex_unlock(&bufPool->poolMutex);      /* Reached bufferPool capacity (should not happen) */      DEBUGLOG(5, "ZSTDMT_releaseBuffer: pool capacity reached => freeing "); -    ZSTD_free(buf.start, bufPool->cMem); +    ZSTD_customFree(buf.start, bufPool->cMem);  }  /* =====   Seq Pool Wrapper   ====== */ -static rawSeqStore_t kNullRawSeqStore = {NULL, 0, 0, 0}; -  typedef ZSTDMT_bufferPool ZSTDMT_seqPool;  static size_t ZSTDMT_sizeof_seqPool(ZSTDMT_seqPool* seqPool) @@ -278,7 +275,7 @@ static size_t ZSTDMT_sizeof_seqPool(ZSTDMT_seqPool* seqPool)  static rawSeqStore_t bufferToSeq(buffer_t buffer)  { -    rawSeqStore_t seq = {NULL, 0, 0, 0}; +    rawSeqStore_t seq = kNullRawSeqStore;      seq.seq = (rawSeq*)buffer.start;      seq.capacity = buffer.capacity / sizeof(rawSeq);      return seq; @@ -354,7 +351,7 @@ static void ZSTDMT_freeCCtxPool(ZSTDMT_CCtxPool* pool)      for (cid=0; cid<pool->totalCCtx; cid++)          ZSTD_freeCCtx(pool->cctx[cid]);  /* note : compatible with free on NULL */      ZSTD_pthread_mutex_destroy(&pool->poolMutex); -    ZSTD_free(pool, pool->cMem); +    ZSTD_customFree(pool, pool->cMem);  }  /* ZSTDMT_createCCtxPool() : @@ -362,12 +359,12 @@ static void ZSTDMT_freeCCtxPool(ZSTDMT_CCtxPool* pool)  static ZSTDMT_CCtxPool* ZSTDMT_createCCtxPool(int nbWorkers,                                                ZSTD_customMem cMem)  { -    ZSTDMT_CCtxPool* const cctxPool = (ZSTDMT_CCtxPool*) ZSTD_calloc( +    ZSTDMT_CCtxPool* const cctxPool = (ZSTDMT_CCtxPool*) ZSTD_customCalloc(          sizeof(ZSTDMT_CCtxPool) + (nbWorkers-1)*sizeof(ZSTD_CCtx*), cMem);      assert(nbWorkers > 0);      if (!cctxPool) return NULL;      if (ZSTD_pthread_mutex_init(&cctxPool->poolMutex, NULL)) { -        ZSTD_free(cctxPool, cMem); +        ZSTD_customFree(cctxPool, cMem);          return NULL;      }      cctxPool->cMem = cMem; @@ -478,7 +475,7 @@ ZSTDMT_serialState_reset(serialState_t* serialState,          serialState->ldmState.hashPower =                  ZSTD_rollingHash_primePower(params.ldmParams.minMatchLength);      } else { -        memset(¶ms.ldmParams, 0, sizeof(params.ldmParams)); +        ZSTD_memset(¶ms.ldmParams, 0, sizeof(params.ldmParams));      }      serialState->nextJobID = 0;      if (params.fParams.checksumFlag) @@ -499,18 +496,18 @@ ZSTDMT_serialState_reset(serialState_t* serialState,          ZSTD_window_init(&serialState->ldmState.window);          /* Resize tables and output space if necessary. */          if (serialState->ldmState.hashTable == NULL || serialState->params.ldmParams.hashLog < hashLog) { -            ZSTD_free(serialState->ldmState.hashTable, cMem); -            serialState->ldmState.hashTable = (ldmEntry_t*)ZSTD_malloc(hashSize, cMem); +            ZSTD_customFree(serialState->ldmState.hashTable, cMem); +            serialState->ldmState.hashTable = (ldmEntry_t*)ZSTD_customMalloc(hashSize, cMem);          }          if (serialState->ldmState.bucketOffsets == NULL || prevBucketLog < bucketLog) { -            ZSTD_free(serialState->ldmState.bucketOffsets, cMem); -            serialState->ldmState.bucketOffsets = (BYTE*)ZSTD_malloc(bucketSize, cMem); +            ZSTD_customFree(serialState->ldmState.bucketOffsets, cMem); +            serialState->ldmState.bucketOffsets = (BYTE*)ZSTD_customMalloc(bucketSize, cMem);          }          if (!serialState->ldmState.hashTable || !serialState->ldmState.bucketOffsets)              return 1;          /* Zero the tables */ -        memset(serialState->ldmState.hashTable, 0, hashSize); -        memset(serialState->ldmState.bucketOffsets, 0, bucketSize); +        ZSTD_memset(serialState->ldmState.hashTable, 0, hashSize); +        ZSTD_memset(serialState->ldmState.bucketOffsets, 0, bucketSize);          /* Update window state and fill hash table with dict */          serialState->ldmState.loadedDictEnd = 0; @@ -537,7 +534,7 @@ ZSTDMT_serialState_reset(serialState_t* serialState,  static int ZSTDMT_serialState_init(serialState_t* serialState)  {      int initError = 0; -    memset(serialState, 0, sizeof(*serialState)); +    ZSTD_memset(serialState, 0, sizeof(*serialState));      initError |= ZSTD_pthread_mutex_init(&serialState->mutex, NULL);      initError |= ZSTD_pthread_cond_init(&serialState->cond, NULL);      initError |= ZSTD_pthread_mutex_init(&serialState->ldmWindowMutex, NULL); @@ -552,8 +549,8 @@ static void ZSTDMT_serialState_free(serialState_t* serialState)      ZSTD_pthread_cond_destroy(&serialState->cond);      ZSTD_pthread_mutex_destroy(&serialState->ldmWindowMutex);      ZSTD_pthread_cond_destroy(&serialState->ldmWindowCond); -    ZSTD_free(serialState->ldmState.hashTable, cMem); -    ZSTD_free(serialState->ldmState.bucketOffsets, cMem); +    ZSTD_customFree(serialState->ldmState.hashTable, cMem); +    ZSTD_customFree(serialState->ldmState.bucketOffsets, cMem);  }  static void ZSTDMT_serialState_update(serialState_t* serialState, @@ -820,7 +817,6 @@ struct ZSTDMT_CCtx_s {      roundBuff_t roundBuff;      serialState_t serial;      rsyncState_t rsync; -    unsigned singleBlockingThread;      unsigned jobIDMask;      unsigned doneJobID;      unsigned nextJobID; @@ -832,6 +828,7 @@ struct ZSTDMT_CCtx_s {      ZSTD_customMem cMem;      ZSTD_CDict* cdictLocal;      const ZSTD_CDict* cdict; +    unsigned providedFactory: 1;  };  static void ZSTDMT_freeJobsTable(ZSTDMT_jobDescription* jobTable, U32 nbJobs, ZSTD_customMem cMem) @@ -842,7 +839,7 @@ static void ZSTDMT_freeJobsTable(ZSTDMT_jobDescription* jobTable, U32 nbJobs, ZS          ZSTD_pthread_mutex_destroy(&jobTable[jobNb].job_mutex);          ZSTD_pthread_cond_destroy(&jobTable[jobNb].job_cond);      } -    ZSTD_free(jobTable, cMem); +    ZSTD_customFree(jobTable, cMem);  }  /* ZSTDMT_allocJobsTable() @@ -854,7 +851,7 @@ static ZSTDMT_jobDescription* ZSTDMT_createJobsTable(U32* nbJobsPtr, ZSTD_custom      U32 const nbJobs = 1 << nbJobsLog2;      U32 jobNb;      ZSTDMT_jobDescription* const jobTable = (ZSTDMT_jobDescription*) -                ZSTD_calloc(nbJobs * sizeof(ZSTDMT_jobDescription), cMem); +                ZSTD_customCalloc(nbJobs * sizeof(ZSTDMT_jobDescription), cMem);      int initError = 0;      if (jobTable==NULL) return NULL;      *nbJobsPtr = nbJobs; @@ -885,12 +882,12 @@ static size_t ZSTDMT_expandJobsTable (ZSTDMT_CCtx* mtctx, U32 nbWorkers) {  /* ZSTDMT_CCtxParam_setNbWorkers():   * Internal use only */ -size_t ZSTDMT_CCtxParam_setNbWorkers(ZSTD_CCtx_params* params, unsigned nbWorkers) +static size_t ZSTDMT_CCtxParam_setNbWorkers(ZSTD_CCtx_params* params, unsigned nbWorkers)  {      return ZSTD_CCtxParams_setParameter(params, ZSTD_c_nbWorkers, (int)nbWorkers);  } -MEM_STATIC ZSTDMT_CCtx* ZSTDMT_createCCtx_advanced_internal(unsigned nbWorkers, ZSTD_customMem cMem) +MEM_STATIC ZSTDMT_CCtx* ZSTDMT_createCCtx_advanced_internal(unsigned nbWorkers, ZSTD_customMem cMem, ZSTD_threadPool* pool)  {      ZSTDMT_CCtx* mtctx;      U32 nbJobs = nbWorkers + 2; @@ -903,12 +900,19 @@ MEM_STATIC ZSTDMT_CCtx* ZSTDMT_createCCtx_advanced_internal(unsigned nbWorkers,          /* invalid custom allocator */          return NULL; -    mtctx = (ZSTDMT_CCtx*) ZSTD_calloc(sizeof(ZSTDMT_CCtx), cMem); +    mtctx = (ZSTDMT_CCtx*) ZSTD_customCalloc(sizeof(ZSTDMT_CCtx), cMem);      if (!mtctx) return NULL;      ZSTDMT_CCtxParam_setNbWorkers(&mtctx->params, nbWorkers);      mtctx->cMem = cMem;      mtctx->allJobsCompleted = 1; -    mtctx->factory = POOL_create_advanced(nbWorkers, 0, cMem); +    if (pool != NULL) { +      mtctx->factory = pool; +      mtctx->providedFactory = 1; +    } +    else { +      mtctx->factory = POOL_create_advanced(nbWorkers, 0, cMem); +      mtctx->providedFactory = 0; +    }      mtctx->jobs = ZSTDMT_createJobsTable(&nbJobs, cMem);      assert(nbJobs > 0); assert((nbJobs & (nbJobs - 1)) == 0);  /* ensure nbJobs is a power of 2 */      mtctx->jobIDMask = nbJobs - 1; @@ -925,22 +929,18 @@ MEM_STATIC ZSTDMT_CCtx* ZSTDMT_createCCtx_advanced_internal(unsigned nbWorkers,      return mtctx;  } -ZSTDMT_CCtx* ZSTDMT_createCCtx_advanced(unsigned nbWorkers, ZSTD_customMem cMem) +ZSTDMT_CCtx* ZSTDMT_createCCtx_advanced(unsigned nbWorkers, ZSTD_customMem cMem, ZSTD_threadPool* pool)  {  #ifdef ZSTD_MULTITHREAD -    return ZSTDMT_createCCtx_advanced_internal(nbWorkers, cMem); +    return ZSTDMT_createCCtx_advanced_internal(nbWorkers, cMem, pool);  #else      (void)nbWorkers;      (void)cMem; +    (void)pool;      return NULL;  #endif  } -ZSTDMT_CCtx* ZSTDMT_createCCtx(unsigned nbWorkers) -{ -    return ZSTDMT_createCCtx_advanced(nbWorkers, ZSTD_defaultCMem); -} -  /* ZSTDMT_releaseAllJobResources() :   * note : ensure all workers are killed first ! */ @@ -957,7 +957,7 @@ static void ZSTDMT_releaseAllJobResources(ZSTDMT_CCtx* mtctx)          ZSTDMT_releaseBuffer(mtctx->bufPool, mtctx->jobs[jobID].dstBuff);          /* Clear the job description, but keep the mutex/cond */ -        memset(&mtctx->jobs[jobID], 0, sizeof(mtctx->jobs[jobID])); +        ZSTD_memset(&mtctx->jobs[jobID], 0, sizeof(mtctx->jobs[jobID]));          mtctx->jobs[jobID].job_mutex = mutex;          mtctx->jobs[jobID].job_cond = cond;      } @@ -984,7 +984,8 @@ static void ZSTDMT_waitForAllJobsCompleted(ZSTDMT_CCtx* mtctx)  size_t ZSTDMT_freeCCtx(ZSTDMT_CCtx* mtctx)  {      if (mtctx==NULL) return 0;   /* compatible with free on NULL */ -    POOL_free(mtctx->factory);   /* stop and free worker threads */ +    if (!mtctx->providedFactory) +        POOL_free(mtctx->factory);   /* stop and free worker threads */      ZSTDMT_releaseAllJobResources(mtctx);  /* release job resources into pools first */      ZSTDMT_freeJobsTable(mtctx->jobs, mtctx->jobIDMask+1, mtctx->cMem);      ZSTDMT_freeBufferPool(mtctx->bufPool); @@ -993,8 +994,8 @@ size_t ZSTDMT_freeCCtx(ZSTDMT_CCtx* mtctx)      ZSTDMT_serialState_free(&mtctx->serial);      ZSTD_freeCDict(mtctx->cdictLocal);      if (mtctx->roundBuff.buffer) -        ZSTD_free(mtctx->roundBuff.buffer, mtctx->cMem); -    ZSTD_free(mtctx, mtctx->cMem); +        ZSTD_customFree(mtctx->roundBuff.buffer, mtctx->cMem); +    ZSTD_customFree(mtctx, mtctx->cMem);      return 0;  } @@ -1011,65 +1012,6 @@ size_t ZSTDMT_sizeof_CCtx(ZSTDMT_CCtx* mtctx)              + mtctx->roundBuff.capacity;  } -/* Internal only */ -size_t -ZSTDMT_CCtxParam_setMTCtxParameter(ZSTD_CCtx_params* params, -                                   ZSTDMT_parameter parameter, -                                   int value) -{ -    DEBUGLOG(4, "ZSTDMT_CCtxParam_setMTCtxParameter"); -    switch(parameter) -    { -    case ZSTDMT_p_jobSize : -        DEBUGLOG(4, "ZSTDMT_CCtxParam_setMTCtxParameter : set jobSize to %i", value); -        return ZSTD_CCtxParams_setParameter(params, ZSTD_c_jobSize, value); -    case ZSTDMT_p_overlapLog : -        DEBUGLOG(4, "ZSTDMT_p_overlapLog : %i", value); -        return ZSTD_CCtxParams_setParameter(params, ZSTD_c_overlapLog, value); -    case ZSTDMT_p_rsyncable : -        DEBUGLOG(4, "ZSTD_p_rsyncable : %i", value); -        return ZSTD_CCtxParams_setParameter(params, ZSTD_c_rsyncable, value); -    default : -        return ERROR(parameter_unsupported); -    } -} - -size_t ZSTDMT_setMTCtxParameter(ZSTDMT_CCtx* mtctx, ZSTDMT_parameter parameter, int value) -{ -    DEBUGLOG(4, "ZSTDMT_setMTCtxParameter"); -    return ZSTDMT_CCtxParam_setMTCtxParameter(&mtctx->params, parameter, value); -} - -size_t ZSTDMT_getMTCtxParameter(ZSTDMT_CCtx* mtctx, ZSTDMT_parameter parameter, int* value) -{ -    switch (parameter) { -    case ZSTDMT_p_jobSize: -        return ZSTD_CCtxParams_getParameter(&mtctx->params, ZSTD_c_jobSize, value); -    case ZSTDMT_p_overlapLog: -        return ZSTD_CCtxParams_getParameter(&mtctx->params, ZSTD_c_overlapLog, value); -    case ZSTDMT_p_rsyncable: -        return ZSTD_CCtxParams_getParameter(&mtctx->params, ZSTD_c_rsyncable, value); -    default: -        return ERROR(parameter_unsupported); -    } -} - -/* Sets parameters relevant to the compression job, - * initializing others to default values. */ -static ZSTD_CCtx_params ZSTDMT_initJobCCtxParams(const ZSTD_CCtx_params* params) -{ -    ZSTD_CCtx_params jobParams = *params; -    /* Clear parameters related to multithreading */ -    jobParams.forceWindow = 0; -    jobParams.nbWorkers = 0; -    jobParams.jobSize = 0; -    jobParams.overlapLog = 0; -    jobParams.rsyncable = 0; -    memset(&jobParams.ldmParams, 0, sizeof(ldmParams_t)); -    memset(&jobParams.customMem, 0, sizeof(ZSTD_customMem)); -    return jobParams; -} -  /* ZSTDMT_resize() :   * @return : error code if fails, 0 on success */ @@ -1098,7 +1040,7 @@ void ZSTDMT_updateCParams_whileCompressing(ZSTDMT_CCtx* mtctx, const ZSTD_CCtx_p      DEBUGLOG(5, "ZSTDMT_updateCParams_whileCompressing (level:%i)",                  compressionLevel);      mtctx->params.compressionLevel = compressionLevel; -    {   ZSTD_compressionParameters cParams = ZSTD_getCParamsFromCCtxParams(cctxParams, ZSTD_CONTENTSIZE_UNKNOWN, 0); +    {   ZSTD_compressionParameters cParams = ZSTD_getCParamsFromCCtxParams(cctxParams, ZSTD_CONTENTSIZE_UNKNOWN, 0, ZSTD_cpm_noAttachDict);          cParams.windowLog = saved_wlog;          mtctx->params.cParams = cParams;      } @@ -1185,8 +1127,8 @@ static unsigned ZSTDMT_computeTargetJobLog(const ZSTD_CCtx_params* params)      if (params->ldmParams.enableLdm) {          /* In Long Range Mode, the windowLog is typically oversized.           * In which case, it's preferable to determine the jobSize -         * based on chainLog instead. */ -        jobLog = MAX(21, params->cParams.chainLog + 4); +         * based on cycleLog instead. */ +        jobLog = MAX(21, ZSTD_cycleLog(params->cParams.chainLog, params->cParams.strategy) + 3);      } else {          jobLog = MAX(20, params->cParams.windowLog + 2);      } @@ -1240,174 +1182,6 @@ static size_t ZSTDMT_computeOverlapSize(const ZSTD_CCtx_params* params)      return (ovLog==0) ? 0 : (size_t)1 << ovLog;  } -static unsigned -ZSTDMT_computeNbJobs(const ZSTD_CCtx_params* params, size_t srcSize, unsigned nbWorkers) -{ -    assert(nbWorkers>0); -    {   size_t const jobSizeTarget = (size_t)1 << ZSTDMT_computeTargetJobLog(params); -        size_t const jobMaxSize = jobSizeTarget << 2; -        size_t const passSizeMax = jobMaxSize * nbWorkers; -        unsigned const multiplier = (unsigned)(srcSize / passSizeMax) + 1; -        unsigned const nbJobsLarge = multiplier * nbWorkers; -        unsigned const nbJobsMax = (unsigned)(srcSize / jobSizeTarget) + 1; -        unsigned const nbJobsSmall = MIN(nbJobsMax, nbWorkers); -        return (multiplier>1) ? nbJobsLarge : nbJobsSmall; -}   } - -/* ZSTDMT_compress_advanced_internal() : - * This is a blocking function : it will only give back control to caller after finishing its compression job. - */ -static size_t -ZSTDMT_compress_advanced_internal( -                ZSTDMT_CCtx* mtctx, -                void* dst, size_t dstCapacity, -          const void* src, size_t srcSize, -          const ZSTD_CDict* cdict, -                ZSTD_CCtx_params params) -{ -    ZSTD_CCtx_params const jobParams = ZSTDMT_initJobCCtxParams(¶ms); -    size_t const overlapSize = ZSTDMT_computeOverlapSize(¶ms); -    unsigned const nbJobs = ZSTDMT_computeNbJobs(¶ms, srcSize, params.nbWorkers); -    size_t const proposedJobSize = (srcSize + (nbJobs-1)) / nbJobs; -    size_t const avgJobSize = (((proposedJobSize-1) & 0x1FFFF) < 0x7FFF) ? proposedJobSize + 0xFFFF : proposedJobSize;   /* avoid too small last block */ -    const char* const srcStart = (const char*)src; -    size_t remainingSrcSize = srcSize; -    unsigned const compressWithinDst = (dstCapacity >= ZSTD_compressBound(srcSize)) ? nbJobs : (unsigned)(dstCapacity / ZSTD_compressBound(avgJobSize));  /* presumes avgJobSize >= 256 KB, which should be the case */ -    size_t frameStartPos = 0, dstBufferPos = 0; -    assert(jobParams.nbWorkers == 0); -    assert(mtctx->cctxPool->totalCCtx == params.nbWorkers); - -    params.jobSize = (U32)avgJobSize; -    DEBUGLOG(4, "ZSTDMT_compress_advanced_internal: nbJobs=%2u (rawSize=%u bytes; fixedSize=%u) ", -                nbJobs, (U32)proposedJobSize, (U32)avgJobSize); - -    if ((nbJobs==1) | (params.nbWorkers<=1)) {   /* fallback to single-thread mode : this is a blocking invocation anyway */ -        ZSTD_CCtx* const cctx = mtctx->cctxPool->cctx[0]; -        DEBUGLOG(4, "ZSTDMT_compress_advanced_internal: fallback to single-thread mode"); -        if (cdict) return ZSTD_compress_usingCDict_advanced(cctx, dst, dstCapacity, src, srcSize, cdict, jobParams.fParams); -        return ZSTD_compress_advanced_internal(cctx, dst, dstCapacity, src, srcSize, NULL, 0, &jobParams); -    } - -    assert(avgJobSize >= 256 KB);  /* condition for ZSTD_compressBound(A) + ZSTD_compressBound(B) <= ZSTD_compressBound(A+B), required to compress directly into Dst (no additional buffer) */ -    ZSTDMT_setBufferSize(mtctx->bufPool, ZSTD_compressBound(avgJobSize) ); -    /* LDM doesn't even try to load the dictionary in single-ingestion mode */ -    if (ZSTDMT_serialState_reset(&mtctx->serial, mtctx->seqPool, params, avgJobSize, NULL, 0, ZSTD_dct_auto)) -        return ERROR(memory_allocation); - -    FORWARD_IF_ERROR( ZSTDMT_expandJobsTable(mtctx, nbJobs) , "");  /* only expands if necessary */ - -    {   unsigned u; -        for (u=0; u<nbJobs; u++) { -            size_t const jobSize = MIN(remainingSrcSize, avgJobSize); -            size_t const dstBufferCapacity = ZSTD_compressBound(jobSize); -            buffer_t const dstAsBuffer = { (char*)dst + dstBufferPos, dstBufferCapacity }; -            buffer_t const dstBuffer = u < compressWithinDst ? dstAsBuffer : g_nullBuffer; -            size_t dictSize = u ? overlapSize : 0; - -            mtctx->jobs[u].prefix.start = srcStart + frameStartPos - dictSize; -            mtctx->jobs[u].prefix.size = dictSize; -            mtctx->jobs[u].src.start = srcStart + frameStartPos; -            mtctx->jobs[u].src.size = jobSize; assert(jobSize > 0);  /* avoid job.src.size == 0 */ -            mtctx->jobs[u].consumed = 0; -            mtctx->jobs[u].cSize = 0; -            mtctx->jobs[u].cdict = (u==0) ? cdict : NULL; -            mtctx->jobs[u].fullFrameSize = srcSize; -            mtctx->jobs[u].params = jobParams; -            /* do not calculate checksum within sections, but write it in header for first section */ -            mtctx->jobs[u].dstBuff = dstBuffer; -            mtctx->jobs[u].cctxPool = mtctx->cctxPool; -            mtctx->jobs[u].bufPool = mtctx->bufPool; -            mtctx->jobs[u].seqPool = mtctx->seqPool; -            mtctx->jobs[u].serial = &mtctx->serial; -            mtctx->jobs[u].jobID = u; -            mtctx->jobs[u].firstJob = (u==0); -            mtctx->jobs[u].lastJob = (u==nbJobs-1); - -            DEBUGLOG(5, "ZSTDMT_compress_advanced_internal: posting job %u  (%u bytes)", u, (U32)jobSize); -            DEBUG_PRINTHEX(6, mtctx->jobs[u].prefix.start, 12); -            POOL_add(mtctx->factory, ZSTDMT_compressionJob, &mtctx->jobs[u]); - -            frameStartPos += jobSize; -            dstBufferPos += dstBufferCapacity; -            remainingSrcSize -= jobSize; -    }   } - -    /* collect result */ -    {   size_t error = 0, dstPos = 0; -        unsigned jobID; -        for (jobID=0; jobID<nbJobs; jobID++) { -            DEBUGLOG(5, "waiting for job %u ", jobID); -            ZSTD_PTHREAD_MUTEX_LOCK(&mtctx->jobs[jobID].job_mutex); -            while (mtctx->jobs[jobID].consumed < mtctx->jobs[jobID].src.size) { -                DEBUGLOG(5, "waiting for jobCompleted signal from job %u", jobID); -                ZSTD_pthread_cond_wait(&mtctx->jobs[jobID].job_cond, &mtctx->jobs[jobID].job_mutex); -            } -            ZSTD_pthread_mutex_unlock(&mtctx->jobs[jobID].job_mutex); -            DEBUGLOG(5, "ready to write job %u ", jobID); - -            {   size_t const cSize = mtctx->jobs[jobID].cSize; -                if (ZSTD_isError(cSize)) error = cSize; -                if ((!error) && (dstPos + cSize > dstCapacity)) error = ERROR(dstSize_tooSmall); -                if (jobID) {   /* note : job 0 is written directly at dst, which is correct position */ -                    if (!error) -                        memmove((char*)dst + dstPos, mtctx->jobs[jobID].dstBuff.start, cSize);  /* may overlap when job compressed within dst */ -                    if (jobID >= compressWithinDst) {  /* job compressed into its own buffer, which must be released */ -                        DEBUGLOG(5, "releasing buffer %u>=%u", jobID, compressWithinDst); -                        ZSTDMT_releaseBuffer(mtctx->bufPool, mtctx->jobs[jobID].dstBuff); -                }   } -                mtctx->jobs[jobID].dstBuff = g_nullBuffer; -                mtctx->jobs[jobID].cSize = 0; -                dstPos += cSize ; -            } -        }  /* for (jobID=0; jobID<nbJobs; jobID++) */ - -        DEBUGLOG(4, "checksumFlag : %u ", params.fParams.checksumFlag); -        if (params.fParams.checksumFlag) { -            U32 const checksum = (U32)XXH64_digest(&mtctx->serial.xxhState); -            if (dstPos + 4 > dstCapacity) { -                error = ERROR(dstSize_tooSmall); -            } else { -                DEBUGLOG(4, "writing checksum : %08X \n", checksum); -                MEM_writeLE32((char*)dst + dstPos, checksum); -                dstPos += 4; -        }   } - -        if (!error) DEBUGLOG(4, "compressed size : %u  ", (U32)dstPos); -        return error ? error : dstPos; -    } -} - -size_t ZSTDMT_compress_advanced(ZSTDMT_CCtx* mtctx, -                                void* dst, size_t dstCapacity, -                          const void* src, size_t srcSize, -                          const ZSTD_CDict* cdict, -                                ZSTD_parameters params, -                                int overlapLog) -{ -    ZSTD_CCtx_params cctxParams = mtctx->params; -    cctxParams.cParams = params.cParams; -    cctxParams.fParams = params.fParams; -    assert(ZSTD_OVERLAPLOG_MIN <= overlapLog && overlapLog <= ZSTD_OVERLAPLOG_MAX); -    cctxParams.overlapLog = overlapLog; -    return ZSTDMT_compress_advanced_internal(mtctx, -                                             dst, dstCapacity, -                                             src, srcSize, -                                             cdict, cctxParams); -} - - -size_t ZSTDMT_compressCCtx(ZSTDMT_CCtx* mtctx, -                           void* dst, size_t dstCapacity, -                     const void* src, size_t srcSize, -                           int compressionLevel) -{ -    ZSTD_parameters params = ZSTD_getParams(compressionLevel, srcSize, 0); -    int const overlapLog = ZSTDMT_overlapLog_default(params.cParams.strategy); -    params.fParams.contentSizeFlag = 1; -    return ZSTDMT_compress_advanced(mtctx, dst, dstCapacity, src, srcSize, NULL, params, overlapLog); -} - -  /* ====================================== */  /* =======      Streaming API     ======= */  /* ====================================== */ @@ -1432,16 +1206,6 @@ size_t ZSTDMT_initCStream_internal(      if (params.jobSize != 0 && params.jobSize < ZSTDMT_JOBSIZE_MIN) params.jobSize = ZSTDMT_JOBSIZE_MIN;      if (params.jobSize > (size_t)ZSTDMT_JOBSIZE_MAX) params.jobSize = (size_t)ZSTDMT_JOBSIZE_MAX; -    mtctx->singleBlockingThread = (pledgedSrcSize <= ZSTDMT_JOBSIZE_MIN);  /* do not trigger multi-threading when srcSize is too small */ -    if (mtctx->singleBlockingThread) { -        ZSTD_CCtx_params const singleThreadParams = ZSTDMT_initJobCCtxParams(¶ms); -        DEBUGLOG(5, "ZSTDMT_initCStream_internal: switch to single blocking thread mode"); -        assert(singleThreadParams.nbWorkers == 0); -        return ZSTD_initCStream_internal(mtctx->cctxPool->cctx[0], -                                         dict, dictSize, cdict, -                                         &singleThreadParams, pledgedSrcSize); -    } -      DEBUGLOG(4, "ZSTDMT_initCStream_internal: %u workers", params.nbWorkers);      if (mtctx->allJobsCompleted == 0) {   /* previous compression not correctly finished */ @@ -1504,8 +1268,8 @@ size_t ZSTDMT_initCStream_internal(          size_t const capacity = MAX(windowSize, sectionsSize) + slackSize;          if (mtctx->roundBuff.capacity < capacity) {              if (mtctx->roundBuff.buffer) -                ZSTD_free(mtctx->roundBuff.buffer, mtctx->cMem); -            mtctx->roundBuff.buffer = (BYTE*)ZSTD_malloc(capacity, mtctx->cMem); +                ZSTD_customFree(mtctx->roundBuff.buffer, mtctx->cMem); +            mtctx->roundBuff.buffer = (BYTE*)ZSTD_customMalloc(capacity, mtctx->cMem);              if (mtctx->roundBuff.buffer == NULL) {                  mtctx->roundBuff.capacity = 0;                  return ERROR(memory_allocation); @@ -1530,53 +1294,6 @@ size_t ZSTDMT_initCStream_internal(      return 0;  } -size_t ZSTDMT_initCStream_advanced(ZSTDMT_CCtx* mtctx, -                             const void* dict, size_t dictSize, -                                   ZSTD_parameters params, -                                   unsigned long long pledgedSrcSize) -{ -    ZSTD_CCtx_params cctxParams = mtctx->params;  /* retrieve sticky params */ -    DEBUGLOG(4, "ZSTDMT_initCStream_advanced (pledgedSrcSize=%u)", (U32)pledgedSrcSize); -    cctxParams.cParams = params.cParams; -    cctxParams.fParams = params.fParams; -    return ZSTDMT_initCStream_internal(mtctx, dict, dictSize, ZSTD_dct_auto, NULL, -                                       cctxParams, pledgedSrcSize); -} - -size_t ZSTDMT_initCStream_usingCDict(ZSTDMT_CCtx* mtctx, -                               const ZSTD_CDict* cdict, -                                     ZSTD_frameParameters fParams, -                                     unsigned long long pledgedSrcSize) -{ -    ZSTD_CCtx_params cctxParams = mtctx->params; -    if (cdict==NULL) return ERROR(dictionary_wrong);   /* method incompatible with NULL cdict */ -    cctxParams.cParams = ZSTD_getCParamsFromCDict(cdict); -    cctxParams.fParams = fParams; -    return ZSTDMT_initCStream_internal(mtctx, NULL, 0 /*dictSize*/, ZSTD_dct_auto, cdict, -                                       cctxParams, pledgedSrcSize); -} - - -/* ZSTDMT_resetCStream() : - * pledgedSrcSize can be zero == unknown (for the time being) - * prefer using ZSTD_CONTENTSIZE_UNKNOWN, - * as `0` might mean "empty" in the future */ -size_t ZSTDMT_resetCStream(ZSTDMT_CCtx* mtctx, unsigned long long pledgedSrcSize) -{ -    if (!pledgedSrcSize) pledgedSrcSize = ZSTD_CONTENTSIZE_UNKNOWN; -    return ZSTDMT_initCStream_internal(mtctx, NULL, 0, ZSTD_dct_auto, 0, mtctx->params, -                                       pledgedSrcSize); -} - -size_t ZSTDMT_initCStream(ZSTDMT_CCtx* mtctx, int compressionLevel) { -    ZSTD_parameters const params = ZSTD_getParams(compressionLevel, ZSTD_CONTENTSIZE_UNKNOWN, 0); -    ZSTD_CCtx_params cctxParams = mtctx->params;   /* retrieve sticky params */ -    DEBUGLOG(4, "ZSTDMT_initCStream (cLevel=%i)", compressionLevel); -    cctxParams.cParams = params.cParams; -    cctxParams.fParams = params.fParams; -    return ZSTDMT_initCStream_internal(mtctx, NULL, 0, ZSTD_dct_auto, NULL, cctxParams, ZSTD_CONTENTSIZE_UNKNOWN); -} -  /* ZSTDMT_writeLastEmptyBlock()   * Write a single empty block with an end-of-frame to finish a frame. @@ -1740,7 +1457,7 @@ static size_t ZSTDMT_flushProduced(ZSTDMT_CCtx* mtctx, ZSTD_outBuffer* output, u              assert(cSize >= mtctx->jobs[wJobID].dstFlushed);              assert(mtctx->jobs[wJobID].dstBuff.start != NULL);              if (toFlush > 0) { -                memcpy((char*)output->dst + output->pos, +                ZSTD_memcpy((char*)output->dst + output->pos,                      (const char*)mtctx->jobs[wJobID].dstBuff.start + mtctx->jobs[wJobID].dstFlushed,                      toFlush);              } @@ -1894,7 +1611,7 @@ static int ZSTDMT_tryGetInputRange(ZSTDMT_CCtx* mtctx)              return 0;          }          ZSTDMT_waitForLdmComplete(mtctx, buffer); -        memmove(start, mtctx->inBuff.prefix.start, prefixSize); +        ZSTD_memmove(start, mtctx->inBuff.prefix.start, prefixSize);          mtctx->inBuff.prefix.start = start;          mtctx->roundBuff.pos = prefixSize;      } @@ -1968,6 +1685,16 @@ findSynchronizationPoint(ZSTDMT_CCtx const* mtctx, ZSTD_inBuffer const input)          pos = 0;          prev = (BYTE const*)mtctx->inBuff.buffer.start + mtctx->inBuff.filled - RSYNC_LENGTH;          hash = ZSTD_rollingHash_compute(prev, RSYNC_LENGTH); +        if ((hash & hitMask) == hitMask) { +            /* We're already at a sync point so don't load any more until +             * we're able to flush this sync point. +             * This likely happened because the job table was full so we +             * couldn't add our job. +             */ +            syncPoint.toLoad = 0; +            syncPoint.flush = 1; +            return syncPoint; +        }      } else {          /* We don't have enough bytes buffered to initialize the hash, but           * we know we have at least RSYNC_LENGTH bytes total. @@ -2022,34 +1749,11 @@ size_t ZSTDMT_compressStream_generic(ZSTDMT_CCtx* mtctx,      assert(output->pos <= output->size);      assert(input->pos  <= input->size); -    if (mtctx->singleBlockingThread) {  /* delegate to single-thread (synchronous) */ -        return ZSTD_compressStream2(mtctx->cctxPool->cctx[0], output, input, endOp); -    } -      if ((mtctx->frameEnded) && (endOp==ZSTD_e_continue)) {          /* current frame being ended. Only flush/end are allowed */          return ERROR(stage_wrong);      } -    /* single-pass shortcut (note : synchronous-mode) */ -    if ( (!mtctx->params.rsyncable)   /* rsyncable mode is disabled */ -      && (mtctx->nextJobID == 0)      /* just started */ -      && (mtctx->inBuff.filled == 0)  /* nothing buffered */ -      && (!mtctx->jobReady)           /* no job already created */ -      && (endOp == ZSTD_e_end)        /* end order */ -      && (output->size - output->pos >= ZSTD_compressBound(input->size - input->pos)) ) { /* enough space in dst */ -        size_t const cSize = ZSTDMT_compress_advanced_internal(mtctx, -                (char*)output->dst + output->pos, output->size - output->pos, -                (const char*)input->src + input->pos, input->size - input->pos, -                mtctx->cdict, mtctx->params); -        if (ZSTD_isError(cSize)) return cSize; -        input->pos = input->size; -        output->pos += cSize; -        mtctx->allJobsCompleted = 1; -        mtctx->frameEnded = 1; -        return 0; -    } -      /* fill input buffer */      if ( (!mtctx->jobReady)        && (input->size > input->pos) ) {   /* support NULL input */ @@ -2072,13 +1776,21 @@ size_t ZSTDMT_compressStream_generic(ZSTDMT_CCtx* mtctx,              assert(mtctx->inBuff.buffer.capacity >= mtctx->targetSectionSize);              DEBUGLOG(5, "ZSTDMT_compressStream_generic: adding %u bytes on top of %u to buffer of size %u",                          (U32)syncPoint.toLoad, (U32)mtctx->inBuff.filled, (U32)mtctx->targetSectionSize); -            memcpy((char*)mtctx->inBuff.buffer.start + mtctx->inBuff.filled, (const char*)input->src + input->pos, syncPoint.toLoad); +            ZSTD_memcpy((char*)mtctx->inBuff.buffer.start + mtctx->inBuff.filled, (const char*)input->src + input->pos, syncPoint.toLoad);              input->pos += syncPoint.toLoad;              mtctx->inBuff.filled += syncPoint.toLoad;              forwardInputProgress = syncPoint.toLoad>0;          } -        if ((input->pos < input->size) && (endOp == ZSTD_e_end)) -            endOp = ZSTD_e_flush;   /* can't end now : not all input consumed */ +    } +    if ((input->pos < input->size) && (endOp == ZSTD_e_end)) { +        /* Can't end yet because the input is not fully consumed. +            * We are in one of these cases: +            * - mtctx->inBuff is NULL & empty: we couldn't get an input buffer so don't create a new job. +            * - We filled the input buffer: flush this job but don't end the frame. +            * - We hit a synchronization point: flush this job but don't end the frame. +            */ +        assert(mtctx->inBuff.filled == 0 || mtctx->inBuff.filled == mtctx->targetSectionSize || mtctx->params.rsyncable); +        endOp = ZSTD_e_flush;      }      if ( (mtctx->jobReady) @@ -2097,47 +1809,3 @@ size_t ZSTDMT_compressStream_generic(ZSTDMT_CCtx* mtctx,          return remainingToFlush;      }  } - - -size_t ZSTDMT_compressStream(ZSTDMT_CCtx* mtctx, ZSTD_outBuffer* output, ZSTD_inBuffer* input) -{ -    FORWARD_IF_ERROR( ZSTDMT_compressStream_generic(mtctx, output, input, ZSTD_e_continue) , ""); - -    /* recommended next input size : fill current input buffer */ -    return mtctx->targetSectionSize - mtctx->inBuff.filled;   /* note : could be zero when input buffer is fully filled and no more availability to create new job */ -} - - -static size_t ZSTDMT_flushStream_internal(ZSTDMT_CCtx* mtctx, ZSTD_outBuffer* output, ZSTD_EndDirective endFrame) -{ -    size_t const srcSize = mtctx->inBuff.filled; -    DEBUGLOG(5, "ZSTDMT_flushStream_internal"); - -    if ( mtctx->jobReady     /* one job ready for a worker to pick up */ -      || (srcSize > 0)       /* still some data within input buffer */ -      || ((endFrame==ZSTD_e_end) && !mtctx->frameEnded)) {  /* need a last 0-size block to end frame */ -           DEBUGLOG(5, "ZSTDMT_flushStream_internal : create a new job (%u bytes, end:%u)", -                        (U32)srcSize, (U32)endFrame); -        FORWARD_IF_ERROR( ZSTDMT_createCompressionJob(mtctx, srcSize, endFrame) , ""); -    } - -    /* check if there is any data available to flush */ -    return ZSTDMT_flushProduced(mtctx, output, 1 /* blockToFlush */, endFrame); -} - - -size_t ZSTDMT_flushStream(ZSTDMT_CCtx* mtctx, ZSTD_outBuffer* output) -{ -    DEBUGLOG(5, "ZSTDMT_flushStream"); -    if (mtctx->singleBlockingThread) -        return ZSTD_flushStream(mtctx->cctxPool->cctx[0], output); -    return ZSTDMT_flushStream_internal(mtctx, output, ZSTD_e_flush); -} - -size_t ZSTDMT_endStream(ZSTDMT_CCtx* mtctx, ZSTD_outBuffer* output) -{ -    DEBUGLOG(4, "ZSTDMT_endStream"); -    if (mtctx->singleBlockingThread) -        return ZSTD_endStream(mtctx->cctxPool->cctx[0], output); -    return ZSTDMT_flushStream_internal(mtctx, output, ZSTD_e_end); -} |