settings/agent/
authority.rs

1// Copyright 2019 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use crate::agent::{AgentCreator, AgentError, Context, Invocation, Lifespan, Payload};
6use crate::base::SettingType;
7use crate::message::base::{Audience, MessengerType};
8use crate::service;
9use crate::service_context::ServiceContext;
10use anyhow::{format_err, Context as _, Error};
11use std::collections::HashSet;
12use std::rc::Rc;
13
14/// Authority provides the ability to execute agents sequentially or simultaneously for a given
15/// stage.
16pub(crate) struct Authority {
17    // This is a list of pairs of debug ids and agent addresses.
18    agent_signatures: Vec<(&'static str, service::message::Signature)>,
19    // Factory passed to agents for communicating with the service.
20    delegate: service::message::Delegate,
21    // Messenger
22    messenger: service::message::Messenger,
23    // Available components
24    available_components: HashSet<SettingType>,
25}
26
27impl Authority {
28    pub(crate) async fn create(
29        delegate: service::message::Delegate,
30        available_components: HashSet<SettingType>,
31    ) -> Result<Authority, Error> {
32        let (client, _) = delegate
33            .create(MessengerType::Unbound)
34            .await
35            .map_err(|_| anyhow::format_err!("could not create agent messenger for authority"))?;
36
37        Ok(Authority {
38            agent_signatures: Vec::new(),
39            delegate,
40            messenger: client,
41            available_components,
42        })
43    }
44
45    pub(crate) async fn register(&mut self, creator: AgentCreator) {
46        let agent_receptor = self
47            .delegate
48            .create(MessengerType::Unbound)
49            .await
50            .expect("agent receptor should be created")
51            .1;
52        let signature = agent_receptor.get_signature();
53        let context =
54            Context::new(agent_receptor, self.delegate.clone(), self.available_components.clone())
55                .await;
56
57        creator.create(context).await;
58
59        self.agent_signatures.push((creator.debug_id, signature));
60    }
61
62    /// Invokes each registered agent for a given lifespan. If sequential is true,
63    /// invocations will only proceed to the next agent once the current
64    /// invocation has been successfully acknowledged. When sequential is false,
65    /// agents will receive their invocations without waiting. However, the
66    /// overall completion (signaled through the receiver returned by the method),
67    /// will not return until all invocations have been acknowledged.
68    pub(crate) async fn execute_lifespan(
69        &self,
70        lifespan: Lifespan,
71        service_context: Rc<ServiceContext>,
72        sequential: bool,
73    ) -> Result<(), Error> {
74        let mut pending_receptors = Vec::new();
75
76        for &(debug_id, signature) in &self.agent_signatures {
77            let mut receptor = self.messenger.message(
78                Payload::Invocation(Invocation {
79                    lifespan,
80                    service_context: Rc::clone(&service_context),
81                })
82                .into(),
83                Audience::Messenger(signature),
84            );
85
86            if sequential {
87                let result = process_payload(debug_id, receptor.next_of::<Payload>().await);
88                #[allow(clippy::question_mark)]
89                if result.is_err() {
90                    return result;
91                }
92            } else {
93                pending_receptors.push((debug_id, receptor));
94            }
95        }
96
97        // Pending acks should only be present for non sequential execution. In
98        // this case wait for each to complete.
99        for (debug_id, mut receptor) in pending_receptors {
100            let result = process_payload(debug_id, receptor.next_of::<Payload>().await);
101            #[allow(clippy::question_mark)]
102            if result.is_err() {
103                return result;
104            }
105        }
106
107        Ok(())
108    }
109}
110
111fn process_payload(
112    debug_id: &str,
113    payload: Result<(Payload, service::message::MessageClient), Error>,
114) -> Result<(), Error> {
115    match payload {
116        Ok((Payload::Complete(Ok(_) | Err(AgentError::UnhandledLifespan)), _)) => Ok(()),
117        Ok((Payload::Complete(result), _)) => {
118            result.with_context(|| format!("Invocation failed for {debug_id:?}"))
119        }
120        Ok(_) => Err(format_err!("Unexpected result for {:?}", debug_id)),
121        Err(e) => Err(e).with_context(|| format!("Invocation failed {debug_id:?}")),
122    }
123}
124
125#[cfg(test)]
126mod tests {
127    use super::*;
128    use crate::message::message_hub::MessageHub;
129    use assert_matches::assert_matches;
130    use fuchsia_async as fasync;
131
132    fn create(context: Context) -> futures::future::LocalBoxFuture<'static, ()> {
133        Box::pin(async move {
134            let _ = &context;
135            let mut receptor = context.receptor;
136            fasync::Task::local(async move {
137                while let Ok((Payload::Invocation(_), client)) = receptor.next_of::<Payload>().await
138                {
139                    let _ =
140                        client.reply(Payload::Complete(Err(AgentError::UnexpectedError)).into());
141                }
142            })
143            .detach();
144        })
145    }
146
147    #[fasync::run_until_stalled(test)]
148    async fn test_log() {
149        let delegate = MessageHub::create();
150        let mut authority = Authority::create(delegate, HashSet::new())
151            .await
152            .expect("Should be able to create authority");
153        authority.register(crate::create_agent!(test_agent, create)).await;
154        let result = authority
155            .execute_lifespan(
156                Lifespan::Initialization,
157                Rc::new(ServiceContext::new(None, None)),
158                false,
159            )
160            .await;
161        assert_matches!(result, Err(e) if format!("{e:?}").contains("test_agent"));
162    }
163}