Parallel QuantLib
Massively parallel execution of QuantLib pricing becomes easy when using our QLW QuantLib wrapper. The combination of Java/Linux platform, simpler functional interface and integration with open-source technologies means that parallel, reliable, execution of tasks on up 100s of nodes becomes easy to implement and monitor.
In particular we are able to offer complete ready-made parallel QauntLib solutions based on Apache Hadoop Map-Reduce Technology.
For more information or quotes for your particular project, contact us at webs@bnikolic.co.uk.
Hadoop Map-Reduce Example of QuantLib Cloud computing
Below is an example of qlw- and Hadoop cloud pricer for QuantLib using the Map-Reduce technology. It can be easily run on data-centre clusters, or on Amazon's EC2 Elastic Map Reduce. It offers a high lever reliability, simplicity and security. It is the simplest way of running QuantLib in the Cloud!
// -*- mode: java -*-
// Copyright (C) 2012 Bojan Nikolic <bojan@bnikolic.co.uk>
//
// All rights reserved, for on-screen viewing only
import java.io.IOException;
import java.net.URI;
// QLW stuff
import co.uk.bnikolic.qlw.property_t;
import co.uk.bnikolic.qlw.qlw;
import co.uk.bnikolic.qlw.StringVector;
// End QLW stuff
// Hadoop stuff
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BooleanWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.SequenceFile.CompressionType;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.SequenceFileInputFormat;
import org.apache.hadoop.mapred.SequenceFileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.hadoop.filecache.DistributedCache;
// end hadoop stuff
public class hadooppricer extends Configured implements Tool {
public static String mkObjToPrice(double strike) {
final String postf= new String("s"+strike);
String daycountConvention = "Actual/365 (Fixed)";
String payoffType = "Vanilla";
String optionType = "Put";
String engineType = "AE"; // Analytic European
String calendar = "TARGET";
double dividendYield = 0.00;
double riskFreeRate = 0.06;
double volatility = 0.20;
double underlying = 36;
int evaluationDate = 35930; // 15 May 1998
int settlementDate = 35932; // 17 May 1998
int exerciseDate = 36297; // 17 May 1999
qlw.qlSettingsSetEvaluationDate(new property_t(evaluationDate),
qlw.OH_NULL());
String idBlackConstantVol = qlw.qlBlackConstantVol("my_blackconstantvol"+postf,
new property_t(settlementDate),
calendar,
volatility,
new property_t(daycountConvention),
qlw.OH_NULL(),
qlw.OH_NULL(),
false);
String idGeneralizedBlackScholesProcess =
qlw.qlGeneralizedBlackScholesProcess(
"my_blackscholes"+postf,
idBlackConstantVol,
underlying,
new property_t(daycountConvention),
new property_t(settlementDate),
riskFreeRate,
dividendYield,
qlw.OH_NULL(),
qlw.OH_NULL(),
false);
String idStrikedTypePayoff =
qlw.qlStrikedTypePayoff("my_payoff"+postf,
payoffType,
optionType,
strike,
new property_t(strike),
qlw.OH_NULL(),
qlw.OH_NULL(),
false);
String idExercise = qlw.qlEuropeanExercise(
"my_exercise"+postf,
new property_t(exerciseDate),
qlw.OH_NULL(),
qlw.OH_NULL(),
false);
String idPricingEngine = qlw.qlPricingEngine(
"my_engine"+postf,
engineType,
idGeneralizedBlackScholesProcess,
qlw.OH_NULL(),
qlw.OH_NULL(),
false);
String idVanillaOption = qlw.qlVanillaOption(
"my_option"+postf,
idStrikedTypePayoff,
idExercise,
qlw.OH_NULL(),
qlw.OH_NULL(),
false);
qlw.qlInstrumentSetPricingEngine(idVanillaOption, idPricingEngine, qlw.OH_NULL());
String idPricingObj = qlw.qlPricingObject("pricingobj"+postf,
idVanillaOption,
idPricingEngine,
new property_t(evaluationDate),
qlw.OH_NULL(), qlw.OH_NULL(),
false );
return idPricingObj;
}
public static String mkAmerOption(double K, double v)
{
property_t today=new property_t(35930);
property_t settlementdate=new property_t(35932);
property_t exercisedate=new property_t(36297);
property_t dcc=new property_t("Actual/365 (Fixed)");
final String postf= new String("k"+K+"vol"+v);
String payoff=qlw.qlStrikedTypePayoff("payoff"+postf,
"Vanilla",
"Put",
K,
qlw.OH_NULL(),
qlw.OH_NULL(),
qlw.OH_NULL(),
false
);
String exercise=qlw.qlAmericanExercise("exercise"+postf,
settlementdate,
exercisedate,
new property_t(false),
qlw.OH_NULL(),
qlw.OH_NULL(),
false);
String option=qlw.qlVanillaOption("option"+postf,
payoff,
exercise,
qlw.OH_NULL(),
qlw.OH_NULL(),
false);
String vol=qlw.qlBlackConstantVol("vol"+postf,
settlementdate,
"TARGET",
v,
dcc,
qlw.OH_NULL(),
qlw.OH_NULL(),
false);
String process=qlw.qlGeneralizedBlackScholesProcess("process"+postf,
"vol"+postf,
K,
dcc,
settlementdate,
0.06,
0.00,
qlw.OH_NULL(),
qlw.OH_NULL(),
false) ;
String pengine=qlw.qlBinomialPricingEngine("pengine-"+postf,
"LR",
process,
500,
qlw.OH_NULL(),
qlw.OH_NULL(),
false);
String idPricingObj = qlw.qlPricingObject("pricingobj"+postf,
option,
pengine,
new property_t(today),
qlw.OH_NULL(), qlw.OH_NULL(),
false );
return idPricingObj;
}
public static String mkPOString(double K, double v) {
StringVector idList= qlw.allPrecedents(mkAmerOption(K, v));
return qlw.ohObjectSaveString(idList,
new property_t(true),
qlw.OH_NULL());
}
/** Map Pricing objects to NPV
*/
public static class QLNPVMapper extends MapReduceBase
implements Mapper<Text, Text, Text, DoubleWritable> {
private JobConf conf;
@Override
public void configure(JobConf job) {
conf = job;
}
public void map(Text po_id,
Text po_ser,
OutputCollector<Text, DoubleWritable> out,
Reporter reporter) throws IOException {
qlw.ohObjectLoadString(po_ser.toString(),
qlw.OH_NULL(),
qlw.OH_NULL());
double NPV=qlw.qlPONPV(po_id.toString(),
qlw.OH_NULL());
out.collect(po_id, new DoubleWritable(NPV));
}
}
static private final Path PREF = new Path("hadooppricetest");
public int run(String[] args) throws Exception {
final JobConf jobConf = new JobConf(getConf(),
getClass());
// Configuration of Hadoop so that it correctly picks up
// QuantLib libraries
jobConf.setJar("hadooppricer.jar");
jobConf.setJobName("Hadoop pricer");
jobConf.setInputFormat(SequenceFileInputFormat.class);
jobConf.setOutputKeyClass(Text.class);
jobConf.setOutputValueClass(DoubleWritable.class);
jobConf.setOutputFormat(SequenceFileOutputFormat.class);
jobConf.setMapperClass(QLNPVMapper.class);
jobConf.setNumMapTasks(10);
jobConf.setNumTasksToExecutePerJvm(-1);
jobConf.setNumReduceTasks(0);
final FileSystem fs = FileSystem.get(jobConf);
DistributedCache.addFileToClassPath(new Path("/opt/qlw.jar"), jobConf, fs) ;
DistributedCache.createSymlink(jobConf);
DistributedCache.addCacheFile(new URI("hdfs://localhost:9000/opt/libqlw.so#libqlw.so"), jobConf) ;
DistributedCache.addCacheFile(new URI("hdfs://localhost:9000/opt/libQuantLibAddinCpp-1.1.0.so#libQuantLibAddinCpp-1.1.0.so"), jobConf) ;
DistributedCache.addCacheFile(new URI("hdfs://localhost:9000/opt/libQuantLibAddin-1.1.0.so#libQuantLibAddin-1.1.0.so"), jobConf) ;
DistributedCache.addCacheFile(new URI("hdfs://localhost:9000/opt/libObjectHandler-1.1.0.so#libObjectHandler-1.1.0.so"), jobConf) ;
DistributedCache.addCacheFile(new URI("hdfs://localhost:9000/opt/libQuantLib.so.0#libQuantLib.so.0"), jobConf) ;
DistributedCache.addCacheFile(new URI("hdfs://localhost:9000/opt/libboost_serialization.so.1.48.0#libboost_serialization.so.1.48.0"), jobConf) ;
DistributedCache.addCacheFile(new URI("hdfs://localhost:9000/opt/libboost_regex.so.1.48.0#libboost_regex.so.1.48.0"), jobConf) ;
DistributedCache.addCacheFile(new URI("hdfs://localhost:9000/opt/libboost_filesystem.so.1.48.0#libboost_filesystem.so.1.48.0"), jobConf) ;
DistributedCache.addCacheFile(new URI("hdfs://localhost:9000/opt/libboost_system.so.1.48.0#libboost_system.so.1.48.0"), jobConf) ;
final Path inDir = new Path(PREF, "in");
final Path outDir = new Path(PREF, "out");
FileInputFormat.setInputPaths(jobConf, inDir);
FileOutputFormat.setOutputPath(jobConf, outDir);
if (fs.exists(PREF)) {
throw new IOException("Tmp directory " + fs.makeQualified(PREF)
+ " already exists. Please remove it first.");
}
if (!fs.mkdirs(PREF)) {
throw new IOException("Cannot create input directory " + PREF);
}
// Setup up the jobs to be done
for(int j=0; j < 100; ++j)
{
final Path file = new Path(inDir, "part"+j);
double vol=0.1+0.2/100*j;
for(int i=0; i < 100; ++i)
{
final double strike=39.5+i*0.01;
final String postf= new String("k"+strike+"vol"+vol);
final Text poid= new Text("pricingobj"+postf);
final Text poser = new Text(mkPOString(strike,vol));
double NPV=qlw.qlPONPV(poid.toString(),
qlw.OH_NULL());
//writer.append(poid, poser);
}
System.out.println("Wrote input for Map #"+j);
//writer.close();
}
System.out.println("Starting Job");
final long startTime = System.currentTimeMillis();
// JobClient.runJob(jobConf);
final double duration = (System.currentTimeMillis() - startTime)/1000.0;
System.out.println("Job Finished in " + duration + " seconds");
return 0;
}
/**
* main method for running it as a stand alone command.
*/
public static void main(String[] argv) throws Exception {
System.exit(ToolRunner.run(null, new hadooppricer(), argv));
}
}