lifx-mqtt-bridge/src/mqtt_commands.rs

83 lines
2.6 KiB
Rust

use crate::light::{Command, Value};
use log::{error, info, warn};
use regex::Regex;
use rumqtt;
use rumqtt::{Notification, Publish, Receiver};
pub struct MqttCommands {
notifications: Receiver<Notification>,
commands: crossbeam_channel::Sender<Command>,
}
impl MqttCommands {
pub fn new(
notifications: Receiver<Notification>,
commands: crossbeam_channel::Sender<Command>,
) -> Self {
MqttCommands {
notifications,
commands,
}
}
pub fn listen(&self) {
info!("Listening for mqtt commands");
loop {
match self.notifications.recv() {
Ok(notification) => {
info!("MQTT notification received: {:#?}", notification);
match notification {
Notification::Publish(data) => self.handle_publish(data),
Notification::PubAck(_) => {}
Notification::PubRec(_) => {}
Notification::PubRel(_) => {}
Notification::PubComp(_) => {}
Notification::SubAck(_) => {}
Notification::None => {}
}
}
Err(recv_error) => {
error!("MQTT channel closed: {}", recv_error);
}
}
}
}
fn handle_publish(&self, data: Publish) {
lazy_static! {
static ref MATCH_STR: String =
format!(r"^/{}/lights/([^/]+)/command/(\w+)$", crate::MQTT_ID);
static ref RE: Regex = Regex::new(&MATCH_STR).unwrap();
}
let matching = match RE.captures(&data.topic_name) {
None => {
warn!("Failed to parse command: returned None");
return;
}
Some(matching) => {
if matching.len() != 3 {
warn!("Failed to parse command: match length = {}", matching.len());
return;
} else {
matching
}
}
};
let light_name = &matching[1];
let command = &matching[2];
debug!("light_name: {}, command: {}", light_name, command);
match Value::new(command, data.payload.to_vec()) {
Some(value) => {
if let Err(err) = self.commands.send(Command {
lightname: light_name.to_owned(),
command: value,
}) {
warn!("{}", err);
}
}
None => warn!("Command value could not be created"),
}
}
}