wayland_bridge/
client.rs

1// Copyright 2018 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use crate::display::{Display, DISPLAY_SINGLETON_OBJECT_ID};
6use crate::object::{MessageReceiver, ObjectLookupError, ObjectMap, ObjectRef, RequestReceiver};
7use crate::seat::InputDispatcher;
8use crate::xdg_shell::XdgSurface;
9use anyhow::{anyhow, Error};
10use futures::channel::mpsc;
11use futures::prelude::*;
12use futures::select;
13use std::any::Any;
14use std::cell::{Cell, RefCell};
15use std::rc::Rc;
16use wayland_server_protocol::WlDisplayEvent;
17use {fuchsia_async as fasync, fuchsia_trace as ftrace, fuchsia_wayland_core as wl};
18
19type Task = Box<dyn FnMut(&mut Client) -> Result<(), Error> + 'static>;
20
21#[derive(Clone)]
22enum ClientChannel {
23    Local(Rc<RefCell<mpsc::UnboundedReceiver<zx::MessageBuf>>>),
24    Remote(Rc<fasync::Channel>),
25}
26
27impl ClientChannel {
28    async fn recv_msg(&mut self, buffer: &mut zx::MessageBuf) -> Result<(), Error> {
29        match self {
30            ClientChannel::Local(receiver) => {
31                let buf = receiver
32                    .borrow_mut()
33                    .next()
34                    .await
35                    .ok_or_else(|| anyhow!("Error receiving message."))?;
36                *buffer = buf;
37                Ok(())
38            }
39            ClientChannel::Remote(chan) => {
40                chan.recv_msg(buffer).await.map_err(|e| anyhow!("Error receiving message: {:?}", e))
41            }
42        }
43    }
44}
45
46/// The state of a single client connection. Each client connection will have
47/// have its own zircon channel and its own set of protocol objects. The
48/// |Display| is the only piece of global state that is shared between
49/// clients.
50pub struct Client {
51    client_channel: ClientChannel,
52
53    /// The display for this client.
54    display: Display,
55
56    /// The set of objects for this client.
57    objects: ObjectMap,
58
59    /// An incoming task queue of closures to be invoked on the client. These
60    /// closures will be invoked with a mutable reference to the `Client`,
61    /// providing a way for background tasks to access client resources.
62    tasks: mpsc::UnboundedReceiver<Task>,
63
64    /// The sending endpoint for the task channel.
65    task_queue: TaskQueue,
66
67    /// The sending endpoint for protocol events.
68    event_queue: EventQueue,
69
70    /// If `true`, all requests and events will be logged.
71    protocol_logging: Rc<Cell<bool>>,
72
73    /// Decode and dispatch Scenic input events.
74    pub input_dispatcher: InputDispatcher,
75
76    /// XDG surfaces. Last surface created at the back.
77    pub xdg_surfaces: Vec<ObjectRef<XdgSurface>>,
78}
79
80impl Client {
81    /// Creates a new client.
82    pub fn new(chan: fasync::Channel, display: Display) -> Self {
83        let (sender, receiver) = mpsc::unbounded();
84        let log_flag = Rc::new(Cell::new(false));
85        let chan = Rc::new(chan);
86        let event_queue = EventQueue {
87            chan: EventQueueChannel::Remote(chan.clone()),
88            log_flag: log_flag.clone(),
89            next_serial: Rc::new(Cell::new(0)),
90        };
91        Client {
92            display,
93            client_channel: ClientChannel::Remote(chan),
94            objects: ObjectMap::new(),
95            tasks: receiver,
96            task_queue: TaskQueue(sender),
97            protocol_logging: log_flag,
98            input_dispatcher: InputDispatcher::new(event_queue.clone()),
99            event_queue,
100            xdg_surfaces: vec![],
101        }
102    }
103
104    pub fn new_local(
105        sender: mpsc::UnboundedSender<zx::MessageBuf>,
106        receiver: mpsc::UnboundedReceiver<zx::MessageBuf>,
107        display: Display,
108    ) -> Self {
109        let (task_sender, tasks) = mpsc::unbounded();
110        let log_flag = Rc::new(Cell::new(false));
111        let event_queue = EventQueue {
112            chan: EventQueueChannel::Local(Rc::new(RefCell::new(sender))),
113            log_flag: log_flag.clone(),
114            next_serial: Rc::new(Cell::new(0)),
115        };
116        Client {
117            display,
118            client_channel: ClientChannel::Local(Rc::new(RefCell::new(receiver))),
119            objects: ObjectMap::new(),
120            tasks,
121            task_queue: TaskQueue(task_sender),
122            protocol_logging: log_flag,
123            input_dispatcher: InputDispatcher::new(event_queue.clone()),
124            event_queue,
125            xdg_surfaces: vec![],
126        }
127    }
128
129    /// Enables or disables protocol message logging.
130    pub fn set_protocol_logging(&mut self, enabled: bool) {
131        self.protocol_logging.set(enabled);
132    }
133
134    /// Returns `true` if protocol messages should be logged.
135    pub(crate) fn protocol_logging(&self) -> bool {
136        self.protocol_logging.get()
137    }
138
139    /// Returns a object that can post messages to the `Client`.
140    pub fn task_queue(&self) -> TaskQueue {
141        self.task_queue.clone()
142    }
143
144    /// Returns an object that can post events back to the client.
145    pub fn event_queue(&self) -> &EventQueue {
146        &self.event_queue
147    }
148
149    /// Spawns an async task that waits for messages to be received on the
150    /// zircon channel, decodes the messages, and dispatches them to the
151    /// corresponding |MessageReceiver|s.
152    pub fn start(mut self) {
153        fasync::Task::local(async move {
154            let mut buffer = zx::MessageBuf::new();
155            loop {
156                select! {
157                    // Fusing: we exit when `recv_msg` fails, so we don't
158                    // need to worry about fast-looping when the channel is
159                    // closed.
160                    message = self.client_channel.recv_msg(&mut buffer).fuse() => {
161                        // We got a new message over the zircon channel.
162                        if let Err(e) = message {
163                            println!("Failed to receive message on the channel: {}", e);
164                            break;
165                        }
166                        // Dispatch the message.
167                        if let Err(e) = self.handle_message(buffer.into()) {
168                            println!("Failed to handle message on the channel: {}", e);
169                            break;
170                        }
171                        buffer = zx::MessageBuf::new();
172                    },
173                    // Fusing: we panic immediately if the task queue ever returns
174                    // `None`, so no need to track state of the channel between
175                    // loop iterations. NOTE: for this to remain true, no other code
176                    // can be given access to mutate `self.tasks`.
177                    task = self.tasks.next() => {
178                        // A new closure has been received.
179                        //
180                        // We unwrap since we retain a reference to the
181                        // sending endpoint of the channel, preventing it
182                        // from closing.
183                        if let Err(e) = self.handle_task(task.expect("Task stream has unexpectedly closed.")) {
184                            println!("Failed to run wayland task: {}", e);
185                            break;
186                        }
187                    },
188                }
189            }
190            // We need to shutdown the client. This includes tearing down
191            // all views associated with this client.
192            self.xdg_surfaces.iter().for_each(|surface| {
193                if let Some(t) = surface.try_get(&self) {
194                    t.shutdown(&self);
195                }
196            });
197        }).detach();
198    }
199
200    /// The `Display` for this client.
201    pub fn display(&self) -> &Display {
202        &self.display
203    }
204
205    /// Looks up an object in the map and returns a downcasted reference to
206    /// the implementation.
207    pub fn get_object<T: Any>(&self, id: wl::ObjectId) -> Result<&T, ObjectLookupError> {
208        let result = self.objects.get(id);
209        #[cfg(feature = "fatal_object_lookup_failures")]
210        if !result.is_ok() {
211            panic!("Invalid object: {:?}", id);
212        }
213        result
214    }
215
216    /// Looks up an object in the map and returns a downcasted reference to
217    /// the implementation, if it exists.
218    pub fn try_get_object<T: Any>(&self, id: wl::ObjectId) -> Option<&T> {
219        self.objects.get(id).ok()
220    }
221
222    /// Looks up an object in the map and returns a downcasted mutable
223    /// reference to the implementation.
224    pub fn get_object_mut<T: Any>(
225        &mut self,
226        id: wl::ObjectId,
227    ) -> Result<&mut T, ObjectLookupError> {
228        let result = self.objects.get_mut(id);
229        #[cfg(feature = "fatal_object_lookup_failures")]
230        if !result.is_ok() {
231            panic!("Invalid object: {:?}", id);
232        }
233        result
234    }
235
236    /// Looks up an object in the map and returns a downcasted mutable
237    /// reference to the implementation, if it exists.
238    pub fn try_get_object_mut<T: Any>(&mut self, id: wl::ObjectId) -> Option<&mut T> {
239        self.objects.get_mut(id).ok()
240    }
241
242    /// Adds a new object into the map that will handle messages with the sender
243    /// set to |id|. When a message is received with the corresponding |id|, the
244    /// message will be decoded and forwarded to the |RequestReceiver|.
245    ///
246    /// Returns Err if there is already an object for |id| in this |ObjectMap|.
247    pub fn add_object<I: wl::Interface + 'static, R: RequestReceiver<I> + 'static>(
248        &mut self,
249        id: u32,
250        receiver: R,
251    ) -> Result<ObjectRef<R>, Error> {
252        self.objects.add_object(id, receiver)
253    }
254
255    /// Adds an object to the map using the low-level primitives. It's favorable
256    /// to use instead |add_object| if the wayland interface for the object is
257    /// statically known.
258    pub fn add_object_raw(
259        &mut self,
260        id: wl::ObjectId,
261        receiver: Box<dyn MessageReceiver>,
262        request_spec: &'static wl::MessageGroupSpec,
263    ) -> Result<(), Error> {
264        self.objects.add_object_raw(id, receiver, request_spec)
265    }
266
267    /// Deletes the object `id` from the local object map and send a notification to the
268    /// client confirming that `id` can be reused.
269    pub fn delete_id(&mut self, id: wl::ObjectId) -> Result<(), Error> {
270        self.objects.delete(id)?;
271        self.event_queue().post(DISPLAY_SINGLETON_OBJECT_ID, WlDisplayEvent::DeleteId { id })
272    }
273
274    /// Reads the message header to find the target for this message and then
275    /// forwards the message to the associated |MessageReceiver|.
276    ///
277    /// Returns Err if no object is associated with the sender field in the
278    /// message header, or if the objects receiver itself fails.
279    pub(crate) fn handle_message(&mut self, mut message: wl::Message) -> Result<(), Error> {
280        ftrace::duration!(c"wayland", c"Client::handle_message");
281        while !message.is_empty() {
282            let header = message.read_header()?;
283            // Lookup the table entry for this object & fail if there is no entry
284            // found.
285            let (receiver, spec) = self.objects.lookup_internal(&header)?;
286
287            // Decode the argument stream and invoke the |MessageReceiver|.
288            let args = message.read_args(spec.0)?;
289            receiver(header.sender, header.opcode, args, self)?;
290        }
291        Ok(())
292    }
293
294    fn handle_task(&mut self, mut task: Task) -> Result<(), Error> {
295        ftrace::duration!(c"wayland", c"Client::handle_task");
296        task(self)
297    }
298
299    pub fn take_view_provider_request(&mut self) -> bool {
300        self.display.take_view_provider_requests()
301    }
302}
303
304#[derive(Clone)]
305enum EventQueueChannel {
306    Local(Rc<RefCell<mpsc::UnboundedSender<zx::MessageBuf>>>),
307    Remote(Rc<fasync::Channel>),
308}
309
310impl EventQueueChannel {
311    fn write(&self, message: wl::Message) -> Result<(), Error> {
312        ftrace::duration!(c"wayland", c"EventQueue::write_to_chan");
313        let (bytes, mut handles) = message.take();
314        match self {
315            EventQueueChannel::Local(sender) => {
316                let buf = zx::MessageBuf::new_with(bytes, handles);
317                sender.borrow_mut().unbounded_send(buf)?;
318                Ok(())
319            }
320            EventQueueChannel::Remote(chan) => chan
321                .write(&bytes, &mut handles)
322                .map_err(|e| anyhow!("Error writing to channel {:?}", e)),
323        }
324    }
325}
326
327/// An `EventQueue` enables protocol events to be sent back to the client.
328#[derive(Clone)]
329pub struct EventQueue {
330    chan: EventQueueChannel,
331    log_flag: Rc<Cell<bool>>,
332    next_serial: Rc<Cell<u32>>,
333}
334
335impl EventQueue {
336    /// Serializes `event` and writes it to the client channel.
337    ///
338    /// The 'sender' will be embedded in the message header indicating what
339    /// protocol object dispatched the event.
340    pub fn post<E: wl::IntoMessage + std::marker::Send>(
341        &self,
342        sender: wl::ObjectId,
343        event: E,
344    ) -> Result<(), Error>
345    where
346        <E as wl::IntoMessage>::Error: std::marker::Send + 'static,
347    {
348        ftrace::duration!(c"wayland", c"EventQueue::post");
349        if self.log_flag.get() {
350            println!("<-e-- {}", event.log(sender));
351        }
352        let message = Self::serialize(sender, event)?;
353        self.chan.write(message)
354    }
355
356    fn serialize<E: wl::IntoMessage>(sender: wl::ObjectId, event: E) -> Result<wl::Message, Error>
357    where
358        <E as wl::IntoMessage>::Error: std::marker::Send + 'static,
359    {
360        ftrace::duration!(c"wayland", c"EventQueue::serialize");
361        Ok(event.into_message(sender).unwrap())
362    }
363
364    /// Returns a monotonically increasing value. Many protocol events rely
365    /// on an event serial number, which can be obtained with this method.
366    pub fn next_serial(&self) -> u32 {
367        let serial = self.next_serial.get();
368        self.next_serial.set(serial + 1);
369        serial
370    }
371}
372
373/// A `TaskQueue` enables asynchronous operations to post tasks back to the
374/// `Client`.
375///
376/// Ex:
377///   let foo: ObjectRef<Foo> = get_foo_ref();
378///   let tasks = client.task_queue();
379///   task.post(|client| {
380///       let foo = foo.get(client);
381///       foo.handle_delayed_operation();
382///   });
383#[derive(Clone)]
384pub struct TaskQueue(mpsc::UnboundedSender<Task>);
385
386impl TaskQueue {
387    /// Posts the closure to be run as soon as possible.
388    pub fn post<F>(&self, f: F)
389    where
390        F: FnMut(&mut Client) -> Result<(), Error> + 'static,
391    {
392        // Failure here means the client is shutting down and we don't want to
393        // accept any more tasks.
394        let _result = self.0.unbounded_send(Box::new(f));
395    }
396}