blob: 3645e594268ec28bebdacfe31f089e986cdce279 [file] [log] [blame]
Yann Collete21384f2017-08-31 12:11:57 -07001/*
Elliott Hughes44aba642023-09-12 20:18:59 +00002 * Copyright (c) Meta Platforms, Inc. and affiliates.
Nick Terrellc9325202016-09-01 15:22:19 -07003 * All rights reserved.
4 *
Yann Collete21384f2017-08-31 12:11:57 -07005 * This source code is licensed under both the BSD-style license (found in the
6 * LICENSE file in the root directory of this source tree) and the GPLv2 (found
7 * in the COPYING file in the root directory of this source tree).
Nick Terrellc9325202016-09-01 15:22:19 -07008 */
9#pragma once
10
11#include "ErrorHolder.h"
Nick Terrellbaa152e2016-10-12 19:02:27 -070012#include "Logging.h"
Nick Terrellc9325202016-09-01 15:22:19 -070013#include "Options.h"
14#include "utils/Buffer.h"
15#include "utils/Range.h"
Nick Terrell48294b52016-10-12 15:18:16 -070016#include "utils/ResourcePool.h"
Nick Terrellc9325202016-09-01 15:22:19 -070017#include "utils/ThreadPool.h"
18#include "utils/WorkQueue.h"
19#define ZSTD_STATIC_LINKING_ONLY
sen40def702021-05-13 14:41:21 -040020#define ZSTD_DISABLE_DEPRECATE_WARNINGS /* No deprecation warnings, pzstd itself is deprecated
21 * and uses deprecated functions
22 */
Nick Terrellc9325202016-09-01 15:22:19 -070023#include "zstd.h"
24#undef ZSTD_STATIC_LINKING_ONLY
25
26#include <cstddef>
Nick Terrell823bf3d2016-09-06 20:11:02 -070027#include <cstdint>
Nick Terrellc9325202016-09-01 15:22:19 -070028#include <memory>
29
30namespace pzstd {
31/**
32 * Runs pzstd with `options` and returns the number of bytes written.
33 * An error occurred if `errorHandler.hasError()`.
34 *
35 * @param options The pzstd options to use for (de)compression
Nick Terrell254c5b12016-09-21 14:29:47 -070036 * @returns 0 upon success and non-zero on failure.
Nick Terrellc9325202016-09-01 15:22:19 -070037 */
Nick Terrell254c5b12016-09-21 14:29:47 -070038int pzstdMain(const Options& options);
Nick Terrellc9325202016-09-01 15:22:19 -070039
Nick Terrell48294b52016-10-12 15:18:16 -070040class SharedState {
41 public:
Nick Terrellbaa152e2016-10-12 19:02:27 -070042 SharedState(const Options& options) : log(options.verbosity) {
43 if (!options.decompress) {
44 auto parameters = options.determineParameters();
Nick Terrelle9e151c2016-10-12 17:23:38 -070045 cStreamPool.reset(new ResourcePool<ZSTD_CStream>{
Nick Terrellf147fcc2016-11-15 16:39:09 -080046 [this, parameters]() -> ZSTD_CStream* {
Nick Terrell5aa5aa42020-05-22 22:26:02 -070047 this->log(kLogVerbose, "%s\n", "Creating new ZSTD_CStream");
Nick Terrelle9e151c2016-10-12 17:23:38 -070048 auto zcs = ZSTD_createCStream();
49 if (zcs) {
50 auto err = ZSTD_initCStream_advanced(
51 zcs, nullptr, 0, parameters, 0);
52 if (ZSTD_isError(err)) {
53 ZSTD_freeCStream(zcs);
54 return nullptr;
55 }
56 }
57 return zcs;
58 },
59 [](ZSTD_CStream *zcs) {
60 ZSTD_freeCStream(zcs);
61 }});
62 } else {
63 dStreamPool.reset(new ResourcePool<ZSTD_DStream>{
Nick Terrellf147fcc2016-11-15 16:39:09 -080064 [this]() -> ZSTD_DStream* {
Nick Terrell5aa5aa42020-05-22 22:26:02 -070065 this->log(kLogVerbose, "%s\n", "Creating new ZSTD_DStream");
Nick Terrelle9e151c2016-10-12 17:23:38 -070066 auto zds = ZSTD_createDStream();
67 if (zds) {
68 auto err = ZSTD_initDStream(zds);
69 if (ZSTD_isError(err)) {
70 ZSTD_freeDStream(zds);
71 return nullptr;
72 }
73 }
74 return zds;
75 },
76 [](ZSTD_DStream *zds) {
77 ZSTD_freeDStream(zds);
78 }});
79 }
80 }
81
Nick Terrellf147fcc2016-11-15 16:39:09 -080082 ~SharedState() {
83 // The resource pools have references to this, so destroy them first.
84 cStreamPool.reset();
85 dStreamPool.reset();
86 }
87
Nick Terrellbaa152e2016-10-12 19:02:27 -070088 Logger log;
Nick Terrell48294b52016-10-12 15:18:16 -070089 ErrorHolder errorHolder;
Nick Terrelle9e151c2016-10-12 17:23:38 -070090 std::unique_ptr<ResourcePool<ZSTD_CStream>> cStreamPool;
91 std::unique_ptr<ResourcePool<ZSTD_DStream>> dStreamPool;
Nick Terrell48294b52016-10-12 15:18:16 -070092};
93
Nick Terrellc9325202016-09-01 15:22:19 -070094/**
95 * Streams input from `fd`, breaks input up into chunks, and compresses each
96 * chunk independently. Output of each chunk gets streamed to a queue, and
97 * the output queues get put into `chunks` in order.
98 *
Nick Terrell48294b52016-10-12 15:18:16 -070099 * @param state The shared state
Nick Terrellc9325202016-09-01 15:22:19 -0700100 * @param chunks Each compression jobs output queue gets `pushed()` here
101 * as soon as it is available
102 * @param executor The thread pool to run compression jobs in
103 * @param fd The input file descriptor
104 * @param size The size of the input file if known, 0 otherwise
105 * @param numThreads The number of threads in the thread pool
106 * @param parameters The zstd parameters to use for compression
Nick Terrelld2498892016-09-23 12:55:21 -0700107 * @returns The number of bytes read from the file
Nick Terrellc9325202016-09-01 15:22:19 -0700108 */
Nick Terrelld2498892016-09-23 12:55:21 -0700109std::uint64_t asyncCompressChunks(
Nick Terrell48294b52016-10-12 15:18:16 -0700110 SharedState& state,
Nick Terrellc9325202016-09-01 15:22:19 -0700111 WorkQueue<std::shared_ptr<BufferWorkQueue>>& chunks,
112 ThreadPool& executor,
113 FILE* fd,
Nick Terrell823bf3d2016-09-06 20:11:02 -0700114 std::uintmax_t size,
Nick Terrellc9325202016-09-01 15:22:19 -0700115 std::size_t numThreads,
116 ZSTD_parameters parameters);
117
118/**
119 * Streams input from `fd`. If pzstd headers are available it breaks the input
120 * up into independent frames. It sends each frame to an independent
121 * decompression job. Output of each frame gets streamed to a queue, and
122 * the output queues get put into `frames` in order.
123 *
Nick Terrell48294b52016-10-12 15:18:16 -0700124 * @param state The shared state
Nick Terrellc9325202016-09-01 15:22:19 -0700125 * @param frames Each decompression jobs output queue gets `pushed()` here
126 * as soon as it is available
127 * @param executor The thread pool to run compression jobs in
128 * @param fd The input file descriptor
Nick Terrelld2498892016-09-23 12:55:21 -0700129 * @returns The number of bytes read from the file
Nick Terrellc9325202016-09-01 15:22:19 -0700130 */
Nick Terrelld2498892016-09-23 12:55:21 -0700131std::uint64_t asyncDecompressFrames(
Nick Terrell48294b52016-10-12 15:18:16 -0700132 SharedState& state,
Nick Terrellc9325202016-09-01 15:22:19 -0700133 WorkQueue<std::shared_ptr<BufferWorkQueue>>& frames,
134 ThreadPool& executor,
135 FILE* fd);
136
137/**
138 * Streams input in from each queue in `outs` in order, and writes the data to
139 * `outputFd`.
140 *
Nick Terrell48294b52016-10-12 15:18:16 -0700141 * @param state The shared state
Nick Terrell254c5b12016-09-21 14:29:47 -0700142 * @param outs A queue of output queues, one for each
143 * (de)compression job.
144 * @param outputFd The file descriptor to write to
145 * @param decompress Are we decompressing?
146 * @returns The number of bytes written
Nick Terrellc9325202016-09-01 15:22:19 -0700147 */
Nick Terrelld2498892016-09-23 12:55:21 -0700148std::uint64_t writeFile(
Nick Terrell48294b52016-10-12 15:18:16 -0700149 SharedState& state,
Nick Terrellc9325202016-09-01 15:22:19 -0700150 WorkQueue<std::shared_ptr<BufferWorkQueue>>& outs,
151 FILE* outputFd,
Nick Terrellbaa152e2016-10-12 19:02:27 -0700152 bool decompress);
Nick Terrellc9325202016-09-01 15:22:19 -0700153}