Showing posts with label Functional Programming Paradigm. Show all posts
Showing posts with label Functional Programming Paradigm. Show all posts

Wednesday, December 26, 2018

QuantLib-Python: Multiprocessing Method Wrapper

In this post, I published a program for simulating term structure up to 30 years with daily time step, using Hull-White one-factor model. The resulting curve was able to replicate the given initial yield curve without any notable differences. The only serious issue here was the cost of running the program in time units.

In order to improve this specific issue, I got familiar with some multi-threading possibilities in Python. Now, there are some deep issues related to the way Python threads are actually implemented and especially the issues with thread locking known as GIL. Related stuff has been completely chewed in here. In order to avoid facing GIL-related issues, another way is to use Python multiprocessing, which allows the programmer to fully leverage multiple processors on a given machine. Moreover, separate processes are completely separate and one process cannot affect another's variables.

Comparison statistics


Again, I simulated term structure up to 30 years with daily time step, using 10000 paths. I did some quick profiling, in order to find the bottlenecks in the original program (sequential). By looking the column task share, we can see, that there are two tasks, which are consuming the most part of the complete running time: path generations (~54%) and path integrations (~46%). After this, I isolated these two parts and processed these by using multiprocessing scheme.











By using multiprocessing (two configured processes), I managed to decrease the complete running time from 163 seconds to 107 seconds. In general, for all those parts which were enjoying the benefits of multiprocessing, improvement ratio (multiprocessing per sequential) is around 0.65.

CPU puzzle


In order to understand the reason for this improvement, let us take a look at processor architecture in my laptop. First, let us check the "Grand Promise" made by System Monitor.













Based on this view, I actually expected to have four CPU for processing. I was then really surprised, that adding third and fourth process was not decreasing running time any further, than having just two processes. After some usual Stackoverflow gymnastics, I finally got the definition to calculate the real number of CPU available in my laptop.












CPU available is "Core(s) per socket * Socket(s)", which is 2 * 1 in my laptop. So, all in all I have only two CPU available for processing, not four as was shown in that System Monitor. This means, that having more than two CPU available in a laptop, one should expect even better improvement ratio than reported in my statistics here.

Wrapper method


In order to avoid code duplication and to come up with something a bit more generic, I started to dream about the possibility to create a mechanism for applying multiprocessing for any given method, if so desired. Such solution is possible by using Python lambda methods.

# method for executing given lambdas in parallel
def MultiprocessingWrapper(targetFunctionList):
    processList = []
    aggregatedResults = []
    queue = mp.Manager().Queue()

    # execute lambda from a given list based on a given index
    # storing results into queue
    def Worker(index):
        result = targetFunctionList[index]()
        queue.put(result)
    
    # start processes, call worker method with index number
    for i in range(len(targetFunctionList)):
        process = mp.Process(target = Worker, args = (i,))
        processList.append(process)
        process.start()
    
    # join processes, extract queue into results list
    for process in processList:
        process.join()
        aggregatedResults.append(queue.get())
        
    # return list of results for a client
    return aggregatedResults

Previous method is receiving a list of lambda methods as its argument. The idea is, that there would be always one lambda method for each process to be started. Why? We might face a situation, in which different set of parameters would be required for each lambda (say, all lambdas would be using the same uniform random generator, but with a different values for seeding the generator). In wrapper method, process is then created to start each configured lambda method. Results calculated by given lambda will be stored into queue. Finally, all results will be imported from queue into result list and returned for a client.

As concrete example, the following program segment is first creating two lambda methods for starting GeneratePaths method, but with different seed value for both processes. After this, wrapper method is then fed with the list of lambdas and processed paths will be received into list of results.

    # task of generating paths is highly time critical
    # use multiprocessing for path generation
    # create lambdas for multiprocessing wrapper
    # target signature: def GeneratePaths(seed, process, timeGrid, n)
    nPaths = 10000
    nProcesses = 2
    seeds = [1834, 66023]
    nPathsPerProcess = int(nPaths / nProcesses)
    target_1 = lambda:GeneratePaths(seeds[0], HW1F, grid.GetTimeGrid(), nPathsPerProcess)
    target_2 = lambda:GeneratePaths(seeds[1], HW1F, grid.GetTimeGrid(), nPathsPerProcess)
    targetFunctionList = [target_1, target_2]
    results = MultiprocessingWrapper(targetFunctionList)

The complete program can be found in my GitHub repository. Finally, a couple of discussion threads, which may be useful in order to understand some QuantLib-related issues, are given in here and here. Thanks for reading my blog and again, Merry Christmas for everyone.
-Mike
  

Sunday, August 6, 2017

C++11 : modelling one-factor processes using functional programming paradigm


There was an interesting technical article on July 2017 Wilmott magazine written by Daniel Duffy and Avi Palley. This multi-page article was giving an overview on some "game changing" Boost libraries, which have been accepted as a part of C++11/14 standard, such as smart pointers, function wrappers, lambda expressions and tuples. The second part of this article series will be published in the next Wilmott magazine and it will present (according to editor) design and implementation for Monte Carlo option pricing framework. It is also an important point of mentioning, that (according to Amazon.com) Daniel Duffy is about to publish long-awaited second edition of his book on pricing derivatives using C++. The book was initially published somewhere back in 2004 and the landscape has changed quite dramatically since these days.

Within the last chapter of this article, functional programming paradigm was nicely applied for modelling one-factor stochastic differential equations, generally used in Finance. By applying more functional programming paradigm, the usually observed code bloating can be substantially reduced. As an example of such code bloat, I reviewed my own implementation for path generator, which models one-factor processes for Monte Carlo purposes. There is an abstract base class (OneFactorProcess) and implementations for GBM and Vasicek processes. Even there is nothing fundamentally wrong with this approach (class hierarchy), one may ask, whether there would be a bit more flexible ways to implement this kind of a scheme.

Within this post, I have been re-designing modelling part for one-factor processes by applying functional programming paradigm, as presented in that article. Reduction in code bloating is present, since there is currently only one single class for modelling different types of processes (before, there was a class hierarchy). Moreover, since the both functions for handling drift and diffusion terms will be constructed outside of this class, their construction process is now much more flexible than before.


The program

 

Implement the following program (two header files and one implementation file for tester) into a new project. For brevity reasons, I have re-designed only the part of the program, which models one-factor processes. Monte Carlo part has been implemented as free function in tester implementation file. First, one-factor process object (Process) will be created by using ProcessBuilder object (Builder Pattern). Within this example, I have implemented a builder for constructing Process object by using console. However, the flexibility in this program allows different types of builders to be implemented. As soon as Process object is created, "skeleton path" (std::vector) will be sent to Monte Carlo method (MonteCarloLite), along with all simulation-related attributes and Process object. As a result, this method will fill vector with the values from one simulation path for chosen parameters and applied stochastic process. Finally, a path will be printed back to console.

#pragma once
#include <functional>
// OneFactorSDE.h
namespace FunctionalOneFactorProcessExampleNamespace
{
 // blueprint for a function, which models drift or diffusion
 using Function = std::function<double(double s, double t)>;
 // wrapper for drift and diffusion function components
 using Functions = std::tuple<Function, Function>;
 //
 // class for modeling one-factor processes by employing drift and diffusion functions
 class Process
 {
 public:
  Process(Functions functions, double initialCondition)
   : driftFunction(std::get<0>(functions)), diffusionFunction(std::get<1>(functions)),
   initialCondition(initialCondition) { }
  double drift(double s, double t) { return this->driftFunction(s, t); }
  double diffusion(double s, double t) { return this->diffusionFunction(s, t); }
  double InitialCondition() const { return this->initialCondition; }
  //
 private:
  double initialCondition; // spot price for a process
  Function driftFunction; // function for modelling process drift
  Function diffusionFunction;  // function for modelling process diffusion
 };
}
//
//
//
// ProcessFactory.h
#pragma once
#include <iostream>
#include <string>
#include <memory>
#include "OneFactorSDE.h"
//
namespace FunctionalOneFactorProcessExampleNamespace
{
 // abstract base class for all process builders
 class ProcessBuilder
 {
 public:
  // return process object which is wrapped inside shared pointer
  // let implementation classes decide, how and from where to built process object
  virtual std::shared_ptr<Process> Build() = 0;
 };
 //
 // specific implementation for console process builder
 // process type and corresponding parameters will be requested from a client by using console
 class ConsoleProcessBuilder : public ProcessBuilder
 {
 public:
  std::shared_ptr<Process> Build() override
  {
   Functions functions;
   double initialCondition = 0.0;
   std::string consoleSelection;
   std::cout << "Select process [1 = GBM, 2 = Vasicek] > ";
   // if conversion cannot be performed, stoi will throw invalid argument exception
   std::getline(std::cin, consoleSelection);
   int processID = std::stoi(consoleSelection);
   //
   switch (processID)
   {
   // GBM process
   case 1:
   {
    // receive client inputs
    std::cout << "spot price > ";
    std::getline(std::cin, consoleSelection);
    initialCondition = std::stod(consoleSelection);
    //
    std::cout << "risk-free rate > ";
    std::getline(std::cin, consoleSelection);
    double r = std::stod(consoleSelection);
    //
    std::cout << "volatility > ";
    std::getline(std::cin, consoleSelection);
    double v = std::stod(consoleSelection);
    //
    // build drift and diffusion functions for GBM process object
    auto driftFunction = [r](double S, double T){ return r * S; };
    auto diffusionFunction = [v](double S, double T){ return v * S; };
    // wrap drift and diffusion functions into tuple
    functions = std::make_tuple(driftFunction, diffusionFunction);
    break;
   }
   case 2:
   {
    // receive client inputs
    std::cout << "spot price > ";
    std::getline(std::cin, consoleSelection);
    initialCondition = std::stod(consoleSelection);
    //
    std::cout << "reversion > ";
    std::getline(std::cin, consoleSelection);
    double reversion = std::stod(consoleSelection);
    //
    std::cout << "long-term rate > ";
    std::getline(std::cin, consoleSelection);
    double longTermRate = std::stod(consoleSelection);
    //
    std::cout << "rate volatility > ";
    std::getline(std::cin, consoleSelection);
    double v = std::stod(consoleSelection);
    //
    // build drift and diffusion functions for Vasicek process object
    auto driftFunction = [reversion, longTermRate](double S, double T)
     { return reversion * (longTermRate - S); };
    auto diffusionFunction = [v](double S, double T){ return v; };
    // wrap drift and diffusion functions into tuple
    functions = std::make_tuple(driftFunction, diffusionFunction);
    break;
   }
   default:
    // if selected process is not configured, program will throw invalid argument exception
    throw std::invalid_argument("invalid process ID");
    break;
   }
   // build and return constructed process object for a client
   // wrapped into shared pointer
   return std::shared_ptr<Process>(new Process(functions, initialCondition));
  }
 };
}
//
//
//
// Tester.cpp
#include <vector>
#include <random>
#include <algorithm>
#include <chrono>
#include "ProcessFactory.h"
namespace MJ = FunctionalOneFactorProcessExampleNamespace;
//
void MonteCarloLite(std::vector<double>& path, 
 std::shared_ptr<MJ::Process>& process, const double maturity);
//
int main()
{
 try
 {
  // create process object by using console builder
  MJ::ConsoleProcessBuilder builder;
  std::shared_ptr<MJ::Process> process = builder.Build();
  //
  // create simulation-related attributes
  int nSteps = 100;
  double timeToMaturity = 1.25;
  // create, process and print one path
  std::vector<double> path(nSteps);
  MonteCarloLite(path, process, timeToMaturity);
  std::for_each(path.begin(), path.end(), 
   [](double v) { std::cout << v << std::endl; });
 }
 catch (std::exception e)
 {
  std::cout << e.what() << std::endl;
 }
 return 0;
}
//
void MonteCarloLite(std::vector<double>& path, 
 std::shared_ptr<MJ::Process>& process, const double maturity)
{
 // lambda method for seeding uniform random generator
 std::function<unsigned long(void)> seeder =
  [](void) -> unsigned long { return static_cast<unsigned long>
  (std::chrono::steady_clock::now().time_since_epoch().count()); };
 //
 // create uniform generator, distribution and random generator function
 std::mt19937 uniformGenerator(seeder());
 std::normal_distribution<double> distribution;
 std::function<double(double)> randomGenerator;
 //
 // lambda method for processing standard normal random numbers
 randomGenerator = [uniformGenerator, distribution](double x) mutable -> double
 {
  x = distribution(uniformGenerator);
  return x;
 };
 //
 // create vector of standard normal random numbers
 // use lambda method created earlier
 std::transform(path.begin(), path.end(), path.begin(), randomGenerator);
 //
 double dt = maturity / (path.size() - 1);
 double dw = 0.0;
 double s = (*process).InitialCondition();
 double t = 0.0;
 path[0] = s; // 1st path element is always the current spot price
 //
 // transform random number vector into a path containing asset prices
 for (auto it = path.begin() + 1; it != path.end(); ++it)
 {
  t += dt;
  dw = (*it) * std::sqrt(dt);
  (*it) = s + (*process).drift(s, t) * dt + (*process).diffusion(s, t) * dw;
  s = (*it);
 }
}


As always, thanks a lot for reading this blog.
-Mike