settings/message/
receptor.rs1use crate::message::action_fuse::ActionFuseHandle;
6use crate::message::base::{MessageEvent, Signature, Status};
7use crate::message::message_client::MessageClient;
8use anyhow::{format_err, Error};
9use futures::channel::mpsc::UnboundedReceiver;
10use futures::task::{Context, Poll};
11use futures::{Stream, StreamExt};
12use std::pin::Pin;
13
14type EventReceiver = UnboundedReceiver<MessageEvent>;
15
16pub struct Receptor {
24 signature: Signature,
25 event_rx: EventReceiver,
26 _fuse: ActionFuseHandle,
28 _chained_fuse: Option<ActionFuseHandle>,
29}
30
31impl Stream for Receptor {
32 type Item = MessageEvent;
33
34 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
35 self.event_rx.poll_next_unpin(cx)
36 }
37}
38
39impl Receptor {
40 pub(super) fn new(
41 signature: Signature,
42 event_rx: EventReceiver,
43 fuse: ActionFuseHandle,
44 chained_fuse: Option<ActionFuseHandle>,
45 ) -> Self {
46 Self { signature, event_rx, _fuse: fuse, _chained_fuse: chained_fuse }
47 }
48
49 pub(crate) fn get_signature(&self) -> Signature {
52 self.signature
53 }
54
55 pub(crate) async fn next_payload(&mut self) -> Result<(crate::Payload, MessageClient), Error> {
58 while let Some(event) = self.next().await {
59 match event {
60 MessageEvent::Message(payload, client) => {
61 return Ok((payload, client));
62 }
63 MessageEvent::Status(Status::Undeliverable) => {
64 return Err(format_err!("origin message not delivered"));
65 }
66 _ => {}
67 }
68 }
69
70 Err(format_err!("could not retrieve payload"))
71 }
72
73 pub(crate) async fn next_of<T: TryFrom<crate::Payload>>(
74 &mut self,
75 ) -> Result<(T, MessageClient), Error>
76 where
77 <T as std::convert::TryFrom<crate::Payload>>::Error: std::fmt::Debug,
78 {
79 let (payload, client) = self.next_payload().await?;
80
81 let converted_payload = T::try_from(payload.clone())
82 .map(move |converted_payload| (converted_payload, client))
83 .map_err(|err| format_err!("conversion failed: {:?}", err));
84
85 if converted_payload.is_err() {
87 panic!("did not receive payload of expected type {payload:?}");
88 }
89
90 converted_payload
91 }
92
93 #[cfg(test)]
95 pub(crate) async fn next_of_type<T: TryFrom<crate::Payload>>(
96 &mut self,
97 ) -> Result<(T, MessageClient), Error>
98 where
99 <T as std::convert::TryFrom<crate::Payload>>::Error: std::fmt::Debug,
100 {
101 loop {
102 let (payload, client) = self.next_payload().await?;
103 let converted_payload = T::try_from(payload.clone())
104 .map(move |converted_payload| (converted_payload, client))
105 .map_err(|err| format_err!("conversion failed: {:?}", err));
106
107 if converted_payload.is_ok() {
108 return converted_payload;
109 }
110
111 }
113 }
114
115 #[cfg(test)]
116 pub(crate) async fn wait_for_acknowledge(&mut self) -> Result<(), Error> {
117 while let Some(event) = self.next().await {
118 match event {
119 MessageEvent::Status(Status::Acknowledged) => {
120 return Ok(());
121 }
122 MessageEvent::Status(Status::Undeliverable) => {
123 return Err(format_err!("origin message not delivered"));
124 }
125 _ => {}
126 }
127 }
128
129 Err(format_err!("did not encounter acknowledged status"))
130 }
131}
132
133pub(crate) fn extract_payload(event: Option<MessageEvent>) -> Option<crate::Payload> {
136 if let Some(MessageEvent::Message(payload, _)) = event {
137 Some(payload)
138 } else {
139 None
140 }
141}