From cd2c88b43bd45c75f83be1c2df1420eeb0934829 Mon Sep 17 00:00:00 2001 From: Lara Date: Wed, 16 Jan 2019 22:46:12 +0100 Subject: [PATCH] Improve mqtt code --- Cargo.lock | 3 ++ Cargo.toml | 3 ++ src/lifx.rs | 16 +++++-- src/light.rs | 52 ++++++++++++++++++++++- src/main.rs | 39 ++++++++--------- src/mqtt.rs | 99 ++++++++++++++++---------------------------- src/mqtt_commands.rs | 60 +++++++++++++++++++++++++++ src/mqtt_updates.rs | 57 +++++++++++++++++++++++++ 8 files changed, 240 insertions(+), 89 deletions(-) create mode 100644 src/mqtt_commands.rs create mode 100644 src/mqtt_updates.rs diff --git a/Cargo.lock b/Cargo.lock index 40ad525..08e0c85 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -478,7 +478,10 @@ name = "lifx-mqtt-bridge" version = "0.1.0" dependencies = [ "clap 2.32.0 (registry+https://github.com/rust-lang/crates.io-index)", + "crossbeam-channel 0.3.6 (registry+https://github.com/rust-lang/crates.io-index)", + "lazy_static 1.2.0 (registry+https://github.com/rust-lang/crates.io-index)", "lifxi 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)", + "regex 1.1.0 (registry+https://github.com/rust-lang/crates.io-index)", "rumqtt 0.30.0 (git+https://github.com/AtherEnergy/rumqtt)", "serde 1.0.84 (registry+https://github.com/rust-lang/crates.io-index)", "serde_derive 1.0.84 (registry+https://github.com/rust-lang/crates.io-index)", diff --git a/Cargo.toml b/Cargo.toml index 494ebdd..613413e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -10,3 +10,6 @@ lifxi = "0.1" clap = "2" serde = "1" serde_derive = "1" +regex = "1" +lazy_static = "1" +crossbeam-channel = "0.3" diff --git a/src/lifx.rs b/src/lifx.rs index c0d2a29..99bf5bd 100644 --- a/src/lifx.rs +++ b/src/lifx.rs @@ -1,14 +1,22 @@ -use crate::light::Light; +use crate::light::{Command, Light, Status}; use lifxi::http::prelude::*; pub struct Lifx { client: Client, + updates: crossbeam_channel::Sender, + commands: crossbeam_channel::Receiver, } impl Lifx { - pub fn new(secret: S) -> Self { + pub fn new( + secret: S, + updates: crossbeam_channel::Sender, + commands: crossbeam_channel::Receiver, + ) -> Self { Lifx { client: Client::new(secret), + updates, + commands, } } @@ -22,7 +30,7 @@ impl Lifx { .unwrap() } - pub fn set_power(&self, id: String, state: bool) -> Result<(), lifxi::http::Error> { + fn set_power(&self, id: String, state: bool) -> Result<(), lifxi::http::Error> { self.client .select(Selector::Id(id)) .change_state() @@ -31,7 +39,7 @@ impl Lifx { .and(Ok(())) } - pub fn set_brightness(&self, id: String, brightness: f32) -> Result<(), lifxi::http::Error> { + fn set_brightness(&self, id: String, brightness: f32) -> Result<(), lifxi::http::Error> { self.client .select(Selector::Id(id)) .change_state() diff --git a/src/light.rs b/src/light.rs index e6c3bf1..18af289 100644 --- a/src/light.rs +++ b/src/light.rs @@ -1,4 +1,3 @@ - #[derive(Deserialize, Debug)] pub struct Color { pub hue: f32, @@ -16,3 +15,54 @@ pub struct Light { pub brightness: f32, } +const POWER: &str = "power"; +const BRIGHTNESS: &str = "brightness"; + +pub enum Value { + Power(String), + Brightness(f32), +} + +impl Value { + pub fn new(label: &str, value: Vec) -> Self { + match label { + POWER => Value::Power(String::from_utf8(value).unwrap()), + BRIGHTNESS => Value::Brightness(Self::vec_to_f32(value)), + _ => unimplemented!(), + } + } + + fn vec_to_f32(vec: Vec) -> f32 { + assert!(vec.len() == 4); + let mut value_u32: u32 = 0; + for val in vec.clone() { + value_u32 = value_u32 << 8; + value_u32 = value_u32 | val as u32; + } + println!("{:?} -> {}", vec, value_u32); + f32::from_bits(value_u32) + } + + pub fn unravel(self) -> (&'static str, Vec) { + match self { + Value::Power(val) => (POWER, val.into_bytes()), + Value::Brightness(val) => (BRIGHTNESS, Vec::new()), + } + } +} + +pub struct Command { + pub lampname: String, + pub command: Value, +} + +pub struct Update { + pub lampname: String, + pub status: Value, +} + +pub enum Status { + Update(Update), + New(Light), + Remove(String), +} diff --git a/src/main.rs b/src/main.rs index 8263c39..5f60c6d 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,14 +1,20 @@ extern crate clap; +#[macro_use] +extern crate lazy_static; +extern crate regex; +#[macro_use] +extern crate serde_derive; mod lifx; mod light; mod mqtt; +mod mqtt_commands; +mod mqtt_updates; -use crate::mqtt::Mqtt; use clap::App; use clap::Arg; -#[macro_use] -extern crate serde_derive; +use crossbeam_channel::unbounded; +use std::thread; pub const MQTT_ID: &str = "lifx-mqtt-bridge"; @@ -45,28 +51,19 @@ fn main() { let lifx_secret = matches.value_of("lifx_secret").unwrap(); println!("Connecting to {}:{}", host, port); - let mut mqtt = match Mqtt::connect(host, port) { + let (s_commands, r_commands) = unbounded(); + let (s_updates, r_updates) = unbounded(); + + let (mqtt_commands, mut mqtt_updates) = match mqtt::mqtt_connect(host, port, s_commands, r_updates) + { Ok(mqtt) => mqtt, Err(err) => panic!("Error connecting: {}", err), }; - let lifx_client = lifx::Lifx::new(lifx_secret); - let lights = lifx_client.find_lights(); - println!("lights: {:#?}", lights); + let lifx_client = lifx::Lifx::new(lifx_secret, s_updates, r_commands); - for light in lights { - mqtt.add_light(&light.id, &light.label); - } + thread::spawn(move || mqtt_commands.listen()); + thread::spawn(move || mqtt_updates.listen()); - loop { - match mqtt.notifications.recv() { - Ok(notification) => { - println!("MQTT notification received: {:#?}", notification); - } - Err(recv_error) => { - println!("MQTT channel closed: {}", recv_error); - return; - } - } - } + loop {} } diff --git a/src/mqtt.rs b/src/mqtt.rs index f9b37a3..79be112 100644 --- a/src/mqtt.rs +++ b/src/mqtt.rs @@ -1,70 +1,43 @@ +use crate::light::{Command, Status}; +use crate::mqtt_commands::MqttCommands; +use crate::mqtt_updates::MqttUpdates; use rumqtt; -use rumqtt::LastWill; -use rumqtt::MqttClient; -use rumqtt::Notification; -use rumqtt::Receiver; -use rumqtt::{ConnectionMethod, MqttOptions, QoS, ReconnectOptions}; +use rumqtt::{ConnectionMethod, LastWill, MqttClient, MqttOptions, QoS, ReconnectOptions}; -pub struct Mqtt { - pub client: MqttClient, - pub notifications: Receiver, -} +pub fn mqtt_connect( + host: &str, + port: u16, + commands: crossbeam_channel::Sender, + updates: crossbeam_channel::Receiver, +) -> Result<(MqttCommands, MqttUpdates), String> { + let last_will = LastWill { + topic: format!("{}/status", crate::MQTT_ID), + message: "disconnected".to_string(), + qos: QoS::AtLeastOnce, + retain: true, + }; + let options = MqttOptions::new(crate::MQTT_ID, host, port); + let options = options + .set_connection_method(ConnectionMethod::Tcp) + .set_keep_alive(10) + .set_last_will(last_will) + .set_reconnect_opts(ReconnectOptions::Always(20)); -impl Mqtt { - pub fn connect(host: &str, port: u16) -> Result { - let last_will = LastWill { - topic: format!("{}/status", crate::MQTT_ID), - message: "disconnected".to_string(), - qos: QoS::AtLeastOnce, - retain: true, - }; - let options = MqttOptions::new(crate::MQTT_ID, host, port); - let options = options - .set_connection_method(ConnectionMethod::Tcp) - .set_keep_alive(10) - .set_last_will(last_will) - .set_reconnect_opts(ReconnectOptions::Always(20)); - - match MqttClient::start(options) { - Ok((mut client, notifications)) => { - match client.publish( - format!("{}/status", crate::MQTT_ID), - QoS::AtLeastOnce, - true, - "connected", - ) { - Ok(()) => Ok(Mqtt { - client, - notifications, - }), - Err(conn_err) => Err(conn_err.to_string()), - } + match MqttClient::start(options) { + Ok((mut client, notifications)) => { + match client.publish( + format!("{}/status", crate::MQTT_ID), + QoS::AtLeastOnce, + true, + "connected", + ) { + Ok(()) => Ok(( + MqttCommands::new(notifications, commands), + MqttUpdates::new(client, updates), + )), + Err(conn_err) => Err(conn_err.to_string()), } - Err(conn_err) => Err(conn_err.to_string()), } - } - - pub fn add_light(&mut self, id: &str, label: &str) -> Result<(), rumqtt::ClientError> { - self.client.publish( - format!("{}/lights", crate::MQTT_ID), - QoS::AtLeastOnce, - false, - format!("{}:{}", id, label), - )?; - self.client.publish( - format!("{}/{}/status/connected", crate::MQTT_ID, label), - QoS::AtLeastOnce, - false, - "true", - )?; - self.client.subscribe( - format!("{}/{}/command/power", crate::MQTT_ID, label), - QoS::AtLeastOnce, - )?; - self.client.subscribe( - format!("{}/{}/command/brightness", crate::MQTT_ID, label), - QoS::AtLeastOnce, - )?; - Ok(()) + Err(conn_err) => Err(conn_err.to_string()), } } diff --git a/src/mqtt_commands.rs b/src/mqtt_commands.rs new file mode 100644 index 0000000..7252a6f --- /dev/null +++ b/src/mqtt_commands.rs @@ -0,0 +1,60 @@ +use crate::light::{Command, Value}; +use regex::Regex; +use rumqtt; +use rumqtt::{Notification, Publish, Receiver}; + +pub struct MqttCommands { + notifications: Receiver, + commands: crossbeam_channel::Sender, +} + +impl MqttCommands { + pub fn new( + notifications: Receiver, + commands: crossbeam_channel::Sender, + ) -> Self { + MqttCommands { + notifications, + commands, + } + } + + pub fn listen(&self) { + loop { + match self.notifications.recv() { + Ok(notification) => { + println!("MQTT notification received: {:#?}", notification); + match notification { + Notification::Publish(data) => self.handle_publish(data), + Notification::PubAck(_) => {} + Notification::PubRec(_) => {} + Notification::PubRel(_) => {} + Notification::PubComp(_) => {} + Notification::SubAck(_) => {} + Notification::None => {} + } + } + Err(recv_error) => { + println!("MQTT channel closed: {}", recv_error); + } + } + } + } + + fn handle_publish(&self, data: Publish) { + lazy_static! { + static ref matchStr: String = format!(r"^{}/(\w+)/command/(\w+)$", crate::MQTT_ID); + static ref RE: Regex = Regex::new(&matchStr).unwrap(); + } + let mut matching = RE.find_iter(&data.topic_name); + let lamp = matching.next().unwrap().as_str(); + let command = matching.next().unwrap().as_str(); + + self.commands + .send(Command { + lampname: lamp.to_owned(), + command: Value::new(command, data.payload.to_vec()), + }) + .unwrap(); + } +} diff --git a/src/mqtt_updates.rs b/src/mqtt_updates.rs new file mode 100644 index 0000000..7f7e77d --- /dev/null +++ b/src/mqtt_updates.rs @@ -0,0 +1,57 @@ +use crate::light::Status; +use rumqtt; +use rumqtt::{MqttClient, QoS}; + +pub struct MqttUpdates { + client: MqttClient, + updates: crossbeam_channel::Receiver, +} + +impl MqttUpdates { + pub fn new(client: MqttClient, updates: crossbeam_channel::Receiver) -> Self { + MqttUpdates { client, updates } + } + pub fn add_light(&mut self, id: &str, lampname: &str) -> Result<(), rumqtt::ClientError> { + self.client.publish( + format!("{}/lights", crate::MQTT_ID), + QoS::AtLeastOnce, + false, + format!("{}:{}", id, lampname), + )?; + self.client.publish( + format!("{}/{}/status/connected", crate::MQTT_ID, lampname), + QoS::AtLeastOnce, + false, + "true", + )?; + self.client.subscribe( + format!("{}/{}/command/power", crate::MQTT_ID, lampname), + QoS::AtLeastOnce, + )?; + self.client.subscribe( + format!("{}/{}/command/brightness", crate::MQTT_ID, lampname), + QoS::AtLeastOnce, + )?; + Ok(()) + } + + pub fn listen(&mut self) { + while let Ok(status) = self.updates.recv() { + match status { + Status::New(light) => self.add_light(&light.id, &light.label).unwrap(), + Status::Remove(_name) => unimplemented!(), + Status::Update(update) => { + let (detail, value) = update.status.unravel(); + self.client + .publish( + format!("{}/{}/status/{}", crate::MQTT_ID, update.lampname, detail), + QoS::AtLeastOnce, + true, + value, + ) + .unwrap(); + } + } + } + } +}