Showing posts with label Thread-safe data structures. Show all posts
Showing posts with label Thread-safe data structures. Show all posts

Sunday, January 1, 2017

C++11 : Multi-Threaded Random Generator using PPL

After some woodshedding with C++11 random and publishing distributional random generator in my blog a couple of weeks ago, the next question was, how to make this thing to happen a bit faster? Since I already have been snooping PPL (Parallel Patterns Library), I realized that Parallel Algorithms in that library would be an excellent candidate for this task. There is a great and comprehensive internet book available on this topic. After getting familiar with some of the issues presented in these sources, one should be able to parallelize algorithms succesfully.

There are some very specific issues, related to random generator seeding. As I started to parallelize my random number generation (creating several generators and processing these using parallel algorithm), I realized that my random generators were not working as expected. After some further woodshedding I came to conclusion, that (in a multi-threaded setting) each generator has to be seeded independently. Finally, concerning my current generator implementation, I made a decision that a client has to provide explicit seed values for all generators. Based on my hands-on experience with a system used for counterparty credit risk simulations (widely ranked as state-of-the-art numerical software), I dare to say that this approach is well justified. Now, the question of what kind of tools will be used for creating the actual seed values, has been left to a client judgement. The rabbit hole of these profound issues related to random generators have been chewed in this blog. Do not enter, if you are not absolutely ready to take the red pill.


ROUGH DIAGNOSTICS


I tested the both sequential (for_each) and parallel (parallel_for_each) schemes by using four generators, 10000 paths and 250 time steps for a single run. After this, I repeated this run for 250 times. The average running time for this sample was 5187 milliseconds (sequential) and 2317 milliseconds (parallel). So, parallel scheme will be completed about two times faster. The actual CPU usages during simulation process for the both schemes are shown in the screenshot below. When using parallel algorithm, computer is able to utilize all four cores in my computer.



I also tested all processed random paths for the both schemes, just to be absolutely sure there are no path duplicates (random number generation would not be independent). Based on my analysis made in Excel, all processed paths for the both schemes are different and there are no duplicates. Presented scheme is hereby fulfilling my two requirements : faster creation of random paths and independency of random generators.


RandomGenerator.h


Current implementation for distributional random path generator is presented below. Note, that the actual seeder creation takes place outside of this class.

#pragma once
#include <algorithm>
#include <functional>
#include <vector>
#include <random>
//
namespace MikeJuniperhillRandomGeneratorTemplate
{
 template <typename Generator = std::mt19937, typename Distribution = std::normal_distribution<double>>
 /// <summary>
 /// Template class for creating random number paths using
 /// Mersenne Twister as default uniform random generator and
 /// Standard Normal (0.0, 1.0) as default probability distribution.
 /// </summary> 
 class RandomGenerator
 {
 public:
  /// <summary>
  /// set explicit seed value
  /// </summary>
  RandomGenerator(unsigned long seed)
  {
   // construct lambda method for processing standard normal random number
   randomGenerator = [this](double x)-> double
   {
    x = distribution(uniformGenerator);
    return x;
   };
   // seed generator
   uniformGenerator.seed(seed);
  }
  /// <summary>
  /// fill vector reference with distributional random numbers
  /// </summary> 
  void operator()(std::vector<double>& v)
  {
   std::transform(v.begin(), v.end(), v.begin(), randomGenerator);
  }
 private:
  std::function<double(double)> randomGenerator;
  Generator uniformGenerator;
  Distribution distribution;
 };
}
//


tester.cpp


First, all simulation parameters and explicit seed values for all generators are defined. Simple factory method is then used to create the actual generator function as lambda function. Next, factory method is returning generator function wrapped inside a shared pointer. Main program is then storing each created generator function into vector container and finally processing all generators (sequentially or parallel). Each generator is storing generated paths into thread-safe nested vector (std::vector inside concurrency::concurrent_vector).

#include <iostream>
#include <memory>
#include <chrono>
#include <ppl.h>
#include <concurrent_vector.h>
#include "RandomGenerator.h"
namespace MJ = MikeJuniperhillRandomGeneratorTemplate;
//
//
// thread-safe nested vector for storing random paths processed by generators
concurrency::concurrent_vector<std::vector<double>> paths;
//
// prototype for generator function
using Generator = std::function<void(void)> ;
//
// factory method :
// return generator function as function wrapper
std::shared_ptr<Generator> Factory(int nPaths, int nSteps, unsigned long seed, 
 concurrency::concurrent_vector<std::vector<double>>& paths)
{
 // create generator function
 auto generator = [=, &paths]()->void
 {
  // step 1 : create distributional random generator
  MJ::RandomGenerator<> randomGenerator(seed);
  // step 2 : create container for a random path
  std::vector<double> path(nSteps);
  // step 3 : create path using distributional generator
  // and import created path into thread-safe container
  for (auto i = 0; i != nPaths; ++i)
  {
   randomGenerator(path);
   paths.push_back(path);
  }
 };
 // return generator function as function wrapper
 return std::shared_ptr<Generator>(new Generator(generator));
}
//
int main()
{
 // define parameters and seed values for generator functions
 int nGenerators = 4;
 int nPaths = 10000;
 int nSteps = 250;
 std::vector<unsigned long> seed = { 10322854, 65947, 387528, 772399573 };
 //
 // use factory method for creating generator function
 std::vector<std::shared_ptr<Generator>> generators;
 for (auto i = 0; i < nGenerators; i++) generators.push_back(Factory(nPaths, nSteps, seed[i], paths));
 //
 // parallel processing
 auto start = std::chrono::steady_clock::now();
 concurrency::parallel_for_each(generators.begin(), generators.end(),
  [](std::shared_ptr<Generator> ppg) -> void { (*ppg)(); });
 //
 //// uncomment for sequential processing
 //std::for_each(generators.begin(), generators.end(),
 // [](std::shared_ptr<Generator> ppg) -> void { (*ppg)(); });
 //
 auto end = std::chrono::steady_clock::now();
 auto timeElapsed = (end - start);
 std::cout << std::chrono::duration_cast<std::chrono::milliseconds>(timeElapsed).count() << std::endl;
 return 0;
}
//


Finally, thanks for reading my blog. I would like to wish Happy New Year for everybody.
-Mike

Wednesday, December 31, 2014

Multi-Threaded Copula producer in C#

I wanted to publish one more article for the sake of celebrating the end of the year 2014. This time I am opening one possible solution for producing multi-threaded correlated random numbers with Gaussian Copula in C#. The numerical scheme used for this procedure has already been opened well enough in this blog posting. Usually, programs presented in this blog have been having (more or less) something to do with Microsoft Excel. This time, there is only C# program. However, with all the example programs presented in this blog, the user should be able to interface this program back to Excel in many different ways, if so desired.

PRODUCER-CONSUMER


In this simple example program, Producer-Consumer Pattern has been used. CopulaQueue class is nothing more, than just a wrapper for thread-safe ConcurrentQueue data structure. More specifically, the part of the program which actually creates all correlated random numbers into CopulaQueue, is the Producer. In the program, concrete CopulaEngine implementation class takes concrete RandomGenerator implementation class and correlation matrix as its input parameters. Concrete CopulaEngine implementation class then creates correlated random numbers and uses C# Event for sending created random number matrices to CopulaQueue. Another part of the program is the Consumer. Here, I have created a simple consumer class which simply prints created random numbers to console. Consumer class also uses C# Event for Dequeueing random number matrices from CopulaQueue. Needless to say, this (Consumer) is the part of our program, where we should be utilizing all correlated random numbers created by the Producer.

On design side, this program is closely following the program presented in the blog posting referred above. Again, the beef in this design is on its flexibility to allow any new implementations for Random generator and Copula model. Moreover, our example program is using C# Framework Parallel class method Parallel.ForEach, found under System.Threading.Tasks namespace. Example program creates four producers and consumers (both into List data structure) and using multi-threading (Parallel.ForEach) when processing correlated random numbers into/from CopulaQueue.


C# PROGRAM

In order to make everything as smooth as possible for the program replication, I have been writing all required classes in the same file. Just create a new console application and copy-paste the following program into a new .cs file.

using System;
using System.Diagnostics;
using System.Linq;
using System.Collections.Generic;
using System.Threading.Tasks;
using System.Collections.Concurrent;
//
namespace MultithreadingCopula
{
    // public delegate methods needed for sending and receiving matrix data
    public delegate void MatrixSender(double[,] matrix);
    public delegate double[,] MatrixReceiver();
    // ********************************************************************
    //
    // the main program 
    class Program
    {
        static void Main(string[] args)
        {
            try
            {
                // start timer
                Stopwatch timer = new Stopwatch();
                timer.Start();
                //
                // PRODUCER PART
                // create copula implementation, correlation matrix data and list of copula engines
                CopulaQueue copulaQueue = new CopulaQueue();
                double[,] correlationMatrix = createCorrelationMatrix();
                List<CopulaEngine> engines = new List<CopulaEngine>();
                int nEngines = 4;
                int nRandomArrays = 25000;
                for (int i = 0; i < nEngines; i++)
                {
                    engines.Add(new GaussianCopula(nRandomArrays, correlationMatrix, new CSharpRandom()));
                    ((GaussianCopula)engines[i]).SendMatrix += copulaQueue.Enqueue;
                }
                // process all copula engines (producers) in parallel
                Parallel.ForEach(engines, engine => engine.Process());
                //
                // CONSUMER PART
                // create list of consumers
                List<Consumer> consumers = new List<Consumer>();
                int nConsumers = nEngines;
                for (int i = 0; i < nConsumers; i++)
                {
                    consumers.Add(new Consumer(nRandomArrays, "consumer_" + (i + 1).ToString()));
                    consumers[i].ReceiveMatrix += copulaQueue.Dequeue;
                }
                // process all consumers in parallel
                Parallel.ForEach(consumers, consumer => consumer.Process());
                //
                // stop timer, print elapsed time to console
                timer.Stop();
                Console.WriteLine("{0}  {1}", "time elapsed", timer.Elapsed.TotalSeconds.ToString());
            }
            catch (AggregateException e)
            {
                Console.WriteLine(e.Message.ToString());
            }
        }
        // hard-coded data for correlation matrix
        static double[,] createCorrelationMatrix()
        {
            double[,] correlation = new double[5, 5];
            correlation[0, 0] = 1.0; correlation[0, 1] = 0.748534249620189;
            correlation[0, 2] = 0.751; correlation[0, 3] = 0.819807151586399;
            correlation[0, 4] = 0.296516443152486; correlation[1, 0] = 0.748534249620189;
            correlation[1, 1] = 1.0; correlation[1, 2] = 0.630062355599386;
            correlation[1, 3] = 0.715491543000545; correlation[1, 4] = 0.329358787086333;
            correlation[2, 0] = 0.751; correlation[2, 1] = 0.630062355599386;
            correlation[2, 2] = 1.0; correlation[2, 3] = 0.758158276137995;
            correlation[2, 4] = 0.302715778081627; correlation[3, 0] = 0.819807151586399;
            correlation[3, 1] = 0.715491543000545; correlation[3, 2] = 0.758158276137995;
            correlation[3, 3] = 1.0; correlation[3, 4] = 0.286336231914179;
            correlation[4, 0] = 0.296516443152486; correlation[4, 1] = 0.329358787086333;
            correlation[4, 2] = 0.302715778081627; correlation[4, 3] = 0.286336231914179;
            correlation[4, 4] = 1.0;
            return correlation;
        }
    }
    // ********************************************************************
    //
    // consumer class for correlated random numbers
    public class Consumer
    {
        public event MatrixReceiver ReceiveMatrix;
        public string ID;
        private int n;
        //
        public Consumer(int n, string ID) { this.n = n; this.ID = ID; }
        public void Process()
        {
            for (int i = 0; i < n; i++)
            {
                // get correlated 1xM random matrix from CopulaQueue and print it to console
                double[,] matrix = ReceiveMatrix();
                Console.WriteLine("{0}  {1}  {2}  {3}  {4}  {5}", this.ID, Math.Round(matrix[0, 0], 4), 
                    Math.Round(matrix[0, 1], 4), Math.Round(matrix[0, 2], 4), 
                    Math.Round(matrix[0, 3], 4), Math.Round(matrix[0, 4], 4));
                //
            }
        }
    }
    // ********************************************************************
    //
    // abstract base class for all uniform random generators
    public abstract class RandomGenerator
    {
        public abstract double GetUniformRandom();
    }
    // ********************************************************************
    //
    // uniform random generator implementation by using CSharp Random class
    public class CSharpRandom : RandomGenerator
    {
        private Random generator;
        //
        public CSharpRandom()
        {
            // using unique GUID hashcode as a seed
            generator = new Random(Guid.NewGuid().GetHashCode());
        }
        public override double GetUniformRandom()
        {
            // Random.NextDouble method returns a double greater than or equal to 0.0 
            // and less than 1.0. Built-in checking procedure prevents out-of-bounds random 
            // number to be re-distributed further to CopulaEngine implementation.
            double rand = 0.0;
            while (true)
            {
                rand = generator.NextDouble();
                if (rand > 0.0) break;
            }
            return rand;
        }
    }
    // ********************************************************************
    //
    // abstract base class for all specific copula implementations
    public abstract class CopulaEngine
    {
        // Uniform random generator and correlation matrix are hosted in this abstract base class.
        // Concrete classes are providing algorithm implementation for a specific copula
        // and event mechanism for sending created correlated random numbers matrix data to CopulaQueue class.
        protected RandomGenerator generator;
        protected double[,] correlation;
        //
        public CopulaEngine(double[,] correlation, RandomGenerator generator)
        {
            this.correlation = correlation;
            this.generator = generator;
        }
        public abstract void Process();
    }
    // ********************************************************************
    //
    // concrete implementation for gaussian copula
    public class GaussianCopula : CopulaEngine
    {
        public event MatrixSender SendMatrix;
        private double[,] normalRandomMatrix;
        private double[,] choleskyMatrix;
        private double[,] correlatedNormalRandomMatrix;
        private double[,] correlatedUniformRandomMatrix;
        private int rows;
        private int cols;
        //
        public GaussianCopula(int n, double[,] correlation, RandomGenerator generator)
            : base(correlation, generator)
        {
            this.cols = base.correlation.GetUpperBound(0) + 1; // M, number of variables
            this.rows = n; // N, number of 1xM matrices
        }
        public override void Process()
        {
            // NxM matrix containing of uniformly distributed correlated random variables are calculated 
            // as a sequence of specific matrix operations
            createNormalRandomMatrix();
            createCholeskyMatrix();
            correlatedNormalRandomMatrix = MatrixTools.Transpose(MatrixTools.Multiplication(choleskyMatrix, MatrixTools.Transpose(normalRandomMatrix)));
            createCorrelatedUniformRandomMatrix();
        }
        private void createNormalRandomMatrix()
        {
            // create NxM matrix consisting of normally distributed independent random variables
            normalRandomMatrix = new double[rows, cols];
            for (int i = 0; i < rows; i++)
            {
                for (int j = 0; j < cols; j++)
                {
                    normalRandomMatrix[i, j] = StatTools.NormSInv(base.generator.GetUniformRandom());
                }
            }
        }
        private void createCholeskyMatrix()
        {
            // create lower-triangular NxM cholesky decomposition matrix
            choleskyMatrix = new double[cols, cols];
            double s = 0.0;
            //
            for (int j = 1; j <= cols; j++)
            {
                s = 0.0;
                for (int k = 1; k <= (j - 1); k++)
                {
                    s += Math.Pow(choleskyMatrix[j - 1, k - 1], 2.0);
                }
                choleskyMatrix[j - 1, j - 1] = base.correlation[j - 1, j - 1] - s;
                if (choleskyMatrix[j - 1, j - 1] <= 0.0) break;
                choleskyMatrix[j - 1, j - 1] = Math.Sqrt(choleskyMatrix[j - 1, j - 1]);
                //
                for (int i = j + 1; i <= cols; i++)
                {
                    s = 0.0;
                    for (int k = 1; k <= (j - 1); k++)
                    {
                        s += (choleskyMatrix[i - 1, k - 1] * choleskyMatrix[j - 1, k - 1]);
                    }
                    choleskyMatrix[i - 1, j - 1] = (base.correlation[i - 1, j - 1] - s) / choleskyMatrix[j - 1, j - 1];
                }
            }
        }
        private void createCorrelatedUniformRandomMatrix()
        {
            // map normally distributed numbers back to uniform plane
            // process NxM matrix row by row
            for (int i = 0; i < rows; i++)
            {
                // create a new 1xM matrix for each row and perform uniform transformation
                correlatedUniformRandomMatrix = new double[1, cols];
                for (int j = 0; j < cols; j++)
                {
                    correlatedUniformRandomMatrix[0, j] = StatTools.NormSDist(correlatedNormalRandomMatrix[i, j]);
                }
                // event : send transformed 1xM matrix to CopulaQueue
                this.SendMatrix(correlatedUniformRandomMatrix);
            }
        }
    }
    // ********************************************************************
    //
    // this is technically just a wrapper class for ConcurrentQueue data structure
    public class CopulaQueue
    {
        private ConcurrentQueue<double[,]> queue;
        public CopulaQueue()
        {
            queue = new ConcurrentQueue<double[,]>();
        }
        public void Enqueue(double[,] matrix)
        {
            // insert a matrix into data structure
            queue.Enqueue(matrix);
        }
        public double[,] Dequeue()
        {
            // remove a matrix from data structure
            double[,] matrix = null;
            bool hasValue = false;
            while (!hasValue) hasValue = queue.TryDequeue(out matrix);
            return matrix;
        }
    }
    // ********************************************************************
    //
    // collection of methods for different types of statistical calculations
    public static class StatTools
    {
        // rational approximation for the inverse of standard normal cumulative distribution. 
        // the distribution has a mean of zero and a standard deviation of one.
        // source : Peter Acklam web page home.online.no/~pjacklam/notes/invnorm/
        public static double NormSInv(double x)
        {
            const double a1 = -39.6968302866538; const double a2 = 220.946098424521;
            const double a3 = -275.928510446969; const double a4 = 138.357751867269;
            const double a5 = -30.6647980661472; const double a6 = 2.50662827745924;
            //
            const double b1 = -54.4760987982241; const double b2 = 161.585836858041;
            const double b3 = -155.698979859887; const double b4 = 66.8013118877197;
            const double b5 = -13.2806815528857;
            //
            const double c1 = -7.78489400243029E-03; const double c2 = -0.322396458041136;
            const double c3 = -2.40075827716184; const double c4 = -2.54973253934373;
            const double c5 = 4.37466414146497; const double c6 = 2.93816398269878;
            //
            const double d1 = 7.78469570904146E-03; const double d2 = 0.32246712907004;
            const double d3 = 2.445134137143; const double d4 = 3.75440866190742;
            //
            const double p_low = 0.02425; const double p_high = 1.0 - p_low;
            double q = 0.0; double r = 0.0; double c = 0.0;
            //
            if ((x > 0.0) && (x < 1.0))
            {
                if (x < p_low)
                {
                    // rational approximation for lower region
                    q = Math.Sqrt(-2.0 * Math.Log(x));
                    c = (((((c1 * q + c2) * q + c3) * q + c4) * q + c5) * q + c6) / ((((d1 * q + d2) * q + d3) * q + d4) * q + 1);
                }
                if ((x >= p_low) && (x <= p_high))
                {
                    // rational approximation for middle region
                    q = x - 0.5; r = q * q;
                    c = (((((a1 * r + a2) * r + a3) * r + a4) * r + a5) * r + a6) * q / (((((b1 * r + b2) * r + b3) * r + b4) * r + b5) * r + 1);
                }
                if (x > p_high)
                {
                    // rational approximation for upper region
                    q = Math.Sqrt(-2 * Math.Log(1 - x));
                    c = -(((((c1 * q + c2) * q + c3) * q + c4) * q + c5) * q + c6) / ((((d1 * q + d2) * q + d3) * q + d4) * q + 1);
                }
            }
            else
            {
                // throw if given x value is out of bounds (less or equal to 0.0, greater or equal to 1.0) 
                throw new Exception("input parameter out of bounds");
            }
            return c;
        }
        public static double NormSDist(double x)
        {
            // returns the standard normal cumulative distribution function. 
            // the distribution has a mean of zero and a standard deviation of one.
            double sign = 1.0;
            if (x < 0) sign = -1.0;
            return 0.5 * (1.0 + sign * errorFunction(Math.Abs(x) / Math.Sqrt(2.0)));
        }
        private static double errorFunction(double x)
        {
            // uses Hohner's method for improved efficiency.
            const double a1 = 0.254829592; const double a2 = -0.284496736;
            const double a3 = 1.421413741; const double a4 = -1.453152027;
            const double a5 = 1.061405429; const double a6 = 0.3275911;
            x = Math.Abs(x); double t = 1 / (1 + a6 * x);
            return 1 - ((((((a5 * t + a4) * t) + a3) * t + a2) * t) + a1) * t * Math.Exp(-1 * x * x);
        }
    }
    // ********************************************************************
    //
    // collection of methods for different types of matrix operations
    public static class MatrixTools
    {
        public static double[,] Transpose(double[,] matrix)
        {
            // method for transposing matrix
            double[,] result = new double[matrix.GetUpperBound(1) + 1, matrix.GetUpperBound(0) + 1];
            //
            for (int i = 0; i < matrix.GetUpperBound(0) + 1; i++)
            {
                for (int j = 0; j < matrix.GetUpperBound(1) + 1; j++)
                {
                    result[j, i] = matrix[i, j];
                }
            }
            return result;
        }
        public static double[,] Multiplication(double[,] matrix1, double[,] matrix2)
        {
            // method for matrix multiplication
            double[,] result = new double[matrix1.GetUpperBound(0) + 1, matrix2.GetUpperBound(1) + 1];
            double v = 0.0;
            //
            for (int i = 0; i < matrix1.GetUpperBound(0) + 1; i++)
            {
                for (int j = 0; j < matrix2.GetUpperBound(1) + 1; j++)
                {
                    v = 0.0;
                    for (int k = 0; k < matrix1.GetUpperBound(1) + 1; k++)
                    {
                        v += matrix1[i, k] * matrix2[k, j];
                    }
                    result[i, j] = v;
                }
            }
            return result;
        }
    }
}


TEST RUN

When running the program, I get the following console printout. The printout shows, how our four consumers used in the example program are Dequeueing correlated 1x5 random matrices from CopulaQueue in parallel.

























EFFICIENCY AND CORRELATION

One million rows for 5x5 correlation matrix was processed 100 times, when testing efficiency of the producer part of the program. More specifically, four producers (CopulaEngine) are creating 1x5 matrices, consisting of five correlated uniformly distributed random numbers one by one and then sending these matrices to ConcurrentQueue wrapped in CopulaQueue class in parallel. Each of the four producers are then performing this operation 250 thousand times. Finally, the whole operation was repeated 1 hundred times and time elapsed for each repetition was saved for further analysis. The results of this experiment are reported in the table below.


As we can see in the table, multi-threading is nicely reducing processing time, but in a non-linear fashion. Next, I printed out one batch of correlated random numbers (1000000x5 matrix) into text file and compared theoretical and simulated correlation in Excel. Results are reported in the table below. We can see, that simulated correlation is in line with theoretical correlation (enough, IMHO).


READING REFERENCES


For me personally, the following two books are standing in their own class. Ben and Joseph Albahari C# in a nutshell is presenting pretty much everything one has to know (and a lot more) about C# and especially on .NET framework. Very nice thing is also, that Albahari brothers have been kindly offering a free chapter on multi-threading from this book on their website. Daniel Duffy and Andrea Germani C# for financial markets is presenting how to apply C# in financial programs. There are a lot of extremely useful examples on multi-threaded programs, especially on using multi-threading in Monte Carlo applications and the general usage of Producer-Consumer pattern, when using multi-threading.

Again, Big thanks for reading my blog and Happy and Better New Year 2015 for everybody.

-Mike