#!/usr/bin/python # -*- coding: utf-8 -*- import logging import time import datetime import os import argparse import ConfigParser import sys import getpass import MySQLdb import re import serial import threading from daemon import runner from pprint import pprint from twx.botapi import TelegramBot, ReplyKeyboardMarkup LOGLEVEL = logging.DEBUG CONFIGFILE = "/etc/terxonchecker.conf" RUNPATH = "/var/run/terxonchecker" PIDFILENAME = "terxonchecker.pid" LOGPATH = "/var/log" LOGFILENAME = "terxonchecker.log" logger = logging.getLogger("Daemon") handler = logging.FileHandler(os.path.join(LOGPATH, LOGFILENAME)) def initLogger(): logger.setLevel(LOGLEVEL) formatter = logging.Formatter("%(asctime)s - %(name)s - %(funcName)s - %(levelname)s - %(message)s") handler.setFormatter(formatter) logger.addHandler(handler) def initCmdLogger(): ch = logging.StreamHandler() formatter = logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s") ch.setFormatter(formatter) logger.addHandler(ch) def parseArgs(): parser = argparse.ArgumentParser(description = "Optional arguments for command line") parser.add_argument("-c", "--config", help = "specifies a config file holding all information") parser.add_argument("--dbinit", help = "initializes the databse") #TODO handling extra mode parser.add_argument("--testfile", help = "username to access your owncloud", nargs = '?') parser.add_argument("-v", "--verbose", help = "increases verbosity", action = "store_true") parser.add_argument("start", help = "", nargs = '?') parser.add_argument("stop", help = "", nargs = '?') parser.add_argument("restart", help = "", nargs = '?') args = parser.parse_args() return args def runInTestMode(filename, config, database): la=logAnalyser(config, database) try: if (os.path.isfile(filename)): testFile = open(filename, 'r') database.openDatabase() while (True): lineRead = testFile.readline() #lineRead = unicode(lineRead.decode('utf-8').strip()) lineRead = unicode(lineRead.decode('latin1').strip()) if (lineRead == ""): break logger.debug(u"Got: {}".format(lineRead)) la.processLine(lineRead) return True else: logger.error(u"Given file {} does not exist".format(filename)) except Exception as e: logger.exception(u"Unexpected error occurred: {}".format(e)) logger.error(u"Line processed was:") logger.error(lineRead) finally: la.waitForThreadsToTerminate() database.closeDatabase() testFile.close() return False def Log (str, force = 0): if cmdargs.verbose or force: print str return; class logAnalyser(): def __init__(self, config, db): self._database = db self._config = config self._threads = [] self._lastEvent = None def getRunningThreads(): return self._threads def waitForThreadsToTerminate(self): logger.info(u"Waiting for {} threads to terminate".format(len(self._threads))) for t in self._threads: t.join() while (self.threadsAlive()): time.sleep(0.5) logger.debug(u"All threads have been terminated") self.cleanUpThreads() def threadsAlive(self): logger.debug(u"Checking list of {} thread for active elements".format(len(self._threads))) for t in self._threads: if (t.isAlive()): logger.debug(u"Found active thread in list") return True logger.debug(u"No active threads found") return False def cleanUpThreads(self): logger.debug(u"Cleaing up {} old threads.".format(len(self._threads))) for t in self._threads: if (not t.isAlive()): logger.debug(u"Found a thread that has completed its work. Deleting...") self._threads.remove(t) logger.debug(u"Number of threads remaining in list: {}".format(len(self._threads))) def processLine(self, text): if (not text): return False logger.debug(u"Text={}".format(text)) separatorMatch = re.match(self._config.get("global", "terxonseparatorregex"), text) datetimeMatch = re.match(self._config.get("global", "terxondatetimeregex"), text) partitionMatch = re.match(self._config.get("global", "terxonpartitionregex"), text) if (separatorMatch or not self._lastEvent): logger.info(u"Found separator or no events in list; creating event and skipping line...") event = logEvent() #separator found -> do nothing else: event = self._lastEvent if (datetimeMatch): logger.info(u"Found event line; processing...") dateTime = event.setDateTime( datetimeMatch.group(3), datetimeMatch.group(2), datetimeMatch.group(1), datetimeMatch.group(4), datetimeMatch.group(5), datetimeMatch.group(6) ) eventDetails = datetimeMatch.group(7).split(",") #array of text detail = unicode() eventText = unicode() if (len(eventDetails) >= 1): eventText = unicode(event.setEvent(eventDetails[0])) if (len(eventDetails) >= 2): detail = unicode(event.setDetail(eventDetails[1].strip('\''))) dbEventId = self._database.addEvent(eventText) #int dbDetailId = self._database.addDetail(detail) #int dbActivityId = self._database.addActivity(dateTime, dbEventId, dbDetailId) #int event.addDbEventId(dbEventId) event.addDbDetailId(dbDetailId) event.addDbActivityId(dbActivityId) if (self._database.getNotificationSetting(dbEventId)): notificationText = unicode(self._database.getNotificationText(dbEventId, dbDetailId, text)) #create the notification thread notifier = Notifier( self._config, dbActivityId, u"Terxon Alarmanlage löste das folgende Event aus: {}".format(notificationText) ) notifier.start() self._threads.append(notifier) elif (partitionMatch): logger.info(u"Found partition information; processing...") partitions = event.parsePartitions(partitionMatch.group(0).split(',')) #array of text relatedActivityId = event.getDbActivityId() for partition in partitions: dbPartitionId = self._database.addPartition(unicode(partition)) #int self._database.addReference(dbPartitionId, relatedActivityId) else: logger.warn(u"Found unknown line: {}".format(text)) return False logger.debug(u"Will save event: {}".format(event)) self._lastEvent = event self.cleanUpThreads() return True #---------------------------------------- #23/10/16 22:03:46 Start Ausg-z.:- ,'Ben. 01' #A1,A2 class logEvent(): def __init__(self): logger.debug(u"Creting new logEvent") self._date = None self._event = None self._detail = None self._partitions = [] self._dbEventId = None self._dbDetailId = None self._dbActivityId = None def __str__(self): return u" --> Event: {}; on: {}; for: {}; by: {}".format(self._event, self._date, self._partitions, self._detail) def __repr__(self): return u"Event: " + self.__str__() def addDbEventId(self, dbId): self._dbEventId = dbId def addDbDetailId(self, dbId): self._dbDetailId = dbId def addDbActivityId(self, dbId): self._dbActivityId = dbId def getDbEventId(self): return self._dbEventId def getDbDetailId(self): return self._dbDetailId def getDbActivityId(self): return self._dbActivityId def setDateTime(self, year, month, day, hour, minute, second): logger.debug(u"Setting date for event based with year={}, month={}, day={}, hour={}, minute={}, second={}".format(year, month, day, hour, minute, second)) self._date = datetime.datetime(int("20" + year), int(month), int(day), int(hour), int(minute), int(second)) # YEAR HACK return self._date def setEvent(self, text): logger.debug(u"Setting event description={}".format(text)) self._event = text return self._event def setDetail(self, text): logger.debug(u"Setting event detail={}".format(text)) self._detail = text return self._detail def parsePartitions(self, array): logger.debug(u"Setting event partitions={}".format(array)) self._partitions = array return self._partitions class ConfigParse(): #members: #config = Configuration() #configFile = str #defaultConfigFile = str def __init__(self, configfile, defaultfile): self.configFile = configfile self.defaultConfigFile = defaultfile def readConfig(self): if (not self.configFile): logger.warn(u"No config file paramater specified. Will try to read default: {}".format(self.defaultConfigFile)) self.configFile = self.defaultConfigFile self.config = ConfigParser.RawConfigParser() self.config.read(self.configFile) if (not self.config.sections()): logger.error(u"No or empty config file found at: {}. Exiting...".format(self.configFile)) return False if (not self.config.get("global", "host")): logger.error(u"No host name for database defined in config file") return False if (not self.config.get("global", "database")): logger.error(u"No database name for database defined in config file") return False if (not self.config.get("global", "username")): logger.error(u"No user name for database defined in config file") return False if (not self.config.get("global", "password")): logger.error(u"No password for database user defined in config file") return False return self.config class Database(): #TODO: use prepared statements def __init__(self, config): logger.debug(u"Database __init__ called") self.dbHost = config.get("global", "host") self.dbName = config.get("global", "database") self.dbUser = config.get("global", "username") self.dbPass = config.get("global", "password") logger.debug(u"Got Host: {}, Database: {}, Username: {}, Password: {} from config file".format(self.dbHost, self.dbName, self.dbUser, self.dbPass)) def testDb(self): result = True try: self.openDatabase() self.queryDb("USE {}".format(MySQLdb.escape_string(self.dbName))) except MySQLdb.Error, e: logger.error(u"Error opring database; perhaps it does not exist or the user you specified does not have access to it") logger.error(u"Run script with parameter --dbinit") result = False finally: self.closeDatabase() return result def openDatabase(self): logger.info(u"Opening database...") self.db = MySQLdb.connect(host=self.dbHost, user=self.dbUser, passwd=self.dbPass, db=self.dbName, charset='utf8', use_unicode=True) def closeDatabase(self): logger.info(u"Closing database...") if (self.db): self.db.close() def initDatabase(self): adminUser = False adminPassword = False logger.info(u"Database seems not to be existant. Creating now!") print("We need to create a databse for terxonchecker") print("Please provide credentials for an existing mysql server on localhost") while (not adminUser): adminUser = "root" adminUser = raw_input("Username wir rights to create database and user (if not \"{}\"): ".format(adminUser)) or adminUser while (not adminPassword): adminPassword=getpass.getpass("Password for {}: ".format(adminUser)) DBACCESSREGEX="GRANT ALL PRIVILEGES ON *.* TO" try: self.db = MySQLdb.connect(host=self.dbHost, # your host, usually localhost user=adminUser, # your username passwd=adminPassword, # your password db="") # name of the data base results = self.queryDb("SHOW GRANTS for CURRENT_USER").fetchall() for row in results: result = re.match(re.escape(DBACCESSREGEX), row[0]) if (result): break if (result): logger.info(u"User {} has sufficient rights to create database structure.".format(adminUser)) sqls=[ ("CREATE DATABASE {}".format(MySQLdb.escape_string(self.dbName))), ("GRANT SELECT,INSERT,UPDATE,DELETE,CREATE,DROP ON {}.* TO '{}'@'{}' IDENTIFIED BY '{}';".format( MySQLdb.escape_string(self.dbName), MySQLdb.escape_string(self.dbUser), MySQLdb.escape_string(self.dbHost), MySQLdb.escape_string(self.dbPass))), ("FLUSH PRIVILEGES;"), ("USE {}".format(MySQLdb.escape_string(self.dbName))), ("""CREATE TABLE detail ( id INT UNSIGNED NOT NULL AUTO_INCREMENT PRIMARY KEY, name CHAR(20) )"""), ("""CREATE TABLE event ( id INT UNSIGNED NOT NULL AUTO_INCREMENT PRIMARY KEY, name CHAR(100) UNIQUE, description TEXT, alertOnEvent BOOL NOT NULL DEFAULT 1 )"""), ("""CREATE TABLE partition ( id INT UNSIGNED NOT NULL AUTO_INCREMENT PRIMARY KEY, name CHAR(2) )"""), ("""CREATE TABLE activity ( id INT UNSIGNED NOT NULL AUTO_INCREMENT PRIMARY KEY, datetime DATETIME NOT NULL, eventId INT UNSIGNED NOT NULL, detailId INT UNSIGNED NOT NULL, alerted BOOL )"""), ("""CREATE TABLE ref_partition_activity ( partitionId INT UNSIGNED NOT NULL, activityId INT UNSIGNED NOT NULL, FOREIGN KEY (partitionId) REFERENCES partition(id) ON UPDATE CASCADE ON DELETE CASCADE, FOREIGN KEY (activityId) REFERENCES activity(id) ON UPDATE CASCADE ON DELETE CASCADE )""") ] for sql in sqls: self.queryDb(sql) else: logger.error(u"User {} does NOT have sufficient rights to create database structure.".format(adminUser)) except MySQLdb.Error, e: logger.error(u"Error opring database; perhaps it does not exist or the user you specified does not have access to it") logger.error(u"Error: {}".format(e)) result = False finally: self.closeDatabase() return (result != False) def addEvent(self, eventText): return self.addGeneric("event", eventText) def addDetail(self, detail): return self.addGeneric("detail", detail) def addPartition(self, partition): return self.addGeneric("partition", partition) def addActivity(self, datetime, eventId, detailId): entryId = self.queryDb(u"INSERT INTO activity (datetime, eventId, detailId, alerted) VALUES ('{}', {}, {}, FALSE)".format(datetime, eventId, detailId)).lastrowid logger.info(u"Added activity with id={}".format(entryId)) return entryId def addReference(self, dbPartitionId, relatedActivityId): entryId = self.queryDb(u"INSERT INTO ref_partition_activity (partitionId, activityId) VALUES ({}, {})".format(dbPartitionId, relatedActivityId)).lastrowid logger.debug(u"Added reference with id={}".format(entryId)) return entryId def addGeneric(self, tablename, value): entryId = False tablename = unicode(tablename) #value = MySQLdb.escape_string(value) entry = self.queryDb(u"SELECT id from {} where name = '{}'".format(tablename, value)).fetchone() if (entry): entryId = entry[0] #use [0] to access 1st column of result (even though we should only have one if (not entryId): logger.info(u"No entry for {}: {} found. Will add to database".format(tablename, value)) entryId = self.queryDb(u"INSERT INTO {} (name) VALUES ('{}')".format(tablename, value)).lastrowid logger.info(u"Added with id={}".format(entryId)) else: logger.debug(u"Found entry for {}: {} with id: {}".format(tablename, value, entryId)) return entryId def setAlerted(self, activityId): self.queryDb(u"UPDATE activity SET alerted = '1' where id = '{}'".format(activityId)) def getNotificationText(self, eventId, detailId, default): #get the minimum notification text result = self.getEventField("description", eventId) if (not result): return default #add information about details (e.g. zones or users) result = self.appendEventDetail(result, detailId) return result def appendEventDetail(self, appendTo, detailId): row = self.queryDb(u"SELECT name from detail where id = {}".format(detailId)).fetchone() if (row): dbResult = unicode(row[0]) logger.debug(u"Got field=name with data={} for detail with id={}".format(dbResult, detailId)) if (dbResult): return u"{}: {}".format(appendTo, dbResult) return appendTo def getNotificationSetting(self, eventId): return self.getEventField("alertOnEvent", eventId) def getEventField(self, dbField, eventId): result = False row = self.queryDb(u"SELECT {} from event where id = '{}'".format(dbField, eventId)).fetchone() if (row): if (row[0]): result = unicode(row[0]) logger.debug(u"Got field={} with data={} for event with id={}".format(dbField, result, eventId)) else: logger.info(u"Did not find an additional description for event with id={}. You may want add add this information in the database.".format(eventId)) else: logger.warn(u"Could not find event with id={}. This should not have happened".format(eventId)) return result def getPartitionArray(self, activityId): result = [] rows = self.queryDb(u"SELECT p.name FROM activity a inner join ref_partition_activity r on r.activityID = a.id inner join partition p on r.partitionId = p.id where a.id = {}".format(activityId)).fetchall() if (rows): for row in rows: partitionText = unicode(row[0]) logger.debug(u"Got partition={} related to activity={}".format(partitionText, activityId)) result.append(partitionText) return result def queryDb(self, sql): logger.debug(u"Executing sql: {}".format(sql)) try: cur = self.db.cursor() cur.execute(sql) except (AttributeError, MySQLdb.OperationalError): #we seem to have lost the database connection; reconnect self.openDatabase() cur = self.db.cursor() cur.execute(sql) self.db.commit() return cur; class Notifier(threading.Thread): def __init__(self, config, dbActivityId, notificationText): threading.Thread.__init__(self) self._RETRIES = 3 self._SLEEPTIME = 0.5 self._config=config self._database=Database(configuration) self._bot=TelegramBot(self._config.get("global", "telegrambottoken")) self._dbActivityId=dbActivityId self._notificationText=notificationText def run(self): logger.info(u"Thread started to send notification for activity={} with msg={}".format(self._dbActivityId, self._notificationText)) while (self._RETRIES): logger.debug(u"Thread for activity={} woke up. Checking db for partition information".format(self._dbActivityId)) partitions = unicode(", ".join(self._database.getPartitionArray(self._dbActivityId))) if (partitions): logger.debug(u"... got partitions={}".format(partitions)) self._notificationText = u"{}; Teilbereiche: {}".format(self._notificationText, partitions) break else: time.sleep(self._SLEEPTIME) self._RETRIES -= 1 self.sendTelegramNotification() self._database.setAlerted(self._dbActivityId) def sendTelegramNotification(self): for telegramUserId in self._config.get("global", "telegramrecipientid").split(): logger.info(u"Sending telegram notification to: {}".format(telegramUserId)) logger.info(u"... with content: {}".format(self._notificationText)) result = self._bot.send_message(telegramUserId, self._notificationText).wait() return result class App(): def __init__(self, config, db): global RUNPATH global PIDFILENAME if not os.path.exists(RUNPATH): os.makedirs(RUNPATH) self.stdin_path = "/dev/null" self.stdout_path = "/dev/null" self.stderr_path = "/dev/null" self.pidfile_path = os.path.join(RUNPATH, PIDFILENAME) self.pidfile_timeout = 5 self._database = db self._config = config self._la = logAnalyser(config, self._database) def run(self): try: self._database.openDatabase() while True: try: ser = serial.Serial( self._config.get("global", "serialport"), self._config.get("global", "baudrate"), timeout=2, xonxoff=False, rtscts=False, dsrdtr=False) #Tried with and without the l while (True): lineRead = ser.readline() lineRead = lineRead.decode('latin1').strip() lineRead = unicode(lineRead) if (lineRead): logger.debug(u"Got: " + lineRead) self._la.processLine(lineRead) except serial.SerialException as err: logger.error(u"Got error while reading from serial: {}".format(err)) except Exception as e: logger.exception(u"Unexpected error occurred: {}".format(e)) logger.error(u"Line processed was:") logger.error(lineRead) finally: self._la.waitForThreadsToTerminate() self._database.closeDatabase() ####################### #### Main function #### ####################### initLogger() cmdargs = parseArgs() configParser = ConfigParse(cmdargs.config, CONFIGFILE) configuration = configParser.readConfig() if (not configuration): logger.error(u"Error while reading config file. Exiting ...",1) sys.exit(1) db = Database(configuration) if (not db.testDb()): db.initDatabase() if (cmdargs.testfile): initCmdLogger() runInTestMode(cmdargs.testfile, configuration, db) else: app = App(configuration, db) daemon_runner = runner.DaemonRunner(app) #This ensures that the logger file handle does not get closed during daemonization daemon_runner.daemon_context.files_preserve=[handler.stream] daemon_runner.do_action()