gpt_component/
service.rs

1// Copyright 2024 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use crate::gpt::GptManager;
6use anyhow::{Context as _, Error};
7use block_client::RemoteBlockClient;
8use fidl::endpoints::{DiscoverableProtocolMarker as _, RequestStream as _, ServiceMarker as _};
9use futures::lock::Mutex as AsyncMutex;
10use futures::stream::TryStreamExt as _;
11use std::sync::Arc;
12use vfs::directory::helper::DirectlyMutable as _;
13use vfs::execution_scope::ExecutionScope;
14use {
15    fidl_fuchsia_fs as ffs, fidl_fuchsia_fs_startup as fstartup,
16    fidl_fuchsia_hardware_block as fblock, fidl_fuchsia_io as fio,
17    fidl_fuchsia_process_lifecycle as flifecycle, fidl_fuchsia_storage_partitions as fpartitions,
18    fuchsia_async as fasync,
19};
20
21pub struct StorageHostService {
22    state: AsyncMutex<State>,
23
24    // The execution scope of the pseudo filesystem.
25    scope: ExecutionScope,
26
27    // The root of the pseudo filesystem for the component.
28    export_dir: Arc<vfs::directory::immutable::Simple>,
29
30    // A directory where partitions are published.
31    partitions_dir: Arc<vfs::directory::immutable::Simple>,
32}
33
34#[derive(Default)]
35enum State {
36    #[default]
37    Stopped,
38    /// The GPT is malformed and needs to be reformatted with ResetPartitionTables before it can be
39    /// used.  The component will publish an empty partitions directory.
40    NeedsFormatting(fblock::BlockProxy),
41    Running(Arc<GptManager>),
42}
43
44impl State {
45    fn is_stopped(&self) -> bool {
46        if let Self::Stopped = self {
47            true
48        } else {
49            false
50        }
51    }
52}
53
54impl StorageHostService {
55    pub fn new() -> Arc<Self> {
56        let export_dir = vfs::directory::immutable::simple();
57        let partitions_dir = vfs::directory::immutable::simple();
58        Arc::new(Self {
59            state: Default::default(),
60            scope: ExecutionScope::new(),
61            export_dir,
62            partitions_dir,
63        })
64    }
65
66    pub async fn run(
67        self: Arc<Self>,
68        outgoing_dir: zx::Channel,
69        lifecycle_channel: Option<zx::Channel>,
70    ) -> Result<(), Error> {
71        let svc_dir = vfs::directory::immutable::simple();
72        self.export_dir.add_entry("svc", svc_dir.clone()).expect("Unable to create svc dir");
73
74        svc_dir
75            .add_entry(
76                fpartitions::PartitionServiceMarker::SERVICE_NAME,
77                self.partitions_dir.clone(),
78            )
79            .unwrap();
80        let weak = Arc::downgrade(&self);
81        svc_dir
82            .add_entry(
83                fstartup::StartupMarker::PROTOCOL_NAME,
84                vfs::service::host(move |requests| {
85                    let weak = weak.clone();
86                    async move {
87                        if let Some(me) = weak.upgrade() {
88                            let _ = me.handle_start_requests(requests).await;
89                        }
90                    }
91                }),
92            )
93            .unwrap();
94        let weak = Arc::downgrade(&self);
95        svc_dir
96            .add_entry(
97                fpartitions::PartitionsAdminMarker::PROTOCOL_NAME,
98                vfs::service::host(move |requests| {
99                    let weak = weak.clone();
100                    async move {
101                        if let Some(me) = weak.upgrade() {
102                            let _ = me.handle_partitions_admin_requests(requests).await;
103                        }
104                    }
105                }),
106            )
107            .unwrap();
108        let weak = Arc::downgrade(&self);
109        svc_dir
110            .add_entry(
111                fpartitions::PartitionsManagerMarker::PROTOCOL_NAME,
112                vfs::service::host(move |requests| {
113                    let weak = weak.clone();
114                    async move {
115                        if let Some(me) = weak.upgrade() {
116                            let _ = me.handle_partitions_manager_requests(requests).await;
117                        }
118                    }
119                }),
120            )
121            .unwrap();
122        let weak = Arc::downgrade(&self);
123        svc_dir
124            .add_entry(
125                ffs::AdminMarker::PROTOCOL_NAME,
126                vfs::service::host(move |requests| {
127                    let weak = weak.clone();
128                    async move {
129                        if let Some(me) = weak.upgrade() {
130                            let _ = me.handle_admin_requests(requests).await;
131                        }
132                    }
133                }),
134            )
135            .unwrap();
136
137        vfs::directory::serve_on(
138            self.export_dir.clone(),
139            fio::PERM_READABLE | fio::PERM_WRITABLE | fio::PERM_EXECUTABLE,
140            self.scope.clone(),
141            fidl::endpoints::ServerEnd::new(outgoing_dir),
142        );
143
144        if let Some(channel) = lifecycle_channel {
145            let me = self.clone();
146            self.scope.spawn(async move {
147                if let Err(e) = me.handle_lifecycle_requests(channel).await {
148                    log::warn!(error:? = e; "handle_lifecycle_requests");
149                }
150            });
151        }
152
153        self.scope.wait().await;
154
155        Ok(())
156    }
157
158    async fn handle_start_requests(
159        self: Arc<Self>,
160        mut stream: fstartup::StartupRequestStream,
161    ) -> Result<(), Error> {
162        while let Some(request) = stream.try_next().await.context("Reading request")? {
163            log::debug!(request:?; "");
164            match request {
165                fstartup::StartupRequest::Start { device, options: _, responder } => {
166                    responder
167                        .send(
168                            self.start(device.into_proxy())
169                                .await
170                                .map_err(|status| status.into_raw()),
171                        )
172                        .unwrap_or_else(|e| log::error!(e:?; "Failed to send Start response"));
173                }
174                fstartup::StartupRequest::Format { responder, .. } => {
175                    responder
176                        .send(Err(zx::Status::NOT_SUPPORTED.into_raw()))
177                        .unwrap_or_else(|e| log::error!(e:?; "Failed to send Check response"));
178                }
179                fstartup::StartupRequest::Check { responder, .. } => {
180                    responder
181                        .send(Err(zx::Status::NOT_SUPPORTED.into_raw()))
182                        .unwrap_or_else(|e| log::error!(e:?; "Failed to send Check response"));
183                }
184            }
185        }
186        Ok(())
187    }
188
189    async fn start(self: &Arc<Self>, device: fblock::BlockProxy) -> Result<(), zx::Status> {
190        let mut state = self.state.lock().await;
191        if !state.is_stopped() {
192            log::warn!("Device already bound");
193            return Err(zx::Status::ALREADY_BOUND);
194        }
195
196        // TODO(https://fxbug.dev/339491886): It would be better if `start` failed on a malformed
197        // device, rather than hiding this state from fshost.  However, fs_management isn't well set
198        // up to deal with this, because it ties the outgoing directory to a successful return from
199        // `start` (see `ServingMultiVolumeFilesystem`), and fshost resolves the queued requests for
200        // the filesystem only after `start` is successful.  We should refactor fs_management and
201        // fshost to better support this case, which might require changing how queueing works so
202        // there's more flexibility to either resolve queueing when the component starts up (what we
203        // need here), or when `Start` is successful (what a filesystem like Fxfs needs).
204        *state = match GptManager::new(device.clone(), self.partitions_dir.clone()).await {
205            Ok(runner) => State::Running(runner),
206            Err(err) => {
207                // This is a warning because, as described above, we can't return an error from
208                // here, so there are normal flows that can print this error message that don't
209                // indicate a bug or incorrect behavior.
210                log::warn!(err:?; "Failed to load GPT.  Reformatting may be required.");
211                State::NeedsFormatting(device)
212            }
213        };
214        Ok(())
215    }
216
217    async fn handle_partitions_manager_requests(
218        self: Arc<Self>,
219        mut stream: fpartitions::PartitionsManagerRequestStream,
220    ) -> Result<(), Error> {
221        while let Some(request) = stream.try_next().await.context("Reading request")? {
222            log::debug!(request:?; "");
223            match request {
224                fpartitions::PartitionsManagerRequest::GetBlockInfo { responder } => {
225                    responder
226                        .send(self.get_block_info().await.map_err(|status| status.into_raw()))
227                        .unwrap_or_else(
228                            |e| log::error!(e:?; "Failed to send GetBlockInfo response"),
229                        );
230                }
231                fpartitions::PartitionsManagerRequest::CreateTransaction { responder } => {
232                    responder
233                        .send(self.create_transaction().await.map_err(|status| status.into_raw()))
234                        .unwrap_or_else(
235                            |e| log::error!(e:?; "Failed to send CreateTransaction response"),
236                        );
237                }
238                fpartitions::PartitionsManagerRequest::CommitTransaction {
239                    transaction,
240                    responder,
241                } => {
242                    responder
243                        .send(
244                            self.commit_transaction(transaction)
245                                .await
246                                .map_err(|status| status.into_raw()),
247                        )
248                        .unwrap_or_else(
249                            |e| log::error!(e:?; "Failed to send CommitTransaction response"),
250                        );
251                }
252                fpartitions::PartitionsManagerRequest::AddPartition { payload, responder } => {
253                    responder
254                        .send(self.add_partition(payload).await.map_err(|status| status.into_raw()))
255                        .unwrap_or_else(
256                            |e| log::error!(e:?; "Failed to send AddPartition response"),
257                        );
258                }
259            }
260        }
261        Ok(())
262    }
263
264    async fn get_block_info(&self) -> Result<(u64, u32), zx::Status> {
265        let state = self.state.lock().await;
266        match &*state {
267            State::Stopped => return Err(zx::Status::BAD_STATE),
268            State::NeedsFormatting(block) => {
269                let info = block
270                    .get_info()
271                    .await
272                    .map_err(|err| {
273                        log::error!(err:?; "get_block_info: failed to query block info");
274                        zx::Status::IO
275                    })?
276                    .map_err(zx::Status::from_raw)?;
277                Ok((info.block_count, info.block_size))
278            }
279            State::Running(gpt) => Ok((gpt.block_count(), gpt.block_size())),
280        }
281    }
282
283    async fn create_transaction(&self) -> Result<zx::EventPair, zx::Status> {
284        let gpt_manager = self.gpt_manager().await?;
285        gpt_manager.create_transaction().await
286    }
287
288    async fn commit_transaction(&self, transaction: zx::EventPair) -> Result<(), zx::Status> {
289        let gpt_manager = self.gpt_manager().await?;
290        gpt_manager.commit_transaction(transaction).await
291    }
292
293    async fn add_partition(
294        &self,
295        request: fpartitions::PartitionsManagerAddPartitionRequest,
296    ) -> Result<(), zx::Status> {
297        let gpt_manager = self.gpt_manager().await?;
298        gpt_manager.add_partition(request).await
299    }
300
301    async fn handle_partitions_admin_requests(
302        self: Arc<Self>,
303        mut stream: fpartitions::PartitionsAdminRequestStream,
304    ) -> Result<(), Error> {
305        while let Some(request) = stream.try_next().await.context("Reading request")? {
306            log::debug!(request:?; "");
307            match request {
308                fpartitions::PartitionsAdminRequest::ResetPartitionTable {
309                    partitions,
310                    responder,
311                } => {
312                    responder
313                        .send(
314                            self.reset_partition_table(partitions)
315                                .await
316                                .map_err(|status| status.into_raw()),
317                        )
318                        .unwrap_or_else(|e| log::error!(e:?; "Failed to send Start response"));
319                }
320            }
321        }
322        Ok(())
323    }
324
325    async fn reset_partition_table(
326        &self,
327        partitions: Vec<fpartitions::PartitionInfo>,
328    ) -> Result<(), zx::Status> {
329        fn convert_partition_info(info: fpartitions::PartitionInfo) -> gpt::PartitionInfo {
330            gpt::PartitionInfo {
331                label: info.name,
332                type_guid: gpt::Guid::from_bytes(info.type_guid.value),
333                instance_guid: gpt::Guid::from_bytes(info.instance_guid.value),
334                start_block: info.start_block,
335                num_blocks: info.num_blocks,
336                flags: info.flags,
337            }
338        }
339        let partitions = partitions.into_iter().map(convert_partition_info).collect::<Vec<_>>();
340
341        let mut state = self.state.lock().await;
342        match &mut *state {
343            State::Stopped => return Err(zx::Status::BAD_STATE),
344            State::NeedsFormatting(block) => {
345                log::info!("reset_partition_table: Reformatting GPT.");
346                let client = Arc::new(RemoteBlockClient::new(&*block).await?);
347
348                log::info!("reset_partition_table: Reformatting GPT...");
349                gpt::Gpt::format(client, partitions).await.map_err(|err| {
350                    log::error!(err:?; "reset_partition_table: failed to init GPT");
351                    zx::Status::IO
352                })?;
353                *state = State::Running(
354                    GptManager::new(block.clone(), self.partitions_dir.clone()).await.map_err(
355                        |err| {
356                            log::error!(err:?; "reset_partition_table: failed to re-launch GPT");
357                            zx::Status::BAD_STATE
358                        },
359                    )?,
360                );
361            }
362            State::Running(gpt) => {
363                log::info!("reset_partition_table: Updating GPT.");
364                gpt.reset_partition_table(partitions).await?;
365            }
366        }
367        Ok(())
368    }
369
370    async fn handle_admin_requests(
371        &self,
372        mut stream: ffs::AdminRequestStream,
373    ) -> Result<(), Error> {
374        if let Some(request) = stream.try_next().await.context("Reading request")? {
375            match request {
376                ffs::AdminRequest::Shutdown { responder } => {
377                    log::info!("Received Admin::Shutdown request");
378                    self.shutdown().await;
379                    responder
380                        .send()
381                        .unwrap_or_else(|e| log::error!(e:?; "Failed to send shutdown response"));
382                    log::info!("Admin shutdown complete");
383                }
384            }
385        }
386        Ok(())
387    }
388
389    async fn handle_lifecycle_requests(&self, lifecycle_channel: zx::Channel) -> Result<(), Error> {
390        let mut stream = flifecycle::LifecycleRequestStream::from_channel(
391            fasync::Channel::from_channel(lifecycle_channel),
392        );
393        match stream.try_next().await.context("Reading request")? {
394            Some(flifecycle::LifecycleRequest::Stop { .. }) => {
395                log::info!("Received Lifecycle::Stop request");
396                self.shutdown().await;
397                log::info!("Lifecycle shutdown complete");
398            }
399            None => {}
400        }
401        Ok(())
402    }
403
404    async fn gpt_manager(&self) -> Result<Arc<GptManager>, zx::Status> {
405        match &*self.state.lock().await {
406            State::Stopped | State::NeedsFormatting(_) => Err(zx::Status::BAD_STATE),
407            State::Running(gpt) => Ok(gpt.clone()),
408        }
409    }
410
411    async fn shutdown(&self) {
412        let mut state = self.state.lock().await;
413        if let State::Running(gpt) = std::mem::take(&mut *state) {
414            gpt.shutdown().await;
415        }
416        self.scope.shutdown();
417    }
418}
419
420#[cfg(test)]
421mod tests {
422    use super::StorageHostService;
423    use block_client::RemoteBlockClient;
424    use fake_block_server::FakeServer;
425    use fidl::endpoints::Proxy as _;
426    use fidl_fuchsia_process_lifecycle::LifecycleMarker;
427    use fuchsia_component::client::connect_to_protocol_at_dir_svc;
428    use futures::FutureExt as _;
429    use gpt::{Gpt, Guid, PartitionInfo};
430    use std::sync::Arc;
431    use {
432        fidl_fuchsia_fs as ffs, fidl_fuchsia_fs_startup as fstartup,
433        fidl_fuchsia_hardware_block as fblock, fidl_fuchsia_hardware_block_volume as fvolume,
434        fidl_fuchsia_io as fio, fidl_fuchsia_storage_partitions as fpartitions,
435        fuchsia_async as fasync,
436    };
437
438    async fn setup_server(
439        block_size: u32,
440        block_count: u64,
441        partitions: Vec<PartitionInfo>,
442    ) -> Arc<FakeServer> {
443        let vmo = zx::Vmo::create(block_size as u64 * block_count).unwrap();
444        let server = Arc::new(FakeServer::from_vmo(512, vmo));
445        {
446            let (block_client, block_server) =
447                fidl::endpoints::create_proxy::<fblock::BlockMarker>();
448            let volume_stream = fidl::endpoints::ServerEnd::<fvolume::VolumeMarker>::from(
449                block_server.into_channel(),
450            )
451            .into_stream();
452            let server_clone = server.clone();
453            let _task = fasync::Task::spawn(async move { server_clone.serve(volume_stream).await });
454            let client = Arc::new(RemoteBlockClient::new(block_client).await.unwrap());
455            Gpt::format(client, partitions).await.unwrap();
456        }
457        server
458    }
459
460    #[fuchsia::test]
461    async fn lifecycle() {
462        let (outgoing_dir, outgoing_dir_server) =
463            fidl::endpoints::create_proxy::<fio::DirectoryMarker>();
464        let (lifecycle_client, lifecycle_server) =
465            fidl::endpoints::create_proxy::<LifecycleMarker>();
466        let (block_client, block_server) =
467            fidl::endpoints::create_endpoints::<fblock::BlockMarker>();
468        let volume_stream =
469            fidl::endpoints::ServerEnd::<fvolume::VolumeMarker>::from(block_server.into_channel())
470                .into_stream();
471
472        futures::join!(
473            async {
474                // Client
475                let client =
476                    connect_to_protocol_at_dir_svc::<fstartup::StartupMarker>(&outgoing_dir)
477                        .unwrap();
478                client
479                    .start(block_client, &fstartup::StartOptions::default())
480                    .await
481                    .expect("FIDL error")
482                    .expect("Start failed");
483                lifecycle_client.stop().expect("Stop failed");
484                fasync::OnSignals::new(
485                    &lifecycle_client.into_channel().expect("into_channel failed"),
486                    zx::Signals::CHANNEL_PEER_CLOSED,
487                )
488                .await
489                .expect("OnSignals failed");
490            },
491            async {
492                // Server
493                let service = StorageHostService::new();
494                service
495                    .run(outgoing_dir_server.into_channel(), Some(lifecycle_server.into_channel()))
496                    .await
497                    .expect("Run failed");
498            },
499            async {
500                // Block device
501                let server = setup_server(
502                    512,
503                    8,
504                    vec![PartitionInfo {
505                        label: "part".to_string(),
506                        type_guid: Guid::from_bytes([0xabu8; 16]),
507                        instance_guid: Guid::from_bytes([0xcdu8; 16]),
508                        start_block: 4,
509                        num_blocks: 1,
510                        flags: 0,
511                    }],
512                )
513                .await;
514                let _ = server.serve(volume_stream).await;
515            }
516            .fuse(),
517        );
518    }
519
520    #[fuchsia::test]
521    async fn admin_shutdown() {
522        let (outgoing_dir, outgoing_dir_server) =
523            fidl::endpoints::create_proxy::<fio::DirectoryMarker>();
524        let (block_client, block_server) =
525            fidl::endpoints::create_endpoints::<fblock::BlockMarker>();
526        let volume_stream =
527            fidl::endpoints::ServerEnd::<fvolume::VolumeMarker>::from(block_server.into_channel())
528                .into_stream();
529
530        futures::join!(
531            async {
532                // Client
533                let client =
534                    connect_to_protocol_at_dir_svc::<fstartup::StartupMarker>(&outgoing_dir)
535                        .unwrap();
536                let admin_client =
537                    connect_to_protocol_at_dir_svc::<ffs::AdminMarker>(&outgoing_dir).unwrap();
538                client
539                    .start(block_client, &fstartup::StartOptions::default())
540                    .await
541                    .expect("FIDL error")
542                    .expect("Start failed");
543                admin_client.shutdown().await.expect("admin shutdown failed");
544                fasync::OnSignals::new(
545                    &admin_client.into_channel().expect("into_channel failed"),
546                    zx::Signals::CHANNEL_PEER_CLOSED,
547                )
548                .await
549                .expect("OnSignals failed");
550            },
551            async {
552                // Server
553                let service = StorageHostService::new();
554                service.run(outgoing_dir_server.into_channel(), None).await.expect("Run failed");
555            },
556            async {
557                // Block device
558                let server = setup_server(
559                    512,
560                    8,
561                    vec![PartitionInfo {
562                        label: "part".to_string(),
563                        type_guid: Guid::from_bytes([0xabu8; 16]),
564                        instance_guid: Guid::from_bytes([0xcdu8; 16]),
565                        start_block: 4,
566                        num_blocks: 1,
567                        flags: 0,
568                    }],
569                )
570                .await;
571                let _ = server.serve(volume_stream).await;
572            }
573            .fuse(),
574        );
575    }
576
577    #[fuchsia::test]
578    async fn transaction_lifecycle() {
579        let (outgoing_dir, outgoing_dir_server) =
580            fidl::endpoints::create_proxy::<fio::DirectoryMarker>();
581        let (lifecycle_client, lifecycle_server) =
582            fidl::endpoints::create_proxy::<LifecycleMarker>();
583        let (block_client, block_server) =
584            fidl::endpoints::create_endpoints::<fblock::BlockMarker>();
585        let volume_stream =
586            fidl::endpoints::ServerEnd::<fvolume::VolumeMarker>::from(block_server.into_channel())
587                .into_stream();
588
589        futures::join!(
590            async {
591                // Client
592                connect_to_protocol_at_dir_svc::<fstartup::StartupMarker>(&outgoing_dir)
593                    .unwrap()
594                    .start(block_client, &fstartup::StartOptions::default())
595                    .await
596                    .expect("FIDL error")
597                    .expect("Start failed");
598
599                let pm_client = connect_to_protocol_at_dir_svc::<
600                    fpartitions::PartitionsManagerMarker,
601                >(&outgoing_dir)
602                .unwrap();
603                let transaction = pm_client
604                    .create_transaction()
605                    .await
606                    .expect("FIDL error")
607                    .expect("create_transaction failed");
608
609                pm_client
610                    .create_transaction()
611                    .await
612                    .expect("FIDL error")
613                    .expect_err("create_transaction should fail while other txn exists");
614
615                pm_client
616                    .commit_transaction(transaction)
617                    .await
618                    .expect("FIDL error")
619                    .expect("commit_transaction failed");
620
621                {
622                    let _transaction = pm_client
623                        .create_transaction()
624                        .await
625                        .expect("FIDL error")
626                        .expect("create_transaction should succeed after committing txn");
627                }
628
629                pm_client
630                    .create_transaction()
631                    .await
632                    .expect("FIDL error")
633                    .expect("create_transaction should succeed after dropping txn");
634
635                lifecycle_client.stop().expect("Stop failed");
636                fasync::OnSignals::new(
637                    &lifecycle_client.into_channel().expect("into_channel failed"),
638                    zx::Signals::CHANNEL_PEER_CLOSED,
639                )
640                .await
641                .expect("OnSignals failed");
642            },
643            async {
644                // Server
645                let service = StorageHostService::new();
646                service
647                    .run(outgoing_dir_server.into_channel(), Some(lifecycle_server.into_channel()))
648                    .await
649                    .expect("Run failed");
650            },
651            async {
652                // Block device
653                let server = setup_server(
654                    512,
655                    16,
656                    vec![PartitionInfo {
657                        label: "part".to_string(),
658                        type_guid: Guid::from_bytes([0xabu8; 16]),
659                        instance_guid: Guid::from_bytes([0xcdu8; 16]),
660                        start_block: 4,
661                        num_blocks: 1,
662                        flags: 0,
663                    }],
664                )
665                .await;
666                let _ = server.serve(volume_stream).await;
667            }
668            .fuse(),
669        );
670    }
671}