1use anyhow::{anyhow, Error};
6use block_server::async_interface::{Interface, SessionManager};
7use block_server::{BlockInfo, BlockServer, DeviceInfo, WriteOptions};
8use fidl::endpoints::{create_endpoints, ClientEnd, FromClient, RequestStream, ServerEnd};
9use fs_management::filesystem::BlockConnector;
10use std::borrow::Cow;
11use std::num::NonZero;
12use std::sync::Arc;
13use {fidl_fuchsia_hardware_block as fblock, fidl_fuchsia_hardware_block_volume as fvolume};
14
15pub enum WriteAction {
17 Write,
18 Discard,
19 Fail,
20}
21
22pub trait Observer: Send + Sync {
23 fn read(
24 &self,
25 _device_block_offset: u64,
26 _block_count: u32,
27 _vmo: &Arc<zx::Vmo>,
28 _vmo_offset: u64,
29 ) {
30 }
31
32 fn write(
33 &self,
34 _device_block_offset: u64,
35 _block_count: u32,
36 _vmo: &Arc<zx::Vmo>,
37 _vmo_offset: u64,
38 _opts: WriteOptions,
39 ) -> WriteAction {
40 WriteAction::Write
41 }
42
43 fn barrier(&self) {}
44
45 fn flush(&self) {}
46
47 fn trim(&self, _device_block_offset: u64, _block_count: u32) {}
48}
49
50pub struct VmoBackedServer {
52 server: BlockServer<SessionManager<Data>>,
53}
54
55pub enum InitialContents<'a> {
57 FromCapacity(u64),
59 FromBufferAndCapactity(u64, &'a [u8]),
62 FromBuffer(&'a [u8]),
65 FromVmo(zx::Vmo),
67}
68
69pub struct VmoBackedServerOptions<'a> {
70 pub info: DeviceInfo,
72 pub block_size: u32,
73 pub initial_contents: InitialContents<'a>,
74 pub observer: Option<Box<dyn Observer>>,
75}
76
77impl Default for VmoBackedServerOptions<'_> {
78 fn default() -> Self {
79 VmoBackedServerOptions {
80 info: DeviceInfo::Block(BlockInfo {
81 device_flags: fblock::Flag::empty(),
82 block_count: 0,
83 max_transfer_blocks: None,
84 }),
85 block_size: 512,
86 initial_contents: InitialContents::FromCapacity(0),
87 observer: None,
88 }
89 }
90}
91
92impl VmoBackedServerOptions<'_> {
93 pub fn build(self) -> Result<VmoBackedServer, Error> {
94 let (data, block_count) = match self.initial_contents {
95 InitialContents::FromCapacity(block_count) => {
96 (zx::Vmo::create(block_count * self.block_size as u64)?, block_count)
97 }
98 InitialContents::FromBufferAndCapactity(block_count, buf) => {
99 let needed =
100 buf.len()
101 .checked_next_multiple_of(self.block_size as usize)
102 .ok_or_else(|| anyhow!("Invalid buffer size"))? as u64
103 / self.block_size as u64;
104 if needed > block_count {
105 return Err(anyhow!("Not enough capacity: {needed} vs {block_count}"));
106 }
107 let vmo = zx::Vmo::create(block_count * self.block_size as u64)?;
108 vmo.write(buf, 0)?;
109 (vmo, block_count)
110 }
111 InitialContents::FromBuffer(buf) => {
112 let block_count =
113 buf.len()
114 .checked_next_multiple_of(self.block_size as usize)
115 .ok_or_else(|| anyhow!("Invalid buffer size"))? as u64
116 / self.block_size as u64;
117 let vmo = zx::Vmo::create(block_count * self.block_size as u64)?;
118 vmo.write(buf, 0)?;
119 (vmo, block_count)
120 }
121 InitialContents::FromVmo(vmo) => {
122 let size = vmo.get_size()?;
123 let block_count = size / self.block_size as u64;
124 (vmo, block_count)
125 }
126 };
127
128 let info = match self.info {
129 DeviceInfo::Block(mut info) => {
130 info.block_count = block_count;
131 DeviceInfo::Block(info)
132 }
133 DeviceInfo::Partition(mut info) => {
134 info.block_range = Some(0..block_count);
135 DeviceInfo::Partition(info)
136 }
137 };
138 Ok(VmoBackedServer {
139 server: BlockServer::new(
140 self.block_size,
141 Arc::new(Data { info, block_size: self.block_size, data, observer: self.observer }),
142 ),
143 })
144 }
145}
146
147impl VmoBackedServer {
148 pub async fn serve(&self, requests: fvolume::VolumeRequestStream) -> Result<(), Error> {
150 self.server.handle_requests(requests).await
151 }
152}
153
154pub struct VmoBackedServerConnector {
156 scope: fuchsia_async::Scope,
157 server: Arc<VmoBackedServer>,
158}
159
160impl VmoBackedServerConnector {
161 pub fn new(scope: fuchsia_async::Scope, server: Arc<VmoBackedServer>) -> Self {
162 Self { scope, server }
163 }
164}
165
166impl BlockConnector for VmoBackedServerConnector {
167 fn connect_channel_to_volume(
168 &self,
169 server_end: ServerEnd<fvolume::VolumeMarker>,
170 ) -> Result<(), Error> {
171 let server = self.server.clone();
172 let _ = self.scope.spawn(async move {
173 let _ = server.serve(server_end.into_stream()).await;
174 });
175 Ok(())
176 }
177}
178
179pub trait VmoBackedServerTestingExt {
182 fn new(block_count: u64, block_size: u32, initial_content: &[u8]) -> Self;
183 fn from_vmo(block_size: u32, vmo: zx::Vmo) -> Self;
184 fn connect_server(self: &Arc<Self>, server: ServerEnd<fvolume::VolumeMarker>);
185 fn connect<R: BlockClient>(self: &Arc<Self>) -> R;
186}
187
188pub trait BlockClient: FromClient {}
189
190impl BlockClient for fblock::BlockProxy {}
191impl BlockClient for fvolume::VolumeProxy {}
192impl BlockClient for fblock::BlockSynchronousProxy {}
193impl BlockClient for fvolume::VolumeSynchronousProxy {}
194impl BlockClient for ClientEnd<fblock::BlockMarker> {}
195impl BlockClient for ClientEnd<fvolume::VolumeMarker> {}
196
197impl VmoBackedServerTestingExt for VmoBackedServer {
198 fn new(block_count: u64, block_size: u32, initial_content: &[u8]) -> Self {
199 VmoBackedServerOptions {
200 block_size,
201 initial_contents: InitialContents::FromBufferAndCapactity(block_count, initial_content),
202 ..Default::default()
203 }
204 .build()
205 .unwrap()
206 }
207 fn from_vmo(block_size: u32, vmo: zx::Vmo) -> Self {
208 VmoBackedServerOptions {
209 block_size,
210 initial_contents: InitialContents::FromVmo(vmo),
211 ..Default::default()
212 }
213 .build()
214 .unwrap()
215 }
216
217 fn connect<R: BlockClient>(self: &Arc<Self>) -> R {
218 let (client, server) = create_endpoints::<R::Protocol>();
219 let this = self.clone();
220 fuchsia_async::Task::spawn(async move {
221 let _ = this.serve(server.into_stream().cast_stream()).await;
222 })
223 .detach();
224 R::from_client(client)
225 }
226
227 fn connect_server(self: &Arc<Self>, server: ServerEnd<fvolume::VolumeMarker>) {
228 let this = self.clone();
229 fuchsia_async::Task::spawn(async move {
230 let _ = this.serve(server.into_stream()).await;
231 })
232 .detach();
233 }
234}
235
236struct Data {
237 info: DeviceInfo,
238 block_size: u32,
239 data: zx::Vmo,
240 observer: Option<Box<dyn Observer>>,
241}
242
243impl Interface for Data {
244 async fn get_info(&self) -> Result<Cow<'_, DeviceInfo>, zx::Status> {
245 Ok(Cow::Borrowed(&self.info))
246 }
247
248 async fn read(
249 &self,
250 device_block_offset: u64,
251 block_count: u32,
252 vmo: &Arc<zx::Vmo>,
253 vmo_offset: u64,
254 _trace_flow_id: Option<NonZero<u64>>,
255 ) -> Result<(), zx::Status> {
256 if let Some(observer) = self.observer.as_ref() {
257 observer.read(device_block_offset, block_count, vmo, vmo_offset);
258 }
259 if let Some(max) = self.info.max_transfer_blocks().as_ref() {
260 assert!(block_count <= max.get());
262 }
263 if device_block_offset + block_count as u64 > self.info.block_count().unwrap() {
264 Err(zx::Status::OUT_OF_RANGE)
265 } else {
266 vmo.write(
267 &self.data.read_to_vec(
268 device_block_offset * self.block_size as u64,
269 block_count as u64 * self.block_size as u64,
270 )?,
271 vmo_offset,
272 )
273 }
274 }
275
276 async fn write(
277 &self,
278 device_block_offset: u64,
279 block_count: u32,
280 vmo: &Arc<zx::Vmo>,
281 vmo_offset: u64,
282 opts: WriteOptions,
283 _trace_flow_id: Option<NonZero<u64>>,
284 ) -> Result<(), zx::Status> {
285 if let Some(observer) = self.observer.as_ref() {
286 match observer.write(device_block_offset, block_count, vmo, vmo_offset, opts) {
287 WriteAction::Write => {}
288 WriteAction::Discard => return Ok(()),
289 WriteAction::Fail => return Err(zx::Status::IO),
290 }
291 }
292 if let Some(max) = self.info.max_transfer_blocks().as_ref() {
293 assert!(block_count <= max.get());
295 }
296 if device_block_offset + block_count as u64 > self.info.block_count().unwrap() {
297 Err(zx::Status::OUT_OF_RANGE)
298 } else {
299 self.data.write(
300 &vmo.read_to_vec(vmo_offset, block_count as u64 * self.block_size as u64)?,
301 device_block_offset * self.block_size as u64,
302 )
303 }
304 }
305
306 fn barrier(&self) -> Result<(), zx::Status> {
307 if let Some(observer) = self.observer.as_ref() {
308 observer.barrier();
309 }
310 Ok(())
311 }
312
313 async fn flush(&self, _trace_flow_id: Option<NonZero<u64>>) -> Result<(), zx::Status> {
314 if let Some(observer) = self.observer.as_ref() {
315 observer.flush();
316 }
317 Ok(())
318 }
319
320 async fn trim(
321 &self,
322 device_block_offset: u64,
323 block_count: u32,
324 _trace_flow_id: Option<NonZero<u64>>,
325 ) -> Result<(), zx::Status> {
326 if let Some(observer) = self.observer.as_ref() {
327 observer.trim(device_block_offset, block_count);
328 }
329 if device_block_offset + block_count as u64 > self.info.block_count().unwrap() {
330 Err(zx::Status::OUT_OF_RANGE)
331 } else {
332 Ok(())
333 }
334 }
335}