1use crate::buffer::{BufferFuture, BufferRef, MutableBufferRef};
6use crate::buffer_allocator::{BufferAllocator, BufferSource};
7use crate::{Device, DeviceHolder};
8use anyhow::{ensure, Error};
9use async_trait::async_trait;
10use block_protocol::WriteOptions;
11use fuchsia_sync::Mutex;
12use rand::Rng;
13use std::ops::Range;
14use std::sync::atomic::{AtomicBool, Ordering};
15
16pub enum Op {
17 Read,
18 Write,
19 Flush,
20}
21
22pub trait Observer: Send + Sync {
23 fn barrier(&self) {}
24}
25
26#[derive(Debug, Default, Clone)]
27struct Inner {
28 data: Vec<u8>,
29 blocks_written_since_last_barrier: Vec<usize>,
30 attach_barrier: bool,
31}
32
33pub struct FakeDevice {
35 allocator: BufferAllocator,
36 inner: Mutex<Inner>,
37 closed: AtomicBool,
38 operation_closure: Box<dyn Fn(Op) -> Result<(), Error> + Send + Sync>,
39 read_only: AtomicBool,
40 poisoned: AtomicBool,
41 observer: Option<Box<dyn Observer>>,
42}
43
44const TRANSFER_HEAP_SIZE: usize = 64 * 1024 * 1024;
45
46impl FakeDevice {
47 pub fn new(block_count: u64, block_size: u32) -> Self {
48 let allocator =
49 BufferAllocator::new(block_size as usize, BufferSource::new(TRANSFER_HEAP_SIZE));
50 Self {
51 allocator,
52 inner: Mutex::new(Inner {
53 data: vec![0 as u8; block_count as usize * block_size as usize],
54 blocks_written_since_last_barrier: Vec::new(),
55 attach_barrier: false,
56 }),
57 closed: AtomicBool::new(false),
58 operation_closure: Box::new(|_: Op| Ok(())),
59 read_only: AtomicBool::new(false),
60 poisoned: AtomicBool::new(false),
61 observer: None,
62 }
63 }
64
65 pub fn set_observer(&mut self, observer: Box<dyn Observer>) {
66 self.observer = Some(observer);
67 }
68
69 pub fn set_op_callback(
72 &mut self,
73 cb: impl Fn(Op) -> Result<(), Error> + Send + Sync + 'static,
74 ) {
75 self.operation_closure = Box::new(cb);
76 }
77
78 pub fn from_image(
81 mut reader: impl std::io::Read,
82 block_size: u32,
83 ) -> Result<Self, std::io::Error> {
84 let allocator =
85 BufferAllocator::new(block_size as usize, BufferSource::new(TRANSFER_HEAP_SIZE));
86 let mut data = Vec::new();
87 reader.read_to_end(&mut data)?;
88 Ok(Self {
89 allocator,
90 inner: Mutex::new(Inner {
91 data: data,
92 blocks_written_since_last_barrier: Vec::new(),
93 attach_barrier: false,
94 }),
95 closed: AtomicBool::new(false),
96 operation_closure: Box::new(|_| Ok(())),
97 read_only: AtomicBool::new(false),
98 poisoned: AtomicBool::new(false),
99 observer: None,
100 })
101 }
102}
103
104#[async_trait]
105impl Device for FakeDevice {
106 fn allocate_buffer(&self, size: usize) -> BufferFuture<'_> {
107 assert!(!self.closed.load(Ordering::Relaxed));
108 self.allocator.allocate_buffer(size)
109 }
110
111 fn block_size(&self) -> u32 {
112 self.allocator.block_size() as u32
113 }
114
115 fn block_count(&self) -> u64 {
116 self.inner.lock().data.len() as u64 / self.block_size() as u64
117 }
118
119 async fn read(&self, offset: u64, mut buffer: MutableBufferRef<'_>) -> Result<(), Error> {
120 ensure!(!self.closed.load(Ordering::Relaxed));
121 (self.operation_closure)(Op::Read)?;
122 let offset = offset as usize;
123 assert_eq!(offset % self.allocator.block_size(), 0);
124 let inner = self.inner.lock();
125 let size = buffer.len();
126 assert!(
127 offset + size <= inner.data.len(),
128 "offset: {} len: {} data.len: {}",
129 offset,
130 size,
131 inner.data.len()
132 );
133 buffer.as_mut_slice().copy_from_slice(&inner.data[offset..offset + size]);
134 Ok(())
135 }
136
137 async fn write_with_opts(
138 &self,
139 offset: u64,
140 buffer: BufferRef<'_>,
141 _opts: WriteOptions,
142 ) -> Result<(), Error> {
143 ensure!(!self.closed.load(Ordering::Relaxed));
144 ensure!(!self.read_only.load(Ordering::Relaxed));
145 let mut inner = self.inner.lock();
146
147 if inner.attach_barrier {
148 inner.blocks_written_since_last_barrier.clear();
149 inner.attach_barrier = false;
150 }
151
152 (self.operation_closure)(Op::Write)?;
153 let offset = offset as usize;
154 assert_eq!(offset % self.allocator.block_size(), 0);
155
156 let size = buffer.len();
157 assert!(
158 offset + size <= inner.data.len(),
159 "offset: {} len: {} data.len: {}",
160 offset,
161 size,
162 inner.data.len()
163 );
164 inner.data[offset..offset + size].copy_from_slice(buffer.as_slice());
165 let first_block = offset / self.allocator.block_size();
166 for block in first_block..first_block + size / self.allocator.block_size() {
167 inner.blocks_written_since_last_barrier.push(block)
168 }
169 Ok(())
170 }
171
172 async fn trim(&self, range: Range<u64>) -> Result<(), Error> {
173 ensure!(!self.closed.load(Ordering::Relaxed));
174 ensure!(!self.read_only.load(Ordering::Relaxed));
175 assert_eq!(range.start % self.block_size() as u64, 0);
176 assert_eq!(range.end % self.block_size() as u64, 0);
177 let mut inner = self.inner.lock();
179 inner.data[range.start as usize..range.end as usize].fill(0xab);
180 Ok(())
181 }
182
183 async fn close(&self) -> Result<(), Error> {
184 self.closed.store(true, Ordering::Relaxed);
185 Ok(())
186 }
187
188 async fn flush(&self) -> Result<(), Error> {
189 self.inner.lock().blocks_written_since_last_barrier.clear();
190 (self.operation_closure)(Op::Flush)
191 }
192
193 fn barrier(&self) {
194 if let Some(observer) = &self.observer {
195 observer.barrier();
196 }
197 self.inner.lock().attach_barrier = true;
198 }
199
200 fn reopen(&self, read_only: bool) {
201 self.closed.store(false, Ordering::Relaxed);
202 self.read_only.store(read_only, Ordering::Relaxed);
203 }
204
205 fn is_read_only(&self) -> bool {
206 self.read_only.load(Ordering::Relaxed)
207 }
208
209 fn supports_trim(&self) -> bool {
210 true
211 }
212
213 fn snapshot(&self) -> Result<DeviceHolder, Error> {
214 let allocator =
215 BufferAllocator::new(self.block_size() as usize, BufferSource::new(TRANSFER_HEAP_SIZE));
216 Ok(DeviceHolder::new(Self {
217 allocator,
218 inner: Mutex::new(self.inner.lock().clone()),
219 closed: AtomicBool::new(false),
220 operation_closure: Box::new(|_: Op| Ok(())),
221 read_only: AtomicBool::new(false),
222 poisoned: AtomicBool::new(false),
223 observer: None,
224 }))
225 }
226
227 fn discard_random_since_last_flush(&self) -> Result<(), Error> {
228 let bs = self.allocator.block_size();
229 let mut rng = rand::thread_rng();
230 let mut guard = self.inner.lock();
231 let Inner { ref mut data, ref mut blocks_written_since_last_barrier, .. } = &mut *guard;
232 log::info!("Discarding from {blocks_written_since_last_barrier:?}");
233 let mut discarded = Vec::new();
234 for block in blocks_written_since_last_barrier.drain(..) {
235 if rng.gen() {
236 data[block * bs..(block + 1) * bs].fill(0xaf);
237 discarded.push(block);
238 }
239 }
240 log::info!("Discarded {discarded:?}");
241 Ok(())
242 }
243
244 fn poison(&self) -> Result<(), Error> {
247 self.poisoned.store(true, Ordering::Relaxed);
248 Ok(())
249 }
250}
251
252impl Drop for FakeDevice {
253 fn drop(&mut self) {
254 if self.poisoned.load(Ordering::Relaxed) {
255 panic!("This device was poisoned to crash whomever is holding a reference here.");
256 }
257 }
258}
259
260#[cfg(test)]
261mod tests {
262 use super::FakeDevice;
263 use crate::Device;
264 use block_protocol::WriteOptions;
265 use rand::{thread_rng, Rng};
266
267 const TEST_DEVICE_BLOCK_SIZE: usize = 512;
268
269 #[fuchsia::test(threads = 10)]
270 async fn test_discard_random_with_barriers() {
271 let device = FakeDevice::new(8192, TEST_DEVICE_BLOCK_SIZE as u32);
272 for _ in 0..1000 {
274 let mut data = vec![0; 7 * TEST_DEVICE_BLOCK_SIZE];
275 thread_rng().fill(&mut data[..]);
276 let indices = [1, 2, 3, 4, 3, 5, 6];
278 for i in 0..indices.len() {
279 let mut buffer = device.allocate_buffer(TEST_DEVICE_BLOCK_SIZE).await;
280 if i == 2 || i == 5 {
281 buffer.as_mut_slice().copy_from_slice(
282 &data[indices[i] * TEST_DEVICE_BLOCK_SIZE
283 ..indices[i] * TEST_DEVICE_BLOCK_SIZE + TEST_DEVICE_BLOCK_SIZE],
284 );
285 device.barrier();
286 device
287 .write(i as u64 * TEST_DEVICE_BLOCK_SIZE as u64, buffer.as_ref())
288 .await
289 .expect("Failed to write to FakeDevice");
290 } else {
291 buffer.as_mut_slice().copy_from_slice(
292 &data[indices[i] * TEST_DEVICE_BLOCK_SIZE
293 ..indices[i] * TEST_DEVICE_BLOCK_SIZE + TEST_DEVICE_BLOCK_SIZE],
294 );
295 device
296 .write_with_opts(
297 i as u64 * TEST_DEVICE_BLOCK_SIZE as u64,
298 buffer.as_ref(),
299 WriteOptions::empty(),
300 )
301 .await
302 .expect("Failed to write to FakeDevice");
303 }
304 }
305 device.discard_random_since_last_flush().expect("failed to randomly discard writes");
306 let mut discard = false;
307 let mut discard_2 = false;
308 for i in 0..7 {
309 let mut read_buffer = device.allocate_buffer(TEST_DEVICE_BLOCK_SIZE).await;
310 device
311 .read(i as u64 * TEST_DEVICE_BLOCK_SIZE as u64, read_buffer.as_mut())
312 .await
313 .expect("failed to read from FakeDevice");
314 let expected_data = &data[indices[i] * TEST_DEVICE_BLOCK_SIZE
315 ..indices[i] * TEST_DEVICE_BLOCK_SIZE + TEST_DEVICE_BLOCK_SIZE];
316 if i < 2 {
317 if expected_data != read_buffer.as_slice() {
318 discard = true;
319 }
320 } else if i < 5 {
321 if discard == true {
322 assert_ne!(expected_data, read_buffer.as_slice());
323 discard_2 = true;
324 } else if expected_data != read_buffer.as_slice() {
325 discard_2 = true;
326 }
327 } else if discard_2 == true {
328 assert_ne!(expected_data, read_buffer.as_slice());
329 }
330 }
331 }
332 }
333}