diff options
Diffstat (limited to 'src/rvi/edge.rs')
-rw-r--r-- | src/rvi/edge.rs | 127 |
1 files changed, 127 insertions, 0 deletions
diff --git a/src/rvi/edge.rs b/src/rvi/edge.rs new file mode 100644 index 0000000..cadea74 --- /dev/null +++ b/src/rvi/edge.rs @@ -0,0 +1,127 @@ +use hyper::StatusCode; +use hyper::net::{HttpStream, Transport}; +use hyper::server::{Server as HyperServer, Request as HyperRequest}; +use rustc_serialize::json; +use rustc_serialize::json::Json; +use std::{mem, str}; +use std::net::ToSocketAddrs; + +use datatype::{RpcRequest, RpcOk, RpcErr, Url}; +use http::{Server, ServerHandler}; +use super::services::Services; + + +/// The HTTP server endpoint for `RVI` client communication. +pub struct Edge { + rvi_edge: Url, + services: Services, +} + +impl Edge { + /// Create a new `Edge` by registering each `RVI` service. + pub fn new(mut services: Services, rvi_edge: String, rvi_client: Url) -> Self { + services.register_services(|service| { + let req = RpcRequest::new("register_service", RegisterServiceRequest { + network_address: rvi_edge.clone(), + service: service.to_string(), + }); + let resp = req.send(rvi_client.clone()) + .unwrap_or_else(|err| panic!("RegisterServiceRequest failed: {}", err)); + let rpc_ok = json::decode::<RpcOk<RegisterServiceResponse>>(&resp) + .unwrap_or_else(|err| panic!("couldn't decode RegisterServiceResponse: {}", err)); + rpc_ok.result.expect("expected rpc_ok result").service + }); + + Edge { rvi_edge: rvi_edge.parse().expect("couldn't parse edge server as url"), services: services } + } + + /// Start the HTTP server listening for incoming RVI client connections. + pub fn start(&mut self) { + let mut addrs = self.rvi_edge.to_socket_addrs() + .unwrap_or_else(|err| panic!("couldn't parse edge url: {}", err)); + let server = HyperServer::http(&addrs.next().expect("no SocketAddr found")) + .unwrap_or_else(|err| panic!("couldn't start rvi edge server: {}", err)); + let (addr, server) = server.handle(move |_| EdgeHandler::new(self.services.clone())).unwrap(); + info!("RVI server edge listening at http://{}.", addr); + server.run(); + } +} + + +#[derive(RustcEncodable)] +struct RegisterServiceRequest { + pub network_address: String, + pub service: String, +} + +#[derive(RustcDecodable)] +struct RegisterServiceResponse { + pub service: String, + pub status: i32, +} + + + +struct EdgeHandler { + services: Services, + resp_code: StatusCode, + resp_body: Option<Vec<u8>> +} + +impl EdgeHandler { + fn new(services: Services) -> ServerHandler<HttpStream> { + ServerHandler::new(Box::new(EdgeHandler { + services: services, + resp_code: StatusCode::InternalServerError, + resp_body: None, + })) + } +} + +impl<T: Transport> Server<T> for EdgeHandler { + fn headers(&mut self, _: HyperRequest<T>) {} + + fn request(&mut self, body: Vec<u8>) { + let outcome = || -> Result<RpcOk<i32>, RpcErr> { + let text = try!(str::from_utf8(&body).map_err(|err| RpcErr::parse_error(err.to_string()))); + let data = try!(Json::from_str(text).map_err(|err| RpcErr::parse_error(err.to_string()))); + let object = try!(data.as_object().ok_or(RpcErr::parse_error("not an object".to_string()))); + let id = try!(object.get("id").and_then(|x| x.as_u64()) + .ok_or(RpcErr::parse_error("expected id".to_string()))); + let method = try!(object.get("method").and_then(|x| x.as_string()) + .ok_or(RpcErr::invalid_request(id, "expected method".to_string()))); + + match method { + "services_available" => Ok(RpcOk::new(id, None)), + + "message" => { + let params = try!(object.get("params").and_then(|p| p.as_object()) + .ok_or(RpcErr::invalid_request(id, "expected params".to_string()))); + let service = try!(params.get("service_name").and_then(|s| s.as_string()) + .ok_or(RpcErr::invalid_request(id, "expected params.service_name".to_string()))); + self.services.handle_service(service, id, text) + } + + _ => Err(RpcErr::method_not_found(id, format!("unknown method: {}", method))) + } + }(); + + match outcome { + Ok(msg) => { + let body = json::encode::<RpcOk<i32>>(&msg).expect("couldn't encode RpcOk response"); + self.resp_code = StatusCode::Ok; + self.resp_body = Some(body.into_bytes()); + } + + Err(err) => { + let body = json::encode::<RpcErr>(&err).expect("couldn't encode RpcErr response"); + self.resp_code = StatusCode::BadRequest; + self.resp_body = Some(body.into_bytes()); + } + } + } + + fn response(&mut self) -> (StatusCode, Option<Vec<u8>>) { + (self.resp_code, mem::replace(&mut self.resp_body, None)) + } +} |