001/*
002 * geordi
003 *
004 * Copyright (C) 2018 Richard "Shred" Körber
005 *   https://github.com/shred/geordi
006 *
007 * This program is free software: you can redistribute it and/or modify
008 * it under the terms of the GNU General Public License as
009 * published by the Free Software Foundation, either version 3 of the
010 * License, or (at your option) any later version.
011 *
012 * This program is distributed in the hope that it will be useful,
013 * but WITHOUT ANY WARRANTY; without even the implied warranty of
014 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
015 */
016package org.shredzone.geordi;
017
018import static java.util.stream.Collectors.toList;
019
020import java.util.LinkedList;
021import java.util.List;
022import java.util.Objects;
023
024import javax.inject.Inject;
025import javax.inject.Singleton;
026
027import org.quartz.CronScheduleBuilder;
028import org.quartz.Job;
029import org.quartz.JobBuilder;
030import org.quartz.JobDetail;
031import org.quartz.JobExecutionContext;
032import org.quartz.Scheduler;
033import org.quartz.SchedulerException;
034import org.quartz.Trigger;
035import org.quartz.TriggerBuilder;
036import org.shredzone.geordi.data.Sample;
037import org.shredzone.geordi.device.Device;
038import org.shredzone.geordi.service.CompactingService;
039import org.shredzone.geordi.service.DatabaseService;
040import org.shredzone.geordi.util.GuiceJobFactory;
041import org.slf4j.Logger;
042import org.slf4j.LoggerFactory;
043
044/**
045 * Geordi's main runner.
046 */
047@Singleton
048public class GeordiRunner {
049
050    private final Logger log = LoggerFactory.getLogger(getClass());
051
052    private static final String ID_KEY = "id";
053
054    @Inject
055    private DatabaseService databaseService;
056
057    @Inject
058    private Scheduler scheduler;
059
060    @Inject
061    private GuiceJobFactory guiceJobFactory;
062
063    /**
064     * Starts Geordi.
065     * <p>
066     * The Quartz scheduler is started, and the cron expressions of all {@link Device} in
067     * the database are added, so each device is triggered on the desired frequency.
068     */
069    public void start() {
070        try {
071            scheduler.start();
072            scheduler.setJobFactory(guiceJobFactory);
073
074            for (Device dev : databaseService.fetchDevices()) {
075                log.info("Registered device: {}", dev.getName());
076
077                JobDetail job = JobBuilder.newJob(DeviceJob.class)
078                        .withIdentity(dev.getName())
079                        .usingJobData(ID_KEY, dev.getId())
080                        .build();
081
082                Trigger trigger = TriggerBuilder.newTrigger()
083                        .withIdentity(dev.getName())
084                        .startNow()
085                        .withSchedule(CronScheduleBuilder.cronSchedule(dev.getCron()))
086                        .build();
087
088                scheduler.scheduleJob(job, trigger);
089            }
090        } catch (SchedulerException ex) {
091            throw new IllegalStateException(ex);
092        }
093
094        log.info("Geordi is in the engine room!");
095    }
096
097    /**
098     * A Quartz {@link Job} that fetches a {@link Device} from database, reads all the
099     * sensor values and stores them into the database.
100     */
101    private static class DeviceJob implements Job {
102        private final Logger log = LoggerFactory.getLogger(getClass());
103
104        @Inject
105        private DatabaseService databaseService;
106
107        @Inject
108        private CompactingService compactingService;
109
110        @Override
111        public void execute(JobExecutionContext context) {
112            int devId = context.getJobDetail().getJobDataMap().getIntValue(ID_KEY);
113            try {
114                Device device = databaseService.getDevice(devId);
115
116                List<Sample> samples = new LinkedList<>(device.readSensors());
117                samples.removeIf(compactingService::wasUnchanged);
118
119                List<Sample> preSamples = samples.stream()
120                        .map(compactingService::lastUnchanged)
121                        .filter(Objects::nonNull)
122                        .collect(toList());
123
124                databaseService.storeSamples(preSamples);
125                databaseService.storeSamples(samples);
126
127                samples.forEach(compactingService::rememberSample);
128            } catch (Exception ex) {
129                log.error("Failed to poll device {}", devId, ex);
130            }
131        }
132    }
133
134}