blob: 67b9419914928c7446e1c564931c73a222c8dbcc [file] [log] [blame]
Yann Colletb0cb0812017-08-31 12:20:50 -07001/*
Elliott Hughes44aba642023-09-12 20:18:59 +00002 * Copyright (c) Meta Platforms, Inc. and affiliates.
Nick Terrellc9325202016-09-01 15:22:19 -07003 * All rights reserved.
4 *
Yann Colletb0cb0812017-08-31 12:20:50 -07005 * This source code is licensed under both the BSD-style license (found in the
6 * LICENSE file in the root directory of this source tree) and the GPLv2 (found
7 * in the COPYING file in the root directory of this source tree).
Nick Terrellc9325202016-09-01 15:22:19 -07008 */
Yann Collet123fac62018-09-21 17:36:00 -07009#include "platform.h" /* Large Files support, SET_BINARY_MODE */
Nick Terrellc9325202016-09-01 15:22:19 -070010#include "Pzstd.h"
11#include "SkippableFrame.h"
12#include "utils/FileSystem.h"
Elliott Hughes44aba642023-09-12 20:18:59 +000013#include "utils/Portability.h"
Nick Terrellc9325202016-09-01 15:22:19 -070014#include "utils/Range.h"
15#include "utils/ScopeGuard.h"
16#include "utils/ThreadPool.h"
17#include "utils/WorkQueue.h"
18
Elliott Hughes44aba642023-09-12 20:18:59 +000019#include <algorithm>
Nick Terrell39801672016-09-23 15:47:26 -070020#include <chrono>
Nick Terrell96e07022016-10-06 21:31:16 -070021#include <cinttypes>
Nick Terrellc9325202016-09-01 15:22:19 -070022#include <cstddef>
23#include <cstdio>
24#include <memory>
25#include <string>
26
Nick Terrell254c5b12016-09-21 14:29:47 -070027
Nick Terrellc9325202016-09-01 15:22:19 -070028namespace pzstd {
29
30namespace {
31#ifdef _WIN32
32const std::string nullOutput = "nul";
33#else
34const std::string nullOutput = "/dev/null";
35#endif
36}
37
38using std::size_t;
39
Nick Terrell254c5b12016-09-21 14:29:47 -070040static std::uintmax_t fileSizeOrZero(const std::string &file) {
41 if (file == "-") {
42 return 0;
Nick Terrellc9325202016-09-01 15:22:19 -070043 }
Nick Terrell254c5b12016-09-21 14:29:47 -070044 std::error_code ec;
45 auto size = file_size(file, ec);
46 if (ec) {
47 size = 0;
Nick Terrellc9325202016-09-01 15:22:19 -070048 }
Nick Terrell254c5b12016-09-21 14:29:47 -070049 return size;
50}
Nick Terrellc9325202016-09-01 15:22:19 -070051
Nick Terrelld2498892016-09-23 12:55:21 -070052static std::uint64_t handleOneInput(const Options &options,
Nick Terrell254c5b12016-09-21 14:29:47 -070053 const std::string &inputFile,
54 FILE* inputFd,
Nick Terrelld2498892016-09-23 12:55:21 -070055 const std::string &outputFile,
Nick Terrell254c5b12016-09-21 14:29:47 -070056 FILE* outputFd,
Nick Terrell48294b52016-10-12 15:18:16 -070057 SharedState& state) {
Nick Terrell254c5b12016-09-21 14:29:47 -070058 auto inputSize = fileSizeOrZero(inputFile);
Nick Terrellc9325202016-09-01 15:22:19 -070059 // WorkQueue outlives ThreadPool so in the case of error we are certain
Josh Sorefa880ca22019-04-12 14:18:11 -040060 // we don't accidentally try to call push() on it after it is destroyed
Nick Terrell1c209a42016-09-21 15:12:23 -070061 WorkQueue<std::shared_ptr<BufferWorkQueue>> outs{options.numThreads + 1};
Nick Terrelld2498892016-09-23 12:55:21 -070062 std::uint64_t bytesRead;
63 std::uint64_t bytesWritten;
Nick Terrellc9325202016-09-01 15:22:19 -070064 {
Nick Terrell9b603ee2016-10-07 15:04:34 -070065 // Initialize the (de)compression thread pool with numThreads
66 ThreadPool executor(options.numThreads);
67 // Run the reader thread on an extra thread
68 ThreadPool readExecutor(1);
Nick Terrellc9325202016-09-01 15:22:19 -070069 if (!options.decompress) {
70 // Add a job that reads the input and starts all the compression jobs
Nick Terrell9b603ee2016-10-07 15:04:34 -070071 readExecutor.add(
Nick Terrell48294b52016-10-12 15:18:16 -070072 [&state, &outs, &executor, inputFd, inputSize, &options, &bytesRead] {
Nick Terrelld2498892016-09-23 12:55:21 -070073 bytesRead = asyncCompressChunks(
Nick Terrell48294b52016-10-12 15:18:16 -070074 state,
Nick Terrellc9325202016-09-01 15:22:19 -070075 outs,
76 executor,
77 inputFd,
78 inputSize,
79 options.numThreads,
80 options.determineParameters());
81 });
82 // Start writing
Nick Terrellbaa152e2016-10-12 19:02:27 -070083 bytesWritten = writeFile(state, outs, outputFd, options.decompress);
Nick Terrellc9325202016-09-01 15:22:19 -070084 } else {
85 // Add a job that reads the input and starts all the decompression jobs
Nick Terrell48294b52016-10-12 15:18:16 -070086 readExecutor.add([&state, &outs, &executor, inputFd, &bytesRead] {
87 bytesRead = asyncDecompressFrames(state, outs, executor, inputFd);
Nick Terrellc9325202016-09-01 15:22:19 -070088 });
89 // Start writing
Nick Terrellbaa152e2016-10-12 19:02:27 -070090 bytesWritten = writeFile(state, outs, outputFd, options.decompress);
Nick Terrellc9325202016-09-01 15:22:19 -070091 }
92 }
Nick Terrellbaa152e2016-10-12 19:02:27 -070093 if (!state.errorHolder.hasError()) {
Nick Terrelld2498892016-09-23 12:55:21 -070094 std::string inputFileName = inputFile == "-" ? "stdin" : inputFile;
95 std::string outputFileName = outputFile == "-" ? "stdout" : outputFile;
96 if (!options.decompress) {
97 double ratio = static_cast<double>(bytesWritten) /
98 static_cast<double>(bytesRead + !bytesRead);
Nick Terrell5aa5aa42020-05-22 22:26:02 -070099 state.log(kLogInfo, "%-20s :%6.2f%% (%6" PRIu64 " => %6" PRIu64
Nick Terrell96e07022016-10-06 21:31:16 -0700100 " bytes, %s)\n",
Nick Terrelld2498892016-09-23 12:55:21 -0700101 inputFileName.c_str(), ratio * 100, bytesRead, bytesWritten,
102 outputFileName.c_str());
103 } else {
Nick Terrell5aa5aa42020-05-22 22:26:02 -0700104 state.log(kLogInfo, "%-20s: %" PRIu64 " bytes \n",
Nick Terrelld2498892016-09-23 12:55:21 -0700105 inputFileName.c_str(),bytesWritten);
106 }
107 }
Nick Terrellc9325202016-09-01 15:22:19 -0700108 return bytesWritten;
109}
110
Nick Terrell254c5b12016-09-21 14:29:47 -0700111static FILE *openInputFile(const std::string &inputFile,
112 ErrorHolder &errorHolder) {
113 if (inputFile == "-") {
114 SET_BINARY_MODE(stdin);
115 return stdin;
116 }
Nick Terrell5c9adff2016-09-21 16:25:08 -0700117 // Check if input file is a directory
118 {
119 std::error_code ec;
120 if (is_directory(inputFile, ec)) {
121 errorHolder.setError("Output file is a directory -- ignored");
122 return nullptr;
123 }
124 }
Nick Terrell254c5b12016-09-21 14:29:47 -0700125 auto inputFd = std::fopen(inputFile.c_str(), "rb");
126 if (!errorHolder.check(inputFd != nullptr, "Failed to open input file")) {
127 return nullptr;
128 }
129 return inputFd;
130}
131
132static FILE *openOutputFile(const Options &options,
133 const std::string &outputFile,
Nick Terrellbaa152e2016-10-12 19:02:27 -0700134 SharedState& state) {
Nick Terrell254c5b12016-09-21 14:29:47 -0700135 if (outputFile == "-") {
136 SET_BINARY_MODE(stdout);
137 return stdout;
138 }
139 // Check if the output file exists and then open it
140 if (!options.overwrite && outputFile != nullOutput) {
141 auto outputFd = std::fopen(outputFile.c_str(), "rb");
142 if (outputFd != nullptr) {
143 std::fclose(outputFd);
Nick Terrell5aa5aa42020-05-22 22:26:02 -0700144 if (!state.log.logsAt(kLogInfo)) {
Nick Terrellbaa152e2016-10-12 19:02:27 -0700145 state.errorHolder.setError("Output file exists");
Nick Terrell254c5b12016-09-21 14:29:47 -0700146 return nullptr;
147 }
Nick Terrellbaa152e2016-10-12 19:02:27 -0700148 state.log(
Nick Terrell5aa5aa42020-05-22 22:26:02 -0700149 kLogInfo,
Nick Terrell254c5b12016-09-21 14:29:47 -0700150 "pzstd: %s already exists; do you wish to overwrite (y/n) ? ",
151 outputFile.c_str());
152 int c = getchar();
153 if (c != 'y' && c != 'Y') {
Nick Terrellbaa152e2016-10-12 19:02:27 -0700154 state.errorHolder.setError("Not overwritten");
Nick Terrell254c5b12016-09-21 14:29:47 -0700155 return nullptr;
156 }
157 }
158 }
159 auto outputFd = std::fopen(outputFile.c_str(), "wb");
Nick Terrellbaa152e2016-10-12 19:02:27 -0700160 if (!state.errorHolder.check(
Nick Terrell254c5b12016-09-21 14:29:47 -0700161 outputFd != nullptr, "Failed to open output file")) {
Nick Terrellbaa152e2016-10-12 19:02:27 -0700162 return nullptr;
Nick Terrell254c5b12016-09-21 14:29:47 -0700163 }
164 return outputFd;
165}
166
167int pzstdMain(const Options &options) {
168 int returnCode = 0;
Nick Terrellbaa152e2016-10-12 19:02:27 -0700169 SharedState state(options);
Nick Terrell254c5b12016-09-21 14:29:47 -0700170 for (const auto& input : options.inputFiles) {
Nick Terrelle9e151c2016-10-12 17:23:38 -0700171 // Setup the shared state
Nick Terrell254c5b12016-09-21 14:29:47 -0700172 auto printErrorGuard = makeScopeGuard([&] {
Nick Terrell48294b52016-10-12 15:18:16 -0700173 if (state.errorHolder.hasError()) {
Nick Terrell254c5b12016-09-21 14:29:47 -0700174 returnCode = 1;
Nick Terrell5aa5aa42020-05-22 22:26:02 -0700175 state.log(kLogError, "pzstd: %s: %s.\n", input.c_str(),
Nick Terrellbaa152e2016-10-12 19:02:27 -0700176 state.errorHolder.getError().c_str());
Nick Terrell254c5b12016-09-21 14:29:47 -0700177 }
178 });
179 // Open the input file
Nick Terrell48294b52016-10-12 15:18:16 -0700180 auto inputFd = openInputFile(input, state.errorHolder);
Nick Terrell254c5b12016-09-21 14:29:47 -0700181 if (inputFd == nullptr) {
182 continue;
183 }
184 auto closeInputGuard = makeScopeGuard([&] { std::fclose(inputFd); });
185 // Open the output file
186 auto outputFile = options.getOutputFile(input);
Nick Terrell48294b52016-10-12 15:18:16 -0700187 if (!state.errorHolder.check(outputFile != "",
Nick Terrell254c5b12016-09-21 14:29:47 -0700188 "Input file does not have extension .zst")) {
189 continue;
190 }
Nick Terrellbaa152e2016-10-12 19:02:27 -0700191 auto outputFd = openOutputFile(options, outputFile, state);
Nick Terrell254c5b12016-09-21 14:29:47 -0700192 if (outputFd == nullptr) {
193 continue;
194 }
195 auto closeOutputGuard = makeScopeGuard([&] { std::fclose(outputFd); });
196 // (de)compress the file
Nick Terrell48294b52016-10-12 15:18:16 -0700197 handleOneInput(options, input, inputFd, outputFile, outputFd, state);
198 if (state.errorHolder.hasError()) {
Nick Terrell254c5b12016-09-21 14:29:47 -0700199 continue;
200 }
201 // Delete the input file if necessary
202 if (!options.keepSource) {
203 // Be sure that we are done and have written everything before we delete
Nick Terrell48294b52016-10-12 15:18:16 -0700204 if (!state.errorHolder.check(std::fclose(inputFd) == 0,
Nick Terrell254c5b12016-09-21 14:29:47 -0700205 "Failed to close input file")) {
206 continue;
207 }
208 closeInputGuard.dismiss();
Nick Terrell48294b52016-10-12 15:18:16 -0700209 if (!state.errorHolder.check(std::fclose(outputFd) == 0,
Nick Terrell254c5b12016-09-21 14:29:47 -0700210 "Failed to close output file")) {
211 continue;
212 }
213 closeOutputGuard.dismiss();
214 if (std::remove(input.c_str()) != 0) {
Nick Terrell48294b52016-10-12 15:18:16 -0700215 state.errorHolder.setError("Failed to remove input file");
Nick Terrell254c5b12016-09-21 14:29:47 -0700216 continue;
217 }
218 }
219 }
220 // Returns 1 if any of the files failed to (de)compress.
221 return returnCode;
222}
223
Nick Terrellc9325202016-09-01 15:22:19 -0700224/// Construct a `ZSTD_inBuffer` that points to the data in `buffer`.
225static ZSTD_inBuffer makeZstdInBuffer(const Buffer& buffer) {
226 return ZSTD_inBuffer{buffer.data(), buffer.size(), 0};
227}
228
229/**
230 * Advance `buffer` and `inBuffer` by the amount of data read, as indicated by
231 * `inBuffer.pos`.
232 */
233void advance(Buffer& buffer, ZSTD_inBuffer& inBuffer) {
234 auto pos = inBuffer.pos;
235 inBuffer.src = static_cast<const unsigned char*>(inBuffer.src) + pos;
236 inBuffer.size -= pos;
237 inBuffer.pos = 0;
238 return buffer.advance(pos);
239}
240
241/// Construct a `ZSTD_outBuffer` that points to the data in `buffer`.
242static ZSTD_outBuffer makeZstdOutBuffer(Buffer& buffer) {
243 return ZSTD_outBuffer{buffer.data(), buffer.size(), 0};
244}
245
246/**
247 * Split `buffer` and advance `outBuffer` by the amount of data written, as
248 * indicated by `outBuffer.pos`.
249 */
250Buffer split(Buffer& buffer, ZSTD_outBuffer& outBuffer) {
251 auto pos = outBuffer.pos;
252 outBuffer.dst = static_cast<unsigned char*>(outBuffer.dst) + pos;
253 outBuffer.size -= pos;
254 outBuffer.pos = 0;
255 return buffer.splitAt(pos);
256}
257
258/**
259 * Stream chunks of input from `in`, compress it, and stream it out to `out`.
260 *
Nick Terrell48294b52016-10-12 15:18:16 -0700261 * @param state The shared state
Nick Terrellc9325202016-09-01 15:22:19 -0700262 * @param in Queue that we `pop()` input buffers from
263 * @param out Queue that we `push()` compressed output buffers to
264 * @param maxInputSize An upper bound on the size of the input
Nick Terrellc9325202016-09-01 15:22:19 -0700265 */
266static void compress(
Nick Terrell48294b52016-10-12 15:18:16 -0700267 SharedState& state,
Nick Terrellc9325202016-09-01 15:22:19 -0700268 std::shared_ptr<BufferWorkQueue> in,
269 std::shared_ptr<BufferWorkQueue> out,
Nick Terrelle9e151c2016-10-12 17:23:38 -0700270 size_t maxInputSize) {
Nick Terrell48294b52016-10-12 15:18:16 -0700271 auto& errorHolder = state.errorHolder;
Nick Terrellc9325202016-09-01 15:22:19 -0700272 auto guard = makeScopeGuard([&] { out->finish(); });
273 // Initialize the CCtx
Nick Terrelle9e151c2016-10-12 17:23:38 -0700274 auto ctx = state.cStreamPool->get();
Nick Terrellc9325202016-09-01 15:22:19 -0700275 if (!errorHolder.check(ctx != nullptr, "Failed to allocate ZSTD_CStream")) {
276 return;
277 }
278 {
Stephen Kittadb54292021-02-20 17:28:19 +0100279 auto err = ZSTD_CCtx_reset(ctx.get(), ZSTD_reset_session_only);
Nick Terrellc9325202016-09-01 15:22:19 -0700280 if (!errorHolder.check(!ZSTD_isError(err), ZSTD_getErrorName(err))) {
281 return;
282 }
283 }
284
285 // Allocate space for the result
286 auto outBuffer = Buffer(ZSTD_compressBound(maxInputSize));
287 auto zstdOutBuffer = makeZstdOutBuffer(outBuffer);
288 {
289 Buffer inBuffer;
290 // Read a buffer in from the input queue
291 while (in->pop(inBuffer) && !errorHolder.hasError()) {
292 auto zstdInBuffer = makeZstdInBuffer(inBuffer);
293 // Compress the whole buffer and send it to the output queue
294 while (!inBuffer.empty() && !errorHolder.hasError()) {
295 if (!errorHolder.check(
296 !outBuffer.empty(), "ZSTD_compressBound() was too small")) {
297 return;
298 }
299 // Compress
300 auto err =
301 ZSTD_compressStream(ctx.get(), &zstdOutBuffer, &zstdInBuffer);
302 if (!errorHolder.check(!ZSTD_isError(err), ZSTD_getErrorName(err))) {
303 return;
304 }
305 // Split the compressed data off outBuffer and pass to the output queue
306 out->push(split(outBuffer, zstdOutBuffer));
307 // Forget about the data we already compressed
308 advance(inBuffer, zstdInBuffer);
309 }
310 }
311 }
312 // Write the epilog
313 size_t bytesLeft;
314 do {
315 if (!errorHolder.check(
316 !outBuffer.empty(), "ZSTD_compressBound() was too small")) {
317 return;
318 }
319 bytesLeft = ZSTD_endStream(ctx.get(), &zstdOutBuffer);
320 if (!errorHolder.check(
321 !ZSTD_isError(bytesLeft), ZSTD_getErrorName(bytesLeft))) {
322 return;
323 }
324 out->push(split(outBuffer, zstdOutBuffer));
325 } while (bytesLeft != 0 && !errorHolder.hasError());
326}
327
328/**
329 * Calculates how large each independently compressed frame should be.
330 *
331 * @param size The size of the source if known, 0 otherwise
332 * @param numThreads The number of threads available to run compression jobs on
333 * @param params The zstd parameters to be used for compression
334 */
Nick Terrell823bf3d2016-09-06 20:11:02 -0700335static size_t calculateStep(
336 std::uintmax_t size,
337 size_t numThreads,
338 const ZSTD_parameters &params) {
Nick Terrellbcd61582016-11-15 17:46:28 -0800339 (void)size;
340 (void)numThreads;
Elliott Hughes44aba642023-09-12 20:18:59 +0000341 // Not validated to work correctly for window logs > 23.
342 // It will definitely fail if windowLog + 2 is >= 4GB because
343 // the skippable frame can only store sizes up to 4GB.
344 assert(params.cParams.windowLog <= 23);
Nick Terrellf147fcc2016-11-15 16:39:09 -0800345 return size_t{1} << (params.cParams.windowLog + 2);
Nick Terrellc9325202016-09-01 15:22:19 -0700346}
347
348namespace {
349enum class FileStatus { Continue, Done, Error };
Nick Terrell9622fe42016-09-02 20:11:22 -0700350/// Determines the status of the file descriptor `fd`.
351FileStatus fileStatus(FILE* fd) {
352 if (std::feof(fd)) {
353 return FileStatus::Done;
354 } else if (std::ferror(fd)) {
355 return FileStatus::Error;
356 }
357 return FileStatus::Continue;
358}
Nick Terrellc9325202016-09-01 15:22:19 -0700359} // anonymous namespace
360
361/**
362 * Reads `size` data in chunks of `chunkSize` and puts it into `queue`.
363 * Will read less if an error or EOF occurs.
364 * Returns the status of the file after all of the reads have occurred.
365 */
366static FileStatus
Nick Terrelld2498892016-09-23 12:55:21 -0700367readData(BufferWorkQueue& queue, size_t chunkSize, size_t size, FILE* fd,
368 std::uint64_t *totalBytesRead) {
Nick Terrellc9325202016-09-01 15:22:19 -0700369 Buffer buffer(size);
370 while (!buffer.empty()) {
371 auto bytesRead =
372 std::fread(buffer.data(), 1, std::min(chunkSize, buffer.size()), fd);
Nick Terrelld2498892016-09-23 12:55:21 -0700373 *totalBytesRead += bytesRead;
Nick Terrellc9325202016-09-01 15:22:19 -0700374 queue.push(buffer.splitAt(bytesRead));
Nick Terrell9622fe42016-09-02 20:11:22 -0700375 auto status = fileStatus(fd);
376 if (status != FileStatus::Continue) {
377 return status;
Nick Terrellc9325202016-09-01 15:22:19 -0700378 }
379 }
380 return FileStatus::Continue;
381}
382
Nick Terrelld2498892016-09-23 12:55:21 -0700383std::uint64_t asyncCompressChunks(
Nick Terrell48294b52016-10-12 15:18:16 -0700384 SharedState& state,
Nick Terrellc9325202016-09-01 15:22:19 -0700385 WorkQueue<std::shared_ptr<BufferWorkQueue>>& chunks,
386 ThreadPool& executor,
387 FILE* fd,
Nick Terrell823bf3d2016-09-06 20:11:02 -0700388 std::uintmax_t size,
Nick Terrellc9325202016-09-01 15:22:19 -0700389 size_t numThreads,
390 ZSTD_parameters params) {
391 auto chunksGuard = makeScopeGuard([&] { chunks.finish(); });
Nick Terrelld2498892016-09-23 12:55:21 -0700392 std::uint64_t bytesRead = 0;
Nick Terrellc9325202016-09-01 15:22:19 -0700393
394 // Break the input up into chunks of size `step` and compress each chunk
395 // independently.
396 size_t step = calculateStep(size, numThreads, params);
Nick Terrell5aa5aa42020-05-22 22:26:02 -0700397 state.log(kLogDebug, "Chosen frame size: %zu\n", step);
Nick Terrellc9325202016-09-01 15:22:19 -0700398 auto status = FileStatus::Continue;
Nick Terrell48294b52016-10-12 15:18:16 -0700399 while (status == FileStatus::Continue && !state.errorHolder.hasError()) {
Nick Terrellc9325202016-09-01 15:22:19 -0700400 // Make a new input queue that we will put the chunk's input data into.
401 auto in = std::make_shared<BufferWorkQueue>();
402 auto inGuard = makeScopeGuard([&] { in->finish(); });
403 // Make a new output queue that compress will put the compressed data into.
404 auto out = std::make_shared<BufferWorkQueue>();
405 // Start compression in the thread pool
Nick Terrelle9e151c2016-10-12 17:23:38 -0700406 executor.add([&state, in, out, step] {
Nick Terrellc9325202016-09-01 15:22:19 -0700407 return compress(
Nick Terrelle9e151c2016-10-12 17:23:38 -0700408 state, std::move(in), std::move(out), step);
Nick Terrellc9325202016-09-01 15:22:19 -0700409 });
410 // Pass the output queue to the writer thread.
411 chunks.push(std::move(out));
Nick Terrell5aa5aa42020-05-22 22:26:02 -0700412 state.log(kLogVerbose, "%s\n", "Starting a new frame");
Nick Terrellc9325202016-09-01 15:22:19 -0700413 // Fill the input queue for the compression job we just started
Nick Terrelld2498892016-09-23 12:55:21 -0700414 status = readData(*in, ZSTD_CStreamInSize(), step, fd, &bytesRead);
Nick Terrellc9325202016-09-01 15:22:19 -0700415 }
Nick Terrell48294b52016-10-12 15:18:16 -0700416 state.errorHolder.check(status != FileStatus::Error, "Error reading input");
Nick Terrelld2498892016-09-23 12:55:21 -0700417 return bytesRead;
Nick Terrellc9325202016-09-01 15:22:19 -0700418}
419
420/**
421 * Decompress a frame, whose data is streamed into `in`, and stream the output
422 * to `out`.
423 *
Nick Terrell48294b52016-10-12 15:18:16 -0700424 * @param state The shared state
Nick Terrellc9325202016-09-01 15:22:19 -0700425 * @param in Queue that we `pop()` input buffers from. It contains
426 * exactly one compressed frame.
427 * @param out Queue that we `push()` decompressed output buffers to
428 */
429static void decompress(
Nick Terrell48294b52016-10-12 15:18:16 -0700430 SharedState& state,
Nick Terrellc9325202016-09-01 15:22:19 -0700431 std::shared_ptr<BufferWorkQueue> in,
432 std::shared_ptr<BufferWorkQueue> out) {
Nick Terrell48294b52016-10-12 15:18:16 -0700433 auto& errorHolder = state.errorHolder;
Nick Terrellc9325202016-09-01 15:22:19 -0700434 auto guard = makeScopeGuard([&] { out->finish(); });
435 // Initialize the DCtx
Nick Terrelle9e151c2016-10-12 17:23:38 -0700436 auto ctx = state.dStreamPool->get();
Nick Terrellc9325202016-09-01 15:22:19 -0700437 if (!errorHolder.check(ctx != nullptr, "Failed to allocate ZSTD_DStream")) {
438 return;
439 }
440 {
Stephen Kittadb54292021-02-20 17:28:19 +0100441 auto err = ZSTD_DCtx_reset(ctx.get(), ZSTD_reset_session_only);
Nick Terrellc9325202016-09-01 15:22:19 -0700442 if (!errorHolder.check(!ZSTD_isError(err), ZSTD_getErrorName(err))) {
443 return;
444 }
445 }
446
447 const size_t outSize = ZSTD_DStreamOutSize();
448 Buffer inBuffer;
449 size_t returnCode = 0;
450 // Read a buffer in from the input queue
451 while (in->pop(inBuffer) && !errorHolder.hasError()) {
452 auto zstdInBuffer = makeZstdInBuffer(inBuffer);
453 // Decompress the whole buffer and send it to the output queue
454 while (!inBuffer.empty() && !errorHolder.hasError()) {
455 // Allocate a buffer with at least outSize bytes.
456 Buffer outBuffer(outSize);
457 auto zstdOutBuffer = makeZstdOutBuffer(outBuffer);
458 // Decompress
459 returnCode =
460 ZSTD_decompressStream(ctx.get(), &zstdOutBuffer, &zstdInBuffer);
461 if (!errorHolder.check(
462 !ZSTD_isError(returnCode), ZSTD_getErrorName(returnCode))) {
463 return;
464 }
465 // Pass the buffer with the decompressed data to the output queue
466 out->push(split(outBuffer, zstdOutBuffer));
467 // Advance past the input we already read
468 advance(inBuffer, zstdInBuffer);
469 if (returnCode == 0) {
470 // The frame is over, prepare to (maybe) start a new frame
471 ZSTD_initDStream(ctx.get());
472 }
473 }
474 }
475 if (!errorHolder.check(returnCode <= 1, "Incomplete block")) {
476 return;
477 }
478 // We've given ZSTD_decompressStream all of our data, but there may still
479 // be data to read.
480 while (returnCode == 1) {
481 // Allocate a buffer with at least outSize bytes.
482 Buffer outBuffer(outSize);
483 auto zstdOutBuffer = makeZstdOutBuffer(outBuffer);
484 // Pass in no input.
485 ZSTD_inBuffer zstdInBuffer{nullptr, 0, 0};
486 // Decompress
487 returnCode =
488 ZSTD_decompressStream(ctx.get(), &zstdOutBuffer, &zstdInBuffer);
489 if (!errorHolder.check(
490 !ZSTD_isError(returnCode), ZSTD_getErrorName(returnCode))) {
491 return;
492 }
493 // Pass the buffer with the decompressed data to the output queue
494 out->push(split(outBuffer, zstdOutBuffer));
495 }
496}
497
Nick Terrelld2498892016-09-23 12:55:21 -0700498std::uint64_t asyncDecompressFrames(
Nick Terrell48294b52016-10-12 15:18:16 -0700499 SharedState& state,
Nick Terrellc9325202016-09-01 15:22:19 -0700500 WorkQueue<std::shared_ptr<BufferWorkQueue>>& frames,
501 ThreadPool& executor,
502 FILE* fd) {
503 auto framesGuard = makeScopeGuard([&] { frames.finish(); });
Nick Terrelld2498892016-09-23 12:55:21 -0700504 std::uint64_t totalBytesRead = 0;
505
Nick Terrellc9325202016-09-01 15:22:19 -0700506 // Split the source up into its component frames.
507 // If we find our recognized skippable frame we know the next frames size
508 // which means that we can decompress each standard frame in independently.
509 // Otherwise, we will decompress using only one decompression task.
510 const size_t chunkSize = ZSTD_DStreamInSize();
511 auto status = FileStatus::Continue;
Nick Terrell48294b52016-10-12 15:18:16 -0700512 while (status == FileStatus::Continue && !state.errorHolder.hasError()) {
Nick Terrellc9325202016-09-01 15:22:19 -0700513 // Make a new input queue that we will put the frames's bytes into.
514 auto in = std::make_shared<BufferWorkQueue>();
515 auto inGuard = makeScopeGuard([&] { in->finish(); });
516 // Make a output queue that decompress will put the decompressed data into
517 auto out = std::make_shared<BufferWorkQueue>();
518
519 size_t frameSize;
520 {
521 // Calculate the size of the next frame.
522 // frameSize is 0 if the frame info can't be decoded.
523 Buffer buffer(SkippableFrame::kSize);
524 auto bytesRead = std::fread(buffer.data(), 1, buffer.size(), fd);
Nick Terrelld2498892016-09-23 12:55:21 -0700525 totalBytesRead += bytesRead;
Nick Terrell9622fe42016-09-02 20:11:22 -0700526 status = fileStatus(fd);
Nick Terrellc9325202016-09-01 15:22:19 -0700527 if (bytesRead == 0 && status != FileStatus::Continue) {
528 break;
529 }
530 buffer.subtract(buffer.size() - bytesRead);
531 frameSize = SkippableFrame::tryRead(buffer.range());
532 in->push(std::move(buffer));
533 }
Nick Terrell9622fe42016-09-02 20:11:22 -0700534 if (frameSize == 0) {
535 // We hit a non SkippableFrame, so this will be the last job.
536 // Make sure that we don't use too much memory
537 in->setMaxSize(64);
538 out->setMaxSize(64);
539 }
Nick Terrellc9325202016-09-01 15:22:19 -0700540 // Start decompression in the thread pool
Nick Terrell48294b52016-10-12 15:18:16 -0700541 executor.add([&state, in, out] {
542 return decompress(state, std::move(in), std::move(out));
Nick Terrellc9325202016-09-01 15:22:19 -0700543 });
544 // Pass the output queue to the writer thread
545 frames.push(std::move(out));
546 if (frameSize == 0) {
547 // We hit a non SkippableFrame ==> not compressed by pzstd or corrupted
548 // Pass the rest of the source to this decompression task
Nick Terrell5aa5aa42020-05-22 22:26:02 -0700549 state.log(kLogVerbose, "%s\n",
Pádraig Brady38a34282017-03-05 19:36:56 -0800550 "Input not in pzstd format, falling back to serial decompression");
Nick Terrell48294b52016-10-12 15:18:16 -0700551 while (status == FileStatus::Continue && !state.errorHolder.hasError()) {
Nick Terrelld2498892016-09-23 12:55:21 -0700552 status = readData(*in, chunkSize, chunkSize, fd, &totalBytesRead);
Nick Terrellc9325202016-09-01 15:22:19 -0700553 }
554 break;
555 }
Nick Terrell5aa5aa42020-05-22 22:26:02 -0700556 state.log(kLogVerbose, "Decompressing a frame of size %zu", frameSize);
Nick Terrellc9325202016-09-01 15:22:19 -0700557 // Fill the input queue for the decompression job we just started
Nick Terrelld2498892016-09-23 12:55:21 -0700558 status = readData(*in, chunkSize, frameSize, fd, &totalBytesRead);
Nick Terrellc9325202016-09-01 15:22:19 -0700559 }
Nick Terrell48294b52016-10-12 15:18:16 -0700560 state.errorHolder.check(status != FileStatus::Error, "Error reading input");
Nick Terrelld2498892016-09-23 12:55:21 -0700561 return totalBytesRead;
Nick Terrellc9325202016-09-01 15:22:19 -0700562}
563
564/// Write `data` to `fd`, returns true iff success.
565static bool writeData(ByteRange data, FILE* fd) {
566 while (!data.empty()) {
567 data.advance(std::fwrite(data.begin(), 1, data.size(), fd));
568 if (std::ferror(fd)) {
569 return false;
570 }
571 }
572 return true;
573}
574
Nick Terrelld2498892016-09-23 12:55:21 -0700575std::uint64_t writeFile(
Nick Terrell48294b52016-10-12 15:18:16 -0700576 SharedState& state,
Nick Terrellc9325202016-09-01 15:22:19 -0700577 WorkQueue<std::shared_ptr<BufferWorkQueue>>& outs,
578 FILE* outputFd,
Nick Terrellbaa152e2016-10-12 19:02:27 -0700579 bool decompress) {
Nick Terrell48294b52016-10-12 15:18:16 -0700580 auto& errorHolder = state.errorHolder;
Nick Terrellbaa152e2016-10-12 19:02:27 -0700581 auto lineClearGuard = makeScopeGuard([&state] {
Nick Terrell5aa5aa42020-05-22 22:26:02 -0700582 state.log.clear(kLogInfo);
Nick Terrell39801672016-09-23 15:47:26 -0700583 });
Nick Terrelld2498892016-09-23 12:55:21 -0700584 std::uint64_t bytesWritten = 0;
Nick Terrellc9325202016-09-01 15:22:19 -0700585 std::shared_ptr<BufferWorkQueue> out;
586 // Grab the output queue for each decompression job (in order).
Nick Terrellf5706092017-06-22 18:09:42 -0700587 while (outs.pop(out)) {
588 if (errorHolder.hasError()) {
589 continue;
590 }
Nick Terrell254c5b12016-09-21 14:29:47 -0700591 if (!decompress) {
Nick Terrellc9325202016-09-01 15:22:19 -0700592 // If we are compressing and want to write skippable frames we can't
593 // start writing before compression is done because we need to know the
594 // compressed size.
595 // Wait for the compressed size to be available and write skippable frame
Elliott Hughes44aba642023-09-12 20:18:59 +0000596 assert(uint64_t(out->size()) < uint64_t(1) << 32);
597 SkippableFrame frame(uint32_t(out->size()));
Nick Terrellc9325202016-09-01 15:22:19 -0700598 if (!writeData(frame.data(), outputFd)) {
599 errorHolder.setError("Failed to write output");
600 return bytesWritten;
601 }
602 bytesWritten += frame.kSize;
603 }
604 // For each chunk of the frame: Pop it from the queue and write it
605 Buffer buffer;
606 while (out->pop(buffer) && !errorHolder.hasError()) {
607 if (!writeData(buffer.range(), outputFd)) {
608 errorHolder.setError("Failed to write output");
609 return bytesWritten;
610 }
611 bytesWritten += buffer.size();
Nick Terrell5aa5aa42020-05-22 22:26:02 -0700612 state.log.update(kLogInfo, "Written: %u MB ",
Nick Terrellbaa152e2016-10-12 19:02:27 -0700613 static_cast<std::uint32_t>(bytesWritten >> 20));
Nick Terrellc9325202016-09-01 15:22:19 -0700614 }
615 }
616 return bytesWritten;
617}
618}