fxfs/object_store/journal/
writer.rs1use crate::checksum::{fletcher64, Checksum};
6use crate::log::*;
7use crate::metrics;
8use crate::object_store::journal::JournalCheckpoint;
9use crate::serialized_types::{Versioned, LATEST_VERSION};
10use anyhow::{anyhow, Error};
11use byteorder::{LittleEndian, WriteBytesExt};
12use fuchsia_inspect::{Property as _, UintProperty};
13use std::cmp::min;
14use std::io::Write;
15use storage_device::buffer::MutableBufferRef;
16
17pub struct JournalWriter {
21 block_size: usize,
23
24 checkpoint: JournalCheckpoint,
26
27 last_checksum: Checksum,
29
30 buf: Vec<u8>,
32
33 journal_checkpoint_offset: UintProperty,
35}
36
37impl JournalWriter {
38 pub fn new(block_size: usize, last_checksum: u64) -> Self {
39 let checkpoint =
42 JournalCheckpoint { version: LATEST_VERSION, ..JournalCheckpoint::default() };
43 JournalWriter {
44 block_size,
45 checkpoint,
46 last_checksum,
47 buf: Vec::new(),
48 journal_checkpoint_offset: metrics::detail()
49 .create_uint("journal_checkpoint_offset", 0),
50 }
51 }
52
53 pub fn write_record<T: Versioned + std::fmt::Debug>(
55 &mut self,
56 record: &T,
57 ) -> Result<(), Error> {
58 let buf_len = self.buf.len();
59 record.serialize_into(&mut *self).unwrap(); if self.buf.len() - buf_len <= self.block_size {
64 Ok(())
65 } else {
66 Err(anyhow!(
67 "Serialized record too big ({} bytes): {:?}",
68 self.buf.len() - buf_len,
69 record
70 ))
71 }
72 }
73
74 pub fn pad_to_block(&mut self) -> std::io::Result<()> {
76 let align = self.buf.len() % self.block_size;
77 if align > 0 {
78 self.write_all(&vec![0; self.block_size - std::mem::size_of::<Checksum>() - align])?;
79 }
80 Ok(())
81 }
82
83 pub(super) fn journal_file_checkpoint(&self) -> JournalCheckpoint {
86 JournalCheckpoint {
87 file_offset: self.checkpoint.file_offset + self.buf.len() as u64,
88 checksum: self.last_checksum,
89 version: self.checkpoint.version,
90 }
91 }
92
93 pub fn flushable_bytes(&self) -> usize {
95 self.buf.len() - self.buf.len() % self.block_size
96 }
97
98 pub fn take_flushable<'a>(&mut self, mut buf: MutableBufferRef<'a>) -> u64 {
104 assert!(self.flushable_bytes() >= buf.len());
106 let len = buf.len();
107 debug_assert!(len % self.block_size == 0);
108 buf.as_mut_slice().copy_from_slice(&self.buf[..len]);
109 let offset = self.checkpoint.file_offset;
110 self.journal_checkpoint_offset.set(offset);
111 self.buf.drain(..len);
112 self.checkpoint.file_offset += len as u64;
113 self.checkpoint.checksum = self.last_checksum;
114 offset
115 }
116
117 pub fn seek(&mut self, checkpoint: JournalCheckpoint) {
127 assert!(self.buf.is_empty());
128 assert!(checkpoint.file_offset % self.block_size as u64 == 0);
129 self.checkpoint = checkpoint;
130 self.last_checksum = self.checkpoint.checksum;
131 self.journal_checkpoint_offset.set(self.checkpoint.file_offset);
132 }
133}
134
135impl std::io::Write for JournalWriter {
136 fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
137 let mut offset = 0;
138 while offset < buf.len() {
139 let space = self.block_size
140 - std::mem::size_of::<Checksum>()
141 - self.buf.len() % self.block_size;
142 let to_copy = min(space, buf.len() - offset);
143 self.buf.write_all(&buf[offset..offset + to_copy])?;
144 if to_copy == space {
145 let end = self.buf.len();
146 let start = end + std::mem::size_of::<Checksum>() - self.block_size;
147 self.last_checksum = fletcher64(&self.buf[start..end], self.last_checksum);
148 self.buf.write_u64::<LittleEndian>(self.last_checksum)?;
149 }
150 offset += to_copy;
151 }
152 Ok(buf.len())
153 }
154
155 fn flush(&mut self) -> std::io::Result<()> {
158 Ok(())
159 }
160}
161
162impl Drop for JournalWriter {
163 fn drop(&mut self) {
164 if self.buf.len() > 0 {
167 warn!("journal data dropped!");
168 }
169 }
170}
171
172#[cfg(test)]
173mod tests {
174 use super::JournalWriter;
175 use crate::checksum::{fletcher64, Checksum};
176 use crate::object_handle::{ObjectHandle, ReadObjectHandle, WriteObjectHandle};
177 use crate::object_store::journal::JournalCheckpoint;
178 use crate::serialized_types::*;
179 use crate::testing::fake_object::{FakeObject, FakeObjectHandle};
180 use byteorder::{ByteOrder, LittleEndian};
181 use std::sync::Arc;
182
183 const TEST_BLOCK_SIZE: usize = 512;
184
185 #[fuchsia::test]
186 async fn test_write_single_record_and_pad() {
187 let object = Arc::new(FakeObject::new());
188 let mut writer = JournalWriter::new(TEST_BLOCK_SIZE, 0);
189 writer.write_record(&4u32).unwrap();
190 writer.pad_to_block().expect("pad_to_block failed");
191 let handle = FakeObjectHandle::new(object.clone());
192 let mut buf = handle.allocate_buffer(writer.flushable_bytes()).await;
193 let offset = writer.take_flushable(buf.as_mut());
194 handle.write_or_append(Some(offset), buf.as_ref()).await.expect("overwrite failed");
195
196 let handle = FakeObjectHandle::new(object.clone());
197 let mut buf = handle.allocate_buffer(object.get_size() as usize).await;
198 assert_eq!(buf.len(), TEST_BLOCK_SIZE);
199 handle.read(0, buf.as_mut()).await.expect("read failed");
200 let mut cursor = std::io::Cursor::new(buf.as_slice());
201 let value: u32 =
202 u32::deserialize_from(&mut cursor, LATEST_VERSION).expect("deserialize_from failed");
203 assert_eq!(value, 4u32);
204 let (payload, checksum_slice) =
205 buf.as_slice().split_at(buf.len() - std::mem::size_of::<Checksum>());
206 let checksum = LittleEndian::read_u64(checksum_slice);
207 assert_eq!(checksum, fletcher64(payload, 0));
208 assert_eq!(
209 writer.journal_file_checkpoint(),
210 JournalCheckpoint {
211 file_offset: TEST_BLOCK_SIZE as u64,
212 checksum,
213 version: LATEST_VERSION,
214 }
215 );
216 }
217
218 #[fuchsia::test]
219 async fn test_journal_file_checkpoint() {
220 let object = Arc::new(FakeObject::new());
221 let mut writer = JournalWriter::new(TEST_BLOCK_SIZE, 0);
222 writer.write_record(&4u32).unwrap();
223 let checkpoint = writer.journal_file_checkpoint();
224 assert_eq!(checkpoint.checksum, 0);
225 writer.write_record(&17u64).unwrap();
226 writer.pad_to_block().expect("pad_to_block failed");
227 let handle = FakeObjectHandle::new(object.clone());
228 let mut buf = handle.allocate_buffer(writer.flushable_bytes()).await;
229 let offset = writer.take_flushable(buf.as_mut());
230 handle.write_or_append(Some(offset), buf.as_ref()).await.expect("overwrite failed");
231
232 let handle = FakeObjectHandle::new(object.clone());
233 let mut buf = handle.allocate_buffer(object.get_size() as usize).await;
234 assert_eq!(buf.len(), TEST_BLOCK_SIZE);
235 handle.read(0, buf.as_mut()).await.expect("read failed");
236 let mut cursor = std::io::Cursor::new(&buf.as_slice()[checkpoint.file_offset as usize..]);
237 let value: u64 =
238 u64::deserialize_from(&mut cursor, LATEST_VERSION).expect("deserialize_from failed");
239 assert_eq!(value, 17);
240 }
241}