summaryrefslogtreecommitdiff
path: root/thirdparty/zstd/compress/zstdmt_compress.c
diff options
context:
space:
mode:
Diffstat (limited to 'thirdparty/zstd/compress/zstdmt_compress.c')
-rw-r--r--thirdparty/zstd/compress/zstdmt_compress.c114
1 files changed, 76 insertions, 38 deletions
diff --git a/thirdparty/zstd/compress/zstdmt_compress.c b/thirdparty/zstd/compress/zstdmt_compress.c
index 22aa3e1245..6bc14b035e 100644
--- a/thirdparty/zstd/compress/zstdmt_compress.c
+++ b/thirdparty/zstd/compress/zstdmt_compress.c
@@ -102,9 +102,8 @@ typedef struct ZSTDMT_bufferPool_s {
buffer_t bTable[1]; /* variable size */
} ZSTDMT_bufferPool;
-static ZSTDMT_bufferPool* ZSTDMT_createBufferPool(unsigned nbWorkers, ZSTD_customMem cMem)
+static ZSTDMT_bufferPool* ZSTDMT_createBufferPool(unsigned maxNbBuffers, ZSTD_customMem cMem)
{
- unsigned const maxNbBuffers = 2*nbWorkers + 3;
ZSTDMT_bufferPool* const bufPool = (ZSTDMT_bufferPool*)ZSTD_customCalloc(
sizeof(ZSTDMT_bufferPool) + (maxNbBuffers-1) * sizeof(buffer_t), cMem);
if (bufPool==NULL) return NULL;
@@ -160,9 +159,8 @@ static void ZSTDMT_setBufferSize(ZSTDMT_bufferPool* const bufPool, size_t const
}
-static ZSTDMT_bufferPool* ZSTDMT_expandBufferPool(ZSTDMT_bufferPool* srcBufPool, U32 nbWorkers)
+static ZSTDMT_bufferPool* ZSTDMT_expandBufferPool(ZSTDMT_bufferPool* srcBufPool, unsigned maxNbBuffers)
{
- unsigned const maxNbBuffers = 2*nbWorkers + 3;
if (srcBufPool==NULL) return NULL;
if (srcBufPool->totalBuffers >= maxNbBuffers) /* good enough */
return srcBufPool;
@@ -171,7 +169,7 @@ static ZSTDMT_bufferPool* ZSTDMT_expandBufferPool(ZSTDMT_bufferPool* srcBufPool,
size_t const bSize = srcBufPool->bufferSize; /* forward parameters */
ZSTDMT_bufferPool* newBufPool;
ZSTDMT_freeBufferPool(srcBufPool);
- newBufPool = ZSTDMT_createBufferPool(nbWorkers, cMem);
+ newBufPool = ZSTDMT_createBufferPool(maxNbBuffers, cMem);
if (newBufPool==NULL) return newBufPool;
ZSTDMT_setBufferSize(newBufPool, bSize);
return newBufPool;
@@ -263,6 +261,16 @@ static void ZSTDMT_releaseBuffer(ZSTDMT_bufferPool* bufPool, buffer_t buf)
ZSTD_customFree(buf.start, bufPool->cMem);
}
+/* We need 2 output buffers per worker since each dstBuff must be flushed after it is released.
+ * The 3 additional buffers are as follows:
+ * 1 buffer for input loading
+ * 1 buffer for "next input" when submitting current one
+ * 1 buffer stuck in queue */
+#define BUF_POOL_MAX_NB_BUFFERS(nbWorkers) 2*nbWorkers + 3
+
+/* After a worker releases its rawSeqStore, it is immediately ready for reuse.
+ * So we only need one seq buffer per worker. */
+#define SEQ_POOL_MAX_NB_BUFFERS(nbWorkers) nbWorkers
/* ===== Seq Pool Wrapper ====== */
@@ -316,7 +324,7 @@ static void ZSTDMT_setNbSeq(ZSTDMT_seqPool* const seqPool, size_t const nbSeq)
static ZSTDMT_seqPool* ZSTDMT_createSeqPool(unsigned nbWorkers, ZSTD_customMem cMem)
{
- ZSTDMT_seqPool* const seqPool = ZSTDMT_createBufferPool(nbWorkers, cMem);
+ ZSTDMT_seqPool* const seqPool = ZSTDMT_createBufferPool(SEQ_POOL_MAX_NB_BUFFERS(nbWorkers), cMem);
if (seqPool == NULL) return NULL;
ZSTDMT_setNbSeq(seqPool, 0);
return seqPool;
@@ -329,7 +337,7 @@ static void ZSTDMT_freeSeqPool(ZSTDMT_seqPool* seqPool)
static ZSTDMT_seqPool* ZSTDMT_expandSeqPool(ZSTDMT_seqPool* pool, U32 nbWorkers)
{
- return ZSTDMT_expandBufferPool(pool, nbWorkers);
+ return ZSTDMT_expandBufferPool(pool, SEQ_POOL_MAX_NB_BUFFERS(nbWorkers));
}
@@ -467,7 +475,7 @@ ZSTDMT_serialState_reset(serialState_t* serialState,
ZSTD_dictContentType_e dictContentType)
{
/* Adjust parameters */
- if (params.ldmParams.enableLdm) {
+ if (params.ldmParams.enableLdm == ZSTD_ps_enable) {
DEBUGLOG(4, "LDM window size = %u KB", (1U << params.cParams.windowLog) >> 10);
ZSTD_ldm_adjustParameters(&params.ldmParams, &params.cParams);
assert(params.ldmParams.hashLog >= params.ldmParams.bucketSizeLog);
@@ -478,7 +486,7 @@ ZSTDMT_serialState_reset(serialState_t* serialState,
serialState->nextJobID = 0;
if (params.fParams.checksumFlag)
XXH64_reset(&serialState->xxhState, 0);
- if (params.ldmParams.enableLdm) {
+ if (params.ldmParams.enableLdm == ZSTD_ps_enable) {
ZSTD_customMem cMem = params.customMem;
unsigned const hashLog = params.ldmParams.hashLog;
size_t const hashSize = ((size_t)1 << hashLog) * sizeof(ldmEntry_t);
@@ -564,7 +572,7 @@ static void ZSTDMT_serialState_update(serialState_t* serialState,
/* A future job may error and skip our job */
if (serialState->nextJobID == jobID) {
/* It is now our turn, do any processing necessary */
- if (serialState->params.ldmParams.enableLdm) {
+ if (serialState->params.ldmParams.enableLdm == ZSTD_ps_enable) {
size_t error;
assert(seqStore.seq != NULL && seqStore.pos == 0 &&
seqStore.size == 0 && seqStore.capacity > 0);
@@ -594,7 +602,7 @@ static void ZSTDMT_serialState_update(serialState_t* serialState,
if (seqStore.size > 0) {
size_t const err = ZSTD_referenceExternalSequences(
jobCCtx, seqStore.seq, seqStore.size);
- assert(serialState->params.ldmParams.enableLdm);
+ assert(serialState->params.ldmParams.enableLdm == ZSTD_ps_enable);
assert(!ZSTD_isError(err));
(void)err;
}
@@ -672,7 +680,7 @@ static void ZSTDMT_compressionJob(void* jobDescription)
if (dstBuff.start==NULL) JOB_ERROR(ERROR(memory_allocation));
job->dstBuff = dstBuff; /* this value can be read in ZSTDMT_flush, when it copies the whole job */
}
- if (jobParams.ldmParams.enableLdm && rawSeqStore.seq == NULL)
+ if (jobParams.ldmParams.enableLdm == ZSTD_ps_enable && rawSeqStore.seq == NULL)
JOB_ERROR(ERROR(memory_allocation));
/* Don't compute the checksum for chunks, since we compute it externally,
@@ -680,7 +688,7 @@ static void ZSTDMT_compressionJob(void* jobDescription)
*/
if (job->jobID != 0) jobParams.fParams.checksumFlag = 0;
/* Don't run LDM for the chunks, since we handle it externally */
- jobParams.ldmParams.enableLdm = 0;
+ jobParams.ldmParams.enableLdm = ZSTD_ps_disable;
/* Correct nbWorkers to 0. */
jobParams.nbWorkers = 0;
@@ -807,6 +815,15 @@ typedef struct {
static const roundBuff_t kNullRoundBuff = {NULL, 0, 0};
#define RSYNC_LENGTH 32
+/* Don't create chunks smaller than the zstd block size.
+ * This stops us from regressing compression ratio too much,
+ * and ensures our output fits in ZSTD_compressBound().
+ *
+ * If this is shrunk < ZSTD_BLOCKSIZELOG_MIN then
+ * ZSTD_COMPRESSBOUND() will need to be updated.
+ */
+#define RSYNC_MIN_BLOCK_LOG ZSTD_BLOCKSIZELOG_MAX
+#define RSYNC_MIN_BLOCK_SIZE (1<<RSYNC_MIN_BLOCK_LOG)
typedef struct {
U64 hash;
@@ -927,7 +944,7 @@ MEM_STATIC ZSTDMT_CCtx* ZSTDMT_createCCtx_advanced_internal(unsigned nbWorkers,
mtctx->jobs = ZSTDMT_createJobsTable(&nbJobs, cMem);
assert(nbJobs > 0); assert((nbJobs & (nbJobs - 1)) == 0); /* ensure nbJobs is a power of 2 */
mtctx->jobIDMask = nbJobs - 1;
- mtctx->bufPool = ZSTDMT_createBufferPool(nbWorkers, cMem);
+ mtctx->bufPool = ZSTDMT_createBufferPool(BUF_POOL_MAX_NB_BUFFERS(nbWorkers), cMem);
mtctx->cctxPool = ZSTDMT_createCCtxPool(nbWorkers, cMem);
mtctx->seqPool = ZSTDMT_createSeqPool(nbWorkers, cMem);
initError = ZSTDMT_serialState_init(&mtctx->serial);
@@ -1030,7 +1047,7 @@ static size_t ZSTDMT_resize(ZSTDMT_CCtx* mtctx, unsigned nbWorkers)
{
if (POOL_resize(mtctx->factory, nbWorkers)) return ERROR(memory_allocation);
FORWARD_IF_ERROR( ZSTDMT_expandJobsTable(mtctx, nbWorkers) , "");
- mtctx->bufPool = ZSTDMT_expandBufferPool(mtctx->bufPool, nbWorkers);
+ mtctx->bufPool = ZSTDMT_expandBufferPool(mtctx->bufPool, BUF_POOL_MAX_NB_BUFFERS(nbWorkers));
if (mtctx->bufPool == NULL) return ERROR(memory_allocation);
mtctx->cctxPool = ZSTDMT_expandCCtxPool(mtctx->cctxPool, nbWorkers);
if (mtctx->cctxPool == NULL) return ERROR(memory_allocation);
@@ -1135,7 +1152,7 @@ size_t ZSTDMT_toFlushNow(ZSTDMT_CCtx* mtctx)
static unsigned ZSTDMT_computeTargetJobLog(const ZSTD_CCtx_params* params)
{
unsigned jobLog;
- if (params->ldmParams.enableLdm) {
+ if (params->ldmParams.enableLdm == ZSTD_ps_enable) {
/* In Long Range Mode, the windowLog is typically oversized.
* In which case, it's preferable to determine the jobSize
* based on cycleLog instead. */
@@ -1179,7 +1196,7 @@ static size_t ZSTDMT_computeOverlapSize(const ZSTD_CCtx_params* params)
int const overlapRLog = 9 - ZSTDMT_overlapLog(params->overlapLog, params->cParams.strategy);
int ovLog = (overlapRLog >= 8) ? 0 : (params->cParams.windowLog - overlapRLog);
assert(0 <= overlapRLog && overlapRLog <= 8);
- if (params->ldmParams.enableLdm) {
+ if (params->ldmParams.enableLdm == ZSTD_ps_enable) {
/* In Long Range Mode, the windowLog is typically oversized.
* In which case, it's preferable to determine the jobSize
* based on chainLog instead.
@@ -1252,6 +1269,9 @@ size_t ZSTDMT_initCStream_internal(
/* Aim for the targetsectionSize as the average job size. */
U32 const jobSizeKB = (U32)(mtctx->targetSectionSize >> 10);
U32 const rsyncBits = (assert(jobSizeKB >= 1), ZSTD_highbit32(jobSizeKB) + 10);
+ /* We refuse to create jobs < RSYNC_MIN_BLOCK_SIZE bytes, so make sure our
+ * expected job size is at least 4x larger. */
+ assert(rsyncBits >= RSYNC_MIN_BLOCK_LOG + 2);
DEBUGLOG(4, "rsyncLog = %u", rsyncBits);
mtctx->rsync.hash = 0;
mtctx->rsync.hitMask = (1ULL << rsyncBits) - 1;
@@ -1263,7 +1283,7 @@ size_t ZSTDMT_initCStream_internal(
ZSTDMT_setBufferSize(mtctx->bufPool, ZSTD_compressBound(mtctx->targetSectionSize));
{
/* If ldm is enabled we need windowSize space. */
- size_t const windowSize = mtctx->params.ldmParams.enableLdm ? (1U << mtctx->params.cParams.windowLog) : 0;
+ size_t const windowSize = mtctx->params.ldmParams.enableLdm == ZSTD_ps_enable ? (1U << mtctx->params.cParams.windowLog) : 0;
/* Two buffers of slack, plus extra space for the overlap
* This is the minimum slack that LDM works with. One extra because
* flush might waste up to targetSectionSize-1 bytes. Another extra
@@ -1538,17 +1558,21 @@ static range_t ZSTDMT_getInputDataInUse(ZSTDMT_CCtx* mtctx)
static int ZSTDMT_isOverlapped(buffer_t buffer, range_t range)
{
BYTE const* const bufferStart = (BYTE const*)buffer.start;
- BYTE const* const bufferEnd = bufferStart + buffer.capacity;
BYTE const* const rangeStart = (BYTE const*)range.start;
- BYTE const* const rangeEnd = range.size != 0 ? rangeStart + range.size : rangeStart;
if (rangeStart == NULL || bufferStart == NULL)
return 0;
- /* Empty ranges cannot overlap */
- if (bufferStart == bufferEnd || rangeStart == rangeEnd)
- return 0;
- return bufferStart < rangeEnd && rangeStart < bufferEnd;
+ {
+ BYTE const* const bufferEnd = bufferStart + buffer.capacity;
+ BYTE const* const rangeEnd = rangeStart + range.size;
+
+ /* Empty ranges cannot overlap */
+ if (bufferStart == bufferEnd || rangeStart == rangeEnd)
+ return 0;
+
+ return bufferStart < rangeEnd && rangeStart < bufferEnd;
+ }
}
static int ZSTDMT_doesOverlapWindow(buffer_t buffer, ZSTD_window_t window)
@@ -1575,7 +1599,7 @@ static int ZSTDMT_doesOverlapWindow(buffer_t buffer, ZSTD_window_t window)
static void ZSTDMT_waitForLdmComplete(ZSTDMT_CCtx* mtctx, buffer_t buffer)
{
- if (mtctx->params.ldmParams.enableLdm) {
+ if (mtctx->params.ldmParams.enableLdm == ZSTD_ps_enable) {
ZSTD_pthread_mutex_t* mutex = &mtctx->serial.ldmWindowMutex;
DEBUGLOG(5, "ZSTDMT_waitForLdmComplete");
DEBUGLOG(5, "source [0x%zx, 0x%zx)",
@@ -1678,6 +1702,11 @@ findSynchronizationPoint(ZSTDMT_CCtx const* mtctx, ZSTD_inBuffer const input)
if (!mtctx->params.rsyncable)
/* Rsync is disabled. */
return syncPoint;
+ if (mtctx->inBuff.filled + input.size - input.pos < RSYNC_MIN_BLOCK_SIZE)
+ /* We don't emit synchronization points if it would produce too small blocks.
+ * We don't have enough input to find a synchronization point, so don't look.
+ */
+ return syncPoint;
if (mtctx->inBuff.filled + syncPoint.toLoad < RSYNC_LENGTH)
/* Not enough to compute the hash.
* We will miss any synchronization points in this RSYNC_LENGTH byte
@@ -1688,10 +1717,28 @@ findSynchronizationPoint(ZSTDMT_CCtx const* mtctx, ZSTD_inBuffer const input)
*/
return syncPoint;
/* Initialize the loop variables. */
- if (mtctx->inBuff.filled >= RSYNC_LENGTH) {
- /* We have enough bytes buffered to initialize the hash.
+ if (mtctx->inBuff.filled < RSYNC_MIN_BLOCK_SIZE) {
+ /* We don't need to scan the first RSYNC_MIN_BLOCK_SIZE positions
+ * because they can't possibly be a sync point. So we can start
+ * part way through the input buffer.
+ */
+ pos = RSYNC_MIN_BLOCK_SIZE - mtctx->inBuff.filled;
+ if (pos >= RSYNC_LENGTH) {
+ prev = istart + pos - RSYNC_LENGTH;
+ hash = ZSTD_rollingHash_compute(prev, RSYNC_LENGTH);
+ } else {
+ assert(mtctx->inBuff.filled >= RSYNC_LENGTH);
+ prev = (BYTE const*)mtctx->inBuff.buffer.start + mtctx->inBuff.filled - RSYNC_LENGTH;
+ hash = ZSTD_rollingHash_compute(prev + pos, (RSYNC_LENGTH - pos));
+ hash = ZSTD_rollingHash_append(hash, istart, pos);
+ }
+ } else {
+ /* We have enough bytes buffered to initialize the hash,
+ * and are have processed enough bytes to find a sync point.
* Start scanning at the beginning of the input.
*/
+ assert(mtctx->inBuff.filled >= RSYNC_MIN_BLOCK_SIZE);
+ assert(RSYNC_MIN_BLOCK_SIZE >= RSYNC_LENGTH);
pos = 0;
prev = (BYTE const*)mtctx->inBuff.buffer.start + mtctx->inBuff.filled - RSYNC_LENGTH;
hash = ZSTD_rollingHash_compute(prev, RSYNC_LENGTH);
@@ -1705,16 +1752,6 @@ findSynchronizationPoint(ZSTDMT_CCtx const* mtctx, ZSTD_inBuffer const input)
syncPoint.flush = 1;
return syncPoint;
}
- } else {
- /* We don't have enough bytes buffered to initialize the hash, but
- * we know we have at least RSYNC_LENGTH bytes total.
- * Start scanning after the first RSYNC_LENGTH bytes less the bytes
- * already buffered.
- */
- pos = RSYNC_LENGTH - mtctx->inBuff.filled;
- prev = (BYTE const*)mtctx->inBuff.buffer.start - pos;
- hash = ZSTD_rollingHash_compute(mtctx->inBuff.buffer.start, mtctx->inBuff.filled);
- hash = ZSTD_rollingHash_append(hash, istart, pos);
}
/* Starting with the hash of the previous RSYNC_LENGTH bytes, roll
* through the input. If we hit a synchronization point, then cut the
@@ -1726,8 +1763,9 @@ findSynchronizationPoint(ZSTDMT_CCtx const* mtctx, ZSTD_inBuffer const input)
*/
for (; pos < syncPoint.toLoad; ++pos) {
BYTE const toRemove = pos < RSYNC_LENGTH ? prev[pos] : istart[pos - RSYNC_LENGTH];
- /* if (pos >= RSYNC_LENGTH) assert(ZSTD_rollingHash_compute(istart + pos - RSYNC_LENGTH, RSYNC_LENGTH) == hash); */
+ assert(pos < RSYNC_LENGTH || ZSTD_rollingHash_compute(istart + pos - RSYNC_LENGTH, RSYNC_LENGTH) == hash);
hash = ZSTD_rollingHash_rotate(hash, toRemove, istart[pos], primePower);
+ assert(mtctx->inBuff.filled + pos >= RSYNC_MIN_BLOCK_SIZE);
if ((hash & hitMask) == hitMask) {
syncPoint.toLoad = pos + 1;
syncPoint.flush = 1;