settings/agent/
authority.rs1use crate::agent::{AgentCreator, AgentError, Context, Invocation, Lifespan, Payload};
6use crate::base::SettingType;
7use crate::message::base::{Audience, MessengerType};
8use crate::service;
9use crate::service_context::ServiceContext;
10use anyhow::{format_err, Context as _, Error};
11use std::collections::HashSet;
12use std::rc::Rc;
13
14pub(crate) struct Authority {
17 agent_signatures: Vec<(&'static str, service::message::Signature)>,
19 delegate: service::message::Delegate,
21 messenger: service::message::Messenger,
23 available_components: HashSet<SettingType>,
25}
26
27impl Authority {
28 pub(crate) async fn create(
29 delegate: service::message::Delegate,
30 available_components: HashSet<SettingType>,
31 ) -> Result<Authority, Error> {
32 let (client, _) = delegate
33 .create(MessengerType::Unbound)
34 .await
35 .map_err(|_| anyhow::format_err!("could not create agent messenger for authority"))?;
36
37 Ok(Authority {
38 agent_signatures: Vec::new(),
39 delegate,
40 messenger: client,
41 available_components,
42 })
43 }
44
45 pub(crate) async fn register(&mut self, creator: AgentCreator) {
46 let agent_receptor = self
47 .delegate
48 .create(MessengerType::Unbound)
49 .await
50 .expect("agent receptor should be created")
51 .1;
52 let signature = agent_receptor.get_signature();
53 let context =
54 Context::new(agent_receptor, self.delegate.clone(), self.available_components.clone())
55 .await;
56
57 creator.create(context).await;
58
59 self.agent_signatures.push((creator.debug_id, signature));
60 }
61
62 pub(crate) async fn execute_lifespan(
69 &self,
70 lifespan: Lifespan,
71 service_context: Rc<ServiceContext>,
72 sequential: bool,
73 ) -> Result<(), Error> {
74 let mut pending_receptors = Vec::new();
75
76 for &(debug_id, signature) in &self.agent_signatures {
77 let mut receptor = self.messenger.message(
78 Payload::Invocation(Invocation {
79 lifespan,
80 service_context: Rc::clone(&service_context),
81 })
82 .into(),
83 Audience::Messenger(signature),
84 );
85
86 if sequential {
87 let result = process_payload(debug_id, receptor.next_of::<Payload>().await);
88 #[allow(clippy::question_mark)]
89 if result.is_err() {
90 return result;
91 }
92 } else {
93 pending_receptors.push((debug_id, receptor));
94 }
95 }
96
97 for (debug_id, mut receptor) in pending_receptors {
100 let result = process_payload(debug_id, receptor.next_of::<Payload>().await);
101 #[allow(clippy::question_mark)]
102 if result.is_err() {
103 return result;
104 }
105 }
106
107 Ok(())
108 }
109}
110
111fn process_payload(
112 debug_id: &str,
113 payload: Result<(Payload, service::message::MessageClient), Error>,
114) -> Result<(), Error> {
115 match payload {
116 Ok((Payload::Complete(Ok(_) | Err(AgentError::UnhandledLifespan)), _)) => Ok(()),
117 Ok((Payload::Complete(result), _)) => {
118 result.with_context(|| format!("Invocation failed for {debug_id:?}"))
119 }
120 Ok(_) => Err(format_err!("Unexpected result for {:?}", debug_id)),
121 Err(e) => Err(e).with_context(|| format!("Invocation failed {debug_id:?}")),
122 }
123}
124
125#[cfg(test)]
126mod tests {
127 use super::*;
128 use crate::message::message_hub::MessageHub;
129 use assert_matches::assert_matches;
130 use fuchsia_async as fasync;
131
132 fn create(context: Context) -> futures::future::LocalBoxFuture<'static, ()> {
133 Box::pin(async move {
134 let _ = &context;
135 let mut receptor = context.receptor;
136 fasync::Task::local(async move {
137 while let Ok((Payload::Invocation(_), client)) = receptor.next_of::<Payload>().await
138 {
139 let _ =
140 client.reply(Payload::Complete(Err(AgentError::UnexpectedError)).into());
141 }
142 })
143 .detach();
144 })
145 }
146
147 #[fasync::run_until_stalled(test)]
148 async fn test_log() {
149 let delegate = MessageHub::create();
150 let mut authority = Authority::create(delegate, HashSet::new())
151 .await
152 .expect("Should be able to create authority");
153 authority.register(crate::create_agent!(test_agent, create)).await;
154 let result = authority
155 .execute_lifespan(
156 Lifespan::Initialization,
157 Rc::new(ServiceContext::new(None, None)),
158 false,
159 )
160 .await;
161 assert_matches!(result, Err(e) if format!("{e:?}").contains("test_agent"));
162 }
163}