stress_test_actor/
actor.rs1use anyhow::{format_err, Result};
6use fidl::endpoints::create_request_stream;
7use fidl_fuchsia_stresstest::{
8 Action as FidlAction, ActionIteratorMarker, ActionIteratorRequest, ActorRequest,
9 ActorRequestStream, Error,
10};
11use fuchsia_component::server::ServiceFs;
12use futures::future::BoxFuture;
13use futures::{StreamExt, TryStreamExt};
14use rand::rngs::SmallRng;
15use rand::SeedableRng;
16use rust_measure_tape_for_action::Measurable;
17use zx::sys::ZX_CHANNEL_MAX_MSG_BYTES;
18
19enum OutgoingProtocols {
20 Actor(ActorRequestStream),
21}
22
23pub struct Action<D> {
26 pub name: &'static str,
28
29 pub run: for<'a> fn(&'a mut D, SmallRng) -> BoxFuture<'a, Result<()>>,
33}
34
35impl<D> Action<D> {
36 fn to_fidl(&self) -> FidlAction {
38 FidlAction { name: Some(self.name.to_string()), ..Default::default() }
39 }
40}
41
42pub async fn actor_loop<D>(mut data: D, actions: Vec<Action<D>>) -> Result<()> {
53 let mut service_fs = ServiceFs::new();
54 service_fs.dir("svc").add_fidl_service(OutgoingProtocols::Actor);
55 service_fs.take_and_serve_directory_handle()?;
56
57 let OutgoingProtocols::Actor(mut stream) = service_fs
59 .next()
60 .await
61 .ok_or_else(|| format_err!("Could not get next connection to Actor protocol"))?;
62
63 while let Some(request) = stream
64 .try_next()
65 .await
66 .map_err(|e| format_err!("FIDL error in call to Actor protocol: {}", e))?
67 {
68 match request {
69 ActorRequest::GetActions { responder } => {
70 let (client_end, mut stream) = create_request_stream::<ActionIteratorMarker>();
71 responder.send(client_end)?;
72
73 let actions: Vec<_> = actions.iter().map(|c| c.to_fidl()).collect();
74 let mut remaining_actions = &actions[..];
75
76 while let Some(ActionIteratorRequest::GetNext { responder }) =
77 stream.try_next().await?
78 {
79 let mut bytes_used: usize = 32;
80 let mut action_count = 0;
81
82 for action in remaining_actions {
85 bytes_used += action.measure().num_bytes;
86 if bytes_used > ZX_CHANNEL_MAX_MSG_BYTES as usize {
87 break;
88 }
89 action_count += 1;
90 }
91 responder.send(&remaining_actions[..action_count])?;
92 remaining_actions = &remaining_actions[action_count..];
93
94 if action_count == 0 {
98 break;
99 }
100 }
101 }
102 ActorRequest::Run { action_name, seed, responder } => {
103 if let Some(action) = actions.iter().find(|action| action.name == action_name) {
104 let rng = SmallRng::seed_from_u64(seed);
105 if let Err(e) = (action.run)(&mut data, rng).await {
106 let chain: Vec<String> = e.chain().map(|c| c.to_string()).collect();
109 let chain = chain.join(" -> ");
110 responder.send(Some(&Error::ErrorString(chain)))?;
111 } else {
112 responder.send(None)?;
114 }
115 } else {
116 responder.send(Some(&Error::ErrorString("Invalid action name".to_string())))?;
117 }
118 }
119 }
120 }
121
122 Ok(())
123}