gpt_component/
service.rs

1// Copyright 2024 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use crate::gpt::GptManager;
6use anyhow::{Context as _, Error};
7use block_client::RemoteBlockClient;
8use fidl::endpoints::{DiscoverableProtocolMarker as _, RequestStream as _, ServiceMarker as _};
9use futures::lock::Mutex as AsyncMutex;
10use futures::stream::TryStreamExt as _;
11use std::sync::Arc;
12use vfs::directory::entry_container::Directory as _;
13use vfs::directory::helper::DirectlyMutable as _;
14use vfs::execution_scope::ExecutionScope;
15use vfs::path::Path;
16use {
17    fidl_fuchsia_fs as ffs, fidl_fuchsia_fs_startup as fstartup,
18    fidl_fuchsia_hardware_block as fblock, fidl_fuchsia_io as fio,
19    fidl_fuchsia_process_lifecycle as flifecycle, fidl_fuchsia_storage_partitions as fpartitions,
20    fuchsia_async as fasync,
21};
22
23pub struct StorageHostService {
24    state: AsyncMutex<State>,
25
26    // The execution scope of the pseudo filesystem.
27    scope: ExecutionScope,
28
29    // The root of the pseudo filesystem for the component.
30    export_dir: Arc<vfs::directory::immutable::Simple>,
31
32    // A directory where partitions are published.
33    partitions_dir: Arc<vfs::directory::immutable::Simple>,
34}
35
36#[derive(Default)]
37enum State {
38    #[default]
39    Stopped,
40    /// The GPT is malformed and needs to be reformatted with ResetPartitionTables before it can be
41    /// used.  The component will publish an empty partitions directory.
42    NeedsFormatting(fblock::BlockProxy),
43    Running(Arc<GptManager>),
44}
45
46impl State {
47    fn is_stopped(&self) -> bool {
48        if let Self::Stopped = self {
49            true
50        } else {
51            false
52        }
53    }
54}
55
56impl StorageHostService {
57    pub fn new() -> Arc<Self> {
58        let export_dir = vfs::directory::immutable::simple();
59        let partitions_dir = vfs::directory::immutable::simple();
60        Arc::new(Self {
61            state: Default::default(),
62            scope: ExecutionScope::new(),
63            export_dir,
64            partitions_dir,
65        })
66    }
67
68    pub async fn run(
69        self: Arc<Self>,
70        outgoing_dir: zx::Channel,
71        lifecycle_channel: Option<zx::Channel>,
72    ) -> Result<(), Error> {
73        let svc_dir = vfs::directory::immutable::simple();
74        self.export_dir.add_entry("svc", svc_dir.clone()).expect("Unable to create svc dir");
75
76        svc_dir
77            .add_entry(
78                fpartitions::PartitionServiceMarker::SERVICE_NAME,
79                self.partitions_dir.clone(),
80            )
81            .unwrap();
82        let weak = Arc::downgrade(&self);
83        svc_dir
84            .add_entry(
85                fstartup::StartupMarker::PROTOCOL_NAME,
86                vfs::service::host(move |requests| {
87                    let weak = weak.clone();
88                    async move {
89                        if let Some(me) = weak.upgrade() {
90                            let _ = me.handle_start_requests(requests).await;
91                        }
92                    }
93                }),
94            )
95            .unwrap();
96        let weak = Arc::downgrade(&self);
97        svc_dir
98            .add_entry(
99                fpartitions::PartitionsAdminMarker::PROTOCOL_NAME,
100                vfs::service::host(move |requests| {
101                    let weak = weak.clone();
102                    async move {
103                        if let Some(me) = weak.upgrade() {
104                            let _ = me.handle_partitions_admin_requests(requests).await;
105                        }
106                    }
107                }),
108            )
109            .unwrap();
110        let weak = Arc::downgrade(&self);
111        svc_dir
112            .add_entry(
113                fpartitions::PartitionsManagerMarker::PROTOCOL_NAME,
114                vfs::service::host(move |requests| {
115                    let weak = weak.clone();
116                    async move {
117                        if let Some(me) = weak.upgrade() {
118                            let _ = me.handle_partitions_manager_requests(requests).await;
119                        }
120                    }
121                }),
122            )
123            .unwrap();
124        let weak = Arc::downgrade(&self);
125        svc_dir
126            .add_entry(
127                ffs::AdminMarker::PROTOCOL_NAME,
128                vfs::service::host(move |requests| {
129                    let weak = weak.clone();
130                    async move {
131                        if let Some(me) = weak.upgrade() {
132                            let _ = me.handle_admin_requests(requests).await;
133                        }
134                    }
135                }),
136            )
137            .unwrap();
138
139        self.export_dir.clone().open(
140            self.scope.clone(),
141            fio::OpenFlags::RIGHT_READABLE
142                | fio::OpenFlags::RIGHT_WRITABLE
143                | fio::OpenFlags::DIRECTORY
144                | fio::OpenFlags::RIGHT_EXECUTABLE,
145            Path::dot(),
146            outgoing_dir.into(),
147        );
148
149        if let Some(channel) = lifecycle_channel {
150            let me = self.clone();
151            self.scope.spawn(async move {
152                if let Err(e) = me.handle_lifecycle_requests(channel).await {
153                    log::warn!(error:? = e; "handle_lifecycle_requests");
154                }
155            });
156        }
157
158        self.scope.wait().await;
159
160        Ok(())
161    }
162
163    async fn handle_start_requests(
164        self: Arc<Self>,
165        mut stream: fstartup::StartupRequestStream,
166    ) -> Result<(), Error> {
167        while let Some(request) = stream.try_next().await.context("Reading request")? {
168            log::debug!(request:?; "");
169            match request {
170                fstartup::StartupRequest::Start { device, options: _, responder } => {
171                    responder
172                        .send(
173                            self.start(device.into_proxy())
174                                .await
175                                .map_err(|status| status.into_raw()),
176                        )
177                        .unwrap_or_else(|e| log::error!(e:?; "Failed to send Start response"));
178                }
179                fstartup::StartupRequest::Format { responder, .. } => {
180                    responder
181                        .send(Err(zx::Status::NOT_SUPPORTED.into_raw()))
182                        .unwrap_or_else(|e| log::error!(e:?; "Failed to send Check response"));
183                }
184                fstartup::StartupRequest::Check { responder, .. } => {
185                    responder
186                        .send(Err(zx::Status::NOT_SUPPORTED.into_raw()))
187                        .unwrap_or_else(|e| log::error!(e:?; "Failed to send Check response"));
188                }
189            }
190        }
191        Ok(())
192    }
193
194    async fn start(self: &Arc<Self>, device: fblock::BlockProxy) -> Result<(), zx::Status> {
195        let mut state = self.state.lock().await;
196        if !state.is_stopped() {
197            log::warn!("Device already bound");
198            return Err(zx::Status::ALREADY_BOUND);
199        }
200
201        // TODO(https://fxbug.dev/339491886): It would be better if `start` failed on a malformed
202        // device, rather than hiding this state from fshost.  However, fs_management isn't well set
203        // up to deal with this, because it ties the outgoing directory to a successful return from
204        // `start` (see `ServingMultiVolumeFilesystem`), and fshost resolves the queued requests for
205        // the filesystem only after `start` is successful.  We should refactor fs_management and
206        // fshost to better support this case, which might require changing how queueing works so
207        // there's more flexibility to either resolve queueing when the component starts up (what we
208        // need here), or when `Start` is successful (what a filesystem like Fxfs needs).
209        *state = match GptManager::new(device.clone(), self.partitions_dir.clone()).await {
210            Ok(runner) => State::Running(runner),
211            Err(err) => {
212                log::error!(err:?; "Failed to load GPT.  Reformatting may be required.");
213                State::NeedsFormatting(device)
214            }
215        };
216        Ok(())
217    }
218
219    async fn handle_partitions_manager_requests(
220        self: Arc<Self>,
221        mut stream: fpartitions::PartitionsManagerRequestStream,
222    ) -> Result<(), Error> {
223        while let Some(request) = stream.try_next().await.context("Reading request")? {
224            log::debug!(request:?; "");
225            match request {
226                fpartitions::PartitionsManagerRequest::GetBlockInfo { responder } => {
227                    responder
228                        .send(self.get_block_info().await.map_err(|status| status.into_raw()))
229                        .unwrap_or_else(
230                            |e| log::error!(e:?; "Failed to send GetBlockInfo response"),
231                        );
232                }
233                fpartitions::PartitionsManagerRequest::CreateTransaction { responder } => {
234                    responder
235                        .send(self.create_transaction().await.map_err(|status| status.into_raw()))
236                        .unwrap_or_else(
237                            |e| log::error!(e:?; "Failed to send CreateTransaction response"),
238                        );
239                }
240                fpartitions::PartitionsManagerRequest::CommitTransaction {
241                    transaction,
242                    responder,
243                } => {
244                    responder
245                        .send(
246                            self.commit_transaction(transaction)
247                                .await
248                                .map_err(|status| status.into_raw()),
249                        )
250                        .unwrap_or_else(
251                            |e| log::error!(e:?; "Failed to send CommitTransaction response"),
252                        );
253                }
254                fpartitions::PartitionsManagerRequest::AddPartition { payload, responder } => {
255                    responder
256                        .send(self.add_partition(payload).await.map_err(|status| status.into_raw()))
257                        .unwrap_or_else(
258                            |e| log::error!(e:?; "Failed to send AddPartition response"),
259                        );
260                }
261            }
262        }
263        Ok(())
264    }
265
266    async fn get_block_info(&self) -> Result<(u64, u32), zx::Status> {
267        let state = self.state.lock().await;
268        match &*state {
269            State::Stopped => return Err(zx::Status::BAD_STATE),
270            State::NeedsFormatting(block) => {
271                let info = block
272                    .get_info()
273                    .await
274                    .map_err(|err| {
275                        log::error!(err:?; "get_block_info: failed to query block info");
276                        zx::Status::IO
277                    })?
278                    .map_err(zx::Status::from_raw)?;
279                Ok((info.block_count, info.block_size))
280            }
281            State::Running(gpt) => Ok((gpt.block_count(), gpt.block_size())),
282        }
283    }
284
285    async fn create_transaction(&self) -> Result<zx::EventPair, zx::Status> {
286        let gpt_manager = self.gpt_manager().await?;
287        gpt_manager.create_transaction().await
288    }
289
290    async fn commit_transaction(&self, transaction: zx::EventPair) -> Result<(), zx::Status> {
291        let gpt_manager = self.gpt_manager().await?;
292        gpt_manager.commit_transaction(transaction).await
293    }
294
295    async fn add_partition(
296        &self,
297        request: fpartitions::PartitionsManagerAddPartitionRequest,
298    ) -> Result<(), zx::Status> {
299        let gpt_manager = self.gpt_manager().await?;
300        gpt_manager.add_partition(request).await
301    }
302
303    async fn handle_partitions_admin_requests(
304        self: Arc<Self>,
305        mut stream: fpartitions::PartitionsAdminRequestStream,
306    ) -> Result<(), Error> {
307        while let Some(request) = stream.try_next().await.context("Reading request")? {
308            log::debug!(request:?; "");
309            match request {
310                fpartitions::PartitionsAdminRequest::ResetPartitionTable {
311                    partitions,
312                    responder,
313                } => {
314                    responder
315                        .send(
316                            self.reset_partition_table(partitions)
317                                .await
318                                .map_err(|status| status.into_raw()),
319                        )
320                        .unwrap_or_else(|e| log::error!(e:?; "Failed to send Start response"));
321                }
322            }
323        }
324        Ok(())
325    }
326
327    async fn reset_partition_table(
328        &self,
329        partitions: Vec<fpartitions::PartitionInfo>,
330    ) -> Result<(), zx::Status> {
331        fn convert_partition_info(info: fpartitions::PartitionInfo) -> gpt::PartitionInfo {
332            gpt::PartitionInfo {
333                label: info.name,
334                type_guid: gpt::Guid::from_bytes(info.type_guid.value),
335                instance_guid: gpt::Guid::from_bytes(info.instance_guid.value),
336                start_block: info.start_block,
337                num_blocks: info.num_blocks,
338                flags: info.flags,
339            }
340        }
341        let partitions = partitions.into_iter().map(convert_partition_info).collect::<Vec<_>>();
342
343        let mut state = self.state.lock().await;
344        match &mut *state {
345            State::Stopped => return Err(zx::Status::BAD_STATE),
346            State::NeedsFormatting(block) => {
347                log::info!("reset_partition_table: Reformatting GPT.");
348                let client = Arc::new(RemoteBlockClient::new(&*block).await?);
349
350                log::info!("reset_partition_table: Reformatting GPT...");
351                gpt::Gpt::format(client, partitions).await.map_err(|err| {
352                    log::error!(err:?; "reset_partition_table: failed to init GPT");
353                    zx::Status::IO
354                })?;
355                *state = State::Running(
356                    GptManager::new(block.clone(), self.partitions_dir.clone()).await.map_err(
357                        |err| {
358                            log::error!(err:?; "reset_partition_table: failed to re-launch GPT");
359                            zx::Status::BAD_STATE
360                        },
361                    )?,
362                );
363            }
364            State::Running(gpt) => {
365                log::info!("reset_partition_table: Updating GPT.");
366                gpt.reset_partition_table(partitions).await?;
367            }
368        }
369        Ok(())
370    }
371
372    async fn handle_admin_requests(
373        &self,
374        mut stream: ffs::AdminRequestStream,
375    ) -> Result<(), Error> {
376        if let Some(request) = stream.try_next().await.context("Reading request")? {
377            match request {
378                ffs::AdminRequest::Shutdown { responder } => {
379                    log::info!("Received Admin::Shutdown request");
380                    self.shutdown().await;
381                    responder
382                        .send()
383                        .unwrap_or_else(|e| log::error!(e:?; "Failed to send shutdown response"));
384                    log::info!("Admin shutdown complete");
385                }
386            }
387        }
388        Ok(())
389    }
390
391    async fn handle_lifecycle_requests(&self, lifecycle_channel: zx::Channel) -> Result<(), Error> {
392        let mut stream = flifecycle::LifecycleRequestStream::from_channel(
393            fasync::Channel::from_channel(lifecycle_channel),
394        );
395        match stream.try_next().await.context("Reading request")? {
396            Some(flifecycle::LifecycleRequest::Stop { .. }) => {
397                log::info!("Received Lifecycle::Stop request");
398                self.shutdown().await;
399                log::info!("Lifecycle shutdown complete");
400            }
401            None => {}
402        }
403        Ok(())
404    }
405
406    async fn gpt_manager(&self) -> Result<Arc<GptManager>, zx::Status> {
407        match &*self.state.lock().await {
408            State::Stopped | State::NeedsFormatting(_) => Err(zx::Status::BAD_STATE),
409            State::Running(gpt) => Ok(gpt.clone()),
410        }
411    }
412
413    async fn shutdown(&self) {
414        let mut state = self.state.lock().await;
415        if let State::Running(gpt) = std::mem::take(&mut *state) {
416            gpt.shutdown().await;
417        }
418        self.scope.shutdown();
419    }
420}
421
422#[cfg(test)]
423mod tests {
424    use super::StorageHostService;
425    use block_client::RemoteBlockClient;
426    use fake_block_server::FakeServer;
427    use fidl::endpoints::Proxy as _;
428    use fidl_fuchsia_process_lifecycle::LifecycleMarker;
429    use fuchsia_component::client::connect_to_protocol_at_dir_svc;
430    use futures::FutureExt as _;
431    use gpt::{Gpt, Guid, PartitionInfo};
432    use std::sync::Arc;
433    use {
434        fidl_fuchsia_fs as ffs, fidl_fuchsia_fs_startup as fstartup,
435        fidl_fuchsia_hardware_block as fblock, fidl_fuchsia_hardware_block_volume as fvolume,
436        fidl_fuchsia_io as fio, fidl_fuchsia_storage_partitions as fpartitions,
437        fuchsia_async as fasync,
438    };
439
440    async fn setup_server(
441        block_size: u32,
442        block_count: u64,
443        partitions: Vec<PartitionInfo>,
444    ) -> Arc<FakeServer> {
445        let vmo = zx::Vmo::create(block_size as u64 * block_count).unwrap();
446        let server = Arc::new(FakeServer::from_vmo(512, vmo));
447        {
448            let (block_client, block_server) =
449                fidl::endpoints::create_proxy::<fblock::BlockMarker>();
450            let volume_stream = fidl::endpoints::ServerEnd::<fvolume::VolumeMarker>::from(
451                block_server.into_channel(),
452            )
453            .into_stream();
454            let server_clone = server.clone();
455            let _task = fasync::Task::spawn(async move { server_clone.serve(volume_stream).await });
456            let client = Arc::new(RemoteBlockClient::new(block_client).await.unwrap());
457            Gpt::format(client, partitions).await.unwrap();
458        }
459        server
460    }
461
462    #[fuchsia::test]
463    async fn lifecycle() {
464        let (outgoing_dir, outgoing_dir_server) =
465            fidl::endpoints::create_proxy::<fio::DirectoryMarker>();
466        let (lifecycle_client, lifecycle_server) =
467            fidl::endpoints::create_proxy::<LifecycleMarker>();
468        let (block_client, block_server) =
469            fidl::endpoints::create_endpoints::<fblock::BlockMarker>();
470        let volume_stream =
471            fidl::endpoints::ServerEnd::<fvolume::VolumeMarker>::from(block_server.into_channel())
472                .into_stream();
473
474        futures::join!(
475            async {
476                // Client
477                let client =
478                    connect_to_protocol_at_dir_svc::<fstartup::StartupMarker>(&outgoing_dir)
479                        .unwrap();
480                client
481                    .start(
482                        block_client,
483                        fstartup::StartOptions {
484                            read_only: false,
485                            verbose: false,
486                            fsck_after_every_transaction: false,
487                            write_compression_algorithm:
488                                fstartup::CompressionAlgorithm::ZstdChunked,
489                            write_compression_level: 0,
490                            cache_eviction_policy_override: fstartup::EvictionPolicyOverride::None,
491                            startup_profiling_seconds: 0,
492                        },
493                    )
494                    .await
495                    .expect("FIDL error")
496                    .expect("Start failed");
497                lifecycle_client.stop().expect("Stop failed");
498                fasync::OnSignals::new(
499                    &lifecycle_client.into_channel().expect("into_channel failed"),
500                    zx::Signals::CHANNEL_PEER_CLOSED,
501                )
502                .await
503                .expect("OnSignals failed");
504            },
505            async {
506                // Server
507                let service = StorageHostService::new();
508                service
509                    .run(outgoing_dir_server.into_channel(), Some(lifecycle_server.into_channel()))
510                    .await
511                    .expect("Run failed");
512            },
513            async {
514                // Block device
515                let server = setup_server(
516                    512,
517                    8,
518                    vec![PartitionInfo {
519                        label: "part".to_string(),
520                        type_guid: Guid::from_bytes([0xabu8; 16]),
521                        instance_guid: Guid::from_bytes([0xcdu8; 16]),
522                        start_block: 4,
523                        num_blocks: 1,
524                        flags: 0,
525                    }],
526                )
527                .await;
528                let _ = server.serve(volume_stream).await;
529            }
530            .fuse(),
531        );
532    }
533
534    #[fuchsia::test]
535    async fn admin_shutdown() {
536        let (outgoing_dir, outgoing_dir_server) =
537            fidl::endpoints::create_proxy::<fio::DirectoryMarker>();
538        let (block_client, block_server) =
539            fidl::endpoints::create_endpoints::<fblock::BlockMarker>();
540        let volume_stream =
541            fidl::endpoints::ServerEnd::<fvolume::VolumeMarker>::from(block_server.into_channel())
542                .into_stream();
543
544        futures::join!(
545            async {
546                // Client
547                let client =
548                    connect_to_protocol_at_dir_svc::<fstartup::StartupMarker>(&outgoing_dir)
549                        .unwrap();
550                let admin_client =
551                    connect_to_protocol_at_dir_svc::<ffs::AdminMarker>(&outgoing_dir).unwrap();
552                client
553                    .start(
554                        block_client,
555                        fstartup::StartOptions {
556                            read_only: false,
557                            verbose: false,
558                            fsck_after_every_transaction: false,
559                            write_compression_algorithm:
560                                fstartup::CompressionAlgorithm::ZstdChunked,
561                            write_compression_level: 0,
562                            cache_eviction_policy_override: fstartup::EvictionPolicyOverride::None,
563                            startup_profiling_seconds: 0,
564                        },
565                    )
566                    .await
567                    .expect("FIDL error")
568                    .expect("Start failed");
569                admin_client.shutdown().await.expect("admin shutdown failed");
570                fasync::OnSignals::new(
571                    &admin_client.into_channel().expect("into_channel failed"),
572                    zx::Signals::CHANNEL_PEER_CLOSED,
573                )
574                .await
575                .expect("OnSignals failed");
576            },
577            async {
578                // Server
579                let service = StorageHostService::new();
580                service.run(outgoing_dir_server.into_channel(), None).await.expect("Run failed");
581            },
582            async {
583                // Block device
584                let server = setup_server(
585                    512,
586                    8,
587                    vec![PartitionInfo {
588                        label: "part".to_string(),
589                        type_guid: Guid::from_bytes([0xabu8; 16]),
590                        instance_guid: Guid::from_bytes([0xcdu8; 16]),
591                        start_block: 4,
592                        num_blocks: 1,
593                        flags: 0,
594                    }],
595                )
596                .await;
597                let _ = server.serve(volume_stream).await;
598            }
599            .fuse(),
600        );
601    }
602
603    #[fuchsia::test]
604    async fn transaction_lifecycle() {
605        let (outgoing_dir, outgoing_dir_server) =
606            fidl::endpoints::create_proxy::<fio::DirectoryMarker>();
607        let (lifecycle_client, lifecycle_server) =
608            fidl::endpoints::create_proxy::<LifecycleMarker>();
609        let (block_client, block_server) =
610            fidl::endpoints::create_endpoints::<fblock::BlockMarker>();
611        let volume_stream =
612            fidl::endpoints::ServerEnd::<fvolume::VolumeMarker>::from(block_server.into_channel())
613                .into_stream();
614
615        futures::join!(
616            async {
617                // Client
618                connect_to_protocol_at_dir_svc::<fstartup::StartupMarker>(&outgoing_dir)
619                    .unwrap()
620                    .start(
621                        block_client,
622                        fstartup::StartOptions {
623                            read_only: false,
624                            verbose: false,
625                            fsck_after_every_transaction: false,
626                            write_compression_algorithm:
627                                fstartup::CompressionAlgorithm::ZstdChunked,
628                            write_compression_level: 0,
629                            cache_eviction_policy_override: fstartup::EvictionPolicyOverride::None,
630                            startup_profiling_seconds: 0,
631                        },
632                    )
633                    .await
634                    .expect("FIDL error")
635                    .expect("Start failed");
636
637                let pm_client = connect_to_protocol_at_dir_svc::<
638                    fpartitions::PartitionsManagerMarker,
639                >(&outgoing_dir)
640                .unwrap();
641                let transaction = pm_client
642                    .create_transaction()
643                    .await
644                    .expect("FIDL error")
645                    .expect("create_transaction failed");
646
647                pm_client
648                    .create_transaction()
649                    .await
650                    .expect("FIDL error")
651                    .expect_err("create_transaction should fail while other txn exists");
652
653                pm_client
654                    .commit_transaction(transaction)
655                    .await
656                    .expect("FIDL error")
657                    .expect("commit_transaction failed");
658
659                {
660                    let _transaction = pm_client
661                        .create_transaction()
662                        .await
663                        .expect("FIDL error")
664                        .expect("create_transaction should succeed after committing txn");
665                }
666
667                pm_client
668                    .create_transaction()
669                    .await
670                    .expect("FIDL error")
671                    .expect("create_transaction should succeed after dropping txn");
672
673                lifecycle_client.stop().expect("Stop failed");
674                fasync::OnSignals::new(
675                    &lifecycle_client.into_channel().expect("into_channel failed"),
676                    zx::Signals::CHANNEL_PEER_CLOSED,
677                )
678                .await
679                .expect("OnSignals failed");
680            },
681            async {
682                // Server
683                let service = StorageHostService::new();
684                service
685                    .run(outgoing_dir_server.into_channel(), Some(lifecycle_server.into_channel()))
686                    .await
687                    .expect("Run failed");
688            },
689            async {
690                // Block device
691                let server = setup_server(
692                    512,
693                    16,
694                    vec![PartitionInfo {
695                        label: "part".to_string(),
696                        type_guid: Guid::from_bytes([0xabu8; 16]),
697                        instance_guid: Guid::from_bytes([0xcdu8; 16]),
698                        start_block: 4,
699                        num_blocks: 1,
700                        flags: 0,
701                    }],
702                )
703                .await;
704                let _ = server.serve(volume_stream).await;
705            }
706            .fuse(),
707        );
708    }
709}