Source code for lazylog

import os
import stat
import json
import copy
import logging
import collections
import logging.handlers

try:
    from StringIO import StringIO

    str_types = (basestring,)
except BaseException:
    from io import StringIO

    str_types = (str,)


[docs]class Logger(logging.getLoggerClass()): """ Core logger for all apps and scripts. This logger adds the following features to the default: - Colors! - Cooler line format - Function is included in the log format - File log - File rotation every 5M with 100 files kept """ LOGDIR = "/var/log" """Log directory, it will be created if it does not exist""" MAXBYTES = 10000000 """Default maximum file size allowed, after that rotation is taking place""" BACKUPCOUNT = 20 """Default number of backup files to keep""" DATEFORMAT = "%d-%m-%Y %H:%M:%S" """Date output format""" LOGFORMAT = "%(asctime)s.%(msecs)03d %(process)s:%(thread)u %(levelname)-8s %(module)15.15s %(lineno)-4s: %(message)s" """Default log format for all handlers. This can change in :py:meth:`init`""" @classmethod def _chkdir(cls): """ Check that the loggin dir exists and create it if not """ if not os.path.isdir(cls.LOGDIR): try: mkdir_p(cls.LOGDIR) except BaseException: # Do not log... logging here enables the # root logger... pass
[docs] @classmethod def addFileLogger(cls, specs): """ Add a file logger with the given specs. Specs:: { 'filename': Filename (under LOGDIR) 'level': Logging level for this file 'format': [ 'json' | 'console' | 'default' ] # TODO: CSV 'backupCount': Number of files to keep 'maxBytes': Maximum file size } If format is set to "console", then ColorFormatter options are also supported. """ root = logging.getLogger() if "filename" not in specs: raise RuntimeError('"filename" missing from file specs... skipping!') if "level" not in specs: specs["level"] = logging.INFO filePath = cls.LOGDIR + "/" + specs["filename"] # Test that we can access the file try: # Make sure the file is there fhandle = open(filePath, "a") fhandle.close() except BaseException: logging.error( "NOT Initializing File Logging: File-system permissions error: \n\t%s" % filePath ) # No point doing anything else! BUT do not raise exception return # If we just created it, set correct permissions so that is group accessible # This often crashes apps in multiuser environmnets try: # Correct permissions perms = ( stat.S_IREAD | stat.S_IWRITE | stat.S_IWOTH | stat.S_IROTH | stat.S_IWGRP | stat.S_IRGRP ) os.chmod(filePath, perms) except Exception as e: # Error here can appear if another user owns the file... pass # Register the rotating file handler rotFileH = logging.handlers.RotatingFileHandler( filePath, backupCount=specs.get("backupCount", cls.BACKUPCOUNT), maxBytes=specs.get("maxBytes", cls.MAXBYTES), ) fmt = specs.get("fmt", cls.USER_LOGFORMAT) datefmt = specs.get("datefmt", cls.USER_DATEFORMAT) formatter = logging.Formatter(fmt, datefmt=datefmt) if "format" not in specs or specs["format"] == "console": specs = ColorFormatter.parseSpecs(specs, ColorFormatter.FILEDEFAULTS) formatter = ColorFormatter( fmt, datefmt=datefmt, color=False, pretty=specs["pretty"], splitLines=specs["splitLines"], ) elif specs["format"] == "default": pass elif specs["format"] == "json": formatter = JSONFormatter(specs.get("fields", JSONFormatter.FIELDS)) rotFileH.setFormatter(formatter) rotFileH.setLevel(specs["level"]) rotFileH.propagate = False root.addHandler(rotFileH)
[docs] @classmethod def init( cls, folder="/var/log/lazylog", termSpecs={}, fileSpecs=None, fmt=None, datefmt=None, ): """ Initialize logging based on the requested fileName. This will create one logger (global) that writes in both file and terminal :param str fileSpecs: A dict with 'filename', 'level', etc. See addFileLogger for details :param int termSpecs: A dict with boolean values for 'color' and 'splitLines' """ logging.setLoggerClass(cls) cls.LOGDIR = folder if fmt is None: fmt = cls.LOGFORMAT if datefmt is None: datefmt = cls.DATEFORMAT # Store those statically as the last user supplied format. This will be # used from now on if format is not specified cls.USER_LOGFORMAT = fmt cls.USER_DATEFORMAT = datefmt # Check folder and create if needed cls._chkdir() # Merge with defaults... termSpecs = ColorFormatter.parseSpecs(termSpecs, ColorFormatter.TERMDEFAULTS) # Disable default logger root = logging.getLogger() root.setLevel(logging.DEBUG) root.handlers = [] # Console logger console = logging.StreamHandler() console.setLevel(termSpecs["level"]) if "format" not in termSpecs: formatter = ColorFormatter( fmt, datefmt=datefmt, color=termSpecs["color"], splitLines=termSpecs["splitLines"], pretty=termSpecs["pretty"], ) elif termSpecs["format"] == "json": formatter = JSONFormatter(termSpecs.get("fields", JSONFormatter.FIELDS)) console.setFormatter(formatter) console.propagate = False root.addHandler(console) # File logger if fileSpecs is not None: for specs in fileSpecs: cls.addFileLogger(specs)
[docs] @staticmethod def logFun(): """Print one message in each level to demo the colours""" logging.debug("All set in logger!") logging.info("This is how INFO lines are") logging.warning("Warnings here") logging.critical("Critical stuff appear like this") logging.error("Look out for ERRORs")
[docs] @staticmethod def setConsoleLevel(level): """ In this logger, by convention, handler 0 is always the console halder. """ logging.getLogger().handlers[0].setLevel(level)
[docs] @staticmethod def setFileLevel(filenum, level): """ Set a file logger log level. Filenum is 1,2,3,.. in the order the files have been added. """ if len(logging.getLogger().handlers) < filenum + 1: return logging.getLogger().handlers[filenum].setLevel(level)
[docs] @staticmethod def mockHandler(index): if len(logging.getLogger().handlers) < index + 1: return h = logging.getLogger().handlers[index] h.old_stream = h.stream h.stream = StringIO() return h.stream
[docs] @staticmethod def restoreHandler(index): if len(logging.getLogger().handlers) < index + 1: return h = logging.getLogger().handlers[index] if not hasattr(h, "old_stream"): return h.stream = h.old_stream del h.old_stream
[docs]class ColorFormatter(logging.Formatter): """ Color formatter. This class is working as expected atm but it can be tidied up to support blinking text and other useless features :) """ BLACK, RED, GREEN, YELLOW, BLUE, MAGENTA, CYAN, WHITE = range(8) """ Bash Colors """ NORM, BOLD, UNK, ITAL, UNDER, BLINK, UNK2, INV, CON = range(9) """ Bash Styles """ ESC = "\033[" """Escape sequence for color""" COLOR_SEQ = "\033[1;%dm" """Escape seq. + color integer placeholder""" RESET_SEQ = "\033[0m" """Reset color sequence""" TERMDEFAULTS = { "color": True, "splitLines": True, "level": logging.DEBUG, "pretty": True, } """Terminal default settings""" FILEDEFAULTS = { "color": False, "splitLines": True, "level": logging.INFO, "pretty": False, } """File default settings""" STYLES = { "DEBUG": NORM, "INFO": NORM, "WARNING": NORM, "CRITICAL": BOLD, "ERROR": BOLD, } """Each log-level's default character style""" COLORS = { "DEBUG": WHITE, "INFO": CYAN, "WARNING": YELLOW, "CRITICAL": YELLOW, "ERROR": RED, } """Default color values for each log-level""" def __init__( self, fmt, datefmt=None, color=True, splitLines=True, pretty=False, colors=None, styles=None, ): """ Init given the log line format, color and date format """ logging.Formatter.__init__(self, fmt, datefmt) self.color = color self.splitLines = splitLines self.pretty = pretty self.colors = colors if colors is not None else ColorFormatter.COLORS self.styles = styles if styles is not None else ColorFormatter.STYLES
[docs] def format(self, record): """ Override format function """ levelname = record.levelname # Keep reference and copy only if required tmp_record = record # NOTE: `message` is formatted, `msg` is raw if self.pretty and ( isinstance(record.msg, dict) or isinstance(record.msg, tuple) or isinstance(record.msg, list) ): # Copy so we do not modify the original message tmp_record = copy.copy(record) tmp_record.msg = pretty(tmp_record.msg) msg = logging.Formatter.format(self, tmp_record) if self.splitLines and tmp_record.message != "": parts = msg.split(tmp_record.message) msg = msg.replace("\n", "\n" + parts[0]) # Escape new lines so the full message is in a single line elif not self.splitLines and msg != "": msg = msg.replace("\n", "\\n") # The background is set with 40 plus the number of the color, # and the foreground with 30 if self.color and levelname in self.colors: msg = "%s%d;%dm%s%s" % ( ColorFormatter.ESC, self.styles[levelname], 30 + self.colors[levelname], msg, ColorFormatter.RESET_SEQ, ) return msg return msg
[docs] @staticmethod def parseSpecs(specs, defaults): """ Merge the user specs with the defaults given (terminal or file). Also ensure that if ``pretty`` is set, ``splitLines`` is also set (implied) """ specs = merge_dicts(specs, defaults.copy()) # Pretty overrides split lines specs["pretty"] = specs.get("pretty", False) specs["splitLines"] = specs.get("splitLines", False) if specs["pretty"] and not specs["splitLines"]: specs["splitLines"] = True return specs
[docs]class JSONFormatter(logging.Formatter): """ Format messages as JSON Lots of ideas from: https://github.com/madzak/python-json-logger/blob/master/src/pythonjsonlogger/jsonlogger.py """ # http://docs.python.org/library/logging.html#logrecord-attributes RESERVED_ATTRS = ( "args", "asctime", "created", "exc_info", "exc_text", "filename", "funcName", "levelname", "levelno", "lineno", "module", "msecs", "message", "msg", "name", "pathname", "process", "processName", "relativeCreated", "stack_info", "thread", "threadName", ) FIELDS = [ "created", "process", "thread", "levelname", "module", "filename", "funcName" "lineno", "message", ] def __init__(self, fields, datefmt=None): self.fields = fields if "created" not in self.fields: self.fields.append("created")
[docs] def format(self, record): """ Override format function """ result = {"message": ""} # Assign string to message if isinstance(record.msg, str_types): result["message"] = record.msg # Merge as dict elif isinstance(record.msg, dict): result = merge_dicts(record.msg, result) # Serialize as object else: result["object"] = record.msg # Now merge any extra for k, v in record.__dict__.items(): # Required attrs if k in self.fields: if k == "created": k = "timestamp" result[k] = v # Custom/extra elif k not in JSONFormatter.RESERVED_ATTRS: result[k] = v return json.dumps(result)
# # Helpers and utilities #
[docs]def pretty_recursive(value, htchar="\t", lfchar="\n", indent=0): """ Recursive pretty printing of dict, list and tuples - full creadit to: http://stackoverflow.com/questions/3229419/pretty-printing-nested-dictionaries-in-python """ nlch = lfchar + htchar * (indent + 1) if isinstance(value, dict) or isinstance(value, collections.MutableMapping): items = [ nlch + repr(key) + ": " + pretty_recursive(value[key], htchar, lfchar, indent + 1) for key in value ] return "{%s}" % (",".join(items) + lfchar + htchar * indent) elif isinstance(value, list): items = [ nlch + pretty_recursive(item, htchar, lfchar, indent + 1) for item in value ] return "[%s]" % (",".join(items) + lfchar + htchar * indent) elif isinstance(value, tuple): items = [ nlch + pretty_recursive(item, htchar, lfchar, indent + 1) for item in value ] return "(%s)" % (",".join(items) + lfchar + htchar * indent) else: return repr(value)
[docs]def pretty(sth): """ Format objects, tuples, lists and dicts for pretty printing """ if sth is None: return sth prefix = "" if sth.__class__.__name__: prefix = "(%s) " % format(sth.__class__.__name__) return prefix + pretty_recursive(sth)
[docs]def mkdir_p(path): """ Does the same as 'mkdir -p' in linux """ if os.path.exists(path): return try: os.makedirs(path) except OSError as exc: # Python >2.5 if exc.errno == errno.EEXIST and os.path.isdir(path): pass else: raise exc
[docs]def merge_dicts(source, destination): """ Full credit to: https://stackoverflow.com/a/20666342/3727050 >>> a = { 'first' : { 'all_rows' : { 'pass' : 'dog', 'number' : '1' } } } >>> b = { 'first' : { 'all_rows' : { 'fail' : 'cat', 'number' : '5' } } } >>> merge_dicts(b, a) == { 'first' : { 'all_rows' : { 'pass' : 'dog', 'fail' : 'cat', 'number' : '5' } } } True """ for key, value in source.items(): if isinstance(value, dict): # get node or create one node = destination.setdefault(key, {}) merge_dicts(value, node) else: destination[key] = value return destination