Showing posts with label Data Structures. Show all posts
Showing posts with label Data Structures. Show all posts

Friday, July 5, 2019

Python: JSON serialization/deserialization

Previous story continues. This post will present one possible implementation for JSON serialization/deserialization. Class JsonHandler (technically just a wrapper for json.load and json.dump methods) has only two methods: FileToObject will re-hydrate JSON file content to a custom object (deserialization) and ObjectToFile will hydrate custom object into JSON file content (serialization).

Class serialization using Python json package works fine with class data members, which are built-in Python data types (ex. integer, string, boolean, float, list, dictionary). However, custom data types such as class instance as data member are non-serializable. JsonHandler cannot handle such non-serializable data types either. As a safety net for facing such case, ObjectToFile method will automatically remove all non-serializable items from dictionary before serialization. For handling this type of serialization issues, I assume there are several more sophisticated third-party packages available.

Finally, it should be noted that when performing deserialization, JsonHandler transforms JSON string into dictionary, then transforms dictionary into object. Correspondingly, when performing serialization, it transforms object into dictionary, then transforms dictionary into JSON string. These operations (including nested methods) are adding some extra complexity into this otherwise simple and straightforward utility class.

Assume we would like to hydrate (serialize) Configurations class instance into JSON file. Note, that this class is actually containing non-serializable data type as its data member (class Whatever). This specific data member will be completely ignored in a process of serialization. However, as JSON file will be deserialized, instance of data member G will be created on class Configurations constructor, based on data members A and B (which are both serializable data types).

import json

# class for handling transformations between custom object and JSON file
class JsonHandler:
    
    # transform json file to custom object
    def FileToObject(file):
        # nested function: transform dictionary to custom object
        def DictionaryToObject(dic):
            if("__class__" in dic):
                class_name = dic.pop("__class__")
                module_name = dic.pop("__module__")
                module = __import__(module_name)
                class_ = getattr(module, class_name)
                obj = class_(**dic)
            else:
                obj = dic
            return obj        
        return DictionaryToObject(json.load(open(file, 'r')))
    
    # transform custom object to json file
    def ObjectToFile(obj, file):
        # nested function: check whether an object can be json serialized
        def IsSerializable(obj):
            check = True
            try:
                # throws, if an object is not serializable
                json.dumps(obj)
            except:
                check = False
            return check
        # nested function: transform custom object to dictionary
        def ObjectToDictionary(obj):
            dic = { "__class__": obj.__class__.__name__, "__module__": obj.__module__ }            
            dic.update(obj.__dict__)
            # remove all non-serializable items from dictionary before serialization
            keysToBeRemoved = []
            for k, v in dic.items():
                if(IsSerializable(v) == False):
                    keysToBeRemoved.append(k)
            [dic.pop(k, None) for k in keysToBeRemoved]
            return dic
        json.dump(ObjectToDictionary(obj), open(file, 'w'))

class Configurations(object):
    def __init__(self, A, B, C, D, E, F):
        # serializable data types
        self.A = A
        self.B = B
        self.C = C
        self.D = D
        self.E = E
        self.F = F
        # non-serializable data type
        self.G = Whatever(A, B)
        
class Whatever(object):
    def __init__(self, A, B):
        self.A = A
        self.B = B

# create class instance using 'primitive types'
new_config = Configurations(100, 3.14, True, [1, 2, 3], 'qwerty', { 'd1':1, 'd2':2 })

# print class members
print('printing data members of a newly created class instance:')
print(new_config.A)
print(new_config.B)
print(new_config.C)
print(new_config.D)
print(new_config.E)
print(new_config.F)
print(new_config.G.A)
print(new_config.G.B)
print()

# write object to json file
JsonHandler.ObjectToFile(new_config, '/home/mikejuniperhill/config.json')

# read object from json file
restored_config = JsonHandler.FileToObject('/home/mikejuniperhill/config.json')

print('printing data members of a restored class instance:')
print(restored_config.A)
print(restored_config.B)
print(restored_config.C)
print(restored_config.D)
print(restored_config.E)
print(restored_config.F)
print(restored_config.G.A)
print(restored_config.G.B)

Program execution in terminal is shown below.















Thanks for reading.
-Mike


Friday, March 24, 2017

C++11 : Wrapper for Bloomberg API Reference Data

Bloomberg is offering an access to its market data API with several different programming languages. Previously, I have presented my wrappers for VBA and C#. This time, I wanted to open up an implementation for C++ wrapper, which covers different types of reference data requests (snapshot, historical, intraday bar, intraday tick). For reference data, API functionality is basically the same for all types of data request types : create inputs (security, field), start session, open service, create request, send request to server, poll for results, unzip messages from server and finally, stop session.

Notes


There were some tough decisions to make. In my C# wrapper, I was sub-classing algorithm for handling messages from server (unzipping message content into data structure). Within this implementation, I made a decision to prefer straightforward simplicity, even it would mean a bit of data duplication. Personally, when using this kind of functionality, I would prefer to have just one single class (one header, one implementation), instead of having several classes.

Another big issue was concerning data output coming from Bloomberg server back to a client : how to pack all requested data into data structure to be easily used by a client ? In C# implementation we had dynamic data type, which enables us to create arrays of dynamic data types. However, in C++ there are no heterogeneous data structures available, besides std::tuple. Initially, I was considering to use tuple as result data structure, but I decided to reject the idea due to material increase in general complexity. I decided to return all possible data types from Bloomberg server as std::string within result data structure. It would be then up to a client to convert these values "back to their origins", by using std::stod or some other conversion method.

For the sake of clarity, I have created type definitions for several nested data structures used in the program : Vector [std::vector<std::string>], Matrix for all types of reference data [std::vector<Vector>] and Cube for historical time-series data [std::vector<Matrix>].

Finally, in order to "make it happen", a set of header files and libraries has to be configured into Visual Studio project. Extremely well-written and complete step-by-step tutorial is available in here.

Header file


#pragma once
//
#include <memory>
#include <string>
#include <vector>
#include <algorithm>
#include <blpapi_session.h>
//
namespace MikeJuniperhillBloombergCPPDesktopAPI
{
 using namespace BloombergLP::blpapi;
 using Vector = std::vector<std::string>; // string vector
 using Matrix = std::vector<Vector>; // string matrix
 using Cube = std::vector<Matrix>; // string cube
 enum ENUM_PRICING_OPTION { PRICING_OPTION_PRICE, PRICING_OPTION_YIELD };
 enum ENUM_PERIODICITY_ADJUSTMENT { ACTUAL, CALENDAR, FISCAL };
 enum ENUM_PERIODICITY_SELECTION { DAILY, WEEKLY, MONTHLY, QUARTERLY, SEMI_ANNUALLY, YEARLY };
 enum ENUM_NON_TRADING_DAY_FILL_OPTION { NON_TRADING_WEEKDAYS, ALL_CALENDAR_DAYS, ACTIVE_DAYS_ONLY };
 enum ENUM_NON_TRADING_DAY_FILL_METHOD { PREVIOUS_VALUE, NIL_VALUE };
 //
 class BBCOMMDataRequest
 {
 public:
  BBCOMMDataRequest(std::string serverHost = "localhost", unsigned short serverPort = 8194);
  void Start();
  void GetReferenceData(Vector& securities, Vector& fields, Matrix& result,
   Vector& overrideFields = Vector(), Vector& overrideValues = Vector());
  void GetHistoricalData(Vector& securities, Vector& fields, Cube& result,
   std::string startDate, std::string endDate,
   ENUM_PRICING_OPTION pricingOption = PRICING_OPTION_PRICE,
   ENUM_PERIODICITY_SELECTION periodicitySelection = DAILY,
   ENUM_PERIODICITY_ADJUSTMENT periodicityAdjustment = ACTUAL,
   ENUM_NON_TRADING_DAY_FILL_OPTION nonTradingDayFillOption = ALL_CALENDAR_DAYS,
   ENUM_NON_TRADING_DAY_FILL_METHOD nonTradingDayFillMethod = PREVIOUS_VALUE,
   std::string overrideCurrency = std::string(),
   Vector& overrideFields = Vector(),
   Vector& overrideValues = Vector());
  void GetIntradayBarData(std::string security, Vector& fields, Matrix& result, 
   std::string eventType, int intervalInMinutes, Datetime start, Datetime end);
  void GetIntradayTickData(std::string security, Vector& fields, Matrix& result, Vector& eventTypes,
   Datetime start, Datetime end, bool includeConditionCodes = false, bool includeExchangeCodes = false);
  void Stop();
 private:
  std::unique_ptr<SessionOptions> sessionOptions;
  std::unique_ptr<Session> session;
  std::unique_ptr<Service> service;
  std::unique_ptr<Request> request;
  std::string serverHost;
  unsigned short serverPort;
 };
}
//


Implementation file


#pragma once
#include "BBCOMMDataRequest.h"
using namespace MikeJuniperhillBloombergCPPDesktopAPI;
//
BBCOMMDataRequest::BBCOMMDataRequest(std::string serverHost, unsigned short serverPort) 
 : serverHost(serverHost), serverPort(serverPort)
{
 // ctor with default parameters for server hosting and port
}
void BBCOMMDataRequest::Start()
{
 // create objects : session options and session
 sessionOptions = std::unique_ptr<SessionOptions>(new SessionOptions);
 sessionOptions->setServerHost(serverHost.c_str());
 sessionOptions->setServerPort(serverPort);
 session = std::unique_ptr<Session>(new Session(*sessionOptions));
 //
 // start session and open service, throw if not succeeded
 if (!session->start()) throw std::exception("session not started");
 if (!session->openService("//blp/refdata")) throw std::exception("service not opened");
}
void BBCOMMDataRequest::GetReferenceData(Vector& securities, Vector& fields, Matrix& result,
 Vector& overrideFields, Vector& overrideValues)
{
 // initialize result data structure with default '#N/A' for all items
 for (unsigned int i = 0; i < securities.size(); ++i)
 {
  Vector innerVector(fields.size(), "#N/A");
  result.push_back(innerVector);
 }
 // create objects : service and request
 service = std::unique_ptr<Service>(new Service(session->getService("//blp/refdata")));
 request = std::unique_ptr<Request>(new Request(service->createRequest("ReferenceDataRequest")));
 //
 // append securities and field requests into request object
 std::for_each(securities.begin(), securities.end(), [this](std::string s) { request->append("securities", s.c_str()); });
 std::for_each(fields.begin(), fields.end(), [this](std::string f) { request->append("fields", f.c_str()); });
 //
 // conditionally, append overrides into request object
 if (overrideFields.size() > 0)
 {
  Element requestOverrides = request->getElement("overrides");
  for (unsigned int i = 0; i < overrideFields.size(); ++i)
  {
   Element requestOverride = requestOverrides.appendElement();
   requestOverride.setElement("fieldId", overrideFields[i].c_str());
   requestOverride.setElement("value", overrideValues[i].c_str());
  }
 }
 // finally send data request to server
 session->sendRequest(*request);
 //
 // start polling server response for data request
 bool isProcessing = true;
 while (isProcessing)
 {
  // catch all 'response-typed' events from server
  Event bEvent = session->nextEvent();
  if ((bEvent.eventType() == Event::PARTIAL_RESPONSE) || (bEvent.eventType() == Event::RESPONSE))
  {
   // create iterator for accessing server message
   MessageIterator bMessageIterator(bEvent);
   while (bMessageIterator.next())
   {
    // get access to message, extract all included securities
    Message bMessage = bMessageIterator.message();
    Element bSecurities = bMessage.getElement("securityData");
    int nSecurities = bSecurities.numValues();
    //
    // loop through all securities in current server response batch
    for (int i = 0; i < nSecurities; ++i)
    {
     // extract one security and all available fields for this security
     Element bSecurity = bSecurities.getValueAsElement(i);
     Element bFields = bSecurity.getElement("fieldData");
     int sequenceNumber = bSecurity.getElementAsInt32("sequenceNumber");
     int nFieldNames = fields.size();
     //
     // loop through all requested fields
     for (int j = 0; j < nFieldNames; ++j)
     {
      // assign string value only if the field is included in delivery
      if (bFields.hasElement(fields[j].c_str()))
      {
       Element bField = bFields.getElement(fields[j].c_str());
       result[sequenceNumber][j] = bField.getValueAsString();
      }
     }
    }
   }
   // when event type is response, there will be no more messages from server
   if (bEvent.eventType() == Event::RESPONSE) isProcessing = false;
  }
 }
}
void BBCOMMDataRequest::GetHistoricalData(Vector& securities, Vector& fields, Cube& result,
 std::string startDate, std::string endDate,
 ENUM_PRICING_OPTION pricingOption,
 ENUM_PERIODICITY_SELECTION periodicitySelection,
 ENUM_PERIODICITY_ADJUSTMENT periodicityAdjustment,
 ENUM_NON_TRADING_DAY_FILL_OPTION nonTradingDayFillOption,
 ENUM_NON_TRADING_DAY_FILL_METHOD nonTradingDayFillMethod,
 std::string overrideCurrency,
 Vector& overrideFields,
 Vector& overrideValues)
{
 bool resultHasDimension = false;
 // create objects : service and request
 service = std::unique_ptr<Service>(new Service(session->getService("//blp/refdata")));
 request = std::unique_ptr<Request>(new Request(service->createRequest("HistoricalDataRequest")));
 //
 // append securities and field requests into request object
 std::for_each(securities.begin(), securities.end(), [this](std::string s) { request->append("securities", s.c_str()); });
 std::for_each(fields.begin(), fields.end(), [this](std::string f) { request->append("fields", f.c_str()); });
 //
 // conditionally, append overrides into request object
 if (overrideFields.size() > 0)
 {
  Element requestOverrides = request->getElement("overrides");
  for (unsigned int i = 0; i < overrideFields.size(); ++i)
  {
   Element requestOverride = requestOverrides.appendElement();
   requestOverride.setElement("fieldId", overrideFields[i].c_str());
   requestOverride.setElement("value", overrideValues[i].c_str());
  }
 }
 //
 // set optional parameters for historical data request
 request->set("startDate", startDate.c_str());
 request->set("endDate", endDate.c_str());
 request->set("pricingOption", pricingOption);
 request->set("periodicityAdjustment", periodicityAdjustment);
 request->set("periodicitySelection", periodicitySelection);
 request->set("nonTradingDayFillOption", nonTradingDayFillOption);
 request->set("nonTradingDayFillMethod", nonTradingDayFillMethod);
 if (!overrideCurrency.empty()) request->set("currency", overrideCurrency.c_str());
 //
 // finally send data request to server
 session->sendRequest(*request);
 //
 // start polling server response for data request
 bool isProcessing = true;
 while (isProcessing)
 {
  // catch all 'response-typed' events from server
  Event bEvent = session->nextEvent();
  //
  if ((bEvent.eventType() == Event::PARTIAL_RESPONSE) || (bEvent.eventType() == Event::RESPONSE))
  {
   // create iterator for accessing server message
   MessageIterator bMessageIterator(bEvent);
   // unzip and pack messages received from BBCOMM server
   while (bMessageIterator.next())
   {
    // receive one security per message and multiple messages per event
    Message bMessage = bMessageIterator.message();
    Element bSecurity = bMessage.getElement("securityData");
    Element bFields = bSecurity.getElement("fieldData");
    int sequenceNumber = bSecurity.getElementAsInt32("sequenceNumber");
    int nFieldNames = fields.size();
    unsigned int nObservationDates = bFields.numValues();
    //
    // the exact dimension for result data structure will be known 
    // only when the response has been received from BBCOMM server
    if (!resultHasDimension)
    {
     // initialize result data structure with default '#N/A' for all items
     for (unsigned int i = 0; i < securities.size(); ++i)
     {
      Matrix outerVector;
      for (unsigned int j = 0; j < nObservationDates; j++)
      {
       Vector innerVector(fields.size() + 1, "#N/A");
       outerVector.push_back(innerVector);
      }
      result.push_back(outerVector);
     }
     resultHasDimension = true;
    }
    // loop through all observation dates
    for (unsigned int i = 0; i < nObservationDates; ++i)
    {
     Element observationDateFields = bFields.getValueAsElement(i);
     result[sequenceNumber][i][0] = observationDateFields.getElementAsString("date");
     // loop through all requested fields for a given observation date
     // and pack results data into data structure
     for (int j = 0; j < nFieldNames; j++)
     {
      // pack field value into data structure only if such value is available
      if (observationDateFields.hasElement(fields[j].c_str()))
      {
       result[sequenceNumber][i][j + 1] = observationDateFields.getElement(fields[j].c_str()).getValueAsString();
      }
     }
    }
   }
   // when event type is response, there will be no more messages coming from server
   if (bEvent.eventType() == Event::RESPONSE) isProcessing = false;
  }
 }
}
void BBCOMMDataRequest::GetIntradayBarData(std::string security, Vector& fields, Matrix& result, std::string eventType, int intervalInMinutes,
 Datetime start, Datetime end)
{
 // create objects : service and request
 service = std::unique_ptr<Service>(new Service(session->getService("//blp/refdata")));
 request = std::unique_ptr<Request>(new Request(service->createRequest("IntradayBarRequest")));
 request->set("security", security.c_str());
 request->set("eventType", eventType.c_str());
 request->set("interval", intervalInMinutes);
 request->set("startDateTime", start);
 request->set("endDateTime", end);
 // finally send data request to server
 session->sendRequest(*request);
 //
 // start polling server response for data request
 bool isProcessing = true;
 while (isProcessing)
 {
  // catch all 'response-typed' events from server
  Event bEvent = session->nextEvent();
  //
  if ((bEvent.eventType() == Event::PARTIAL_RESPONSE) || (bEvent.eventType() == Event::RESPONSE))
  {
   // create iterator for accessing server message
   MessageIterator bMessageIterator(bEvent);
   // unzip and pack messages received from BBCOMM server
   while (bMessageIterator.next())
   {
    // receive one security per message and multiple messages per event
    Message bMessage = bMessageIterator.message();
    Element bData = bMessage.getElement("barData").getElement("barTickData");
    int nBars = bData.numValues();
    //
    for (int i = 0; i < nBars; ++i)
    {
     Element bBar = bData.getValueAsElement(i);
     if (!bBar.isNull())
     {
      Vector innerVector;
      for (unsigned int j = 0; j < fields.size(); ++j)
      {
       if (bBar.hasElement(fields[j].c_str()))
        innerVector.push_back(bBar.getElementAsString(fields[j].c_str()));
      }
      result.push_back(innerVector);
     }
    }
   }
   // when event type is response, there will be no more messages coming from server
   if (bEvent.eventType() == Event::RESPONSE) isProcessing = false;
  }
 }
}
void BBCOMMDataRequest::GetIntradayTickData(std::string security, Vector& fields, Matrix& result, Vector& eventTypes,
 Datetime start, Datetime end, bool includeConditionCodes, bool includeExchangeCodes)
{
 // create objects : service and request
 service = std::unique_ptr<Service>(new Service(session->getService("//blp/refdata")));
 request = std::unique_ptr<Request>(new Request(service->createRequest("IntradayTickRequest")));
 request->set("security", security.c_str());
 request->set("startDateTime", start);
 request->set("endDateTime", end);
 std::for_each(eventTypes.begin(), eventTypes.end(), [this](std::string e) { request->append("eventTypes", e.c_str()); });
 if (includeConditionCodes) request->set("includeConditionCodes", true);
 if (includeExchangeCodes) request->set("includeExchangeCodes", true);
 // finally send data request to server
 session->sendRequest(*request);
 //
 // start polling server response for data request
 bool isProcessing = true;
 while (isProcessing)
 {
  // catch all 'response-typed' events from server
  Event bEvent = session->nextEvent();
  //
  if ((bEvent.eventType() == Event::PARTIAL_RESPONSE) || (bEvent.eventType() == Event::RESPONSE))
  {
   // create iterator for accessing server message
   MessageIterator bMessageIterator(bEvent);
   // unzip and pack messages received from BBCOMM server
   while (bMessageIterator.next())
   {
    // receive one security per message and multiple messages per event
    Message bMessage = bMessageIterator.message();
    Element bData = bMessage.getElement("tickData").getElement("tickData");
    int nTicks = bData.numValues();
    //
    for (int i = 0; i < nTicks; ++i)
    {
     Element bItem = bData.getValueAsElement(i);
     if (!bItem.isNull())
     {
      Vector innerVector;
      for (unsigned int j = 0; j < fields.size(); ++j)
      {
       if (bItem.hasElement(fields[j].c_str()))
        innerVector.push_back(bItem.getElementAsString(fields[j].c_str()));
      }
      // include conditionally requested condition and exchange codes
      if (bItem.hasElement("conditionCodes") && (includeConditionCodes))
       innerVector.push_back(bItem.getElementAsString("conditionCodes"));
      if (bItem.hasElement("exchangeCodes") && (includeExchangeCodes))
       innerVector.push_back(bItem.getElementAsString("exchangeCodes"));
      result.push_back(innerVector);
     }
    }
   }
   // when event type is response, there will be no more messages coming from server
   if (bEvent.eventType() == Event::RESPONSE) isProcessing = false;
  }
 }
}
void BBCOMMDataRequest::Stop()
{
 session->stop();
}


Tester file


#include <iostream>
#include "BBCOMMDataRequest.h"
//
namespace MJ = MikeJuniperhillBloombergCPPDesktopAPI;
void PrintMatrix(MJ::Matrix& result);
void PrintCube(MJ::Cube& result);
void CreateEquitySecurityList(std::vector<std::string>& securities);
void CreateRateSecurityList(std::vector<std::string>& securities);
//
int main()
{
 try
 {
  // start bloomberg session
  MJ::BBCOMMDataRequest bloomberg;
  bloomberg.Start();
  //
  std::cout << "CASE 1 : create reference data request without overrides >" << std::endl;
  // create list of rate securities and fields, request data and print result
  MJ::Vector securities;
  CreateRateSecurityList(securities);
  MJ::Vector fields{ "PARSEKYABLE_DES", "PX_LAST" };
  MJ::Matrix matrixResult;
  bloomberg.GetReferenceData(securities, fields, matrixResult);
  PrintMatrix(matrixResult);
  //
  std::cout << "CASE 2 : create reference data request with overrides >" << std::endl;
  // create list of equity securities and fields, request data and print result
  CreateEquitySecurityList(securities);
  fields.push_back("BEST_PE_RATIO");
  MJ::Vector overrideFields{ "BEST_FPERIOD_OVERRIDE" };
  MJ::Vector overrideValues{ "3FY" };
  matrixResult.clear();
  bloomberg.GetReferenceData(securities, fields, matrixResult, overrideFields, overrideValues);
  PrintMatrix(matrixResult);
  //
  std::cout << "CASE 3 : create historical data request for previous securities >" << std::endl;
  // use previous securities, re-create fields, request data and print result
  fields.clear();
  fields.push_back("PX_LAST");
  MJ::Cube cubeResult;
  // bloomberg API date format : 'YYYYMMDD'
  std::string startDate = "20170301";
  std::string endDate = "20170322";
  // request actual daily frequency, but only for weekdays and prices converted to JPY
  bloomberg.GetHistoricalData(securities, fields, cubeResult, startDate, endDate, 
   MJ::PRICING_OPTION_PRICE, MJ::DAILY, MJ::ACTUAL, MJ::NON_TRADING_WEEKDAYS, MJ::PREVIOUS_VALUE, "JPY");
  PrintCube(cubeResult);
  //
  std::cout << "CASE 4 : create historical data request for list of securities >" << std::endl;
  // create list of rate securities, use previous field, request data and print result
  CreateRateSecurityList(securities);
  cubeResult.clear();
  bloomberg.GetHistoricalData(securities, fields, cubeResult, startDate, endDate);
  PrintCube(cubeResult);
  //
  std::cout << "CASE 5 : create intraday bar data request for single security >" << std::endl;
  // create one security, request data and print result
  std::string security = "GOOGL UW Equity";
  MJ::Vector barFields{ "time", "open" , "high", "low", "close" };
  std::string eventType = "TRADE";
  int intervalInMinutes = 1;
  MJ::Datetime start;
  start.setDate(2017, 3, 22);
  start.setTime(15, 0, 0);
  MJ::Datetime end;
  end.setDate(2017, 3, 22);
  end.setTime(15, 10, 0);
  matrixResult.clear();
  bloomberg.GetIntradayBarData(security, barFields, matrixResult, eventType, intervalInMinutes, start, end);
  PrintMatrix(matrixResult);
  //
  std::cout << "CASE 6 : create intraday tick data request for single security >" << std::endl;
  // use previous security, request data and print result
  MJ::Vector tickFields{ "time", "value", "size", "type" };
  MJ::Vector eventTypes{ "TRADE" };
  matrixResult.clear();
  bloomberg.GetIntradayTickData(security, tickFields, matrixResult, eventTypes, start, end, true, true);
  PrintMatrix(matrixResult);
  //
  // stop bloomberg session
  bloomberg.Stop();
 }
 catch (std::exception& e)
 {
  std::cout << e.what() << std::endl;
 }
 return 0;
}
//
void PrintMatrix(MJ::Matrix& result)
{
 for (unsigned int i = 0; i < result.size(); ++i)
 {
  for (unsigned int j = 0; j < result[i].size(); ++j)
  {
   std::cout << result[i][j] << std::endl;
  }
 }
 std::cout << std::endl;
}
//
void PrintCube(MJ::Cube& result)
{
 for (unsigned int i = 0; i < result.size(); ++i)
 {
  for (unsigned int j = 0; j < result[i].size(); ++j)
  {
   for (unsigned int k = 0; k < result[i][j].size(); k++)
   {
    std::cout << result[i][j][k] << std::endl;
   }
  }
 }
 std::cout << std::endl;
}
//
void CreateRateSecurityList(std::vector<std::string>& securities)
{
 // EUR Euribor vs. 6M swap curve
 securities.clear();
 securities.push_back("EONIA Index"); securities.push_back("EUR001W Index");
 securities.push_back("EUR001M Index"); securities.push_back("EUR002M Index");
 securities.push_back("EUR003M Index"); securities.push_back("EUR006M Index");
 securities.push_back("EUFR0F1 Curncy"); securities.push_back("EUFR0G1A Curncy");
 securities.push_back("EUFR0H1B Curncy"); securities.push_back("EUFR0I1C Curncy");
 securities.push_back("EUFR0J1D Curncy"); securities.push_back("EUFR0K1E Curncy");
 securities.push_back("EUFR011F Curncy"); securities.push_back("EUSA2 Curncy");
 securities.push_back("EUSA3 Curncy"); securities.push_back("EUSA4 Curncy");
 securities.push_back("EUSA5 Curncy"); securities.push_back("EUSA6 Curncy");
 securities.push_back("EUSA7 Curncy"); securities.push_back("EUSA8 Curncy");
 securities.push_back("EUSA9 Curncy"); securities.push_back("EUSA10 Curncy");
 securities.push_back("EUSA11 Curncy"); securities.push_back("EUSA12 Curncy");
 securities.push_back("EUSA15 Curncy"); securities.push_back("EUSA20 Curncy");
 securities.push_back("EUSA25 Curncy"); securities.push_back("EUSA30 Curncy");
 securities.push_back("EUSA35 Curncy"); securities.push_back("EUSA40 Curncy");
 securities.push_back("EUSA45 Curncy"); securities.push_back("EUSA50 Curncy");
}
//
void CreateEquitySecurityList(std::vector<std::string>& securities)
{
 // some equities
 securities.clear();
 securities.push_back("GOOGL UW Equity");
 securities.push_back("YHOO UW Equity");
 securities.push_back("FB UW Equity");
 securities.push_back("EBAY UW Equity");
}
//

Finally, thanks a lot for reading my blog.
-Mike

Sunday, April 10, 2016

C# : flexible design for processing market data

Third-party analytics softwares will usually require a full set of market data to be feeded into system, before performing any of those precious high intensity calculations. Market data (curves, surfaces, fixing time-series, etc.) has to be feeded into system following some specific file configurations. Moreover, source data might have to be collected from several different vendor sources. Needless to say, the process can easily turn into a semi-manageable mess involving Excel workbooks and a lot of manual processing, which is always a bad omen.

For this reason, I finally ended up creating one possible design solution for flexible processing of market data files. I have been going through some iterations starting with Abstract Factory, before landing with the current one using Delegates to pair data and algorithms. With the current solution, I start to feel quite comfortable already.


UML




















Each market data point (such as mid USD swap rate for 2 years) is presented as a RiskFactor object. All individual RiskFactor objects are hosted in a list inside generic MarketDataElements<T> object, which enables hosting any type of data. MarketDataElements<T> object itself is hosted by static BaseMarket class.

The actual algorithms needed for creating any type of vendor data are captured in static ProcessorLibrary class. During the processing task, MarketDataElements<T> object will be handled for a specific library method implementation, which will then request values from vendor source for all involved RiskFactor objects. In the example program, ProcessorLibrary has a method for processing RiskFactor objects using Bloomberg market data API. This specific method is then using DummyBBCOMMWrapper class for requesting values for a RiskFactor object.

For the purpose of pairing specific data (MarketDataElements<T>) and specific algorithm (ProcessorLibrary), BaseMarket class is hosting a list of ElementProcessor objects as well as a list of Delegates bound with specific methods found in ProcessorLibrary. For the processing task, each ElementProcessor object is feeded with delegate method for specific ProcessorLibrary implementation method and information on MarketDataElements<T> object.

Finally, (not shown in UML) program is also using static TextFileHandler class for handling text files and static Configurations class for hosting hardcoded configurations, initially read from App.config file.


Files

 

App.config










CSV for all RiskFactor object configurations
Fields (matching with RiskFactor object properties) :
  • Data vendor identification string, matching with the one given in configuration file
  • Identification code (ticker) for a market data element, found in the system for which the data will be created
  • Vendor ticker for a market data element (Bloomberg ISIN code)
  • Field name for a market data element (Bloomberg field PX_MID)
  • Divider (Bloomberg is presenting a rate as percentage 1.234, but target system may need to have an input as absolute value 0.01234)
  • Empty field for a value to be processed by specific processor implementation for specific data vendor. 
 

Result CSV

























The program


Create a new console project and CopyPaste the following program into corresponding CS files. When testing the program in a real production environment, just add reference to Bloomberg API DLL file and replace DummyBBCOMMWrapper class with this.

Adding a new data vendor processor XYZ involves the following four easy steps :
  1. Update providers in App.config file : <add key="RiskFactorDataProviders" value="BLOOMBERG,XYZ" />
  2. Create a new method implementation into ProcessorLibrary :  public static void ProcessXYZRiskFactors(dynamic marketDataElements) { // implement algorithm}
  3. Add selection for a new processor into BaseMarket class method createElementProcessors: if(dataProviderString.ToUpper() == "XYZ") elementProcessors.Add(new ElementProcessor(ProcessorLibrary.ProcessXYZRiskFactors, elements));
  4. Create new RiskFactor object configurations into source file for a new vendor XYZ
Finally, thanks for reading my blog.
-Mike


// MainProgram.cs
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;

namespace MarketDataProcess
{
    class MainProgram
    {
        static void Main(string[] args)
        {
            try
            {
                // process base market risk factors to file
                BaseMarket.Process();
                BaseMarket.PrintToFile();
            }
            catch (Exception e)
            {
                Console.WriteLine(e.Message);
            }
        }
    }
}
//
//
//
//
// BaseMarket.cs
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;

namespace MarketDataProcess
{
    // class for administrating risk factors and processors for base market
    public static class BaseMarket
    {
        private static List<string> inputFileStreams = new List<string>();
        private static MarketDataElements<RiskFactor> riskFactors = new MarketDataElements<RiskFactor>();
        private static List<ElementProcessor> elementProcessors = new List<ElementProcessor>();
        private static int nRiskFactors = 0;
        //
        public static void Process()
        {
            // read all source data string streams from file into list
            TextFileHandler.Read(Configurations.BaseMarketSourceDataFilePathName, inputFileStreams);
            //
            // extract string streams and create risk factor objects
            foreach (string inputFileStream in inputFileStreams)
            {
                RiskFactor element = new RiskFactor();
                element.Create(inputFileStream);
                riskFactors.AddElement(element);
            }
            nRiskFactors = riskFactors.elements.Count;
            //
            // create and execute market data element processors
            // finally run technical check on created risk factors
            BaseMarket.createElementProcessors();
            elementProcessors.ForEach(processor => processor.Process());
            BaseMarket.Check();
        }
        public static void PrintToFile()
        {
            // write created base market risk factors into file
            StringBuilder stream = new StringBuilder();
            for (int i = 0; i < riskFactors.elements.Count; i++)
            {
                stream.AppendLine(String.Format("{0},{1}", riskFactors.elements[i].systemTicker, riskFactors.elements[i].value));
            }
            TextFileHandler.Write(Configurations.BaseMarketResultDataFilePathName, stream.ToString(), false);
        }
        private static void createElementProcessors()
        {
            // market data element processor types are defined in configuration file
            List<string> dataProviderStrings = Configurations.RiskFactorDataProviders.Split(',').ToList<string>();
            //
            foreach (string dataProviderString in dataProviderStrings)
            {
                if (dataProviderString == String.Empty) throw new Exception("No element processor defined");
                List<RiskFactor> elements = riskFactors.elements.Where(factor => factor.provider == dataProviderString).ToList<RiskFactor>();
                if(dataProviderString.ToUpper() == "BLOOMBERG") elementProcessors.Add(new ElementProcessor(ProcessorLibrary.ProcessBloombergRiskFactors, elements));
            }
        }
        public static MarketDataElements<RiskFactor> GetRiskFactors()
        {
            // create and return deep copy of all base market risk factors
            return riskFactors.Clone();
        }
        private static void Check()
        {
            int nValidRiskFactors = 0;
            //
            // loop through all created risk factors for base market
            for (int i = 0; i < riskFactors.elements.Count; i++)
            {
                // extract risk factor to be investigated for valid value
                RiskFactor factor = riskFactors.elements[i];
                //
                // valid value inside risk factor should be double-typed converted to string, ex. "0.05328"
                // check validity of this value with Double
                // TryParse-method returning TRUE if string value can be converted to double
                double value = 0;
                bool isValid = Double.TryParse(factor.value, out value);
                if (isValid) nValidRiskFactors++;
                //
                // if value is not convertable to double, get user input for this value
                if (!isValid)
                {
                    while (true)
                    {
                        Console.Write(String.Format("Provide input for {0} >", factor.systemTicker));
                        bool validUserInput = Double.TryParse(Console.ReadLine(), out value);
                        if (validUserInput)
                        {
                            factor.value = value.ToString();
                            nValidRiskFactors++;
                            break;
                        }
                        else
                        {
                            // client is forced to set (at least technically) valid value
                            Console.WriteLine("Invalid value, try again");
                        }
                    }
                }
            }
        }
    }
}
//
//
//
//
// Configurations.cs
using System;
using System.Configuration;

namespace MarketDataProcess
{
    // static class for hosting configurations
    public static class Configurations
    {
        // readonly data for public sharing
        public static readonly string RiskFactorDataProviders;
        public static readonly string BaseMarketSourceDataFilePathName;
        public static readonly string BaseMarketResultDataFilePathName;
        //
        // private constructor will be called just before any configuration is requested 
        static Configurations()
        {
            // configuration strings are assigned to static class data members for easy access
            RiskFactorDataProviders = ConfigurationManager.AppSettings["RiskFactorDataProviders"].ToString();
            BaseMarketSourceDataFilePathName = ConfigurationManager.AppSettings["BaseMarketSourceDataFilePathName"].ToString();
            BaseMarketResultDataFilePathName = ConfigurationManager.AppSettings["BaseMarketResultDataFilePathName"].ToString();
        }
    }
}
//
//
//
//
// MarketDataElement.cs
using System;
using System.Collections.Generic;
using System.Linq;

namespace MarketDataProcess
{
    // generic template for all types of market data elements (risk factors, fixings)
    public class MarketDataElements<T> where T : ICloneable, new()
    {
        public List<T> elements = new List<T>();
        public void AddElement(T element)
        {
            elements.Add(element);
        }
        public MarketDataElements<T> Clone()
        {
            // create a deep copy of market data elements object
            MarketDataElements<T> clone = new MarketDataElements<T>();
            //
            // copy content for all elements into a list
            List<T> elementList = new List<T>();
            foreach (T element in elements)
            {
                elementList.Add((T)element.Clone());
            }
            clone.elements.AddRange(elementList);
            return clone;
        }
    }
    // risk factor as market data element
    public class RiskFactor : ICloneable
    {
        public string provider;
        public string systemTicker;
        public string vendorTicker;
        public string vendorField;
        public string divider;
        public string value;
        //
        public RiskFactor() { }
        public void Create(string stream)
        {
            List<string> fields = stream.Split(',').ToList<string>();
            this.provider = fields[0];
            this.systemTicker = fields[1];
            this.vendorTicker = fields[2];
            this.vendorField = fields[3];
            this.divider = fields[4];
            this.value = fields[5];
        }
        public object Clone()
        {
            RiskFactor clone = new RiskFactor();
            clone.provider = this.provider;
            clone.systemTicker = this.systemTicker;
            clone.vendorTicker = this.vendorTicker;
            clone.vendorField = this.vendorField;
            clone.divider = this.divider;
            clone.value = this.value;
            return clone;
        }
    }
    // fixing as market data element
    public class Fixing : ICloneable
    {
        public string provider;
        public string systemTicker;
        public string vendorTicker;
        public string vendorField;
        public string frequency;
        public string nYearsBack;
        public string divider;
        public string value;
        public Dictionary<string, string> timeSeries = new Dictionary<string, string>();
        //
        public Fixing() { }
        public void Create(string stream)
        {
            List<string> fields = stream.Split(',').ToList<string>();
            this.provider = fields[0];
            this.systemTicker = fields[1];
            this.vendorTicker = fields[2];
            this.vendorField = fields[3];
            this.frequency = fields[4];
            this.nYearsBack = fields[5];
            this.divider = fields[6];
            this.value = fields[7];
        }
        public object Clone()
        {
            Fixing clone = new Fixing();
            clone.provider = this.provider;
            clone.systemTicker = this.systemTicker;
            clone.vendorTicker = this.vendorTicker;
            clone.vendorField = this.vendorField;
            clone.divider = this.divider;
            clone.value = this.value;
            //
            // create deep copy of timeseries dictionary
            Dictionary<string, string> timeSeriesClone = new Dictionary<string, string>();
            foreach (KeyValuePair<string, string> kvp in this.timeSeries)
            {
                timeSeriesClone.Add(kvp.Key, kvp.Value);
            }
            clone.timeSeries = timeSeriesClone;
            return clone;
        }
    }
}
//
//
//
//
// ElementProcessor.cs
using System;
using System.Collections.Generic;

namespace MarketDataProcess
{
    // algorithm for creating market data element objects
    public delegate void Processor(dynamic marketDataElements);
    //
    // class for hosting data and algorithm
    public class ElementProcessor
    {
        private Processor taskProcessor;
        dynamic marketDataElements;
        public ElementProcessor(Processor taskProcessor, dynamic marketDataElements)
        {
            this.taskProcessor = taskProcessor;
            this.marketDataElements = marketDataElements;
        }
        public void Process()
        {
            taskProcessor.Invoke(marketDataElements);
        }
    }
}
//
//
//
//
// ProcessorLibrary.cs
using System;
using System.Collections.Generic;
using System.Linq;

namespace MarketDataProcess
{
    public static class ProcessorLibrary
    {
        public static void ProcessBloombergRiskFactors(dynamic marketDataElements)
        {
            List<RiskFactor> riskFactors = (List<RiskFactor>)marketDataElements;
            BBCOMMWrapper.BBCOMMDataRequest request;
            dynamic[, ,] result = null;
            string SYSTEM_DOUBLE = "System.Double";
            int counter = 0;
            //
            // group data list into N lists grouped by distinct bloomberg field names
            var dataGroups = riskFactors.GroupBy(factor => factor.vendorField);
            //
            // process each group of distinct bloomberg field names
            for (int i = 0; i < dataGroups.Count(); i++)
            {
                // extract group, create data structures for securities and fields
                List<RiskFactor> dataGroup = dataGroups.ElementAt(i).ToList<RiskFactor>();
                List<string> securities = new List<string>();
                List<string> fields = new List<string>() { dataGroup[0].vendorField };
                //
                // import securities into data structure feeded to bloomberg api
                for (int j = 0; j < dataGroup.Count; j++)
                {
                    securities.Add(dataGroup[j].vendorTicker);
                }
                //
                // create and use request object to retrieve bloomberg data
                request = new BBCOMMWrapper.ReferenceDataRequest(securities, fields);
                result = request.ProcessData();
                //
                // write retrieved bloomberg data into risk factor group data
                for (int k = 0; k < result.GetLength(0); k++)
                {
                    string stringValue;
                    dynamic value = result[k, 0, 0];
                    //
                    if (value.GetType().ToString() == SYSTEM_DOUBLE)
                    {
                        // handle output value with divider only if retrieved value is double
                        // this means that data retrieval has been succesfull
                        double divider = Convert.ToDouble(dataGroup[k].divider);
                        stringValue = (value / divider).ToString();
                        dataGroup[k].value = stringValue;
                        counter++;
                    }
                    else
                    {
                        // write non-double value (bloomberg will retrieve #N/A) if retrieved value is not double
                        stringValue = value.ToString();
                        dataGroup[k].value = stringValue;
                    }
                }
            }
        }
    }
}
//
//
//
//
// DummyBBCOMMWrapper.cs
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;

namespace BBCOMMWrapper
{
    // abstract base class for data request
    public abstract class BBCOMMDataRequest
    {
        // input data structures
        protected List<string> securityNames = new List<string>();
        protected List<string> fieldNames = new List<string>();
        //
        // output result data structure
        protected dynamic[, ,] result;
        //
        public dynamic[, ,] ProcessData()
        {
            // instead of the actual Bloomberg market data, random numbers are going to be generated
            Random randomGenerator = new Random(Math.Abs(Guid.NewGuid().GetHashCode()));
            //
            for (int i = 0; i < securityNames.Count; i++)
            {
                for (int j = 0; j < fieldNames.Count; j++)
                {
                    result[i, 0, j] = randomGenerator.NextDouble();
                }
            }
            return result;
        }
    }
    //
    // concrete class implementation for processing reference data request
    public class ReferenceDataRequest : BBCOMMDataRequest
    {
        public ReferenceDataRequest(List<string> bloombergSecurityNames,
            List<string> bloombergFieldNames)
        {
            securityNames = bloombergSecurityNames;
            fieldNames = bloombergFieldNames;
            result = new dynamic[securityNames.Count, 1, fieldNames.Count];
        }
    }
}
//
//
//
//
// TextFileHandler.cs
using System;
using System.Collections.Generic;
using System.IO;

namespace MarketDataProcess
{
    public static class TextFileHandler
    {
        public static void Read(string filePathName, List<string> output)
        {
            // read file content into list
            StreamReader reader = new StreamReader(filePathName);
            while (!reader.EndOfStream)
            {
                output.Add(reader.ReadLine());
            }
            reader.Close();
        }
        public static void Write(string filePathName, List<string> input, bool append)
        {
            // write text stream list to file
            StreamWriter writer = new StreamWriter(filePathName, append);
            input.ForEach(it => writer.WriteLine(it));
            writer.Close();
        }
        public static void Write(string filePathName, string input, bool append)
        {
            // write bulk text stream to file
            StreamWriter writer = new StreamWriter(filePathName, append);
            writer.Write(input);
            writer.Close();
        }
    }
}