# Copyright (C) 2011-2012 Nicolo' Barbon
#
# This file is part of Calise.
#
# Calise is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# any later version.
#
# Calise is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with Calise. If not, see <http://www.gnu.org/licenses/>.
import os
import argparse
import ConfigParser
import logging
from xdg.BaseDirectory import load_config_paths
from calise.infos import __LowerName__, __version__
# globals
settings = {}
logger = logging.getLogger(".".join([__LowerName__, "options"]))
def get_path(pname="default", sufx=".conf"):
# yield system-wide profile first
yield os.path.join("/", "etc", __LowerName__ + sufx)
# search for profiles other than system-wide one only if NOT running as
# root (uid=0)
if os.getuid() > 0:
if settings.keys().count("profile"):
pname = settings["profile"]
if os.path.dirname(pname):
if os.path.basename(pname):
yield pname
else:
if pname.endswith(sufx):
pname = pname[:-len(sufx)]
xdg_paths = list(load_config_paths(__LowerName__))
xdg_paths.reverse()
for directory in xdg_paths:
yield os.path.join(directory, pname + sufx)
class wlogger():
def __init__(self, loglevel="info", logfile=None):
numLvl = self.getILvl(loglevel)
self.init_logs(numLvl, logfile)
def getILvl(self, loglevel):
numeric_level = getattr(logging, loglevel.upper(), None)
if not isinstance(numeric_level, int):
raise ValueError("Invalid log level: %s" % loglevel)
return numeric_level
def init_logs(self, ilvl, flog=None):
self.logger = logging.getLogger(__LowerName__)
self.logger.setLevel(logging.DEBUG)
self.setStreamHandle(ilvl)
if flog:
self.setFileHandle(flog)
def setStreamHandle(self, ilvl):
self.ch = logging.StreamHandler()
self.ch.setLevel(ilvl)
formatterCh = logging.Formatter(
"[%(asctime)s][%(levelname)s - %(name)s] %(message)s",
datefmt="%H:%M:%S")
self.ch.setFormatter(formatterCh)
self.logger.addHandler(self.ch)
def setFileHandle(self, flog):
self.fh = logging.FileHandler(flog)
self.fh.setLevel(logging.DEBUG)
formatterFh = logging.Formatter(
"[%(asctime)s][%(levelname)s - %(name)s] %(message)s",
datefmt="%Y/%m/%d %H:%M:%S")
self.fh.setFormatter(formatterFh)
self.logger.addHandler(self.fh)
def changeLogFile(self, flog):
self.logger.removeHandler(self.fh)
self.fh.close()
self.setFileHandle(flog)
def changeLogLevel(self, loglevel):
ilvl = self.getILvl(loglevel)
self.ch.setLevel(ilvl)
class daemon_argumenter():
def __init__(self, argslist=None):
self.argslist = argslist
self.arguments = None
def init_args(self):
parser = argparse.ArgumentParser(
description=(
"Daemon to change screen's backlight according to ambient "
"brightness through webcam"),
prog="calised")
parser.add_argument(
'--version',
action='version',
version="%(pro)s %(ver)s" % dict(pro="%(prog)s", ver=__version__),
help="Display current version")
# Daemon execution
parser.add_argument(
"-k", "--stop",
action="store_true", default=None, dest="kill",
help="Terminate daemon")
parser.add_argument(
"--restart",
action="store_true", default=None, dest="restart",
help="Restart daemon")
parser.add_argument(
"-e", "--pause",
action="store_true", default=None, dest="pause",
help="Pause daemon")
parser.add_argument(
"-r", "--resume",
action="store_true", default=None, dest="resume",
help="Terminate daemon execution")
parser.add_argument(
"-c", "--capture",
action="store_true", default=None, dest="capture",
help="Terminate daemon execution")
parser.add_argument(
"--check",
action="store_true", default=None, dest="check",
help="Checks whenever daemon is running or not")
# Variable set
parser.add_argument(
"-p", "--profile",
metavar="<profile>", dest="pname", default="default",
help="Specify either a valid profile name or path")
parser.add_argument(
"--port",
metavar="<port>", dest="dport", default=None,
help="Daemon port number used to communicate")
parser.add_argument(
"--location",
metavar="<lat>:<lon>", dest="position", default=None,
help="Geographical position expressed as float degrees")
parser.add_argument(
"--capture-number",
metavar="<int>", dest="capnum", default=None,
help="Number of captures per \"capture session\" (default: 7)")
parser.add_argument(
"--capture-interval",
metavar="<float>", dest="capint", default=None,
help=(
"Seconds between consecutive captures in a \"capture "
"session\" (default: 0.5)"))
parser.add_argument(
"--weather",
action="store_true", default=None, dest="yweather",
help="Enable internet weather check")
parser.add_argument(
"--no-weather",
action="store_true", default=None, dest="nweather",
help="Disable internet weather check")
# Logging
parser.add_argument(
"--loglevel",
metavar="<level>", dest="loglevel", default=None,
help="Log level: error, warning (default), info or debug")
parser.add_argument(
"--logfile",
metavar="<path>", dest="logfile", default=None,
help="Log output file (None if not specified)")
parser.add_argument(
"-d", "--dump",
action="store_true", default=None, dest="dump",
help="dump last capture data")
parser.add_argument(
"-a", "--dump-all",
action="store_true", default=None, dest="dumpall",
help="dump all captured data from program start")
parser.add_argument(
"--dump-settings",
action="store_true", default=None, dest="dumpsettings",
help="dump current execution's settings")
self.arguments = vars(parser.parse_args(self.argslist))
self.parser = parser
def parse_settings(self):
global settings
args = self.arguments
# NOTE: Daemon execution related arguments won't be processed there
# since daemon communication is done through command existence
# in args variable (actual command content isn't processed).
# At the end of this function all daemon commands with value
# None are stripped so that only given commands remain.
#
# eg. args = {"kill": 0, "dump": "zaczac"} -> kill, dump
# eg. args = {"kill": False, "dump": 1} -> kill, dump
#
# Settings related arguments
if args["pname"]:
settings["profile"] = args["pname"]
if args["dport"]:
settings["dport"] = int(args["dport"])
if args["position"]:
lat, lon = [float(x) for x in args["position"].split(":")]
settings["latitude"], settings["longitude"] = lat, lon
args["latitude"], args["longitude"] = lat, lon
del args["position"]
if args["capnum"]:
settings["capnum"] = int(args["capnum"])
if args["capint"]:
settings["capint"] = float(args["capint"])
if args["yweather"]:
settings["weather"] = True
elif args["nweather"]:
settings["weather"] = False
# Logging related arguments
if args["loglevel"]:
settings["loglevel"] = args["loglevel"]
if args["logfile"]:
settings["logfile"] = args["logfile"]
# Arguments variable post-processing
# every non daemon execution's related var is deleted
daemon_commands = [
"kill", "restart", "pause", "resume", "check", "capture",
"dump", "dumpall", "dumpsettings"]
for key in args.keys():
if args[key] is None:
del args[key]
self.exec_args = args
class profiler():
# options syntax:
# Group: {option: (data_type, self.settings_key)}
options = {
"Camera": {
"offset": (float, "offset"),
"delta": (float, "delta"),
"camera": (str, "cam"),
},
"Backlight": {
"steps": (int, "steps"),
"offset": (int, "bkofs"),
"invert": (bool, "invert"),
"path": (str, "path"),
},
"Daemon": {
"port": (int, "dport"),
"latitude": (float, "latitude"),
"longitude": (float, "longitude"),
"capnum": (int, "capnum"),
"capint": (float, "capint"),
"weather": (bool, "weather"),
},
"Advanced": {
"average": (int, "avg"),
"delay": (float, "gap"),
"screen": (bool, "screen"),
},
"Info": {
"loglevel": (str, "loglevel"),
"logfile": (str, "logfile"),
},
}
def __init__(self):
self.config = ConfigParser.RawConfigParser()
def check_config(self, configFile):
if os.path.isfile(configFile):
try:
self.config.read(configFile)
logger.info("Profile found: %s" % configFile)
except (
ConfigParser.MissingSectionHeaderError,
ConfigParser.ParsingError) as err:
err = str(err).replace("\t", "").splitlines()
logger.warning(err[0])
for line in err[1:]:
logger.debug(line)
return 1
self.parse_config()
return 0
def parse_config(self, config=None):
global settings
if config is None:
config = self.config
# parse keys from options dictionary as self.settings keys
for option in self.options.keys():
for key in self.options[option]:
if config.has_option(option, key):
vtype, skey = self.options[option][key]
if int == vtype:
settings[skey] = config.getint(option, key)
elif float == vtype:
settings[skey] = config.getfloat(option, key)
elif bool == vtype:
settings[skey] = config.getboolean(option, key)
else:
settings[skey] = config.get(option, key)
def read_configs(self):
for conf in get_path():
self.check_config(conf)