use chan; use chan::{Sender, Receiver}; use hyper::StatusCode; use hyper::net::{HttpStream, Transport}; use hyper::server::{Server as HyperServer, Request as HyperRequest}; use rustc_serialize::json; use std::net::SocketAddr; use std::thread; use std::sync::{Arc, Mutex}; use datatype::{Command, Event}; use gateway::{Gateway, Interpret}; use http::{Server, ServerHandler}; /// The `Http` gateway parses `Command`s from the body of incoming requests. pub struct Http { pub server: SocketAddr } impl Gateway for Http { fn initialize(&mut self, itx: Sender) -> Result<(), String> { let itx = Arc::new(Mutex::new(itx)); let server = try!(HyperServer::http(&self.server).map_err(|err| { format!("couldn't start http gateway: {}", err) })); thread::spawn(move || { let (_, server) = server.handle(move |_| HttpHandler::new(itx.clone())).unwrap(); server.run(); }); Ok(info!("HTTP gateway listening at http://{}", self.server)) } } struct HttpHandler { itx: Arc>>, response_rx: Option> } impl HttpHandler { fn new(itx: Arc>>) -> ServerHandler { ServerHandler::new(Box::new(HttpHandler { itx: itx, response_rx: None })) } } impl Server for HttpHandler { fn headers(&mut self, _: HyperRequest) {} fn request(&mut self, body: Vec) { String::from_utf8(body).map(|body| { json::decode::(&body).map(|cmd| { info!("Incoming HTTP request command: {}", cmd); let (etx, erx) = chan::async::(); self.response_rx = Some(erx); self.itx.lock().unwrap().send(Interpret { command: cmd, response_tx: Some(Arc::new(Mutex::new(etx))), }); }).unwrap_or_else(|err| error!("http request parse json: {}", err)) }).unwrap_or_else(|err| error!("http request parse string: {}", err)) } fn response(&mut self) -> (StatusCode, Option>) { self.response_rx.as_ref().map_or((StatusCode::BadRequest, None), |rx| { rx.recv().map_or_else(|| { error!("on_response receiver error"); (StatusCode::InternalServerError, None) }, |event| { json::encode(&event).map(|body| { (StatusCode::Ok, Some(body.into_bytes())) }).unwrap_or_else(|err| { error!("on_response encoding json: {:?}", err); (StatusCode::InternalServerError, None) }) }) }) } } #[cfg(test)] mod tests { use chan; use crossbeam; use rustc_serialize::json; use std::path::Path; use std::thread; use super::*; use gateway::{Gateway, Interpret}; use datatype::{Command, Event}; use http::{AuthClient, Client, Response, set_ca_certificates}; #[test] fn http_connections() { set_ca_certificates(&Path::new("run/sota_certificates")); let (etx, erx) = chan::sync::(0); let (itx, irx) = chan::sync::(0); thread::spawn(move || Http { server: "127.0.0.1:8888".parse().unwrap() }.start(itx, erx)); thread::spawn(move || { let _ = etx; // move into this scope loop { let interpret = irx.recv().expect("itx is closed"); match interpret.command { Command::StartDownload(id) => { let tx = interpret.response_tx.unwrap(); tx.lock().unwrap().send(Event::FoundSystemInfo(id)); } _ => panic!("expected AcceptUpdates"), } } }); crossbeam::scope(|scope| { for id in 0..10 { scope.spawn(move || { let cmd = Command::StartDownload(format!("{}", id)); let client = AuthClient::default(); let url = "http://127.0.0.1:8888".parse().unwrap(); let body = json::encode(&cmd).unwrap(); let resp_rx = client.post(url, Some(body.into_bytes())); let resp = resp_rx.recv().unwrap(); let text = match resp { Response::Success(data) => String::from_utf8(data.body).unwrap(), Response::Failed(data) => panic!("failed response: {}", data), Response::Error(err) => panic!("error response: {}", err) }; assert_eq!(json::decode::(&text).unwrap(), Event::FoundSystemInfo(format!("{}", id))); }); } }); } }