crossbeam_channel/flavors/
at.rs1use std::sync::atomic::{AtomicBool, Ordering};
6use std::thread;
7use std::time::{Duration, Instant};
8
9use crate::context::Context;
10use crate::err::{RecvTimeoutError, TryRecvError};
11use crate::select::{Operation, SelectHandle, Token};
12use crate::utils;
13
14pub(crate) type AtToken = Option<Instant>;
16
17pub(crate) struct Channel {
19 delivery_time: Instant,
21
22 received: AtomicBool,
24}
25
26impl Channel {
27 #[inline]
29 pub(crate) fn new_deadline(when: Instant) -> Self {
30 Channel {
31 delivery_time: when,
32 received: AtomicBool::new(false),
33 }
34 }
35 #[inline]
37 pub(crate) fn new_timeout(dur: Duration) -> Self {
38 Self::new_deadline(Instant::now() + dur)
39 }
40
41 #[inline]
43 pub(crate) fn try_recv(&self) -> Result<Instant, TryRecvError> {
44 if self.received.load(Ordering::Relaxed) {
46 return Err(TryRecvError::Empty);
48 }
49
50 if Instant::now() < self.delivery_time {
51 return Err(TryRecvError::Empty);
53 }
54
55 if !self.received.swap(true, Ordering::SeqCst) {
57 Ok(self.delivery_time)
59 } else {
60 Err(TryRecvError::Empty)
62 }
63 }
64
65 #[inline]
67 pub(crate) fn recv(&self, deadline: Option<Instant>) -> Result<Instant, RecvTimeoutError> {
68 if self.received.load(Ordering::Relaxed) {
70 utils::sleep_until(deadline);
72 return Err(RecvTimeoutError::Timeout);
73 }
74
75 loop {
77 let now = Instant::now();
78
79 let deadline = match deadline {
80 _ if now >= self.delivery_time => break,
82 Some(d) if now >= d => return Err(RecvTimeoutError::Timeout),
84
85 Some(d) if d < self.delivery_time => d,
87 _ => self.delivery_time,
88 };
89
90 thread::sleep(deadline - now);
91 }
92
93 if !self.received.swap(true, Ordering::SeqCst) {
95 Ok(self.delivery_time)
97 } else {
98 utils::sleep_until(None);
100 unreachable!()
101 }
102 }
103
104 #[inline]
106 pub(crate) unsafe fn read(&self, token: &mut Token) -> Result<Instant, ()> {
107 token.at.ok_or(())
108 }
109
110 #[inline]
112 pub(crate) fn is_empty(&self) -> bool {
113 if self.received.load(Ordering::Relaxed) {
115 return true;
116 }
117
118 if Instant::now() < self.delivery_time {
120 return true;
121 }
122
123 self.received.load(Ordering::SeqCst)
126 }
127
128 #[inline]
130 pub(crate) fn is_full(&self) -> bool {
131 !self.is_empty()
132 }
133
134 #[inline]
136 pub(crate) fn len(&self) -> usize {
137 if self.is_empty() {
138 0
139 } else {
140 1
141 }
142 }
143
144 #[allow(clippy::unnecessary_wraps)] #[inline]
147 pub(crate) fn capacity(&self) -> Option<usize> {
148 Some(1)
149 }
150}
151
152impl SelectHandle for Channel {
153 #[inline]
154 fn try_select(&self, token: &mut Token) -> bool {
155 match self.try_recv() {
156 Ok(msg) => {
157 token.at = Some(msg);
158 true
159 }
160 Err(TryRecvError::Disconnected) => {
161 token.at = None;
162 true
163 }
164 Err(TryRecvError::Empty) => false,
165 }
166 }
167
168 #[inline]
169 fn deadline(&self) -> Option<Instant> {
170 if self.received.load(Ordering::Relaxed) {
172 None
173 } else {
174 Some(self.delivery_time)
175 }
176 }
177
178 #[inline]
179 fn register(&self, _oper: Operation, _cx: &Context) -> bool {
180 self.is_ready()
181 }
182
183 #[inline]
184 fn unregister(&self, _oper: Operation) {}
185
186 #[inline]
187 fn accept(&self, token: &mut Token, _cx: &Context) -> bool {
188 self.try_select(token)
189 }
190
191 #[inline]
192 fn is_ready(&self) -> bool {
193 !self.is_empty()
194 }
195
196 #[inline]
197 fn watch(&self, _oper: Operation, _cx: &Context) -> bool {
198 self.is_ready()
199 }
200
201 #[inline]
202 fn unwatch(&self, _oper: Operation) {}
203}