1use crate::gpt::GptManager;
6use anyhow::{Context as _, Error};
7use block_client::RemoteBlockClient;
8use fidl::endpoints::{DiscoverableProtocolMarker as _, RequestStream as _, ServiceMarker as _};
9use futures::lock::Mutex as AsyncMutex;
10use futures::stream::TryStreamExt as _;
11use std::sync::Arc;
12use vfs::directory::helper::DirectlyMutable as _;
13use vfs::execution_scope::ExecutionScope;
14use {
15 fidl_fuchsia_fs as ffs, fidl_fuchsia_fs_startup as fstartup,
16 fidl_fuchsia_hardware_block as fblock, fidl_fuchsia_io as fio,
17 fidl_fuchsia_process_lifecycle as flifecycle, fidl_fuchsia_storage_partitions as fpartitions,
18 fuchsia_async as fasync,
19};
20
21pub struct StorageHostService {
22 state: AsyncMutex<State>,
23
24 scope: ExecutionScope,
26
27 export_dir: Arc<vfs::directory::immutable::Simple>,
29
30 partitions_dir: Arc<vfs::directory::immutable::Simple>,
32}
33
34#[derive(Default)]
35enum State {
36 #[default]
37 Stopped,
38 NeedsFormatting(fblock::BlockProxy),
41 Running(Arc<GptManager>),
42}
43
44impl State {
45 fn is_stopped(&self) -> bool {
46 if let Self::Stopped = self {
47 true
48 } else {
49 false
50 }
51 }
52}
53
54impl StorageHostService {
55 pub fn new() -> Arc<Self> {
56 let export_dir = vfs::directory::immutable::simple();
57 let partitions_dir = vfs::directory::immutable::simple();
58 Arc::new(Self {
59 state: Default::default(),
60 scope: ExecutionScope::new(),
61 export_dir,
62 partitions_dir,
63 })
64 }
65
66 pub async fn run(
67 self: Arc<Self>,
68 outgoing_dir: zx::Channel,
69 lifecycle_channel: Option<zx::Channel>,
70 ) -> Result<(), Error> {
71 let svc_dir = vfs::directory::immutable::simple();
72 self.export_dir.add_entry("svc", svc_dir.clone()).expect("Unable to create svc dir");
73
74 svc_dir
75 .add_entry(
76 fpartitions::PartitionServiceMarker::SERVICE_NAME,
77 self.partitions_dir.clone(),
78 )
79 .unwrap();
80 let weak = Arc::downgrade(&self);
81 svc_dir
82 .add_entry(
83 fstartup::StartupMarker::PROTOCOL_NAME,
84 vfs::service::host(move |requests| {
85 let weak = weak.clone();
86 async move {
87 if let Some(me) = weak.upgrade() {
88 let _ = me.handle_start_requests(requests).await;
89 }
90 }
91 }),
92 )
93 .unwrap();
94 let weak = Arc::downgrade(&self);
95 svc_dir
96 .add_entry(
97 fpartitions::PartitionsAdminMarker::PROTOCOL_NAME,
98 vfs::service::host(move |requests| {
99 let weak = weak.clone();
100 async move {
101 if let Some(me) = weak.upgrade() {
102 let _ = me.handle_partitions_admin_requests(requests).await;
103 }
104 }
105 }),
106 )
107 .unwrap();
108 let weak = Arc::downgrade(&self);
109 svc_dir
110 .add_entry(
111 fpartitions::PartitionsManagerMarker::PROTOCOL_NAME,
112 vfs::service::host(move |requests| {
113 let weak = weak.clone();
114 async move {
115 if let Some(me) = weak.upgrade() {
116 let _ = me.handle_partitions_manager_requests(requests).await;
117 }
118 }
119 }),
120 )
121 .unwrap();
122 let weak = Arc::downgrade(&self);
123 svc_dir
124 .add_entry(
125 ffs::AdminMarker::PROTOCOL_NAME,
126 vfs::service::host(move |requests| {
127 let weak = weak.clone();
128 async move {
129 if let Some(me) = weak.upgrade() {
130 let _ = me.handle_admin_requests(requests).await;
131 }
132 }
133 }),
134 )
135 .unwrap();
136
137 vfs::directory::serve_on(
138 self.export_dir.clone(),
139 fio::PERM_READABLE | fio::PERM_WRITABLE | fio::PERM_EXECUTABLE,
140 self.scope.clone(),
141 fidl::endpoints::ServerEnd::new(outgoing_dir),
142 );
143
144 if let Some(channel) = lifecycle_channel {
145 let me = self.clone();
146 self.scope.spawn(async move {
147 if let Err(e) = me.handle_lifecycle_requests(channel).await {
148 log::warn!(error:? = e; "handle_lifecycle_requests");
149 }
150 });
151 }
152
153 self.scope.wait().await;
154
155 Ok(())
156 }
157
158 async fn handle_start_requests(
159 self: Arc<Self>,
160 mut stream: fstartup::StartupRequestStream,
161 ) -> Result<(), Error> {
162 while let Some(request) = stream.try_next().await.context("Reading request")? {
163 log::debug!(request:?; "");
164 match request {
165 fstartup::StartupRequest::Start { device, options: _, responder } => {
166 responder
167 .send(
168 self.start(device.into_proxy())
169 .await
170 .map_err(|status| status.into_raw()),
171 )
172 .unwrap_or_else(|e| log::error!(e:?; "Failed to send Start response"));
173 }
174 fstartup::StartupRequest::Format { responder, .. } => {
175 responder
176 .send(Err(zx::Status::NOT_SUPPORTED.into_raw()))
177 .unwrap_or_else(|e| log::error!(e:?; "Failed to send Check response"));
178 }
179 fstartup::StartupRequest::Check { responder, .. } => {
180 responder
181 .send(Err(zx::Status::NOT_SUPPORTED.into_raw()))
182 .unwrap_or_else(|e| log::error!(e:?; "Failed to send Check response"));
183 }
184 }
185 }
186 Ok(())
187 }
188
189 async fn start(self: &Arc<Self>, device: fblock::BlockProxy) -> Result<(), zx::Status> {
190 let mut state = self.state.lock().await;
191 if !state.is_stopped() {
192 log::warn!("Device already bound");
193 return Err(zx::Status::ALREADY_BOUND);
194 }
195
196 *state = match GptManager::new(device.clone(), self.partitions_dir.clone()).await {
205 Ok(runner) => State::Running(runner),
206 Err(err) => {
207 log::warn!(err:?; "Failed to load GPT. Reformatting may be required.");
211 State::NeedsFormatting(device)
212 }
213 };
214 Ok(())
215 }
216
217 async fn handle_partitions_manager_requests(
218 self: Arc<Self>,
219 mut stream: fpartitions::PartitionsManagerRequestStream,
220 ) -> Result<(), Error> {
221 while let Some(request) = stream.try_next().await.context("Reading request")? {
222 log::debug!(request:?; "");
223 match request {
224 fpartitions::PartitionsManagerRequest::GetBlockInfo { responder } => {
225 responder
226 .send(self.get_block_info().await.map_err(|status| status.into_raw()))
227 .unwrap_or_else(
228 |e| log::error!(e:?; "Failed to send GetBlockInfo response"),
229 );
230 }
231 fpartitions::PartitionsManagerRequest::CreateTransaction { responder } => {
232 responder
233 .send(self.create_transaction().await.map_err(|status| status.into_raw()))
234 .unwrap_or_else(
235 |e| log::error!(e:?; "Failed to send CreateTransaction response"),
236 );
237 }
238 fpartitions::PartitionsManagerRequest::CommitTransaction {
239 transaction,
240 responder,
241 } => {
242 responder
243 .send(
244 self.commit_transaction(transaction)
245 .await
246 .map_err(|status| status.into_raw()),
247 )
248 .unwrap_or_else(
249 |e| log::error!(e:?; "Failed to send CommitTransaction response"),
250 );
251 }
252 fpartitions::PartitionsManagerRequest::AddPartition { payload, responder } => {
253 responder
254 .send(self.add_partition(payload).await.map_err(|status| status.into_raw()))
255 .unwrap_or_else(
256 |e| log::error!(e:?; "Failed to send AddPartition response"),
257 );
258 }
259 }
260 }
261 Ok(())
262 }
263
264 async fn get_block_info(&self) -> Result<(u64, u32), zx::Status> {
265 let state = self.state.lock().await;
266 match &*state {
267 State::Stopped => return Err(zx::Status::BAD_STATE),
268 State::NeedsFormatting(block) => {
269 let info = block
270 .get_info()
271 .await
272 .map_err(|err| {
273 log::error!(err:?; "get_block_info: failed to query block info");
274 zx::Status::IO
275 })?
276 .map_err(zx::Status::from_raw)?;
277 Ok((info.block_count, info.block_size))
278 }
279 State::Running(gpt) => Ok((gpt.block_count(), gpt.block_size())),
280 }
281 }
282
283 async fn create_transaction(&self) -> Result<zx::EventPair, zx::Status> {
284 let gpt_manager = self.gpt_manager().await?;
285 gpt_manager.create_transaction().await
286 }
287
288 async fn commit_transaction(&self, transaction: zx::EventPair) -> Result<(), zx::Status> {
289 let gpt_manager = self.gpt_manager().await?;
290 gpt_manager.commit_transaction(transaction).await
291 }
292
293 async fn add_partition(
294 &self,
295 request: fpartitions::PartitionsManagerAddPartitionRequest,
296 ) -> Result<(), zx::Status> {
297 let gpt_manager = self.gpt_manager().await?;
298 gpt_manager.add_partition(request).await
299 }
300
301 async fn handle_partitions_admin_requests(
302 self: Arc<Self>,
303 mut stream: fpartitions::PartitionsAdminRequestStream,
304 ) -> Result<(), Error> {
305 while let Some(request) = stream.try_next().await.context("Reading request")? {
306 log::debug!(request:?; "");
307 match request {
308 fpartitions::PartitionsAdminRequest::ResetPartitionTable {
309 partitions,
310 responder,
311 } => {
312 responder
313 .send(
314 self.reset_partition_table(partitions)
315 .await
316 .map_err(|status| status.into_raw()),
317 )
318 .unwrap_or_else(|e| log::error!(e:?; "Failed to send Start response"));
319 }
320 }
321 }
322 Ok(())
323 }
324
325 async fn reset_partition_table(
326 &self,
327 partitions: Vec<fpartitions::PartitionInfo>,
328 ) -> Result<(), zx::Status> {
329 fn convert_partition_info(info: fpartitions::PartitionInfo) -> gpt::PartitionInfo {
330 gpt::PartitionInfo {
331 label: info.name,
332 type_guid: gpt::Guid::from_bytes(info.type_guid.value),
333 instance_guid: gpt::Guid::from_bytes(info.instance_guid.value),
334 start_block: info.start_block,
335 num_blocks: info.num_blocks,
336 flags: info.flags,
337 }
338 }
339 let partitions = partitions.into_iter().map(convert_partition_info).collect::<Vec<_>>();
340
341 let mut state = self.state.lock().await;
342 match &mut *state {
343 State::Stopped => return Err(zx::Status::BAD_STATE),
344 State::NeedsFormatting(block) => {
345 log::info!("reset_partition_table: Reformatting GPT.");
346 let client = Arc::new(RemoteBlockClient::new(&*block).await?);
347
348 log::info!("reset_partition_table: Reformatting GPT...");
349 gpt::Gpt::format(client, partitions).await.map_err(|err| {
350 log::error!(err:?; "reset_partition_table: failed to init GPT");
351 zx::Status::IO
352 })?;
353 *state = State::Running(
354 GptManager::new(block.clone(), self.partitions_dir.clone()).await.map_err(
355 |err| {
356 log::error!(err:?; "reset_partition_table: failed to re-launch GPT");
357 zx::Status::BAD_STATE
358 },
359 )?,
360 );
361 }
362 State::Running(gpt) => {
363 log::info!("reset_partition_table: Updating GPT.");
364 gpt.reset_partition_table(partitions).await?;
365 }
366 }
367 Ok(())
368 }
369
370 async fn handle_admin_requests(
371 &self,
372 mut stream: ffs::AdminRequestStream,
373 ) -> Result<(), Error> {
374 if let Some(request) = stream.try_next().await.context("Reading request")? {
375 match request {
376 ffs::AdminRequest::Shutdown { responder } => {
377 log::info!("Received Admin::Shutdown request");
378 self.shutdown().await;
379 responder
380 .send()
381 .unwrap_or_else(|e| log::error!(e:?; "Failed to send shutdown response"));
382 log::info!("Admin shutdown complete");
383 }
384 }
385 }
386 Ok(())
387 }
388
389 async fn handle_lifecycle_requests(&self, lifecycle_channel: zx::Channel) -> Result<(), Error> {
390 let mut stream = flifecycle::LifecycleRequestStream::from_channel(
391 fasync::Channel::from_channel(lifecycle_channel),
392 );
393 match stream.try_next().await.context("Reading request")? {
394 Some(flifecycle::LifecycleRequest::Stop { .. }) => {
395 log::info!("Received Lifecycle::Stop request");
396 self.shutdown().await;
397 log::info!("Lifecycle shutdown complete");
398 }
399 None => {}
400 }
401 Ok(())
402 }
403
404 async fn gpt_manager(&self) -> Result<Arc<GptManager>, zx::Status> {
405 match &*self.state.lock().await {
406 State::Stopped | State::NeedsFormatting(_) => Err(zx::Status::BAD_STATE),
407 State::Running(gpt) => Ok(gpt.clone()),
408 }
409 }
410
411 async fn shutdown(&self) {
412 let mut state = self.state.lock().await;
413 if let State::Running(gpt) = std::mem::take(&mut *state) {
414 gpt.shutdown().await;
415 }
416 self.scope.shutdown();
417 }
418}
419
420#[cfg(test)]
421mod tests {
422 use super::StorageHostService;
423 use block_client::RemoteBlockClient;
424 use fake_block_server::FakeServer;
425 use fidl::endpoints::Proxy as _;
426 use fidl_fuchsia_process_lifecycle::LifecycleMarker;
427 use fuchsia_component::client::connect_to_protocol_at_dir_svc;
428 use futures::FutureExt as _;
429 use gpt::{Gpt, Guid, PartitionInfo};
430 use std::sync::Arc;
431 use {
432 fidl_fuchsia_fs as ffs, fidl_fuchsia_fs_startup as fstartup,
433 fidl_fuchsia_hardware_block as fblock, fidl_fuchsia_hardware_block_volume as fvolume,
434 fidl_fuchsia_io as fio, fidl_fuchsia_storage_partitions as fpartitions,
435 fuchsia_async as fasync,
436 };
437
438 async fn setup_server(
439 block_size: u32,
440 block_count: u64,
441 partitions: Vec<PartitionInfo>,
442 ) -> Arc<FakeServer> {
443 let vmo = zx::Vmo::create(block_size as u64 * block_count).unwrap();
444 let server = Arc::new(FakeServer::from_vmo(512, vmo));
445 {
446 let (block_client, block_server) =
447 fidl::endpoints::create_proxy::<fblock::BlockMarker>();
448 let volume_stream = fidl::endpoints::ServerEnd::<fvolume::VolumeMarker>::from(
449 block_server.into_channel(),
450 )
451 .into_stream();
452 let server_clone = server.clone();
453 let _task = fasync::Task::spawn(async move { server_clone.serve(volume_stream).await });
454 let client = Arc::new(RemoteBlockClient::new(block_client).await.unwrap());
455 Gpt::format(client, partitions).await.unwrap();
456 }
457 server
458 }
459
460 #[fuchsia::test]
461 async fn lifecycle() {
462 let (outgoing_dir, outgoing_dir_server) =
463 fidl::endpoints::create_proxy::<fio::DirectoryMarker>();
464 let (lifecycle_client, lifecycle_server) =
465 fidl::endpoints::create_proxy::<LifecycleMarker>();
466 let (block_client, block_server) =
467 fidl::endpoints::create_endpoints::<fblock::BlockMarker>();
468 let volume_stream =
469 fidl::endpoints::ServerEnd::<fvolume::VolumeMarker>::from(block_server.into_channel())
470 .into_stream();
471
472 futures::join!(
473 async {
474 let client =
476 connect_to_protocol_at_dir_svc::<fstartup::StartupMarker>(&outgoing_dir)
477 .unwrap();
478 client
479 .start(block_client, &fstartup::StartOptions::default())
480 .await
481 .expect("FIDL error")
482 .expect("Start failed");
483 lifecycle_client.stop().expect("Stop failed");
484 fasync::OnSignals::new(
485 &lifecycle_client.into_channel().expect("into_channel failed"),
486 zx::Signals::CHANNEL_PEER_CLOSED,
487 )
488 .await
489 .expect("OnSignals failed");
490 },
491 async {
492 let service = StorageHostService::new();
494 service
495 .run(outgoing_dir_server.into_channel(), Some(lifecycle_server.into_channel()))
496 .await
497 .expect("Run failed");
498 },
499 async {
500 let server = setup_server(
502 512,
503 8,
504 vec![PartitionInfo {
505 label: "part".to_string(),
506 type_guid: Guid::from_bytes([0xabu8; 16]),
507 instance_guid: Guid::from_bytes([0xcdu8; 16]),
508 start_block: 4,
509 num_blocks: 1,
510 flags: 0,
511 }],
512 )
513 .await;
514 let _ = server.serve(volume_stream).await;
515 }
516 .fuse(),
517 );
518 }
519
520 #[fuchsia::test]
521 async fn admin_shutdown() {
522 let (outgoing_dir, outgoing_dir_server) =
523 fidl::endpoints::create_proxy::<fio::DirectoryMarker>();
524 let (block_client, block_server) =
525 fidl::endpoints::create_endpoints::<fblock::BlockMarker>();
526 let volume_stream =
527 fidl::endpoints::ServerEnd::<fvolume::VolumeMarker>::from(block_server.into_channel())
528 .into_stream();
529
530 futures::join!(
531 async {
532 let client =
534 connect_to_protocol_at_dir_svc::<fstartup::StartupMarker>(&outgoing_dir)
535 .unwrap();
536 let admin_client =
537 connect_to_protocol_at_dir_svc::<ffs::AdminMarker>(&outgoing_dir).unwrap();
538 client
539 .start(block_client, &fstartup::StartOptions::default())
540 .await
541 .expect("FIDL error")
542 .expect("Start failed");
543 admin_client.shutdown().await.expect("admin shutdown failed");
544 fasync::OnSignals::new(
545 &admin_client.into_channel().expect("into_channel failed"),
546 zx::Signals::CHANNEL_PEER_CLOSED,
547 )
548 .await
549 .expect("OnSignals failed");
550 },
551 async {
552 let service = StorageHostService::new();
554 service.run(outgoing_dir_server.into_channel(), None).await.expect("Run failed");
555 },
556 async {
557 let server = setup_server(
559 512,
560 8,
561 vec![PartitionInfo {
562 label: "part".to_string(),
563 type_guid: Guid::from_bytes([0xabu8; 16]),
564 instance_guid: Guid::from_bytes([0xcdu8; 16]),
565 start_block: 4,
566 num_blocks: 1,
567 flags: 0,
568 }],
569 )
570 .await;
571 let _ = server.serve(volume_stream).await;
572 }
573 .fuse(),
574 );
575 }
576
577 #[fuchsia::test]
578 async fn transaction_lifecycle() {
579 let (outgoing_dir, outgoing_dir_server) =
580 fidl::endpoints::create_proxy::<fio::DirectoryMarker>();
581 let (lifecycle_client, lifecycle_server) =
582 fidl::endpoints::create_proxy::<LifecycleMarker>();
583 let (block_client, block_server) =
584 fidl::endpoints::create_endpoints::<fblock::BlockMarker>();
585 let volume_stream =
586 fidl::endpoints::ServerEnd::<fvolume::VolumeMarker>::from(block_server.into_channel())
587 .into_stream();
588
589 futures::join!(
590 async {
591 connect_to_protocol_at_dir_svc::<fstartup::StartupMarker>(&outgoing_dir)
593 .unwrap()
594 .start(block_client, &fstartup::StartOptions::default())
595 .await
596 .expect("FIDL error")
597 .expect("Start failed");
598
599 let pm_client = connect_to_protocol_at_dir_svc::<
600 fpartitions::PartitionsManagerMarker,
601 >(&outgoing_dir)
602 .unwrap();
603 let transaction = pm_client
604 .create_transaction()
605 .await
606 .expect("FIDL error")
607 .expect("create_transaction failed");
608
609 pm_client
610 .create_transaction()
611 .await
612 .expect("FIDL error")
613 .expect_err("create_transaction should fail while other txn exists");
614
615 pm_client
616 .commit_transaction(transaction)
617 .await
618 .expect("FIDL error")
619 .expect("commit_transaction failed");
620
621 {
622 let _transaction = pm_client
623 .create_transaction()
624 .await
625 .expect("FIDL error")
626 .expect("create_transaction should succeed after committing txn");
627 }
628
629 pm_client
630 .create_transaction()
631 .await
632 .expect("FIDL error")
633 .expect("create_transaction should succeed after dropping txn");
634
635 lifecycle_client.stop().expect("Stop failed");
636 fasync::OnSignals::new(
637 &lifecycle_client.into_channel().expect("into_channel failed"),
638 zx::Signals::CHANNEL_PEER_CLOSED,
639 )
640 .await
641 .expect("OnSignals failed");
642 },
643 async {
644 let service = StorageHostService::new();
646 service
647 .run(outgoing_dir_server.into_channel(), Some(lifecycle_server.into_channel()))
648 .await
649 .expect("Run failed");
650 },
651 async {
652 let server = setup_server(
654 512,
655 16,
656 vec![PartitionInfo {
657 label: "part".to_string(),
658 type_guid: Guid::from_bytes([0xabu8; 16]),
659 instance_guid: Guid::from_bytes([0xcdu8; 16]),
660 start_block: 4,
661 num_blocks: 1,
662 flags: 0,
663 }],
664 )
665 .await;
666 let _ = server.serve(volume_stream).await;
667 }
668 .fuse(),
669 );
670 }
671}