summaryrefslogtreecommitdiff
path: root/src/liblzma/common/stream_decoder_mt.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/liblzma/common/stream_decoder_mt.c')
-rw-r--r--src/liblzma/common/stream_decoder_mt.c140
1 files changed, 69 insertions, 71 deletions
diff --git a/src/liblzma/common/stream_decoder_mt.c b/src/liblzma/common/stream_decoder_mt.c
index 244624a47900..271f9b07c4b8 100644
--- a/src/liblzma/common/stream_decoder_mt.c
+++ b/src/liblzma/common/stream_decoder_mt.c
@@ -23,15 +23,10 @@ typedef enum {
THR_IDLE,
/// Decoding is in progress.
- /// Main thread may change this to THR_STOP or THR_EXIT.
+ /// Main thread may change this to THR_IDLE or THR_EXIT.
/// The worker thread may change this to THR_IDLE.
THR_RUN,
- /// The main thread wants the thread to stop whatever it was doing
- /// but not exit. Main thread may change this to THR_EXIT.
- /// The worker thread may change this to THR_IDLE.
- THR_STOP,
-
/// The main thread wants the thread to exit.
THR_EXIT,
@@ -346,27 +341,6 @@ worker_enable_partial_update(void *thr_ptr)
}
-/// Things do to at THR_STOP or when finishing a Block.
-/// This is called with thr->mutex locked.
-static void
-worker_stop(struct worker_thread *thr)
-{
- // Update memory usage counters.
- thr->coder->mem_in_use -= thr->in_size;
- thr->in_size = 0; // thr->in was freed above.
-
- thr->coder->mem_in_use -= thr->mem_filters;
- thr->coder->mem_cached += thr->mem_filters;
-
- // Put this thread to the stack of free threads.
- thr->next = thr->coder->threads_free;
- thr->coder->threads_free = thr;
-
- mythread_cond_signal(&thr->coder->cond);
- return;
-}
-
-
static MYTHREAD_RET_TYPE
worker_decoder(void *thr_ptr)
{
@@ -397,17 +371,6 @@ next_loop_unlocked:
return MYTHREAD_RET_VALUE;
}
- if (thr->state == THR_STOP) {
- thr->state = THR_IDLE;
- mythread_mutex_unlock(&thr->mutex);
-
- mythread_sync(thr->coder->mutex) {
- worker_stop(thr);
- }
-
- goto next_loop_lock;
- }
-
assert(thr->state == THR_RUN);
// Update progress info for get_progress().
@@ -472,8 +435,7 @@ next_loop_unlocked:
}
// Either we finished successfully (LZMA_STREAM_END) or an error
- // occurred. Both cases are handled almost identically. The error
- // case requires updating thr->coder->thread_error.
+ // occurred.
//
// The sizes are in the Block Header and the Block decoder
// checks that they match, thus we know these:
@@ -481,16 +443,30 @@ next_loop_unlocked:
assert(ret != LZMA_STREAM_END
|| thr->out_pos == thr->block_options.uncompressed_size);
- // Free the input buffer. Don't update in_size as we need
- // it later to update thr->coder->mem_in_use.
- lzma_free(thr->in, thr->allocator);
- thr->in = NULL;
-
mythread_sync(thr->mutex) {
+ // Block decoder ensures this, but do a sanity check anyway
+ // because thr->in_filled < thr->in_size means that the main
+ // thread is still writing to thr->in.
+ if (ret == LZMA_STREAM_END && thr->in_filled != thr->in_size) {
+ assert(0);
+ ret = LZMA_PROG_ERROR;
+ }
+
if (thr->state != THR_EXIT)
thr->state = THR_IDLE;
}
+ // Free the input buffer. Don't update in_size as we need
+ // it later to update thr->coder->mem_in_use.
+ //
+ // This step is skipped if an error occurred because the main thread
+ // might still be writing to thr->in. The memory will be freed after
+ // threads_end() sets thr->state = THR_EXIT.
+ if (ret == LZMA_STREAM_END) {
+ lzma_free(thr->in, thr->allocator);
+ thr->in = NULL;
+ }
+
mythread_sync(thr->coder->mutex) {
// Move our progress info to the main thread.
thr->coder->progress_in += thr->in_pos;
@@ -510,7 +486,20 @@ next_loop_unlocked:
&& thr->coder->thread_error == LZMA_OK)
thr->coder->thread_error = ret;
- worker_stop(thr);
+ // Return the worker thread to the stack of available
+ // threads only if no errors occurred.
+ if (ret == LZMA_STREAM_END) {
+ // Update memory usage counters.
+ thr->coder->mem_in_use -= thr->in_size;
+ thr->coder->mem_in_use -= thr->mem_filters;
+ thr->coder->mem_cached += thr->mem_filters;
+
+ // Put this thread to the stack of free threads.
+ thr->next = thr->coder->threads_free;
+ thr->coder->threads_free = thr;
+ }
+
+ mythread_cond_signal(&thr->coder->cond);
}
goto next_loop_lock;
@@ -544,17 +533,22 @@ threads_end(struct lzma_stream_coder *coder, const lzma_allocator *allocator)
}
+/// Tell worker threads to stop without doing any cleaning up.
+/// The clean up will be done when threads_exit() is called;
+/// it's not possible to reuse the threads after threads_stop().
+///
+/// This is called before returning an unrecoverable error code
+/// to the application. It would be waste of processor time
+/// to keep the threads running in such a situation.
static void
threads_stop(struct lzma_stream_coder *coder)
{
for (uint32_t i = 0; i < coder->threads_initialized; ++i) {
+ // The threads that are in the THR_RUN state will stop
+ // when they check the state the next time. There's no
+ // need to signal coder->threads[i].cond.
mythread_sync(coder->threads[i].mutex) {
- // The state must be changed conditionally because
- // THR_IDLE -> THR_STOP is not a valid state change.
- if (coder->threads[i].state != THR_IDLE) {
- coder->threads[i].state = THR_STOP;
- mythread_cond_signal(&coder->threads[i].cond);
- }
+ coder->threads[i].state = THR_IDLE;
}
}
@@ -1077,9 +1071,9 @@ stream_decode_mt(void *coder_ptr, const lzma_allocator *allocator,
if (coder->tell_any_check)
return LZMA_GET_CHECK;
- }
- // Fall through
+ FALLTHROUGH;
+ }
case SEQ_BLOCK_HEADER: {
const size_t in_old = *in_pos;
@@ -1214,10 +1208,9 @@ stream_decode_mt(void *coder_ptr, const lzma_allocator *allocator,
}
coder->sequence = SEQ_BLOCK_INIT;
+ FALLTHROUGH;
}
- // Fall through
-
case SEQ_BLOCK_INIT: {
// Check if decoding is possible at all with the current
// memlimit_stop which we must never exceed.
@@ -1303,10 +1296,9 @@ stream_decode_mt(void *coder_ptr, const lzma_allocator *allocator,
}
coder->sequence = SEQ_BLOCK_THR_INIT;
+ FALLTHROUGH;
}
- // Fall through
-
case SEQ_BLOCK_THR_INIT: {
// We need to wait for a multiple conditions to become true
// until we can initialize the Block decoder and let a worker
@@ -1508,10 +1500,9 @@ stream_decode_mt(void *coder_ptr, const lzma_allocator *allocator,
}
coder->sequence = SEQ_BLOCK_THR_RUN;
+ FALLTHROUGH;
}
- // Fall through
-
case SEQ_BLOCK_THR_RUN: {
if (action == LZMA_FINISH && coder->fail_fast) {
// We know that we won't get more input and that
@@ -1549,10 +1540,17 @@ stream_decode_mt(void *coder_ptr, const lzma_allocator *allocator,
// Read output from the output queue. Just like in
// SEQ_BLOCK_HEADER, we wait to fill the output buffer
// only if waiting_allowed was set to true in the beginning
- // of this function (see the comment there).
+ // of this function (see the comment there) and there is
+ // no input available. In SEQ_BLOCK_HEADER, there is never
+ // input available when read_output_and_wait() is called,
+ // but here there can be when LZMA_FINISH is used, thus we
+ // need to check if *in_pos == in_size. Otherwise we would
+ // wait here instead of using the available input to start
+ // a new thread.
return_if_error(read_output_and_wait(coder, allocator,
out, out_pos, out_size,
- NULL, waiting_allowed,
+ NULL,
+ waiting_allowed && *in_pos == in_size,
&wait_abs, &has_blocked));
if (coder->pending_error != LZMA_OK) {
@@ -1561,6 +1559,10 @@ stream_decode_mt(void *coder_ptr, const lzma_allocator *allocator,
}
// Return if the input didn't contain the whole Block.
+ //
+ // NOTE: When we updated coder->thr->in_filled a few lines
+ // above, the worker thread might by now have finished its
+ // work and returned itself back to the stack of free threads.
if (coder->thr->in_filled < coder->thr->in_size) {
assert(*in_pos == in_size);
return LZMA_OK;
@@ -1613,10 +1615,9 @@ stream_decode_mt(void *coder_ptr, const lzma_allocator *allocator,
coder->mem_direct_mode = coder->mem_next_filters;
coder->sequence = SEQ_BLOCK_DIRECT_RUN;
+ FALLTHROUGH;
}
- // Fall through
-
case SEQ_BLOCK_DIRECT_RUN: {
const size_t in_old = *in_pos;
const size_t out_old = *out_pos;
@@ -1652,8 +1653,7 @@ stream_decode_mt(void *coder_ptr, const lzma_allocator *allocator,
return LZMA_OK;
coder->sequence = SEQ_INDEX_DECODE;
-
- // Fall through
+ FALLTHROUGH;
case SEQ_INDEX_DECODE: {
// If we don't have any input, don't call
@@ -1672,10 +1672,9 @@ stream_decode_mt(void *coder_ptr, const lzma_allocator *allocator,
return ret;
coder->sequence = SEQ_STREAM_FOOTER;
+ FALLTHROUGH;
}
- // Fall through
-
case SEQ_STREAM_FOOTER: {
// Copy the Stream Footer to the internal buffer.
const size_t in_old = *in_pos;
@@ -1714,10 +1713,9 @@ stream_decode_mt(void *coder_ptr, const lzma_allocator *allocator,
return LZMA_STREAM_END;
coder->sequence = SEQ_STREAM_PADDING;
+ FALLTHROUGH;
}
- // Fall through
-
case SEQ_STREAM_PADDING:
assert(coder->concatenated);
@@ -1948,7 +1946,7 @@ stream_decoder_mt_init(lzma_next_coder *next, const lzma_allocator *allocator,
// accounting from scratch, too. Changes in filter and block sizes may
// affect number of threads.
//
- // FIXME? Reusing should be easy but unlike the single-threaded
+ // Reusing threads doesn't seem worth it. Unlike the single-threaded
// decoder, with some types of input file combinations reusing
// could leave quite a lot of memory allocated but unused (first
// file could allocate a lot, the next files could use fewer