1use crate::gpt::GptManager;
6use anyhow::{Context as _, Error};
7use block_client::RemoteBlockClient;
8use fidl::endpoints::{DiscoverableProtocolMarker as _, RequestStream as _, ServiceMarker as _};
9use futures::lock::Mutex as AsyncMutex;
10use futures::stream::TryStreamExt as _;
11use std::sync::Arc;
12use vfs::directory::entry_container::Directory as _;
13use vfs::directory::helper::DirectlyMutable as _;
14use vfs::execution_scope::ExecutionScope;
15use vfs::path::Path;
16use {
17 fidl_fuchsia_fs as ffs, fidl_fuchsia_fs_startup as fstartup,
18 fidl_fuchsia_hardware_block as fblock, fidl_fuchsia_io as fio,
19 fidl_fuchsia_process_lifecycle as flifecycle, fidl_fuchsia_storage_partitions as fpartitions,
20 fuchsia_async as fasync,
21};
22
23pub struct StorageHostService {
24 state: AsyncMutex<State>,
25
26 scope: ExecutionScope,
28
29 export_dir: Arc<vfs::directory::immutable::Simple>,
31
32 partitions_dir: Arc<vfs::directory::immutable::Simple>,
34}
35
36#[derive(Default)]
37enum State {
38 #[default]
39 Stopped,
40 NeedsFormatting(fblock::BlockProxy),
43 Running(Arc<GptManager>),
44}
45
46impl State {
47 fn is_stopped(&self) -> bool {
48 if let Self::Stopped = self {
49 true
50 } else {
51 false
52 }
53 }
54}
55
56impl StorageHostService {
57 pub fn new() -> Arc<Self> {
58 let export_dir = vfs::directory::immutable::simple();
59 let partitions_dir = vfs::directory::immutable::simple();
60 Arc::new(Self {
61 state: Default::default(),
62 scope: ExecutionScope::new(),
63 export_dir,
64 partitions_dir,
65 })
66 }
67
68 pub async fn run(
69 self: Arc<Self>,
70 outgoing_dir: zx::Channel,
71 lifecycle_channel: Option<zx::Channel>,
72 ) -> Result<(), Error> {
73 let svc_dir = vfs::directory::immutable::simple();
74 self.export_dir.add_entry("svc", svc_dir.clone()).expect("Unable to create svc dir");
75
76 svc_dir
77 .add_entry(
78 fpartitions::PartitionServiceMarker::SERVICE_NAME,
79 self.partitions_dir.clone(),
80 )
81 .unwrap();
82 let weak = Arc::downgrade(&self);
83 svc_dir
84 .add_entry(
85 fstartup::StartupMarker::PROTOCOL_NAME,
86 vfs::service::host(move |requests| {
87 let weak = weak.clone();
88 async move {
89 if let Some(me) = weak.upgrade() {
90 let _ = me.handle_start_requests(requests).await;
91 }
92 }
93 }),
94 )
95 .unwrap();
96 let weak = Arc::downgrade(&self);
97 svc_dir
98 .add_entry(
99 fpartitions::PartitionsAdminMarker::PROTOCOL_NAME,
100 vfs::service::host(move |requests| {
101 let weak = weak.clone();
102 async move {
103 if let Some(me) = weak.upgrade() {
104 let _ = me.handle_partitions_admin_requests(requests).await;
105 }
106 }
107 }),
108 )
109 .unwrap();
110 let weak = Arc::downgrade(&self);
111 svc_dir
112 .add_entry(
113 fpartitions::PartitionsManagerMarker::PROTOCOL_NAME,
114 vfs::service::host(move |requests| {
115 let weak = weak.clone();
116 async move {
117 if let Some(me) = weak.upgrade() {
118 let _ = me.handle_partitions_manager_requests(requests).await;
119 }
120 }
121 }),
122 )
123 .unwrap();
124 let weak = Arc::downgrade(&self);
125 svc_dir
126 .add_entry(
127 ffs::AdminMarker::PROTOCOL_NAME,
128 vfs::service::host(move |requests| {
129 let weak = weak.clone();
130 async move {
131 if let Some(me) = weak.upgrade() {
132 let _ = me.handle_admin_requests(requests).await;
133 }
134 }
135 }),
136 )
137 .unwrap();
138
139 self.export_dir.clone().open(
140 self.scope.clone(),
141 fio::OpenFlags::RIGHT_READABLE
142 | fio::OpenFlags::RIGHT_WRITABLE
143 | fio::OpenFlags::DIRECTORY
144 | fio::OpenFlags::RIGHT_EXECUTABLE,
145 Path::dot(),
146 outgoing_dir.into(),
147 );
148
149 if let Some(channel) = lifecycle_channel {
150 let me = self.clone();
151 self.scope.spawn(async move {
152 if let Err(e) = me.handle_lifecycle_requests(channel).await {
153 log::warn!(error:? = e; "handle_lifecycle_requests");
154 }
155 });
156 }
157
158 self.scope.wait().await;
159
160 Ok(())
161 }
162
163 async fn handle_start_requests(
164 self: Arc<Self>,
165 mut stream: fstartup::StartupRequestStream,
166 ) -> Result<(), Error> {
167 while let Some(request) = stream.try_next().await.context("Reading request")? {
168 log::debug!(request:?; "");
169 match request {
170 fstartup::StartupRequest::Start { device, options: _, responder } => {
171 responder
172 .send(
173 self.start(device.into_proxy())
174 .await
175 .map_err(|status| status.into_raw()),
176 )
177 .unwrap_or_else(|e| log::error!(e:?; "Failed to send Start response"));
178 }
179 fstartup::StartupRequest::Format { responder, .. } => {
180 responder
181 .send(Err(zx::Status::NOT_SUPPORTED.into_raw()))
182 .unwrap_or_else(|e| log::error!(e:?; "Failed to send Check response"));
183 }
184 fstartup::StartupRequest::Check { responder, .. } => {
185 responder
186 .send(Err(zx::Status::NOT_SUPPORTED.into_raw()))
187 .unwrap_or_else(|e| log::error!(e:?; "Failed to send Check response"));
188 }
189 }
190 }
191 Ok(())
192 }
193
194 async fn start(self: &Arc<Self>, device: fblock::BlockProxy) -> Result<(), zx::Status> {
195 let mut state = self.state.lock().await;
196 if !state.is_stopped() {
197 log::warn!("Device already bound");
198 return Err(zx::Status::ALREADY_BOUND);
199 }
200
201 *state = match GptManager::new(device.clone(), self.partitions_dir.clone()).await {
210 Ok(runner) => State::Running(runner),
211 Err(err) => {
212 log::error!(err:?; "Failed to load GPT. Reformatting may be required.");
213 State::NeedsFormatting(device)
214 }
215 };
216 Ok(())
217 }
218
219 async fn handle_partitions_manager_requests(
220 self: Arc<Self>,
221 mut stream: fpartitions::PartitionsManagerRequestStream,
222 ) -> Result<(), Error> {
223 while let Some(request) = stream.try_next().await.context("Reading request")? {
224 log::debug!(request:?; "");
225 match request {
226 fpartitions::PartitionsManagerRequest::GetBlockInfo { responder } => {
227 responder
228 .send(self.get_block_info().await.map_err(|status| status.into_raw()))
229 .unwrap_or_else(
230 |e| log::error!(e:?; "Failed to send GetBlockInfo response"),
231 );
232 }
233 fpartitions::PartitionsManagerRequest::CreateTransaction { responder } => {
234 responder
235 .send(self.create_transaction().await.map_err(|status| status.into_raw()))
236 .unwrap_or_else(
237 |e| log::error!(e:?; "Failed to send CreateTransaction response"),
238 );
239 }
240 fpartitions::PartitionsManagerRequest::CommitTransaction {
241 transaction,
242 responder,
243 } => {
244 responder
245 .send(
246 self.commit_transaction(transaction)
247 .await
248 .map_err(|status| status.into_raw()),
249 )
250 .unwrap_or_else(
251 |e| log::error!(e:?; "Failed to send CommitTransaction response"),
252 );
253 }
254 fpartitions::PartitionsManagerRequest::AddPartition { payload, responder } => {
255 responder
256 .send(self.add_partition(payload).await.map_err(|status| status.into_raw()))
257 .unwrap_or_else(
258 |e| log::error!(e:?; "Failed to send AddPartition response"),
259 );
260 }
261 }
262 }
263 Ok(())
264 }
265
266 async fn get_block_info(&self) -> Result<(u64, u32), zx::Status> {
267 let state = self.state.lock().await;
268 match &*state {
269 State::Stopped => return Err(zx::Status::BAD_STATE),
270 State::NeedsFormatting(block) => {
271 let info = block
272 .get_info()
273 .await
274 .map_err(|err| {
275 log::error!(err:?; "get_block_info: failed to query block info");
276 zx::Status::IO
277 })?
278 .map_err(zx::Status::from_raw)?;
279 Ok((info.block_count, info.block_size))
280 }
281 State::Running(gpt) => Ok((gpt.block_count(), gpt.block_size())),
282 }
283 }
284
285 async fn create_transaction(&self) -> Result<zx::EventPair, zx::Status> {
286 let gpt_manager = self.gpt_manager().await?;
287 gpt_manager.create_transaction().await
288 }
289
290 async fn commit_transaction(&self, transaction: zx::EventPair) -> Result<(), zx::Status> {
291 let gpt_manager = self.gpt_manager().await?;
292 gpt_manager.commit_transaction(transaction).await
293 }
294
295 async fn add_partition(
296 &self,
297 request: fpartitions::PartitionsManagerAddPartitionRequest,
298 ) -> Result<(), zx::Status> {
299 let gpt_manager = self.gpt_manager().await?;
300 gpt_manager.add_partition(request).await
301 }
302
303 async fn handle_partitions_admin_requests(
304 self: Arc<Self>,
305 mut stream: fpartitions::PartitionsAdminRequestStream,
306 ) -> Result<(), Error> {
307 while let Some(request) = stream.try_next().await.context("Reading request")? {
308 log::debug!(request:?; "");
309 match request {
310 fpartitions::PartitionsAdminRequest::ResetPartitionTable {
311 partitions,
312 responder,
313 } => {
314 responder
315 .send(
316 self.reset_partition_table(partitions)
317 .await
318 .map_err(|status| status.into_raw()),
319 )
320 .unwrap_or_else(|e| log::error!(e:?; "Failed to send Start response"));
321 }
322 }
323 }
324 Ok(())
325 }
326
327 async fn reset_partition_table(
328 &self,
329 partitions: Vec<fpartitions::PartitionInfo>,
330 ) -> Result<(), zx::Status> {
331 fn convert_partition_info(info: fpartitions::PartitionInfo) -> gpt::PartitionInfo {
332 gpt::PartitionInfo {
333 label: info.name,
334 type_guid: gpt::Guid::from_bytes(info.type_guid.value),
335 instance_guid: gpt::Guid::from_bytes(info.instance_guid.value),
336 start_block: info.start_block,
337 num_blocks: info.num_blocks,
338 flags: info.flags,
339 }
340 }
341 let partitions = partitions.into_iter().map(convert_partition_info).collect::<Vec<_>>();
342
343 let mut state = self.state.lock().await;
344 match &mut *state {
345 State::Stopped => return Err(zx::Status::BAD_STATE),
346 State::NeedsFormatting(block) => {
347 log::info!("reset_partition_table: Reformatting GPT.");
348 let client = Arc::new(RemoteBlockClient::new(&*block).await?);
349
350 log::info!("reset_partition_table: Reformatting GPT...");
351 gpt::Gpt::format(client, partitions).await.map_err(|err| {
352 log::error!(err:?; "reset_partition_table: failed to init GPT");
353 zx::Status::IO
354 })?;
355 *state = State::Running(
356 GptManager::new(block.clone(), self.partitions_dir.clone()).await.map_err(
357 |err| {
358 log::error!(err:?; "reset_partition_table: failed to re-launch GPT");
359 zx::Status::BAD_STATE
360 },
361 )?,
362 );
363 }
364 State::Running(gpt) => {
365 log::info!("reset_partition_table: Updating GPT.");
366 gpt.reset_partition_table(partitions).await?;
367 }
368 }
369 Ok(())
370 }
371
372 async fn handle_admin_requests(
373 &self,
374 mut stream: ffs::AdminRequestStream,
375 ) -> Result<(), Error> {
376 if let Some(request) = stream.try_next().await.context("Reading request")? {
377 match request {
378 ffs::AdminRequest::Shutdown { responder } => {
379 log::info!("Received Admin::Shutdown request");
380 self.shutdown().await;
381 responder
382 .send()
383 .unwrap_or_else(|e| log::error!(e:?; "Failed to send shutdown response"));
384 log::info!("Admin shutdown complete");
385 }
386 }
387 }
388 Ok(())
389 }
390
391 async fn handle_lifecycle_requests(&self, lifecycle_channel: zx::Channel) -> Result<(), Error> {
392 let mut stream = flifecycle::LifecycleRequestStream::from_channel(
393 fasync::Channel::from_channel(lifecycle_channel),
394 );
395 match stream.try_next().await.context("Reading request")? {
396 Some(flifecycle::LifecycleRequest::Stop { .. }) => {
397 log::info!("Received Lifecycle::Stop request");
398 self.shutdown().await;
399 log::info!("Lifecycle shutdown complete");
400 }
401 None => {}
402 }
403 Ok(())
404 }
405
406 async fn gpt_manager(&self) -> Result<Arc<GptManager>, zx::Status> {
407 match &*self.state.lock().await {
408 State::Stopped | State::NeedsFormatting(_) => Err(zx::Status::BAD_STATE),
409 State::Running(gpt) => Ok(gpt.clone()),
410 }
411 }
412
413 async fn shutdown(&self) {
414 let mut state = self.state.lock().await;
415 if let State::Running(gpt) = std::mem::take(&mut *state) {
416 gpt.shutdown().await;
417 }
418 self.scope.shutdown();
419 }
420}
421
422#[cfg(test)]
423mod tests {
424 use super::StorageHostService;
425 use block_client::RemoteBlockClient;
426 use fake_block_server::FakeServer;
427 use fidl::endpoints::Proxy as _;
428 use fidl_fuchsia_process_lifecycle::LifecycleMarker;
429 use fuchsia_component::client::connect_to_protocol_at_dir_svc;
430 use futures::FutureExt as _;
431 use gpt::{Gpt, Guid, PartitionInfo};
432 use std::sync::Arc;
433 use {
434 fidl_fuchsia_fs as ffs, fidl_fuchsia_fs_startup as fstartup,
435 fidl_fuchsia_hardware_block as fblock, fidl_fuchsia_hardware_block_volume as fvolume,
436 fidl_fuchsia_io as fio, fidl_fuchsia_storage_partitions as fpartitions,
437 fuchsia_async as fasync,
438 };
439
440 async fn setup_server(
441 block_size: u32,
442 block_count: u64,
443 partitions: Vec<PartitionInfo>,
444 ) -> Arc<FakeServer> {
445 let vmo = zx::Vmo::create(block_size as u64 * block_count).unwrap();
446 let server = Arc::new(FakeServer::from_vmo(512, vmo));
447 {
448 let (block_client, block_server) =
449 fidl::endpoints::create_proxy::<fblock::BlockMarker>();
450 let volume_stream = fidl::endpoints::ServerEnd::<fvolume::VolumeMarker>::from(
451 block_server.into_channel(),
452 )
453 .into_stream();
454 let server_clone = server.clone();
455 let _task = fasync::Task::spawn(async move { server_clone.serve(volume_stream).await });
456 let client = Arc::new(RemoteBlockClient::new(block_client).await.unwrap());
457 Gpt::format(client, partitions).await.unwrap();
458 }
459 server
460 }
461
462 #[fuchsia::test]
463 async fn lifecycle() {
464 let (outgoing_dir, outgoing_dir_server) =
465 fidl::endpoints::create_proxy::<fio::DirectoryMarker>();
466 let (lifecycle_client, lifecycle_server) =
467 fidl::endpoints::create_proxy::<LifecycleMarker>();
468 let (block_client, block_server) =
469 fidl::endpoints::create_endpoints::<fblock::BlockMarker>();
470 let volume_stream =
471 fidl::endpoints::ServerEnd::<fvolume::VolumeMarker>::from(block_server.into_channel())
472 .into_stream();
473
474 futures::join!(
475 async {
476 let client =
478 connect_to_protocol_at_dir_svc::<fstartup::StartupMarker>(&outgoing_dir)
479 .unwrap();
480 client
481 .start(
482 block_client,
483 fstartup::StartOptions {
484 read_only: false,
485 verbose: false,
486 fsck_after_every_transaction: false,
487 write_compression_algorithm:
488 fstartup::CompressionAlgorithm::ZstdChunked,
489 write_compression_level: 0,
490 cache_eviction_policy_override: fstartup::EvictionPolicyOverride::None,
491 startup_profiling_seconds: 0,
492 },
493 )
494 .await
495 .expect("FIDL error")
496 .expect("Start failed");
497 lifecycle_client.stop().expect("Stop failed");
498 fasync::OnSignals::new(
499 &lifecycle_client.into_channel().expect("into_channel failed"),
500 zx::Signals::CHANNEL_PEER_CLOSED,
501 )
502 .await
503 .expect("OnSignals failed");
504 },
505 async {
506 let service = StorageHostService::new();
508 service
509 .run(outgoing_dir_server.into_channel(), Some(lifecycle_server.into_channel()))
510 .await
511 .expect("Run failed");
512 },
513 async {
514 let server = setup_server(
516 512,
517 8,
518 vec![PartitionInfo {
519 label: "part".to_string(),
520 type_guid: Guid::from_bytes([0xabu8; 16]),
521 instance_guid: Guid::from_bytes([0xcdu8; 16]),
522 start_block: 4,
523 num_blocks: 1,
524 flags: 0,
525 }],
526 )
527 .await;
528 let _ = server.serve(volume_stream).await;
529 }
530 .fuse(),
531 );
532 }
533
534 #[fuchsia::test]
535 async fn admin_shutdown() {
536 let (outgoing_dir, outgoing_dir_server) =
537 fidl::endpoints::create_proxy::<fio::DirectoryMarker>();
538 let (block_client, block_server) =
539 fidl::endpoints::create_endpoints::<fblock::BlockMarker>();
540 let volume_stream =
541 fidl::endpoints::ServerEnd::<fvolume::VolumeMarker>::from(block_server.into_channel())
542 .into_stream();
543
544 futures::join!(
545 async {
546 let client =
548 connect_to_protocol_at_dir_svc::<fstartup::StartupMarker>(&outgoing_dir)
549 .unwrap();
550 let admin_client =
551 connect_to_protocol_at_dir_svc::<ffs::AdminMarker>(&outgoing_dir).unwrap();
552 client
553 .start(
554 block_client,
555 fstartup::StartOptions {
556 read_only: false,
557 verbose: false,
558 fsck_after_every_transaction: false,
559 write_compression_algorithm:
560 fstartup::CompressionAlgorithm::ZstdChunked,
561 write_compression_level: 0,
562 cache_eviction_policy_override: fstartup::EvictionPolicyOverride::None,
563 startup_profiling_seconds: 0,
564 },
565 )
566 .await
567 .expect("FIDL error")
568 .expect("Start failed");
569 admin_client.shutdown().await.expect("admin shutdown failed");
570 fasync::OnSignals::new(
571 &admin_client.into_channel().expect("into_channel failed"),
572 zx::Signals::CHANNEL_PEER_CLOSED,
573 )
574 .await
575 .expect("OnSignals failed");
576 },
577 async {
578 let service = StorageHostService::new();
580 service.run(outgoing_dir_server.into_channel(), None).await.expect("Run failed");
581 },
582 async {
583 let server = setup_server(
585 512,
586 8,
587 vec![PartitionInfo {
588 label: "part".to_string(),
589 type_guid: Guid::from_bytes([0xabu8; 16]),
590 instance_guid: Guid::from_bytes([0xcdu8; 16]),
591 start_block: 4,
592 num_blocks: 1,
593 flags: 0,
594 }],
595 )
596 .await;
597 let _ = server.serve(volume_stream).await;
598 }
599 .fuse(),
600 );
601 }
602
603 #[fuchsia::test]
604 async fn transaction_lifecycle() {
605 let (outgoing_dir, outgoing_dir_server) =
606 fidl::endpoints::create_proxy::<fio::DirectoryMarker>();
607 let (lifecycle_client, lifecycle_server) =
608 fidl::endpoints::create_proxy::<LifecycleMarker>();
609 let (block_client, block_server) =
610 fidl::endpoints::create_endpoints::<fblock::BlockMarker>();
611 let volume_stream =
612 fidl::endpoints::ServerEnd::<fvolume::VolumeMarker>::from(block_server.into_channel())
613 .into_stream();
614
615 futures::join!(
616 async {
617 connect_to_protocol_at_dir_svc::<fstartup::StartupMarker>(&outgoing_dir)
619 .unwrap()
620 .start(
621 block_client,
622 fstartup::StartOptions {
623 read_only: false,
624 verbose: false,
625 fsck_after_every_transaction: false,
626 write_compression_algorithm:
627 fstartup::CompressionAlgorithm::ZstdChunked,
628 write_compression_level: 0,
629 cache_eviction_policy_override: fstartup::EvictionPolicyOverride::None,
630 startup_profiling_seconds: 0,
631 },
632 )
633 .await
634 .expect("FIDL error")
635 .expect("Start failed");
636
637 let pm_client = connect_to_protocol_at_dir_svc::<
638 fpartitions::PartitionsManagerMarker,
639 >(&outgoing_dir)
640 .unwrap();
641 let transaction = pm_client
642 .create_transaction()
643 .await
644 .expect("FIDL error")
645 .expect("create_transaction failed");
646
647 pm_client
648 .create_transaction()
649 .await
650 .expect("FIDL error")
651 .expect_err("create_transaction should fail while other txn exists");
652
653 pm_client
654 .commit_transaction(transaction)
655 .await
656 .expect("FIDL error")
657 .expect("commit_transaction failed");
658
659 {
660 let _transaction = pm_client
661 .create_transaction()
662 .await
663 .expect("FIDL error")
664 .expect("create_transaction should succeed after committing txn");
665 }
666
667 pm_client
668 .create_transaction()
669 .await
670 .expect("FIDL error")
671 .expect("create_transaction should succeed after dropping txn");
672
673 lifecycle_client.stop().expect("Stop failed");
674 fasync::OnSignals::new(
675 &lifecycle_client.into_channel().expect("into_channel failed"),
676 zx::Signals::CHANNEL_PEER_CLOSED,
677 )
678 .await
679 .expect("OnSignals failed");
680 },
681 async {
682 let service = StorageHostService::new();
684 service
685 .run(outgoing_dir_server.into_channel(), Some(lifecycle_server.into_channel()))
686 .await
687 .expect("Run failed");
688 },
689 async {
690 let server = setup_server(
692 512,
693 16,
694 vec![PartitionInfo {
695 label: "part".to_string(),
696 type_guid: Guid::from_bytes([0xabu8; 16]),
697 instance_guid: Guid::from_bytes([0xcdu8; 16]),
698 start_block: 4,
699 num_blocks: 1,
700 flags: 0,
701 }],
702 )
703 .await;
704 let _ = server.serve(volume_stream).await;
705 }
706 .fuse(),
707 );
708 }
709}