routing/bedrock/
use_dictionary_router.rs1use crate::capability_source::CapabilitySource;
6use crate::error::RoutingError;
7use async_trait::async_trait;
8use futures::StreamExt;
9use futures::channel::oneshot;
10use futures::stream::FuturesUnordered;
11use router_error::RouterError;
12use sandbox::{
13 Capability, Dict, EntryUpdate, Request, Routable, Router, RouterResponse,
14 UpdateNotifierRetention,
15};
16
17pub struct UseDictionaryRouter {
21 path: cm_types::Path,
22 moniker: moniker::Moniker,
23 original_dictionary: Dict,
24 dictionary_routers: Vec<Router<Dict>>,
25 capability_source: CapabilitySource,
26}
27
28impl UseDictionaryRouter {
29 pub fn new(
30 path: cm_types::Path,
31 moniker: moniker::Moniker,
32 original_dictionary: Dict,
33 dictionary_routers: Vec<Router<Dict>>,
34 capability_source: CapabilitySource,
35 ) -> Router<Dict> {
36 Router::new(Self {
37 path,
38 moniker,
39 original_dictionary,
40 dictionary_routers,
41 capability_source,
42 })
43 }
44
45 async fn dictionary_follow_updates_from(
52 &self,
53 self_dictionary: Dict,
54 other_dict: Dict,
55 ) -> Vec<(cm_types::Name, Capability, Capability)> {
56 let self_clone = self_dictionary;
57 let (sender, receiver) = oneshot::channel();
58 let mut sender = Some(sender);
59 let mut initial_conflicts = vec![];
60 other_dict.register_update_notifier(Box::new(move |entry_update| {
61 match entry_update {
62 EntryUpdate::Add(key, capability) => {
63 if let Some(preexisting_value) = self_clone.get(key).ok().flatten() {
64 initial_conflicts.push((
67 key.into(),
68 capability.try_clone().unwrap(),
69 preexisting_value,
70 ));
71 } else {
72 let _ = self_clone.insert(key.into(), capability.try_clone().unwrap());
73 }
74 }
75 EntryUpdate::Remove(key) => {
76 let _ = self_clone.remove(key);
77 }
78 EntryUpdate::Idle => {
79 if let Some(sender) = sender.take() {
80 let _ = sender.send(std::mem::take(&mut initial_conflicts));
81 }
82 }
83 }
84 UpdateNotifierRetention::Retain
85 }));
86
87 receiver.await.expect("sender was dropped unexpectedly")
88 }
89}
90
91#[async_trait]
92impl Routable<Dict> for UseDictionaryRouter {
93 async fn route(
94 &self,
95 request: Option<Request>,
96 debug: bool,
97 ) -> Result<RouterResponse<Dict>, RouterError> {
98 if debug {
99 return Ok(RouterResponse::Debug(
100 self.capability_source
101 .clone()
102 .try_into()
103 .expect("failed to serialize capability source"),
104 ));
105 }
106 let mut futures_unordered = FuturesUnordered::new();
107 for dictionary_router in self.dictionary_routers.iter() {
108 let request = request.as_ref().and_then(|r| r.try_clone().ok());
109 futures_unordered.push(dictionary_router.route(request, false));
110 }
111 let resulting_dictionary = self.original_dictionary.shallow_copy().unwrap();
112 while let Some(route_result) = futures_unordered.next().await {
113 match route_result {
114 Ok(RouterResponse::Capability(other_dictionary)) => {
115 let initial_conflicts = self
116 .dictionary_follow_updates_from(
117 resulting_dictionary.clone(),
118 other_dictionary,
119 )
120 .await;
121 let mut conflicting_names = vec![];
122 for (key, capability, preexisting_value) in initial_conflicts {
123 log::warn!(
124 "{}: unable to add {key} from source {} to merged dictionary for path \
125 {} because the dictionary already contains an item with the same name \
126 from source {}",
127 &self.moniker,
128 try_get_router_source(&capability)
129 .await
130 .unwrap_or_else(|| "<unknown>".to_string()),
131 &self.path,
132 try_get_router_source(&preexisting_value)
133 .await
134 .unwrap_or_else(|| "<unknown>".to_string()),
135 );
136 conflicting_names.push(key);
137 }
138 if !conflicting_names.is_empty() {
139 return Err(RoutingError::ConflictingDictionaryEntries {
140 moniker: self.capability_source.source_moniker(),
141 conflicting_names,
142 }
143 .into());
144 }
145 }
146 Ok(RouterResponse::Unavailable) => (),
147 Ok(RouterResponse::Debug(_)) => {
148 panic!("got debug response when we didn't request one")
149 }
150 Err(_e) => {
151 }
156 }
157 }
158 Ok(resulting_dictionary.into())
159 }
160}
161
162async fn try_get_router_source(capability: &Capability) -> Option<String> {
163 let source: crate::capability_source::CapabilitySource = match capability {
164 Capability::DictionaryRouter(router) => match router.route(None, true).await {
165 Ok(RouterResponse::Debug(data)) => data.try_into().ok()?,
166 _ => return None,
167 },
168 Capability::ConnectorRouter(router) => match router.route(None, true).await {
169 Ok(RouterResponse::Debug(data)) => data.try_into().ok()?,
170 _ => return None,
171 },
172 Capability::DirConnectorRouter(router) => match router.route(None, true).await {
173 Ok(RouterResponse::Debug(data)) => data.try_into().ok()?,
174 _ => return None,
175 },
176 Capability::DataRouter(router) => match router.route(None, true).await {
177 Ok(RouterResponse::Debug(data)) => data.try_into().ok()?,
178 _ => return None,
179 },
180 _ => return None,
181 };
182 Some(format!("{}", source))
183}