test_diagnostics/
zstd_compress.rs1use futures::channel::mpsc;
6use futures::SinkExt;
7use std::cell::RefCell;
8use thiserror::Error;
9use zstd::stream::raw::Operation;
10
11const BUFFER_SIZE: usize = 1024 * 1024 * 4; const CHANNEL_SIZE: usize = 10; thread_local! {
15 static BUFFER: RefCell<Vec<u8>> = RefCell::new(vec![0; BUFFER_SIZE]);
16}
17
18#[derive(Debug, Error)]
20pub enum Error {
21 #[error("Error Decompressing bytes: pos: {1}, len: {2}, error: {0:?}")]
23 Decompress(#[source] std::io::Error, usize, usize),
24
25 #[error("Error compressing bytes: pos: {1}, len: {2}, error: {0:?}")]
27 Compress(#[source] std::io::Error, usize, usize),
28
29 #[error("Error Decompressing while flushing: {0:?}")]
31 DecompressFinish(#[source] std::io::Error),
32
33 #[error("Error compressing while flushing: {0:?}")]
35 CompressFinish(#[source] std::io::Error),
36
37 #[error("Error while sending on mpsc channel: {0:?}")]
39 Send(#[source] mpsc::SendError),
40}
41
42pub struct Decoder<'a> {
44 sender: mpsc::Sender<Vec<u8>>,
45 decoder: zstd::stream::raw::Decoder<'a>,
46}
47
48impl Decoder<'static> {
49 pub fn new() -> (Self, mpsc::Receiver<Vec<u8>>) {
54 let (sender, receiver) = mpsc::channel(CHANNEL_SIZE);
55 let decoder = Self { sender: sender, decoder: zstd::stream::raw::Decoder::new().unwrap() };
56 (decoder, receiver)
57 }
58
59 pub async fn decompress(&mut self, bytes: &[u8]) -> Result<(), Error> {
65 let len = bytes.len();
66 let mut pos = 0;
67 while pos != len {
68 let decoded_bytes = BUFFER.with_borrow_mut(|buf| {
69 let status = self
70 .decoder
71 .run_on_buffers(&bytes[pos..], buf.as_mut_slice())
72 .map_err(|e| Error::Decompress(e, pos, len))?;
73 pos += status.bytes_read;
74 Ok::<Vec<u8>, Error>(buf[..status.bytes_written].to_vec())
75 })?;
76 self.sender.send(decoded_bytes).await.map_err(Error::Send)?;
77 }
78 Ok(())
79 }
80
81 pub async fn finish(mut self) -> Result<(), Error> {
86 loop {
87 let (remaining_bytes, decoded_bytes) = BUFFER.with_borrow_mut(|buf| {
88 let mut out_buffer = zstd::stream::raw::OutBuffer::around(buf.as_mut_slice());
89 let remaining_bytes =
90 self.decoder.flush(&mut out_buffer).map_err(Error::DecompressFinish)?;
91 Ok::<(usize, Vec<u8>), Error>((remaining_bytes, out_buffer.as_slice().to_vec()))
92 })?;
93 if !decoded_bytes.is_empty() {
94 self.sender.send(decoded_bytes).await.map_err(Error::Send)?;
95 }
96 if remaining_bytes == 0 {
97 break;
98 }
99 }
100 Ok(())
101 }
102}
103
104pub struct Encoder<'a> {
106 sender: mpsc::Sender<Vec<u8>>,
107 encoder: zstd::stream::raw::Encoder<'a>,
108}
109
110impl Encoder<'static> {
111 pub fn new(level: i32) -> (Self, mpsc::Receiver<Vec<u8>>) {
117 let (sender, receiver) = mpsc::channel(CHANNEL_SIZE);
118 let decoder =
119 Self { sender: sender, encoder: zstd::stream::raw::Encoder::new(level).unwrap() };
120 (decoder, receiver)
121 }
122
123 pub async fn compress(&mut self, bytes: &[u8]) -> Result<(), Error> {
129 let len = bytes.len();
130 let mut pos = 0;
131 while pos != len {
132 let encoded_bytes = BUFFER.with_borrow_mut(|buf| {
133 let status = self
134 .encoder
135 .run_on_buffers(&bytes[pos..], buf.as_mut_slice())
136 .map_err(|e| Error::Compress(e, pos, len))?;
137 pos += status.bytes_read;
138 Ok::<Vec<u8>, Error>(buf[..status.bytes_written].to_vec())
139 })?;
140 self.sender.send(encoded_bytes).await.map_err(Error::Send)?;
141 }
142 Ok(())
143 }
144
145 pub async fn finish(mut self) -> Result<(), Error> {
150 loop {
151 let (remaining_bytes, encoded_bytes) = BUFFER.with_borrow_mut(|buf| {
152 let mut out_buffer = zstd::stream::raw::OutBuffer::around(buf.as_mut_slice());
153 let remaining_bytes =
154 self.encoder.finish(&mut out_buffer, true).map_err(Error::CompressFinish)?;
155 Ok::<(usize, Vec<u8>), Error>((remaining_bytes, out_buffer.as_slice().to_vec()))
156 })?;
157 if !encoded_bytes.is_empty() {
158 self.sender.send(encoded_bytes).await.map_err(Error::Send)?;
159 }
160 if remaining_bytes == 0 {
161 break;
162 }
163 }
164 Ok(())
165 }
166}
167
168#[cfg(test)]
169mod tests {
170 use super::*;
171 use assert_matches::assert_matches;
172 use futures::StreamExt;
173 use test_case::test_case;
174
175 #[test_case(Vec::from(b"This is a test string"); "normal test string")]
176 #[test_case(Vec::from(b""); "empty string")]
177 #[fuchsia::test]
178 async fn test_compress_decompress(original_data: Vec<u8>) {
179 let (mut encoder, mut rx) = Encoder::new(0);
180 let (mut decoder, mut drx) = Decoder::new();
181
182 encoder.compress(original_data.as_slice()).await.unwrap();
184 encoder.finish().await.unwrap();
185
186 let mut compressed_data = Vec::new();
188 while let Some(chunk) = rx.next().await {
189 compressed_data.extend_from_slice(&chunk);
190 }
191
192 assert_ne!(compressed_data.len(), original_data.len());
193
194 decoder.decompress(&compressed_data).await.unwrap();
196 decoder.finish().await.unwrap();
197
198 let mut decompressed_data = Vec::new();
200 while let Some(chunk) = drx.next().await {
201 decompressed_data.extend_from_slice(&chunk);
202 }
203
204 assert_eq!(original_data.as_slice(), &decompressed_data[..]);
206 }
207
208 #[fuchsia::test]
209 async fn test_compress_decompress_large_chunked() {
210 let (mut encoder, mut rx) = Encoder::new(0);
211 let (mut decoder, mut drx) = Decoder::new();
212
213 let original_data = vec![b'a'; BUFFER_SIZE * 10 + 100];
214 let chunk_size = 2 * 1024 * 1024; let compress_fut = async {
218 for i in (0..original_data.len()).step_by(chunk_size) {
219 encoder
220 .compress(&original_data[i..i + chunk_size.min(original_data.len() - i)])
221 .await
222 .unwrap();
223 }
224 encoder.finish().await.unwrap();
225 };
226 let mut compressed_len = 0;
227 let decompress_fut = async {
228 while let Some(compressed_chunk) = rx.next().await {
229 compressed_len += compressed_chunk.len();
230 decoder.decompress(&compressed_chunk).await.unwrap();
231 }
232 decoder.finish().await.unwrap();
233 };
234
235 let mut decompressed_data = Vec::new();
236 let collect_final_data = async {
237 while let Some(chunk) = drx.next().await {
239 decompressed_data.extend_from_slice(&chunk);
240 }
241 };
242
243 futures::join!(compress_fut, decompress_fut, collect_final_data);
244
245 assert!(compressed_len < original_data.len());
246 assert_eq!(original_data, decompressed_data);
247 }
248
249 #[fuchsia::test]
250 async fn test_compress_decompress_random_chunked() {
251 let (mut encoder, mut rx) = Encoder::new(0);
252 let (mut decoder, mut drx) = Decoder::new();
253
254 let mut original_data = vec![0u8; BUFFER_SIZE * 5 + 100];
255 rand::fill(&mut original_data[..]); let chunk_size = 2 * 1024 * 1024; let compress_fut = async {
260 for i in (0..original_data.len()).step_by(chunk_size) {
261 encoder
262 .compress(&original_data[i..i + chunk_size.min(original_data.len() - i)])
263 .await
264 .unwrap();
265 }
266 encoder.finish().await.unwrap();
267 };
268 let mut compressed_len = 0;
269 let decompress_fut = async {
270 while let Some(compressed_chunk) = rx.next().await {
271 compressed_len += compressed_chunk.len();
272 decoder.decompress(&compressed_chunk).await.unwrap();
273 }
274 decoder.finish().await.unwrap();
275 };
276
277 let mut decompressed_data = Vec::new();
278 let collect_final_data = async {
279 while let Some(chunk) = drx.next().await {
281 decompressed_data.extend_from_slice(&chunk);
282 }
283 };
284
285 futures::join!(compress_fut, decompress_fut, collect_final_data);
286
287 assert_ne!(compressed_len, original_data.len());
288 assert_eq!(original_data, decompressed_data);
289 }
290
291 #[fuchsia::test]
292 async fn test_invalid_input() {
293 let (mut decoder, _drx) = Decoder::new();
294
295 let invalid_data = vec![0xff; 1024];
296
297 let result = decoder.decompress(&invalid_data).await;
298
299 assert_matches!(result, Err(Error::Decompress(..)));
300 }
301
302 #[fuchsia::test]
303 async fn test_send_error() {
304 let (mut encoder, rx) = Encoder::new(0);
305
306 let data = b"some_text";
307 drop(rx);
308
309 let result = encoder.compress(data).await;
310
311 assert_matches!(result, Err(Error::Send(..)));
312
313 let (mut encoder, mut rx) = Encoder::new(0);
314 let (mut decoder, drx) = Decoder::new();
315 encoder.compress(data).await.unwrap();
316 encoder.finish().await.unwrap();
317 drop(drx);
318
319 let mut compressed_data = Vec::new();
320 while let Some(chunk) = rx.next().await {
321 compressed_data.extend_from_slice(&chunk);
322 }
323
324 let result = decoder.decompress(&compressed_data).await;
325 assert_matches!(result, Err(Error::Send(..)));
326 }
327}