Friday, March 20, 2020

Python: implementing Strategy design pattern without class hierarchy

The essence of Strategy design pattern is to enable algorithm selection to happen at run-time. Assume we would have the following two simple functions in a source file. Note, that the content of these functions is only a side-show. The main point of interest here is, how do we call these functions from somewhere else.

# functions.py
import numpy as np

def calculate_average(arr):
    return np.mean(arr)

def calculate_standard_deviation(arr):
    return np.std(arr)

In the main program, we need to select desired algorithm to process calculation for array input.

Hard-coded implementation


Below is kind of 'level-one' implementation for the main program. Needless to say, we are forced to modify the program as soon as any new function will be implemented in functions module and we would like to use that new function in the main program.

import functions

try:
    arr = [1, 2, 3, 4, 5]
    selection = 1

    if(selection == 1):
        result = functions.calculate_average(arr)
    elif(selection == 2):
        result = functions.calculate_standard_deviation(arr)
    else:
        raise Exception('Selected function is not implemented.')

    print(result)

except Exception as e:
    print(e)

Flexible implementation


Fortunately, there are ways to get out of such hard-coded scheme for selecting algorithm and this post is only presenting one such possibility. First, let us implement the following json configuration file. In this file, we are configuring the name of the function what we would like to use in our main program.

{
  "function_name": "calculate_standard_deviation"
}

In our new main program, we create configurations from the previous file and read function name from it. Then, by using hasattr function we check, if functions module is containing configured function. After this, we use getattr function to get reference to configured function. Finally, we will use the function and print calculation result.

import json
import functions

try:
    arr = [1, 2, 3, 4, 5]

    # create configurations
    path = '//temp/configurations.json'
    configurations = json.load(open(path, 'r'))
    function_name = configurations['function_name']

    if(hasattr(functions, function_name)):
        function = getattr(functions, function_name)
        result = function(arr)
    else:
        raise Exception('Calculation cannot be processed due to incorrect function configuration.')

    print(result)

except Exception as e:
    print(e)

Assume there would be a new function implementation in functions module and we would like to use that new function in our main program. In this flexible implementation, there would be no need to touch the main program here. Required change (run-time information of what function will be used) has now been isolated to configurations file.

From purely technical point of view, presented scheme is far from being 'traditional' Strategy pattern implementation starting from the fact, that there is no class hierarchy. However, code is receiving run-time instructions (from configurations file) for selecting a specific algorithm (from all functions available). At least for me, this is the essence of Strategy design pattern.

Finally, thanks for reading this blog.
-Mike

Saturday, March 14, 2020

Python: Implementing Flexible Logging Mechanism

This post is presenting a way to implement flexible logging mechanism for Python program. However, just for the sake of being curious, I have also implemented another logging mechanism by using Observer Design Pattern. The both programs can be downloaded from my github page.

For flexible logging, I have the following two criteria:
  1. Number of loggers (medium into which log feed will be written) can be chosen.
  2. Log feed (which will be written into chosen medium) can be chosen, based on log levels.

Using Observer Pattern


import datetime as dt   
import enum

class LogLevel(enum.Enum):
    debug = 1
    info = 2
    warning = 3
    error = 4
    critical = 5
    
# subscriber in observer pattern, receives updates from publisher.
class LogHandler:
    def __init__(self, logging_function, log_level):
        self.logging_function = logging_function
        self.log_level = log_level
    
    # receive update from publisher
    def update_log(self, message, log_level):
        # log levels: debug=1, info=2, warning=3, error=4, critical=5
        # ex. class log level 1 will send log updates for all incoming messages
        if(self.log_level.value <= log_level.value):
            date_time_string = str(dt.datetime.now().strftime("%Y-%m-%d %H:%M:%S"))
            self.logging_function('{} {} {}'.format(date_time_string, log_level.name, message))
        
# publisher in observer pattern, send updates for subscribers.
class Logger:
    def __init__(self):
        self.log_handlers = set()
        
    def register_log_handler(self, log_handler):
        self.log_handlers.add(log_handler)
        
    def unregister_log_handler(self, log_handler):
        self.log_handlers.discard(log_handler)
    
    # send update for all registered subscribers
    def send_log_update(self, message, log_level):
        for log_handler in self.log_handlers:
            log_handler.update_log(message, log_level)  

# create publisher
logger = Logger()

# create console log handler (subscriber), receiving updates only when update message log level is greater than/equal to warning
console_log_handler = LogHandler(lambda message: print(message), LogLevel.warning)
logger.register_log_handler(console_log_handler)

# create file log handler (subscriber), receiving all possible updates
log_file = open('/home/mikejuniperhill/log.txt', 'w')
file_log_handler = LogHandler(lambda message: print(message, file=log_file), LogLevel.debug)
logger.register_log_handler(file_log_handler)

# process log updates
logger.send_log_update('sending calculation task to engine', LogLevel.debug)
logger.send_log_update('engine is processing calculation task', LogLevel.info)
logger.send_log_update('incorrect grid configurations, using backup grid settings', LogLevel.warning)
logger.send_log_update('analytical error retrieved for this calculation task', LogLevel.error)
logger.send_log_update('unable to process calculations task due to incorrect market data', LogLevel.critical)

log_file.close()

Log from processing feed is being printed to console and file, according to the chosen log levels.









Using Python Logging


Python is full of positive surprises and as one might expect, logging issues have already been thoroughly chewed by Python community. More information on in-built logging tools can be read from here. In the program, we create loggers and handlers (for console and file) by using method Initialize_logHandlers. This method uses Configurations object, which will be constructed from a given json file. The content of json configuration file is as follows.

{
  "LOGFORMATSTRING": "%(asctime)s %(levelname)-1s %(filename)-1s %(funcName)-1s %(message)s", 
  "CLEANLOGFILEBEFOREWRITE": 1, 
  "LOGFILEPATH": "/home/mikejuniperhill/log.txt",
  "CONSOLELOGLEVEL": 30,
  "FILELOGLEVEL": 10, 
  "LOGDATEFORMAT": "%d.%m.%Y %H:%M:%S",
  "LOGFILEMODE": "w"
}

The program below will create configurations object from json file, initializes logging handlers (console and file) and process small feed for logger.

import os, logging, json

class Configurations:
    inner = {}
    # read JSON configuration file to dictionary
    def __init__(self, filePathName):
        self.inner = json.load(open(filePathName))
    # return value for a given configuration key
    # 'overload' indexing operator
    def __getitem__(self, key):
        return self.inner[key.upper()]

def Initialize_logHandlers(configurations):
    # conditionally, delete the existing log file before starting to write log for this session
    if(bool(int(configurations['CleanLogFileBeforeWrite'])) == True):
        path = configurations['LogFilePath']
        if(os.path.exists(path) == True): os.remove(path)
 
    # create logger
    logger = logging.getLogger()
    logger.setLevel(logging.NOTSET)
 
    # console log handler
    c_handler = logging.StreamHandler()
    c_handler.setLevel(int(configurations['ConsoleLogLevel']))
    c_formatter = logging.Formatter(configurations['LogFormatString'], datefmt=configurations['LogDateFormat'])
    c_handler.setFormatter(c_formatter)
    logger.addHandler(c_handler)
 
    # file log handler
    f_handler = logging.FileHandler(configurations['LogFilePath'], mode=configurations['LogFileMode'], encoding=None, delay=True)
    f_handler.setLevel(int(configurations['FileLogLevel']))
    f_formatter = logging.Formatter(configurations['LogFormatString'], datefmt=configurations['LogDateFormat'])
    f_handler.setFormatter(f_formatter)
    logger.addHandler(f_handler)

# read configurations for this program
path = '/home/mikejuniperhill/configurations.json'
# create configurations object
configurations = Configurations(path)

# initialize log handlers
Initialize_logHandlers(configurations)
 
# process log updates
logging.debug('sending calculation task to engine')
logging.info('engine is processing calculation task')
logging.warning('incorrect grid configurations, using backup grid settings')
logging.error('analytical error retrieved for this calculation task')
logging.critical('unable to process calculations task due to incorrect market data')

Log from processing feed is being printed to console and file, according to the chosen log levels.







Finally, thanks for reading this blog.
-Mike

Saturday, March 7, 2020

Python: Implementing Factory Method Design Pattern

Ideally, program should be closed for modifications, but open for extensions and hard-coded stuff should be avoided like plague. This is the point, where Design Patterns are usually stepping into picture. When the code base will grow enough in size and we are in charge of production-level programs, it is relatively easy to justify the usage of patterns. In this post, Python mutation of Factory Method Pattern is introduced for creating Task class objects, based on argument feed received from different types of sources. Complete program can be downloaded from my github page.

Imaginary scheme


Third-party analytical software is used for processing different types of complex calculations. One calculation request requires a set of specific arguments from the client program. Moreover, different types of requests will require different set of arguments (number of arguments, types of arguments). Before sending calculation request for analytical software, client program must collect arguments for all tasks to be processed from somewhere and we would like to leave the source for such task arguments open (json, text, console, xml, etc.).

Json source file


{
  "taskName": "ANZ.XVA",
  "taskType": "XVA",
  "paths": 1000,
  "storeExposures": "True" 
}

Text source file


taskName,RBC.PV
taskType,PV

Program


import sys, json, csv

# class for hosting calculation task-related information
# NOTE: the actual method for creating 'calculation request' is currently not implemented
class Task(object):
    def __init__(self, arguments):
        self.arguments = arguments
        
    # non-relevant dummy method, which just writes out given arguments
    def print(self):
        for k, v in self.arguments.items():
            print(k, 'is', v)
        
    @classmethod
    # factory: create task object from json file
    def from_json(cls, path)->'Task':
        arguments = json.load(open(path, 'r'))
        return cls(arguments)
    
    @classmethod
    # factory: create task object from text file
    def from_text(cls, path)->'Task':
        arguments = dict((k, v) for k, v in csv.reader(open(path, 'r')))
        return cls(arguments)
    
    @classmethod
    # factory: create task object from console input
    def from_console(cls)->'Task':
        arguments = json.loads(input())
        return cls(arguments)        

path = '/home/mikejuniperhill/json.feed/task.json'
task = Task.from_json(path)
task.print()

path = '/home/mikejuniperhill/json.feed/task.txt'
task = Task.from_text(path)
task.print()

task = Task.from_console()
# console feed string: {"taskName":"ANZ.WHATIF.XVA","taskType":"WHATIF","referenceTask":"ANZ.XVA"}
task.print()

Program output




















Thanks for reading this blog.
-Mike


Monday, March 2, 2020

Python: Simulating Exposures Using Multiprocessing Pool

This post is presenting a scheme for simulating exposures for European call option on a non-dividend-paying stock by using Multiprocessing.Pool class (note: in Linux). There are several different options available when stepping into multiprocessing world, but for this specific purpose, Multiprocessing.Pool is robust enough. In a nutshell, we need to simulate a path for the underlying equity price and then calculate call option present value for each point in time. Complete program can be downloaded from my github.

Library imports


import multiprocessing as mp
import numpy as np
import scipy.stats as sc
import matplotlib.pyplot as pl
import time, math

Methods


# black scholes valuation of call option on non-dividend-paying equity
def VanillaCall(s, x, t, r, v):
    pv = 0.0
    # option value at maturity
    if((t - 0.0) < (1 / 365)):
        pv = max(s - x, 0.0)
    # option value before its maturity
    else:
        d1 = (math.log(s / x) + (r + 0.5 * v ** 2) * t) / (v * math.sqrt(t))
        d2 = (math.log(s / x) + (r - 0.5 * v ** 2) * t) / (v * math.sqrt(t))
        pv = (s * sc.norm.cdf(d1, 0.0, 1.0) - x * math.exp(-r * t) * sc.norm.cdf(d2, 0.0, 1.0))
    return pv

# globalize local parameters
def Globalizer(local_parameters):
    global global_parameters
    global_parameters = local_parameters

# simulate exposures by using
# - a given one-factor process (func_process)
# - a given pricing function (func_pricing)
def SimulateExposures(arg):
    # seed random number generator
    np.random.seed(arg)
    # unzip globalized tuple
    n_paths_per_process, n_steps, t, s_0, r, vol, func_pricing, func_process = global_parameters
    s = 0.0
    pv = 0.0
    dt = (t / n_steps)
    # create container for results
    paths = np.zeros(shape=(n_paths_per_process, n_steps + 1), dtype=float)
    for i in range(n_paths_per_process):
        s = s_0
        t_expiry = t
        # value product at path inception
        paths[i, 0] = func_pricing(s, t_expiry)
        # create array of normal random variates
        e = np.random.standard_normal(n_steps)
        for j in range(n_steps):
            s = func_process(s, e[j])
            t_expiry = (t - dt * (j + 1))
            # value product at after inception
            paths[i, j + 1] = func_pricing(s, t_expiry)
    return paths

Main program


The main program will create a pool containing two processes, which will use SimulateExposures method for calculating exposures for a call option. Method takes only random seed as its direct input argument. All the other required arguments (product- or process-related parameters) will be packed into tuple data structure, which will be globalized for each process separately in initializer method. After processing, the both worker methods will return an array of simulated exposures, which then will be aggregated into array (exposures).

t_0 = time.time()

# monte carlo parameters
n_paths = 10000
n_steps = 250
n_processes = 2
n_paths_per_process = int(n_paths / n_processes)
seeds = np.random.randint(low=1, high=999999, size=n_processes)

# product- and process-related parameters
t = 1.0
s_0 = 100.0
r = 0.05
vol = 0.15
x = 100.0
drift = ((r - 0.5 * vol * vol) * (t / n_steps))
diffusion = (vol * math.sqrt(t / n_steps))

# method for calculating pv as a function of spot and time to maturity
func_pricing = lambda s_, t_: VanillaCall(s_, x, t_, r, vol)
# method for simulating stochastic process
func_process = lambda s_, e_: s_ * math.exp(drift + diffusion * e_)

# container for storing results from worker methods
results = []

# local parameters, which will be globalized for pool workers
simulation_parameters = (n_paths_per_process, n_steps, t, s_0, r, vol, func_pricing, func_process)
pool = mp.Pool(processes=n_processes, initializer=Globalizer, initargs=(simulation_parameters,))
for r in pool.imap_unordered(SimulateExposures, seeds): results.append(r)

t_1 = time.time()
print('processing time', t_1 - t_0)

# stack results from workers into one array
exposures = np.empty(shape = (0, results[0].shape[1]))
for i in range(n_processes):
    exposures = np.vstack([exposures, results[i]])

# average of all exposures
expected_exposure = np.mean(exposures, axis=0)
pl.plot(expected_exposure)
pl.show()

Thanks for reading this blog.
-Mike