lease_management/
sequence_client.rs1use futures::lock::Mutex;
5use {fidl_fuchsia_example_power as fexample, fuchsia_trace as ftrace};
6
7pub struct SequenceClient {
13 baton_source: fexample::MessageSourceProxy,
14 baton: Mutex<Option<LeaseBaton>>,
15 msg_index: Mutex<u64>,
16}
17
18pub enum BatonError {
19 InvalidBatonMessage,
20 Internal,
21}
22
23pub struct LeaseBaton {
24 pub msg_index: u64,
25 pub lease: zx::EventPair,
26}
27
28impl TryFrom<fexample::LeaseBaton> for LeaseBaton {
29 type Error = BatonError;
30
31 fn try_from(value: fexample::LeaseBaton) -> Result<Self, Self::Error> {
32 match (value.lease, value.msg_index) {
33 (Some(lease), Some(msg_index)) => Ok(Self { msg_index, lease }),
34 _ => Err(BatonError::InvalidBatonMessage),
35 }
36 }
37}
38
39impl SequenceClient {
40 pub fn new(message_source: fexample::MessageSourceProxy) -> Self {
43 Self { baton_source: message_source, baton: Mutex::new(None), msg_index: Mutex::new(0) }
44 }
45
46 pub async fn run(&self) -> Result<(), BatonError> {
50 loop {
51 let baton_result = self.baton_source.receive_baton().await;
52
53 if let Err(e) = baton_result {
54 if e.is_closed() {
55 break;
56 } else {
57 return Err(BatonError::Internal);
58 }
59 }
60
61 ftrace::instant!(crate::TRACE_CATEGORY, c"receive-baton", ftrace::Scope::Process);
62 let baton = baton_result.unwrap();
63 {
64 let new_baton: LeaseBaton = baton.try_into()?;
65
66 let current_index = self.msg_index.lock().await;
67 if *current_index < new_baton.msg_index {
68 let mut current_baton = self.baton.lock().await;
69 *current_baton = Some(new_baton);
70 }
71 }
72 }
73 Ok(())
74 }
75
76 pub async fn process_message(&self) -> Option<LeaseBaton> {
81 self.process_messages(1).await
82 }
83
84 pub async fn process_messages(&self, message_count: u64) -> Option<LeaseBaton> {
89 ftrace::duration!(crate::TRACE_CATEGORY, c"process-message", "count" => message_count);
90 let mut current_index = self.msg_index.lock().await;
91 *current_index += message_count;
92 let mut baton_ref = self.baton.lock().await;
93
94 if let Some(baton) = baton_ref.take() {
96 let baton_index = baton.msg_index;
97
98 if baton_index <= *current_index {
101 ftrace::instant!(crate::TRACE_CATEGORY, c"dropping-baton", ftrace::Scope::Process);
102 return Some(baton);
103 }
104 *baton_ref = Some(baton);
105 }
106 None
107 }
108
109 pub async fn get_receieved_count(&self) -> u64 {
110 *self.msg_index.lock().await
111 }
112}