summaryrefslogtreecommitdiff
path: root/src/main.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/main.rs')
-rw-r--r--src/main.rs215
1 files changed, 123 insertions, 92 deletions
diff --git a/src/main.rs b/src/main.rs
index 61fa02a..9821b2d 100644
--- a/src/main.rs
+++ b/src/main.rs
@@ -9,71 +9,37 @@ extern crate rustc_serialize;
#[macro_use] extern crate sota;
extern crate time;
-use chan::{Sender, Receiver};
+use chan::{Sender, Receiver, WaitGroup};
use chan_signal::Signal;
use env_logger::LogBuilder;
use getopts::Options;
use log::{LogLevelFilter, LogRecord};
-use std::env;
+use std::{env, process, thread};
use std::collections::HashMap;
use std::path::Path;
use std::sync::{Arc, Mutex};
use std::time::Duration;
-use sota::datatype::{Command, Config, Event, SystemInfo};
+use sota::datatype::{Command, Config, Event};
use sota::gateway::{Console, DBus, Gateway, Interpret, Http, Socket, Websocket};
use sota::broadcast::Broadcast;
use sota::http::{AuthClient, set_ca_certificates};
use sota::interpreter::{EventInterpreter, CommandInterpreter, Interpreter, GlobalInterpreter};
-use sota::package_manager::PackageManager;
use sota::rvi::{Edge, Services};
macro_rules! exit {
- ($fmt:expr, $($arg:tt)*) => {{
+ ($code:expr, $fmt:expr, $($arg:tt)*) => {{
print!(concat!($fmt, "\n"), $($arg)*);
- std::process::exit(1);
+ process::exit($code);
}}
}
-fn start_signal_handler(signals: Receiver<Signal>) {
- loop {
- match signals.recv() {
- Some(Signal::INT) | Some(Signal::TERM) => std::process::exit(0),
- _ => ()
- }
- }
-}
-
-fn start_update_poller(interval: u64, itx: Sender<Interpret>) {
- let (etx, erx) = chan::async::<Event>();
- let tick = chan::tick(Duration::from_secs(interval));
- loop {
- let _ = tick.recv();
- itx.send(Interpret {
- command: Command::GetNewUpdates,
- response_tx: Some(Arc::new(Mutex::new(etx.clone())))
- });
- let _ = erx.recv();
- }
-}
-
-fn send_startup_commands(config: &Config, ctx: &Sender<Command>) {
- ctx.send(Command::Authenticate(None));
- ctx.send(Command::RefreshSystemInfo(true));
-
- if config.device.package_manager != PackageManager::Off {
- config.device.package_manager.installed_packages().map(|packages| {
- ctx.send(Command::SendInstalledPackages(packages));
- }).unwrap_or_else(|err| exit!("Couldn't get list of packages: {}", err));
- }
-}
-
fn main() {
- setup_logging();
+ let version = start_logging();
+ let config = build_config(&version);
- let config = build_config();
set_ca_certificates(Path::new(&config.device.certificates_path));
let (etx, erx) = chan::async::<Event>();
@@ -81,16 +47,25 @@ fn main() {
let (itx, irx) = chan::async::<Interpret>();
let mut broadcast = Broadcast::new(erx);
- send_startup_commands(&config, &ctx);
+ let wg = WaitGroup::new();
+
+ ctx.send(Command::Authenticate(None));
crossbeam::scope(|scope| {
- // Must subscribe to the signal before spawning ANY other threads
+ // subscribe to signals first
let signals = chan_signal::notify(&[Signal::INT, Signal::TERM]);
scope.spawn(move || start_signal_handler(signals));
- let poll_tick = config.device.polling_interval;
- let poll_itx = itx.clone();
- scope.spawn(move || start_update_poller(poll_tick, poll_itx));
+ if config.core.polling {
+ let poll_tick = config.core.polling_sec;
+ let poll_itx = itx.clone();
+ let poll_wg = wg.clone();
+ scope.spawn(move || start_update_poller(poll_tick, poll_itx, poll_wg));
+ }
+
+ //
+ // start gateways
+ //
if config.gateway.console {
let cons_itx = itx.clone();
@@ -99,7 +74,7 @@ fn main() {
}
if config.gateway.dbus {
- let dbus_cfg = config.dbus.as_ref().unwrap_or_else(|| exit!("{}", "dbus config required for dbus gateway"));
+ let dbus_cfg = config.dbus.as_ref().unwrap_or_else(|| exit!(1, "{}", "dbus config required for dbus gateway"));
let dbus_itx = itx.clone();
let dbus_sub = broadcast.subscribe();
let mut dbus = DBus { dbus_cfg: dbus_cfg.clone(), itx: itx.clone() };
@@ -109,20 +84,21 @@ fn main() {
if config.gateway.http {
let http_itx = itx.clone();
let http_sub = broadcast.subscribe();
- let mut http = Http { server: config.network.http_server.clone() };
+ let mut http = Http { server: *config.network.http_server };
scope.spawn(move || http.start(http_itx, http_sub));
}
- let mut rvi = None;
- if config.gateway.rvi {
- let _ = config.dbus.as_ref().unwrap_or_else(|| exit!("{}", "dbus config required for rvi gateway"));
- let rvi_cfg = config.rvi.as_ref().unwrap_or_else(|| exit!("{}", "rvi config required for rvi gateway"));
+ let rvi_services = if config.gateway.rvi {
+ let _ = config.dbus.as_ref().unwrap_or_else(|| exit!(1, "{}", "dbus config required for rvi gateway"));
+ let rvi_cfg = config.rvi.as_ref().unwrap_or_else(|| exit!(1, "{}", "rvi config required for rvi gateway"));
let rvi_edge = config.network.rvi_edge_server.clone();
let services = Services::new(rvi_cfg.clone(), config.device.uuid.clone(), etx.clone());
let mut edge = Edge::new(services.clone(), rvi_edge, rvi_cfg.client.clone());
scope.spawn(move || edge.start());
- rvi = Some(services);
- }
+ Some(services)
+ } else {
+ None
+ };
if config.gateway.socket {
let socket_itx = itx.clone();
@@ -142,48 +118,83 @@ fn main() {
scope.spawn(move || ws.start(ws_itx, ws_sub));
}
+ //
+ // start interpreters
+ //
+
let event_sub = broadcast.subscribe();
let event_ctx = ctx.clone();
let event_mgr = config.device.package_manager.clone();
+ let event_sys = config.device.system_info.clone();
+ let event_wg = wg.clone();
scope.spawn(move || EventInterpreter {
- package_manager: event_mgr
- }.run(event_sub, event_ctx));
+ pacman: event_mgr,
+ sysinfo: event_sys,
+ }.run(event_sub, event_ctx, event_wg));
let cmd_itx = itx.clone();
- scope.spawn(move || CommandInterpreter.run(crx, cmd_itx));
+ let cmd_wg = wg.clone();
+ scope.spawn(move || CommandInterpreter.run(crx, cmd_itx, cmd_wg));
scope.spawn(move || GlobalInterpreter {
config: config,
token: None,
http_client: Box::new(AuthClient::default()),
- rvi: rvi,
- loopback_tx: itx,
- }.run(irx, etx));
+ rvi: rvi_services,
+ }.run(irx, etx, wg));
scope.spawn(move || broadcast.start());
});
}
-fn setup_logging() {
- let version = option_env!("SOTA_VERSION").unwrap_or("?");
+fn start_logging() -> String {
+ let version = option_env!("SOTA_VERSION").unwrap_or("unknown");
+
let mut builder = LogBuilder::new();
builder.format(move |record: &LogRecord| {
let timestamp = format!("{}", time::now_utc().rfc3339());
format!("{} ({}): {} - {}", timestamp, version, record.level(), record.args())
});
builder.filter(Some("hyper"), LogLevelFilter::Info);
+ builder.parse(&env::var("RUST_LOG").unwrap_or("INFO".to_string()));
+ builder.init().expect("builder already initialized");
+
+ version.to_string()
+}
- let _ = env::var("RUST_LOG").map(|level| builder.parse(&level));
- builder.init().expect("env_logger::init() called twice, blame the programmers.");
+fn start_signal_handler(signals: Receiver<Signal>) {
+ loop {
+ match signals.recv() {
+ Some(Signal::INT) | Some(Signal::TERM) => process::exit(0),
+ _ => ()
+ }
+ }
}
-fn build_config() -> Config {
+fn start_update_poller(interval: u64, itx: Sender<Interpret>, wg: WaitGroup) {
+ info!("Polling for new updates every {} seconds.", interval);
+ let (etx, erx) = chan::async::<Event>();
+ let wait = Duration::from_secs(interval);
+ loop {
+ wg.wait(); // wait until not busy
+ thread::sleep(wait); // then wait `interval` seconds
+ itx.send(Interpret {
+ command: Command::GetUpdateRequests,
+ response_tx: Some(Arc::new(Mutex::new(etx.clone())))
+ }); // then request new updates
+ let _ = erx.recv(); // then wait for the response
+ }
+}
+
+fn build_config(version: &str) -> Config {
let args = env::args().collect::<Vec<String>>();
let program = args[0].clone();
let mut opts = Options::new();
- opts.optflag("h", "help", "print this help menu");
- opts.optopt("", "config", "change config path", "PATH");
+ opts.optflag("h", "help", "print this help menu then quit");
+ opts.optflag("p", "print", "print the parsed config then quit");
+ opts.optflag("v", "version", "print the version then quit");
+ opts.optopt("c", "config", "change config path", "PATH");
opts.optopt("", "auth-server", "change the auth server", "URL");
opts.optopt("", "auth-client-id", "change the auth client id", "ID");
@@ -191,6 +202,8 @@ fn build_config() -> Config {
opts.optopt("", "auth-credentials-file", "change the auth credentials file", "PATH");
opts.optopt("", "core-server", "change the core server", "URL");
+ opts.optopt("", "core-polling", "toggle polling the core server for updates", "BOOL");
+ opts.optopt("", "core-polling-sec", "change the core polling interval", "SECONDS");
opts.optopt("", "dbus-name", "change the dbus registration name", "NAME");
opts.optopt("", "dbus-path", "change the dbus path", "PATH");
@@ -203,7 +216,6 @@ fn build_config() -> Config {
opts.optopt("", "device-vin", "change the device vin", "VIN");
opts.optopt("", "device-packages-dir", "change downloaded directory for packages", "PATH");
opts.optopt("", "device-package-manager", "change the package manager", "MANAGER");
- opts.optopt("", "device-polling-interval", "change the package polling interval", "INTERVAL");
opts.optopt("", "device-certificates-path", "change the OpenSSL CA certificates file", "PATH");
opts.optopt("", "device-system-info", "change the system information command", "PATH");
@@ -225,25 +237,37 @@ fn build_config() -> Config {
opts.optopt("", "rvi-timeout", "change the rvi timeout", "TIMEOUT");
let matches = opts.parse(&args[1..]).unwrap_or_else(|err| panic!(err.to_string()));
- if matches.opt_present("h") {
- exit!("{}", opts.usage(&format!("Usage: {} [options]", program)));
+
+ if matches.opt_present("help") {
+ exit!(0, "{}", opts.usage(&format!("Usage: {} [options]", program)));
+ } else if matches.opt_present("version") {
+ exit!(0, "{}", version);
}
- let config_file = matches.opt_str("config").unwrap_or_else(|| {
- env::var("SOTA_CONFIG").unwrap_or_else(|_| exit!("{}", "No config file provided."))
- });
- let mut config = Config::load(&config_file).unwrap_or_else(|err| exit!("{}", err));
+ let mut config = match matches.opt_str("config").or(env::var("SOTA_CONFIG").ok()) {
+ Some(file) => Config::load(&file).unwrap_or_else(|err| exit!(1, "{}", err)),
+ None => {
+ warn!("No config file given. Falling back to defaults.");
+ Config::default()
+ }
+ };
config.auth.as_mut().map(|auth_cfg| {
matches.opt_str("auth-client-id").map(|id| auth_cfg.client_id = id);
matches.opt_str("auth-client-secret").map(|secret| auth_cfg.client_secret = secret);
matches.opt_str("auth-server").map(|text| {
- auth_cfg.server = text.parse().unwrap_or_else(|err| exit!("Invalid auth-server URL: {}", err));
+ auth_cfg.server = text.parse().unwrap_or_else(|err| exit!(1, "Invalid auth-server URL: {}", err));
});
});
matches.opt_str("core-server").map(|text| {
- config.core.server = text.parse().unwrap_or_else(|err| exit!("Invalid core-server URL: {}", err));
+ config.core.server = text.parse().unwrap_or_else(|err| exit!(1, "Invalid core-server URL: {}", err));
+ });
+ matches.opt_str("core-polling").map(|polling| {
+ config.core.polling = polling.parse().unwrap_or_else(|err| exit!(1, "Invalid core-polling boolean: {}", err));
+ });
+ matches.opt_str("core-polling-sec").map(|secs| {
+ config.core.polling_sec = secs.parse().unwrap_or_else(|err| exit!(1, "Invalid core-polling-sec: {}", err));
});
config.dbus.as_mut().map(|dbus_cfg| {
@@ -253,7 +277,7 @@ fn build_config() -> Config {
matches.opt_str("dbus-software-manager").map(|mgr| dbus_cfg.software_manager = mgr);
matches.opt_str("dbus-software-manager-path").map(|mgr_path| dbus_cfg.software_manager_path = mgr_path);
matches.opt_str("dbus-timeout").map(|timeout| {
- dbus_cfg.timeout = timeout.parse().unwrap_or_else(|err| exit!("Invalid dbus timeout: {}", err));
+ dbus_cfg.timeout = timeout.parse().unwrap_or_else(|err| exit!(1, "Invalid dbus-timeout: {}", err));
});
});
@@ -261,48 +285,55 @@ fn build_config() -> Config {
matches.opt_str("device-vin").map(|vin| config.device.vin = vin);
matches.opt_str("device-packages-dir").map(|path| config.device.packages_dir = path);
matches.opt_str("device-package-manager").map(|text| {
- config.device.package_manager = text.parse().unwrap_or_else(|err| exit!("Invalid device package manager: {}", err));
- });
- matches.opt_str("device-polling-interval").map(|interval| {
- config.device.polling_interval = interval.parse().unwrap_or_else(|err| exit!("Invalid device polling interval: {}", err));
+ config.device.package_manager = text.parse().unwrap_or_else(|err| exit!(1, "Invalid device-package-manager: {}", err));
});
matches.opt_str("device-certificates-path").map(|certs| config.device.certificates_path = certs);
- matches.opt_str("device-system-info").map(|cmd| config.device.system_info = SystemInfo::new(cmd));
+ matches.opt_str("device-system-info").map(|cmd| {
+ config.device.system_info = if cmd.len() > 0 { Some(cmd) } else { None }
+ });
matches.opt_str("gateway-console").map(|console| {
- config.gateway.console = console.parse().unwrap_or_else(|err| exit!("Invalid console gateway boolean: {}", err));
+ config.gateway.console = console.parse().unwrap_or_else(|err| exit!(1, "Invalid gateway-console boolean: {}", err));
});
matches.opt_str("gateway-dbus").map(|dbus| {
- config.gateway.dbus = dbus.parse().unwrap_or_else(|err| exit!("Invalid dbus gateway boolean: {}", err));
+ config.gateway.dbus = dbus.parse().unwrap_or_else(|err| exit!(1, "Invalid gateway-dbus boolean: {}", err));
});
matches.opt_str("gateway-http").map(|http| {
- config.gateway.http = http.parse().unwrap_or_else(|err| exit!("Invalid http gateway boolean: {}", err));
+ config.gateway.http = http.parse().unwrap_or_else(|err| exit!(1, "Invalid gateway-http boolean: {}", err));
});
matches.opt_str("gateway-rvi").map(|rvi| {
- config.gateway.rvi = rvi.parse().unwrap_or_else(|err| exit!("Invalid rvi gateway boolean: {}", err));
+ config.gateway.rvi = rvi.parse().unwrap_or_else(|err| exit!(1, "Invalid gateway-rvi boolean: {}", err));
});
matches.opt_str("gateway-socket").map(|socket| {
- config.gateway.socket = socket.parse().unwrap_or_else(|err| exit!("Invalid socket gateway boolean: {}", err));
+ config.gateway.socket = socket.parse().unwrap_or_else(|err| exit!(1, "Invalid gateway-socket boolean: {}", err));
});
matches.opt_str("gateway-websocket").map(|websocket| {
- config.gateway.websocket = websocket.parse().unwrap_or_else(|err| exit!("Invalid websocket gateway boolean: {}", err));
+ config.gateway.websocket = websocket.parse().unwrap_or_else(|err| exit!(1, "Invalid gateway-websocket boolean: {}", err));
});
- matches.opt_str("network-http-server").map(|server| config.network.http_server = server);
- matches.opt_str("network-rvi-edge-server").map(|server| config.network.rvi_edge_server = server);
+ matches.opt_str("network-http-server").map(|addr| {
+ config.network.http_server = addr.parse().unwrap_or_else(|err| exit!(1, "Invalid network-http-server: {}", err));
+ });
+ matches.opt_str("network-rvi-edge-server").map(|addr| {
+ config.network.rvi_edge_server = addr.parse().unwrap_or_else(|err| exit!(1, "Invalid network-rvi-edge-server: {}", err));
+ });
matches.opt_str("network-socket-commands-path").map(|path| config.network.socket_commands_path = path);
matches.opt_str("network-socket-events-path").map(|path| config.network.socket_events_path = path);
matches.opt_str("network-websocket-server").map(|server| config.network.websocket_server = server);
config.rvi.as_mut().map(|rvi_cfg| {
matches.opt_str("rvi-client").map(|url| {
- rvi_cfg.client = url.parse().unwrap_or_else(|err| exit!("Invalid rvi-client URL: {}", err));
+ rvi_cfg.client = url.parse().unwrap_or_else(|err| exit!(1, "Invalid rvi-client URL: {}", err));
});
matches.opt_str("rvi-storage-dir").map(|dir| rvi_cfg.storage_dir = dir);
matches.opt_str("rvi-timeout").map(|timeout| {
- rvi_cfg.timeout = Some(timeout.parse().unwrap_or_else(|err| exit!("Invalid rvi timeout: {}", err)));
+ rvi_cfg.timeout = Some(timeout.parse().unwrap_or_else(|err| exit!(1, "Invalid rvi-timeout: {}", err)));
});
});
+ if matches.opt_present("print") {
+ exit!(0, "{:#?}", config);
+ }
+
config
}