1use crate::display::{Display, DISPLAY_SINGLETON_OBJECT_ID};
6use crate::object::{MessageReceiver, ObjectLookupError, ObjectMap, ObjectRef, RequestReceiver};
7use crate::seat::InputDispatcher;
8use crate::xdg_shell::XdgSurface;
9use anyhow::{anyhow, Error};
10use futures::channel::mpsc;
11use futures::prelude::*;
12use futures::select;
13use std::any::Any;
14use std::cell::{Cell, RefCell};
15use std::rc::Rc;
16use wayland_server_protocol::WlDisplayEvent;
17use {fuchsia_async as fasync, fuchsia_trace as ftrace, fuchsia_wayland_core as wl};
18
19type Task = Box<dyn FnMut(&mut Client) -> Result<(), Error> + 'static>;
20
21#[derive(Clone)]
22enum ClientChannel {
23 Local(Rc<RefCell<mpsc::UnboundedReceiver<zx::MessageBuf>>>),
24 Remote(Rc<fasync::Channel>),
25}
26
27impl ClientChannel {
28 async fn recv_msg(&mut self, buffer: &mut zx::MessageBuf) -> Result<(), Error> {
29 match self {
30 ClientChannel::Local(receiver) => {
31 let buf = receiver
32 .borrow_mut()
33 .next()
34 .await
35 .ok_or_else(|| anyhow!("Error receiving message."))?;
36 *buffer = buf;
37 Ok(())
38 }
39 ClientChannel::Remote(chan) => {
40 chan.recv_msg(buffer).await.map_err(|e| anyhow!("Error receiving message: {:?}", e))
41 }
42 }
43 }
44}
45
46pub struct Client {
51 client_channel: ClientChannel,
52
53 display: Display,
55
56 objects: ObjectMap,
58
59 tasks: mpsc::UnboundedReceiver<Task>,
63
64 task_queue: TaskQueue,
66
67 event_queue: EventQueue,
69
70 protocol_logging: Rc<Cell<bool>>,
72
73 pub input_dispatcher: InputDispatcher,
75
76 pub xdg_surfaces: Vec<ObjectRef<XdgSurface>>,
78}
79
80impl Client {
81 pub fn new(chan: fasync::Channel, display: Display) -> Self {
83 let (sender, receiver) = mpsc::unbounded();
84 let log_flag = Rc::new(Cell::new(false));
85 let chan = Rc::new(chan);
86 let event_queue = EventQueue {
87 chan: EventQueueChannel::Remote(chan.clone()),
88 log_flag: log_flag.clone(),
89 next_serial: Rc::new(Cell::new(0)),
90 };
91 Client {
92 display,
93 client_channel: ClientChannel::Remote(chan),
94 objects: ObjectMap::new(),
95 tasks: receiver,
96 task_queue: TaskQueue(sender),
97 protocol_logging: log_flag,
98 input_dispatcher: InputDispatcher::new(event_queue.clone()),
99 event_queue,
100 xdg_surfaces: vec![],
101 }
102 }
103
104 pub fn new_local(
105 sender: mpsc::UnboundedSender<zx::MessageBuf>,
106 receiver: mpsc::UnboundedReceiver<zx::MessageBuf>,
107 display: Display,
108 ) -> Self {
109 let (task_sender, tasks) = mpsc::unbounded();
110 let log_flag = Rc::new(Cell::new(false));
111 let event_queue = EventQueue {
112 chan: EventQueueChannel::Local(Rc::new(RefCell::new(sender))),
113 log_flag: log_flag.clone(),
114 next_serial: Rc::new(Cell::new(0)),
115 };
116 Client {
117 display,
118 client_channel: ClientChannel::Local(Rc::new(RefCell::new(receiver))),
119 objects: ObjectMap::new(),
120 tasks,
121 task_queue: TaskQueue(task_sender),
122 protocol_logging: log_flag,
123 input_dispatcher: InputDispatcher::new(event_queue.clone()),
124 event_queue,
125 xdg_surfaces: vec![],
126 }
127 }
128
129 pub fn set_protocol_logging(&mut self, enabled: bool) {
131 self.protocol_logging.set(enabled);
132 }
133
134 pub(crate) fn protocol_logging(&self) -> bool {
136 self.protocol_logging.get()
137 }
138
139 pub fn task_queue(&self) -> TaskQueue {
141 self.task_queue.clone()
142 }
143
144 pub fn event_queue(&self) -> &EventQueue {
146 &self.event_queue
147 }
148
149 pub fn start(mut self) {
153 fasync::Task::local(async move {
154 let mut buffer = zx::MessageBuf::new();
155 loop {
156 select! {
157 message = self.client_channel.recv_msg(&mut buffer).fuse() => {
161 if let Err(e) = message {
163 println!("Failed to receive message on the channel: {}", e);
164 break;
165 }
166 if let Err(e) = self.handle_message(buffer.into()) {
168 println!("Failed to handle message on the channel: {}", e);
169 break;
170 }
171 buffer = zx::MessageBuf::new();
172 },
173 task = self.tasks.next() => {
178 if let Err(e) = self.handle_task(task.expect("Task stream has unexpectedly closed.")) {
184 println!("Failed to run wayland task: {}", e);
185 break;
186 }
187 },
188 }
189 }
190 self.xdg_surfaces.iter().for_each(|surface| {
193 if let Some(t) = surface.try_get(&self) {
194 t.shutdown(&self);
195 }
196 });
197 }).detach();
198 }
199
200 pub fn display(&self) -> &Display {
202 &self.display
203 }
204
205 pub fn get_object<T: Any>(&self, id: wl::ObjectId) -> Result<&T, ObjectLookupError> {
208 let result = self.objects.get(id);
209 #[cfg(feature = "fatal_object_lookup_failures")]
210 if !result.is_ok() {
211 panic!("Invalid object: {:?}", id);
212 }
213 result
214 }
215
216 pub fn try_get_object<T: Any>(&self, id: wl::ObjectId) -> Option<&T> {
219 self.objects.get(id).ok()
220 }
221
222 pub fn get_object_mut<T: Any>(
225 &mut self,
226 id: wl::ObjectId,
227 ) -> Result<&mut T, ObjectLookupError> {
228 let result = self.objects.get_mut(id);
229 #[cfg(feature = "fatal_object_lookup_failures")]
230 if !result.is_ok() {
231 panic!("Invalid object: {:?}", id);
232 }
233 result
234 }
235
236 pub fn try_get_object_mut<T: Any>(&mut self, id: wl::ObjectId) -> Option<&mut T> {
239 self.objects.get_mut(id).ok()
240 }
241
242 pub fn add_object<I: wl::Interface + 'static, R: RequestReceiver<I> + 'static>(
248 &mut self,
249 id: u32,
250 receiver: R,
251 ) -> Result<ObjectRef<R>, Error> {
252 self.objects.add_object(id, receiver)
253 }
254
255 pub fn add_object_raw(
259 &mut self,
260 id: wl::ObjectId,
261 receiver: Box<dyn MessageReceiver>,
262 request_spec: &'static wl::MessageGroupSpec,
263 ) -> Result<(), Error> {
264 self.objects.add_object_raw(id, receiver, request_spec)
265 }
266
267 pub fn delete_id(&mut self, id: wl::ObjectId) -> Result<(), Error> {
270 self.objects.delete(id)?;
271 self.event_queue().post(DISPLAY_SINGLETON_OBJECT_ID, WlDisplayEvent::DeleteId { id })
272 }
273
274 pub(crate) fn handle_message(&mut self, mut message: wl::Message) -> Result<(), Error> {
280 ftrace::duration!(c"wayland", c"Client::handle_message");
281 while !message.is_empty() {
282 let header = message.read_header()?;
283 let (receiver, spec) = self.objects.lookup_internal(&header)?;
286
287 let args = message.read_args(spec.0)?;
289 receiver(header.sender, header.opcode, args, self)?;
290 }
291 Ok(())
292 }
293
294 fn handle_task(&mut self, mut task: Task) -> Result<(), Error> {
295 ftrace::duration!(c"wayland", c"Client::handle_task");
296 task(self)
297 }
298
299 pub fn take_view_provider_request(&mut self) -> bool {
300 self.display.take_view_provider_requests()
301 }
302}
303
304#[derive(Clone)]
305enum EventQueueChannel {
306 Local(Rc<RefCell<mpsc::UnboundedSender<zx::MessageBuf>>>),
307 Remote(Rc<fasync::Channel>),
308}
309
310impl EventQueueChannel {
311 fn write(&self, message: wl::Message) -> Result<(), Error> {
312 ftrace::duration!(c"wayland", c"EventQueue::write_to_chan");
313 let (bytes, mut handles) = message.take();
314 match self {
315 EventQueueChannel::Local(sender) => {
316 let buf = zx::MessageBuf::new_with(bytes, handles);
317 sender.borrow_mut().unbounded_send(buf)?;
318 Ok(())
319 }
320 EventQueueChannel::Remote(chan) => chan
321 .write(&bytes, &mut handles)
322 .map_err(|e| anyhow!("Error writing to channel {:?}", e)),
323 }
324 }
325}
326
327#[derive(Clone)]
329pub struct EventQueue {
330 chan: EventQueueChannel,
331 log_flag: Rc<Cell<bool>>,
332 next_serial: Rc<Cell<u32>>,
333}
334
335impl EventQueue {
336 pub fn post<E: wl::IntoMessage + std::marker::Send>(
341 &self,
342 sender: wl::ObjectId,
343 event: E,
344 ) -> Result<(), Error>
345 where
346 <E as wl::IntoMessage>::Error: std::marker::Send + 'static,
347 {
348 ftrace::duration!(c"wayland", c"EventQueue::post");
349 if self.log_flag.get() {
350 println!("<-e-- {}", event.log(sender));
351 }
352 let message = Self::serialize(sender, event)?;
353 self.chan.write(message)
354 }
355
356 fn serialize<E: wl::IntoMessage>(sender: wl::ObjectId, event: E) -> Result<wl::Message, Error>
357 where
358 <E as wl::IntoMessage>::Error: std::marker::Send + 'static,
359 {
360 ftrace::duration!(c"wayland", c"EventQueue::serialize");
361 Ok(event.into_message(sender).unwrap())
362 }
363
364 pub fn next_serial(&self) -> u32 {
367 let serial = self.next_serial.get();
368 self.next_serial.set(serial + 1);
369 serial
370 }
371}
372
373#[derive(Clone)]
384pub struct TaskQueue(mpsc::UnboundedSender<Task>);
385
386impl TaskQueue {
387 pub fn post<F>(&self, f: F)
389 where
390 F: FnMut(&mut Client) -> Result<(), Error> + 'static,
391 {
392 let _result = self.0.unbounded_send(Box::new(f));
395 }
396}