Protocol Managers

Each protocol has its own manager, you find them in the source directory. Most managers have the same code path inside. They check if a configuration for their protocol is specified; if not, they sleep until the configuration changes. If a configuration is present they spawn their threads which will do the actual work. As soon as the threads are spawned, the manager returns to wait for config changes. As every aspect of the config can change and setting up everything is pretty fast, most managers will not verify the exact changes but stop the threads and start them again.

Your configuration parsing information needs to be added to src/config/. If your protocol is “example” your config should be named ExampleConfig and be defined in a way that serde_yml can parse the information. To understand how to manage the configuration see Configuration.

pub struct ExampleManager {
    /*
       The sender can be used to subscribe to MQTT topics,
       send metering data or
       send Home Assistant Discovery Messages
    */
    sender: Sender<Transmission>,
    /*
        Config change listener needs to be fetched from the CONFIG singleton
    */ 
    config_change: tokio::sync::broadcast::Receiver<ConfigChange>,
    /*
       If you need more than one thread it's common practice to store all
       handles inside a vector for easier usage.
    */
    threads: Vec<JoinHandle<()>>,
    /*
        Store a copy of our configuration during startup which is the
        initial version and may change later on
    */
    config: Vec<ExampleConfig>,
}

/* The implementation of your manager */
impl ExampleManager {
    pub fn new(sender: Sender<Transmission>) -> Self {
        /*
           Beware: The macro may panic() if you misspell the parameters,
           but it will happen directly after starting up
        */
        let config: Vec<ExampleConfig> = get_config_or_panic!("example",
                                                              ConfigBases::Example);

        ExampleManager {
            sender,
            config_change: CONFIG.read().unwrap().get_change_receiver(),
            threads: Vec::new(),
            config,
        }
    }

    pub async fn start_thread(&mut self) -> ! {
        /* There may be not config to start with, so sleep until there is */
        if self.config.len() == 0 {
            info!("No Example config found, waiting for a config change to wake me up");
            loop {
                let change = self.config_change.recv().await.unwrap();
                if change.operation != ConfigOperation::ADD ||
                   change.base != "example" {
                    continue;
                }
                
                /*
                    we need to read the config now as this change is about our part
                    of the configuration.
                */
                break;
            }
        }

        info!("Started Example setup");
        loop {
            let mut device_count = 0;

            for conf in self.config.iter() {
                /*
                    We expect name and enabled as part of each config here
                */
                if !conf.enabled {
                    info!("[Example {}] disabled by configuration", conf.name);
                    continue;
                }

                let config_clone = conf.clone();
                let sender_clone = self.sender.clone();

                let handle = tokio::spawn( async move {
                    /* Your code goes here ONLY use *_clone here because of async move */

                });

                device_count = device_count + 1;
                self.threads.push(handle);
            }

            /*
                All our threads are setup and have been configured.
                Now move back to wait for the configuration to change again
            */
            info!("Example setup with {device_count} devices, waiting for config changes");

            loop {
                let change = self.config_change.recv().await.unwrap();
                if change.base == "example" {
                    break;
                }
            }

            /*
                We are woken up because some of our config changed, so stop the threads
                and start over
            */
            info!("Example is stopping threads");
            for thread in self.threads.iter() {
                thread.abort();
            }

            self.threads.clear();
    }
}

As soon as your protocol manager is ready then add it to src/main.rs to be loaded and been started up.