For flexible logging, I have the following two criteria:
- Number of loggers (medium into which log feed will be written) can be chosen.
- Log feed (which will be written into chosen medium) can be chosen, based on log levels.
Using Observer Pattern
import datetime as dt import enum class LogLevel(enum.Enum): debug = 1 info = 2 warning = 3 error = 4 critical = 5 # subscriber in observer pattern, receives updates from publisher. class LogHandler: def __init__(self, logging_function, log_level): self.logging_function = logging_function self.log_level = log_level # receive update from publisher def update_log(self, message, log_level): # log levels: debug=1, info=2, warning=3, error=4, critical=5 # ex. class log level 1 will send log updates for all incoming messages if(self.log_level.value <= log_level.value): date_time_string = str(dt.datetime.now().strftime("%Y-%m-%d %H:%M:%S")) self.logging_function('{} {} {}'.format(date_time_string, log_level.name, message)) # publisher in observer pattern, send updates for subscribers. class Logger: def __init__(self): self.log_handlers = set() def register_log_handler(self, log_handler): self.log_handlers.add(log_handler) def unregister_log_handler(self, log_handler): self.log_handlers.discard(log_handler) # send update for all registered subscribers def send_log_update(self, message, log_level): for log_handler in self.log_handlers: log_handler.update_log(message, log_level) # create publisher logger = Logger() # create console log handler (subscriber), receiving updates only when update message log level is greater than/equal to warning console_log_handler = LogHandler(lambda message: print(message), LogLevel.warning) logger.register_log_handler(console_log_handler) # create file log handler (subscriber), receiving all possible updates log_file = open('/home/mikejuniperhill/log.txt', 'w') file_log_handler = LogHandler(lambda message: print(message, file=log_file), LogLevel.debug) logger.register_log_handler(file_log_handler) # process log updates logger.send_log_update('sending calculation task to engine', LogLevel.debug) logger.send_log_update('engine is processing calculation task', LogLevel.info) logger.send_log_update('incorrect grid configurations, using backup grid settings', LogLevel.warning) logger.send_log_update('analytical error retrieved for this calculation task', LogLevel.error) logger.send_log_update('unable to process calculations task due to incorrect market data', LogLevel.critical) log_file.close()
Log from processing feed is being printed to console and file, according to the chosen log levels.
Using Python Logging
Python is full of positive surprises and as one might expect, logging issues have already been thoroughly chewed by Python community. More information on in-built logging tools can be read from here. In the program, we create loggers and handlers (for console and file) by using method Initialize_logHandlers. This method uses Configurations object, which will be constructed from a given json file. The content of json configuration file is as follows.
{ "LOGFORMATSTRING": "%(asctime)s %(levelname)-1s %(filename)-1s %(funcName)-1s %(message)s", "CLEANLOGFILEBEFOREWRITE": 1, "LOGFILEPATH": "/home/mikejuniperhill/log.txt", "CONSOLELOGLEVEL": 30, "FILELOGLEVEL": 10, "LOGDATEFORMAT": "%d.%m.%Y %H:%M:%S", "LOGFILEMODE": "w" }
The program below will create configurations object from json file, initializes logging handlers (console and file) and process small feed for logger.
import os, logging, json class Configurations: inner = {} # read JSON configuration file to dictionary def __init__(self, filePathName): self.inner = json.load(open(filePathName)) # return value for a given configuration key # 'overload' indexing operator def __getitem__(self, key): return self.inner[key.upper()] def Initialize_logHandlers(configurations): # conditionally, delete the existing log file before starting to write log for this session if(bool(int(configurations['CleanLogFileBeforeWrite'])) == True): path = configurations['LogFilePath'] if(os.path.exists(path) == True): os.remove(path) # create logger logger = logging.getLogger() logger.setLevel(logging.NOTSET) # console log handler c_handler = logging.StreamHandler() c_handler.setLevel(int(configurations['ConsoleLogLevel'])) c_formatter = logging.Formatter(configurations['LogFormatString'], datefmt=configurations['LogDateFormat']) c_handler.setFormatter(c_formatter) logger.addHandler(c_handler) # file log handler f_handler = logging.FileHandler(configurations['LogFilePath'], mode=configurations['LogFileMode'], encoding=None, delay=True) f_handler.setLevel(int(configurations['FileLogLevel'])) f_formatter = logging.Formatter(configurations['LogFormatString'], datefmt=configurations['LogDateFormat']) f_handler.setFormatter(f_formatter) logger.addHandler(f_handler) # read configurations for this program path = '/home/mikejuniperhill/configurations.json' # create configurations object configurations = Configurations(path) # initialize log handlers Initialize_logHandlers(configurations) # process log updates logging.debug('sending calculation task to engine') logging.info('engine is processing calculation task') logging.warning('incorrect grid configurations, using backup grid settings') logging.error('analytical error retrieved for this calculation task') logging.critical('unable to process calculations task due to incorrect market data')
Log from processing feed is being printed to console and file, according to the chosen log levels.
Finally, thanks for reading this blog.
-Mike
No comments:
Post a Comment