storage_device/
fake_device.rs

1// Copyright 2021 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use crate::buffer::{BufferFuture, BufferRef, MutableBufferRef};
6use crate::buffer_allocator::{BufferAllocator, BufferSource};
7use crate::{Device, DeviceHolder};
8use anyhow::{ensure, Error};
9use async_trait::async_trait;
10use block_protocol::WriteOptions;
11use fuchsia_sync::Mutex;
12use rand::Rng;
13use std::ops::Range;
14use std::sync::atomic::{AtomicBool, Ordering};
15
16pub enum Op {
17    Read,
18    Write,
19    Flush,
20}
21
22pub trait Observer: Send + Sync {
23    fn barrier(&self) {}
24}
25
26#[derive(Debug, Default, Clone)]
27struct Inner {
28    data: Vec<u8>,
29    blocks_written_since_last_barrier: Vec<usize>,
30    attach_barrier: bool,
31}
32
33/// A Device backed by a memory buffer.
34pub struct FakeDevice {
35    allocator: BufferAllocator,
36    inner: Mutex<Inner>,
37    closed: AtomicBool,
38    operation_closure: Box<dyn Fn(Op) -> Result<(), Error> + Send + Sync>,
39    read_only: AtomicBool,
40    poisoned: AtomicBool,
41    observer: Option<Box<dyn Observer>>,
42}
43
44const TRANSFER_HEAP_SIZE: usize = 64 * 1024 * 1024;
45
46impl FakeDevice {
47    pub fn new(block_count: u64, block_size: u32) -> Self {
48        let allocator =
49            BufferAllocator::new(block_size as usize, BufferSource::new(TRANSFER_HEAP_SIZE));
50        Self {
51            allocator,
52            inner: Mutex::new(Inner {
53                data: vec![0 as u8; block_count as usize * block_size as usize],
54                blocks_written_since_last_barrier: Vec::new(),
55                attach_barrier: false,
56            }),
57            closed: AtomicBool::new(false),
58            operation_closure: Box::new(|_: Op| Ok(())),
59            read_only: AtomicBool::new(false),
60            poisoned: AtomicBool::new(false),
61            observer: None,
62        }
63    }
64
65    pub fn set_observer(&mut self, observer: Box<dyn Observer>) {
66        self.observer = Some(observer);
67    }
68
69    /// Sets a callback that will run at the beginning of read, write, and flush which will forward
70    /// any errors, and proceed on Ok().
71    pub fn set_op_callback(
72        &mut self,
73        cb: impl Fn(Op) -> Result<(), Error> + Send + Sync + 'static,
74    ) {
75        self.operation_closure = Box::new(cb);
76    }
77
78    /// Creates a fake block device from an image (which can be anything that implements
79    /// std::io::Read).  The size of the device is determined by how much data is read.
80    pub fn from_image(
81        mut reader: impl std::io::Read,
82        block_size: u32,
83    ) -> Result<Self, std::io::Error> {
84        let allocator =
85            BufferAllocator::new(block_size as usize, BufferSource::new(TRANSFER_HEAP_SIZE));
86        let mut data = Vec::new();
87        reader.read_to_end(&mut data)?;
88        Ok(Self {
89            allocator,
90            inner: Mutex::new(Inner {
91                data: data,
92                blocks_written_since_last_barrier: Vec::new(),
93                attach_barrier: false,
94            }),
95            closed: AtomicBool::new(false),
96            operation_closure: Box::new(|_| Ok(())),
97            read_only: AtomicBool::new(false),
98            poisoned: AtomicBool::new(false),
99            observer: None,
100        })
101    }
102}
103
104#[async_trait]
105impl Device for FakeDevice {
106    fn allocate_buffer(&self, size: usize) -> BufferFuture<'_> {
107        assert!(!self.closed.load(Ordering::Relaxed));
108        self.allocator.allocate_buffer(size)
109    }
110
111    fn block_size(&self) -> u32 {
112        self.allocator.block_size() as u32
113    }
114
115    fn block_count(&self) -> u64 {
116        self.inner.lock().data.len() as u64 / self.block_size() as u64
117    }
118
119    async fn read(&self, offset: u64, mut buffer: MutableBufferRef<'_>) -> Result<(), Error> {
120        ensure!(!self.closed.load(Ordering::Relaxed));
121        (self.operation_closure)(Op::Read)?;
122        let offset = offset as usize;
123        assert_eq!(offset % self.allocator.block_size(), 0);
124        let inner = self.inner.lock();
125        let size = buffer.len();
126        assert!(
127            offset + size <= inner.data.len(),
128            "offset: {} len: {} data.len: {}",
129            offset,
130            size,
131            inner.data.len()
132        );
133        buffer.as_mut_slice().copy_from_slice(&inner.data[offset..offset + size]);
134        Ok(())
135    }
136
137    async fn write_with_opts(
138        &self,
139        offset: u64,
140        buffer: BufferRef<'_>,
141        _opts: WriteOptions,
142    ) -> Result<(), Error> {
143        ensure!(!self.closed.load(Ordering::Relaxed));
144        ensure!(!self.read_only.load(Ordering::Relaxed));
145        let mut inner = self.inner.lock();
146
147        if inner.attach_barrier {
148            inner.blocks_written_since_last_barrier.clear();
149            inner.attach_barrier = false;
150        }
151
152        (self.operation_closure)(Op::Write)?;
153        let offset = offset as usize;
154        assert_eq!(offset % self.allocator.block_size(), 0);
155
156        let size = buffer.len();
157        assert!(
158            offset + size <= inner.data.len(),
159            "offset: {} len: {} data.len: {}",
160            offset,
161            size,
162            inner.data.len()
163        );
164        inner.data[offset..offset + size].copy_from_slice(buffer.as_slice());
165        let first_block = offset / self.allocator.block_size();
166        for block in first_block..first_block + size / self.allocator.block_size() {
167            inner.blocks_written_since_last_barrier.push(block)
168        }
169        Ok(())
170    }
171
172    async fn trim(&self, range: Range<u64>) -> Result<(), Error> {
173        ensure!(!self.closed.load(Ordering::Relaxed));
174        ensure!(!self.read_only.load(Ordering::Relaxed));
175        assert_eq!(range.start % self.block_size() as u64, 0);
176        assert_eq!(range.end % self.block_size() as u64, 0);
177        // Blast over the range to simulate it being used for something else.
178        let mut inner = self.inner.lock();
179        inner.data[range.start as usize..range.end as usize].fill(0xab);
180        Ok(())
181    }
182
183    async fn close(&self) -> Result<(), Error> {
184        self.closed.store(true, Ordering::Relaxed);
185        Ok(())
186    }
187
188    async fn flush(&self) -> Result<(), Error> {
189        self.inner.lock().blocks_written_since_last_barrier.clear();
190        (self.operation_closure)(Op::Flush)
191    }
192
193    fn barrier(&self) {
194        if let Some(observer) = &self.observer {
195            observer.barrier();
196        }
197        self.inner.lock().attach_barrier = true;
198    }
199
200    fn reopen(&self, read_only: bool) {
201        self.closed.store(false, Ordering::Relaxed);
202        self.read_only.store(read_only, Ordering::Relaxed);
203    }
204
205    fn is_read_only(&self) -> bool {
206        self.read_only.load(Ordering::Relaxed)
207    }
208
209    fn supports_trim(&self) -> bool {
210        true
211    }
212
213    fn snapshot(&self) -> Result<DeviceHolder, Error> {
214        let allocator =
215            BufferAllocator::new(self.block_size() as usize, BufferSource::new(TRANSFER_HEAP_SIZE));
216        Ok(DeviceHolder::new(Self {
217            allocator,
218            inner: Mutex::new(self.inner.lock().clone()),
219            closed: AtomicBool::new(false),
220            operation_closure: Box::new(|_: Op| Ok(())),
221            read_only: AtomicBool::new(false),
222            poisoned: AtomicBool::new(false),
223            observer: None,
224        }))
225    }
226
227    fn discard_random_since_last_flush(&self) -> Result<(), Error> {
228        let bs = self.allocator.block_size();
229        let mut rng = rand::thread_rng();
230        let mut guard = self.inner.lock();
231        let Inner { ref mut data, ref mut blocks_written_since_last_barrier, .. } = &mut *guard;
232        log::info!("Discarding from {blocks_written_since_last_barrier:?}");
233        let mut discarded = Vec::new();
234        for block in blocks_written_since_last_barrier.drain(..) {
235            if rng.gen() {
236                data[block * bs..(block + 1) * bs].fill(0xaf);
237                discarded.push(block);
238            }
239        }
240        log::info!("Discarded {discarded:?}");
241        Ok(())
242    }
243
244    /// Sets the poisoned state for the device. A poisoned device will panic the thread that
245    /// performs Drop on it.
246    fn poison(&self) -> Result<(), Error> {
247        self.poisoned.store(true, Ordering::Relaxed);
248        Ok(())
249    }
250}
251
252impl Drop for FakeDevice {
253    fn drop(&mut self) {
254        if self.poisoned.load(Ordering::Relaxed) {
255            panic!("This device was poisoned to crash whomever is holding a reference here.");
256        }
257    }
258}
259
260#[cfg(test)]
261mod tests {
262    use super::FakeDevice;
263    use crate::Device;
264    use block_protocol::WriteOptions;
265    use rand::{thread_rng, Rng};
266
267    const TEST_DEVICE_BLOCK_SIZE: usize = 512;
268
269    #[fuchsia::test(threads = 10)]
270    async fn test_discard_random_with_barriers() {
271        let device = FakeDevice::new(8192, TEST_DEVICE_BLOCK_SIZE as u32);
272        // Loop 100 times to catch errors.
273        for _ in 0..1000 {
274            let mut data = vec![0; 7 * TEST_DEVICE_BLOCK_SIZE];
275            thread_rng().fill(&mut data[..]);
276            // Ensure that barriers work with overwrites.
277            let indices = [1, 2, 3, 4, 3, 5, 6];
278            for i in 0..indices.len() {
279                let mut buffer = device.allocate_buffer(TEST_DEVICE_BLOCK_SIZE).await;
280                if i == 2 || i == 5 {
281                    buffer.as_mut_slice().copy_from_slice(
282                        &data[indices[i] * TEST_DEVICE_BLOCK_SIZE
283                            ..indices[i] * TEST_DEVICE_BLOCK_SIZE + TEST_DEVICE_BLOCK_SIZE],
284                    );
285                    device.barrier();
286                    device
287                        .write(i as u64 * TEST_DEVICE_BLOCK_SIZE as u64, buffer.as_ref())
288                        .await
289                        .expect("Failed to write to FakeDevice");
290                } else {
291                    buffer.as_mut_slice().copy_from_slice(
292                        &data[indices[i] * TEST_DEVICE_BLOCK_SIZE
293                            ..indices[i] * TEST_DEVICE_BLOCK_SIZE + TEST_DEVICE_BLOCK_SIZE],
294                    );
295                    device
296                        .write_with_opts(
297                            i as u64 * TEST_DEVICE_BLOCK_SIZE as u64,
298                            buffer.as_ref(),
299                            WriteOptions::empty(),
300                        )
301                        .await
302                        .expect("Failed to write to FakeDevice");
303                }
304            }
305            device.discard_random_since_last_flush().expect("failed to randomly discard writes");
306            let mut discard = false;
307            let mut discard_2 = false;
308            for i in 0..7 {
309                let mut read_buffer = device.allocate_buffer(TEST_DEVICE_BLOCK_SIZE).await;
310                device
311                    .read(i as u64 * TEST_DEVICE_BLOCK_SIZE as u64, read_buffer.as_mut())
312                    .await
313                    .expect("failed to read from FakeDevice");
314                let expected_data = &data[indices[i] * TEST_DEVICE_BLOCK_SIZE
315                    ..indices[i] * TEST_DEVICE_BLOCK_SIZE + TEST_DEVICE_BLOCK_SIZE];
316                if i < 2 {
317                    if expected_data != read_buffer.as_slice() {
318                        discard = true;
319                    }
320                } else if i < 5 {
321                    if discard == true {
322                        assert_ne!(expected_data, read_buffer.as_slice());
323                        discard_2 = true;
324                    } else if expected_data != read_buffer.as_slice() {
325                        discard_2 = true;
326                    }
327                } else if discard_2 == true {
328                    assert_ne!(expected_data, read_buffer.as_slice());
329                }
330            }
331        }
332    }
333}