diff options
Diffstat (limited to 'src/liblzma/common/stream_decoder_mt.c')
| -rw-r--r-- | src/liblzma/common/stream_decoder_mt.c | 140 |
1 files changed, 69 insertions, 71 deletions
diff --git a/src/liblzma/common/stream_decoder_mt.c b/src/liblzma/common/stream_decoder_mt.c index 244624a47900..271f9b07c4b8 100644 --- a/src/liblzma/common/stream_decoder_mt.c +++ b/src/liblzma/common/stream_decoder_mt.c @@ -23,15 +23,10 @@ typedef enum { THR_IDLE, /// Decoding is in progress. - /// Main thread may change this to THR_STOP or THR_EXIT. + /// Main thread may change this to THR_IDLE or THR_EXIT. /// The worker thread may change this to THR_IDLE. THR_RUN, - /// The main thread wants the thread to stop whatever it was doing - /// but not exit. Main thread may change this to THR_EXIT. - /// The worker thread may change this to THR_IDLE. - THR_STOP, - /// The main thread wants the thread to exit. THR_EXIT, @@ -346,27 +341,6 @@ worker_enable_partial_update(void *thr_ptr) } -/// Things do to at THR_STOP or when finishing a Block. -/// This is called with thr->mutex locked. -static void -worker_stop(struct worker_thread *thr) -{ - // Update memory usage counters. - thr->coder->mem_in_use -= thr->in_size; - thr->in_size = 0; // thr->in was freed above. - - thr->coder->mem_in_use -= thr->mem_filters; - thr->coder->mem_cached += thr->mem_filters; - - // Put this thread to the stack of free threads. - thr->next = thr->coder->threads_free; - thr->coder->threads_free = thr; - - mythread_cond_signal(&thr->coder->cond); - return; -} - - static MYTHREAD_RET_TYPE worker_decoder(void *thr_ptr) { @@ -397,17 +371,6 @@ next_loop_unlocked: return MYTHREAD_RET_VALUE; } - if (thr->state == THR_STOP) { - thr->state = THR_IDLE; - mythread_mutex_unlock(&thr->mutex); - - mythread_sync(thr->coder->mutex) { - worker_stop(thr); - } - - goto next_loop_lock; - } - assert(thr->state == THR_RUN); // Update progress info for get_progress(). @@ -472,8 +435,7 @@ next_loop_unlocked: } // Either we finished successfully (LZMA_STREAM_END) or an error - // occurred. Both cases are handled almost identically. The error - // case requires updating thr->coder->thread_error. + // occurred. // // The sizes are in the Block Header and the Block decoder // checks that they match, thus we know these: @@ -481,16 +443,30 @@ next_loop_unlocked: assert(ret != LZMA_STREAM_END || thr->out_pos == thr->block_options.uncompressed_size); - // Free the input buffer. Don't update in_size as we need - // it later to update thr->coder->mem_in_use. - lzma_free(thr->in, thr->allocator); - thr->in = NULL; - mythread_sync(thr->mutex) { + // Block decoder ensures this, but do a sanity check anyway + // because thr->in_filled < thr->in_size means that the main + // thread is still writing to thr->in. + if (ret == LZMA_STREAM_END && thr->in_filled != thr->in_size) { + assert(0); + ret = LZMA_PROG_ERROR; + } + if (thr->state != THR_EXIT) thr->state = THR_IDLE; } + // Free the input buffer. Don't update in_size as we need + // it later to update thr->coder->mem_in_use. + // + // This step is skipped if an error occurred because the main thread + // might still be writing to thr->in. The memory will be freed after + // threads_end() sets thr->state = THR_EXIT. + if (ret == LZMA_STREAM_END) { + lzma_free(thr->in, thr->allocator); + thr->in = NULL; + } + mythread_sync(thr->coder->mutex) { // Move our progress info to the main thread. thr->coder->progress_in += thr->in_pos; @@ -510,7 +486,20 @@ next_loop_unlocked: && thr->coder->thread_error == LZMA_OK) thr->coder->thread_error = ret; - worker_stop(thr); + // Return the worker thread to the stack of available + // threads only if no errors occurred. + if (ret == LZMA_STREAM_END) { + // Update memory usage counters. + thr->coder->mem_in_use -= thr->in_size; + thr->coder->mem_in_use -= thr->mem_filters; + thr->coder->mem_cached += thr->mem_filters; + + // Put this thread to the stack of free threads. + thr->next = thr->coder->threads_free; + thr->coder->threads_free = thr; + } + + mythread_cond_signal(&thr->coder->cond); } goto next_loop_lock; @@ -544,17 +533,22 @@ threads_end(struct lzma_stream_coder *coder, const lzma_allocator *allocator) } +/// Tell worker threads to stop without doing any cleaning up. +/// The clean up will be done when threads_exit() is called; +/// it's not possible to reuse the threads after threads_stop(). +/// +/// This is called before returning an unrecoverable error code +/// to the application. It would be waste of processor time +/// to keep the threads running in such a situation. static void threads_stop(struct lzma_stream_coder *coder) { for (uint32_t i = 0; i < coder->threads_initialized; ++i) { + // The threads that are in the THR_RUN state will stop + // when they check the state the next time. There's no + // need to signal coder->threads[i].cond. mythread_sync(coder->threads[i].mutex) { - // The state must be changed conditionally because - // THR_IDLE -> THR_STOP is not a valid state change. - if (coder->threads[i].state != THR_IDLE) { - coder->threads[i].state = THR_STOP; - mythread_cond_signal(&coder->threads[i].cond); - } + coder->threads[i].state = THR_IDLE; } } @@ -1077,9 +1071,9 @@ stream_decode_mt(void *coder_ptr, const lzma_allocator *allocator, if (coder->tell_any_check) return LZMA_GET_CHECK; - } - // Fall through + FALLTHROUGH; + } case SEQ_BLOCK_HEADER: { const size_t in_old = *in_pos; @@ -1214,10 +1208,9 @@ stream_decode_mt(void *coder_ptr, const lzma_allocator *allocator, } coder->sequence = SEQ_BLOCK_INIT; + FALLTHROUGH; } - // Fall through - case SEQ_BLOCK_INIT: { // Check if decoding is possible at all with the current // memlimit_stop which we must never exceed. @@ -1303,10 +1296,9 @@ stream_decode_mt(void *coder_ptr, const lzma_allocator *allocator, } coder->sequence = SEQ_BLOCK_THR_INIT; + FALLTHROUGH; } - // Fall through - case SEQ_BLOCK_THR_INIT: { // We need to wait for a multiple conditions to become true // until we can initialize the Block decoder and let a worker @@ -1508,10 +1500,9 @@ stream_decode_mt(void *coder_ptr, const lzma_allocator *allocator, } coder->sequence = SEQ_BLOCK_THR_RUN; + FALLTHROUGH; } - // Fall through - case SEQ_BLOCK_THR_RUN: { if (action == LZMA_FINISH && coder->fail_fast) { // We know that we won't get more input and that @@ -1549,10 +1540,17 @@ stream_decode_mt(void *coder_ptr, const lzma_allocator *allocator, // Read output from the output queue. Just like in // SEQ_BLOCK_HEADER, we wait to fill the output buffer // only if waiting_allowed was set to true in the beginning - // of this function (see the comment there). + // of this function (see the comment there) and there is + // no input available. In SEQ_BLOCK_HEADER, there is never + // input available when read_output_and_wait() is called, + // but here there can be when LZMA_FINISH is used, thus we + // need to check if *in_pos == in_size. Otherwise we would + // wait here instead of using the available input to start + // a new thread. return_if_error(read_output_and_wait(coder, allocator, out, out_pos, out_size, - NULL, waiting_allowed, + NULL, + waiting_allowed && *in_pos == in_size, &wait_abs, &has_blocked)); if (coder->pending_error != LZMA_OK) { @@ -1561,6 +1559,10 @@ stream_decode_mt(void *coder_ptr, const lzma_allocator *allocator, } // Return if the input didn't contain the whole Block. + // + // NOTE: When we updated coder->thr->in_filled a few lines + // above, the worker thread might by now have finished its + // work and returned itself back to the stack of free threads. if (coder->thr->in_filled < coder->thr->in_size) { assert(*in_pos == in_size); return LZMA_OK; @@ -1613,10 +1615,9 @@ stream_decode_mt(void *coder_ptr, const lzma_allocator *allocator, coder->mem_direct_mode = coder->mem_next_filters; coder->sequence = SEQ_BLOCK_DIRECT_RUN; + FALLTHROUGH; } - // Fall through - case SEQ_BLOCK_DIRECT_RUN: { const size_t in_old = *in_pos; const size_t out_old = *out_pos; @@ -1652,8 +1653,7 @@ stream_decode_mt(void *coder_ptr, const lzma_allocator *allocator, return LZMA_OK; coder->sequence = SEQ_INDEX_DECODE; - - // Fall through + FALLTHROUGH; case SEQ_INDEX_DECODE: { // If we don't have any input, don't call @@ -1672,10 +1672,9 @@ stream_decode_mt(void *coder_ptr, const lzma_allocator *allocator, return ret; coder->sequence = SEQ_STREAM_FOOTER; + FALLTHROUGH; } - // Fall through - case SEQ_STREAM_FOOTER: { // Copy the Stream Footer to the internal buffer. const size_t in_old = *in_pos; @@ -1714,10 +1713,9 @@ stream_decode_mt(void *coder_ptr, const lzma_allocator *allocator, return LZMA_STREAM_END; coder->sequence = SEQ_STREAM_PADDING; + FALLTHROUGH; } - // Fall through - case SEQ_STREAM_PADDING: assert(coder->concatenated); @@ -1948,7 +1946,7 @@ stream_decoder_mt_init(lzma_next_coder *next, const lzma_allocator *allocator, // accounting from scratch, too. Changes in filter and block sizes may // affect number of threads. // - // FIXME? Reusing should be easy but unlike the single-threaded + // Reusing threads doesn't seem worth it. Unlike the single-threaded // decoder, with some types of input file combinations reusing // could leave quite a lot of memory allocated but unused (first // file could allocate a lot, the next files could use fewer |
