Listening to MQTT topics

Protocol managers and internal command handlers may want to listen to specific MQTT topics. The system allows subscribing to topics starting with homeassistant/ or energy2mqtt/. All other topics will be prefixed with energy2mqtt/.

The callbacks use tokio’s multi-producer, single-consumer channel to send the name of the topic changed and the new payload to the receiver. As the sender, set during the call of new() is the only communication a protocol manager may use you can use the Transmission::Subscribe to inform the MQTT client thread to subscribe to a specific topic. The registration call takes a struct SubscribeData to send the name of the topic to listen as well as the sender to publish the data to.

Details
  • You do not need to take care about resubscribing if the MQTT client reconnects; it does that all on its own
  • If you know you will receive a lot of data you can increase the channel number (10 in this example)

You can find a complete implementation in src/metering_oms/mod.rs. A small example can be seen here:

impl ExampleManager {
    pub fn new(sender: Sender<Transmission>) -> Self {
        ExampleManager { 
            sender: sender,
        }
    }

    pub async fn start_thread(&mut self) {
        info!("Starting Example thread");

        /* We need to subscribe to an MQTT topic and wait for data or commands */
        let (sender, mut receiver) = tokio::sync::mpsc::channel(10);

        let register = Transmission::Subscribe(SubscribeData{
            topic: "oms_input".to_string(),
            sender
        });

        let _ = self.sender.send(register).await;

        info!("Example is starting to wait for messages");
        while let Some(callback) = receiver.recv().await {
            let topic = callback.topic;
            let payload = callback.payload;

            /* ... work with our data here ... */
        }
    }
}