import os
import stat
import json
import copy
import logging
import collections
import logging.handlers

    from StringIO import StringIO

    str_types = (basestring,)
except BaseException:
    from io import StringIO

    str_types = (str,)

[docs]class Logger(logging.getLoggerClass()): """ Core logger for all apps and scripts. This logger adds the following features to the default: - Colors! - Cooler line format - Function is included in the log format - File log - File rotation every 5M with 100 files kept """ LOGDIR = "/var/log" """Log directory, it will be created if it does not exist""" MAXBYTES = 10000000 """Default maximum file size allowed, after that rotation is taking place""" BACKUPCOUNT = 20 """Default number of backup files to keep""" DATEFORMAT = "%d-%m-%Y %H:%M:%S" """Date output format""" LOGFORMAT = "%(asctime)s.%(msecs)03d %(process)s:%(thread)u %(levelname)-8s %(module)15.15s %(lineno)-4s: %(message)s" """Default log format for all handlers. This can change in :py:meth:`init`""" @classmethod def _chkdir(cls): """ Check that the loggin dir exists and create it if not """ if not os.path.isdir(cls.LOGDIR): try: mkdir_p(cls.LOGDIR) except BaseException: # Do not log... logging here enables the # root logger... pass
[docs] @classmethod def addFileLogger(cls, specs): """ Add a file logger with the given specs. Specs:: { 'filename': Filename (under LOGDIR) 'level': Logging level for this file 'format': [ 'json' | 'console' | 'default' ] # TODO: CSV 'backupCount': Number of files to keep 'maxBytes': Maximum file size } If format is set to "console", then ColorFormatter options are also supported. """ root = logging.getLogger() if "filename" not in specs: raise RuntimeError('"filename" missing from file specs... skipping!') if "level" not in specs: specs["level"] = logging.INFO filePath = cls.LOGDIR + "/" + specs["filename"] # Test that we can access the file try: # Make sure the file is there fhandle = open(filePath, "a") fhandle.close() except BaseException: logging.error( "NOT Initializing File Logging: File-system permissions error: \n\t%s" % filePath ) # No point doing anything else! BUT do not raise exception return # If we just created it, set correct permissions so that is group accessible # This often crashes apps in multiuser environmnets try: # Correct permissions perms = ( stat.S_IREAD | stat.S_IWRITE | stat.S_IWOTH | stat.S_IROTH | stat.S_IWGRP | stat.S_IRGRP ) os.chmod(filePath, perms) except Exception as e: # Error here can appear if another user owns the file... pass # Register the rotating file handler rotFileH = logging.handlers.RotatingFileHandler( filePath, backupCount=specs.get("backupCount", cls.BACKUPCOUNT), maxBytes=specs.get("maxBytes", cls.MAXBYTES), ) fmt = specs.get("fmt", cls.USER_LOGFORMAT) datefmt = specs.get("datefmt", cls.USER_DATEFORMAT) formatter = logging.Formatter(fmt, datefmt=datefmt) if "format" not in specs or specs["format"] == "console": specs = ColorFormatter.parseSpecs(specs, ColorFormatter.FILEDEFAULTS) formatter = ColorFormatter( fmt, datefmt=datefmt, color=False, pretty=specs["pretty"], splitLines=specs["splitLines"], ) elif specs["format"] == "default": pass elif specs["format"] == "json": formatter = JSONFormatter(specs.get("fields", JSONFormatter.FIELDS)) rotFileH.setFormatter(formatter) rotFileH.setLevel(specs["level"]) rotFileH.propagate = False root.addHandler(rotFileH)
[docs] @classmethod def init( cls, folder="/var/log/lazylog", termSpecs={}, fileSpecs=None, fmt=None, datefmt=None, ): """ Initialize logging based on the requested fileName. This will create one logger (global) that writes in both file and terminal :param str fileSpecs: A dict with 'filename', 'level', etc. See addFileLogger for details :param int termSpecs: A dict with boolean values for 'color' and 'splitLines' """ logging.setLoggerClass(cls) cls.LOGDIR = folder if fmt is None: fmt = cls.LOGFORMAT if datefmt is None: datefmt = cls.DATEFORMAT # Store those statically as the last user supplied format. This will be # used from now on if format is not specified cls.USER_LOGFORMAT = fmt cls.USER_DATEFORMAT = datefmt # Check folder and create if needed cls._chkdir() # Merge with defaults... termSpecs = ColorFormatter.parseSpecs(termSpecs, ColorFormatter.TERMDEFAULTS) # Disable default logger root = logging.getLogger() root.setLevel(logging.DEBUG) root.handlers = [] # Console logger console = logging.StreamHandler() console.setLevel(termSpecs["level"]) if "format" not in termSpecs: formatter = ColorFormatter( fmt, datefmt=datefmt, color=termSpecs["color"], splitLines=termSpecs["splitLines"], pretty=termSpecs["pretty"], ) elif termSpecs["format"] == "json": formatter = JSONFormatter(termSpecs.get("fields", JSONFormatter.FIELDS)) console.setFormatter(formatter) console.propagate = False root.addHandler(console) # File logger if fileSpecs is not None: for specs in fileSpecs: cls.addFileLogger(specs)
[docs] @staticmethod def logFun(): """Print one message in each level to demo the colours""" logging.debug("All set in logger!")"This is how INFO lines are") logging.warning("Warnings here") logging.critical("Critical stuff appear like this") logging.error("Look out for ERRORs")
[docs] @staticmethod def setConsoleLevel(level): """ In this logger, by convention, handler 0 is always the console halder. """ logging.getLogger().handlers[0].setLevel(level)
[docs] @staticmethod def setFileLevel(filenum, level): """ Set a file logger log level. Filenum is 1,2,3,.. in the order the files have been added. """ if len(logging.getLogger().handlers) < filenum + 1: return logging.getLogger().handlers[filenum].setLevel(level)
[docs] @staticmethod def mockHandler(index): if len(logging.getLogger().handlers) < index + 1: return h = logging.getLogger().handlers[index] h.old_stream = = StringIO() return
[docs] @staticmethod def restoreHandler(index): if len(logging.getLogger().handlers) < index + 1: return h = logging.getLogger().handlers[index] if not hasattr(h, "old_stream"): return = h.old_stream del h.old_stream
[docs]class ColorFormatter(logging.Formatter): """ Color formatter. This class is working as expected atm but it can be tidied up to support blinking text and other useless features :) """ BLACK, RED, GREEN, YELLOW, BLUE, MAGENTA, CYAN, WHITE = range(8) """ Bash Colors """ NORM, BOLD, UNK, ITAL, UNDER, BLINK, UNK2, INV, CON = range(9) """ Bash Styles """ ESC = "\033[" """Escape sequence for color""" COLOR_SEQ = "\033[1;%dm" """Escape seq. + color integer placeholder""" RESET_SEQ = "\033[0m" """Reset color sequence""" TERMDEFAULTS = { "color": True, "splitLines": True, "level": logging.DEBUG, "pretty": True, } """Terminal default settings""" FILEDEFAULTS = { "color": False, "splitLines": True, "level": logging.INFO, "pretty": False, } """File default settings""" STYLES = { "DEBUG": NORM, "INFO": NORM, "WARNING": NORM, "CRITICAL": BOLD, "ERROR": BOLD, } """Each log-level's default character style""" COLORS = { "DEBUG": WHITE, "INFO": CYAN, "WARNING": YELLOW, "CRITICAL": YELLOW, "ERROR": RED, } """Default color values for each log-level""" def __init__( self, fmt, datefmt=None, color=True, splitLines=True, pretty=False, colors=None, styles=None, ): """ Init given the log line format, color and date format """ logging.Formatter.__init__(self, fmt, datefmt) self.color = color self.splitLines = splitLines self.pretty = pretty self.colors = colors if colors is not None else ColorFormatter.COLORS self.styles = styles if styles is not None else ColorFormatter.STYLES
[docs] def format(self, record): """ Override format function """ levelname = record.levelname # Keep reference and copy only if required tmp_record = record # NOTE: `message` is formatted, `msg` is raw if self.pretty and ( isinstance(record.msg, dict) or isinstance(record.msg, tuple) or isinstance(record.msg, list) ): # Copy so we do not modify the original message tmp_record = copy.copy(record) tmp_record.msg = pretty(tmp_record.msg) msg = logging.Formatter.format(self, tmp_record) if self.splitLines and tmp_record.message != "": parts = msg.split(tmp_record.message) msg = msg.replace("\n", "\n" + parts[0]) # Escape new lines so the full message is in a single line elif not self.splitLines and msg != "": msg = msg.replace("\n", "\\n") # The background is set with 40 plus the number of the color, # and the foreground with 30 if self.color and levelname in self.colors: msg = "%s%d;%dm%s%s" % ( ColorFormatter.ESC, self.styles[levelname], 30 + self.colors[levelname], msg, ColorFormatter.RESET_SEQ, ) return msg return msg
[docs] @staticmethod def parseSpecs(specs, defaults): """ Merge the user specs with the defaults given (terminal or file). Also ensure that if ``pretty`` is set, ``splitLines`` is also set (implied) """ specs = merge_dicts(specs, defaults.copy()) # Pretty overrides split lines specs["pretty"] = specs.get("pretty", False) specs["splitLines"] = specs.get("splitLines", False) if specs["pretty"] and not specs["splitLines"]: specs["splitLines"] = True return specs
[docs]class JSONFormatter(logging.Formatter): """ Format messages as JSON Lots of ideas from: """ # RESERVED_ATTRS = ( "args", "asctime", "created", "exc_info", "exc_text", "filename", "funcName", "levelname", "levelno", "lineno", "module", "msecs", "message", "msg", "name", "pathname", "process", "processName", "relativeCreated", "stack_info", "thread", "threadName", ) FIELDS = [ "created", "process", "thread", "levelname", "module", "filename", "funcName" "lineno", "message", ] def __init__(self, fields, datefmt=None): self.fields = fields if "created" not in self.fields: self.fields.append("created")
[docs] def format(self, record): """ Override format function """ result = {"message": ""} # Assign string to message if isinstance(record.msg, str_types): result["message"] = record.msg # Merge as dict elif isinstance(record.msg, dict): result = merge_dicts(record.msg, result) # Serialize as object else: result["object"] = record.msg # Now merge any extra for k, v in record.__dict__.items(): # Required attrs if k in self.fields: if k == "created": k = "timestamp" result[k] = v # Custom/extra elif k not in JSONFormatter.RESERVED_ATTRS: result[k] = v return json.dumps(result)
# # Helpers and utilities #
[docs]def pretty_recursive(value, htchar="\t", lfchar="\n", indent=0): """ Recursive pretty printing of dict, list and tuples - full creadit to: """ nlch = lfchar + htchar * (indent + 1) if isinstance(value, dict) or isinstance(value, collections.MutableMapping): items = [ nlch + repr(key) + ": " + pretty_recursive(value[key], htchar, lfchar, indent + 1) for key in value ] return "{%s}" % (",".join(items) + lfchar + htchar * indent) elif isinstance(value, list): items = [ nlch + pretty_recursive(item, htchar, lfchar, indent + 1) for item in value ] return "[%s]" % (",".join(items) + lfchar + htchar * indent) elif isinstance(value, tuple): items = [ nlch + pretty_recursive(item, htchar, lfchar, indent + 1) for item in value ] return "(%s)" % (",".join(items) + lfchar + htchar * indent) else: return repr(value)
[docs]def pretty(sth): """ Format objects, tuples, lists and dicts for pretty printing """ if sth is None: return sth prefix = "" if sth.__class__.__name__: prefix = "(%s) " % format(sth.__class__.__name__) return prefix + pretty_recursive(sth)
[docs]def mkdir_p(path): """ Does the same as 'mkdir -p' in linux """ if os.path.exists(path): return try: os.makedirs(path) except OSError as exc: # Python >2.5 if exc.errno == errno.EEXIST and os.path.isdir(path): pass else: raise exc
[docs]def merge_dicts(source, destination): """ Full credit to: >>> a = { 'first' : { 'all_rows' : { 'pass' : 'dog', 'number' : '1' } } } >>> b = { 'first' : { 'all_rows' : { 'fail' : 'cat', 'number' : '5' } } } >>> merge_dicts(b, a) == { 'first' : { 'all_rows' : { 'pass' : 'dog', 'fail' : 'cat', 'number' : '5' } } } True """ for key, value in source.items(): if isinstance(value, dict): # get node or create one node = destination.setdefault(key, {}) merge_dicts(value, node) else: destination[key] = value return destination