Lecture Notes on Big Geospatial Data

Table of Contents

Students will get an overview of methods and infrastructures for parallel computing with large data sets and of methods for parallel processing of geo-information. At the end of the course they are able to evaluate and use frameworks and approaches to solve their own projects.

First, different methods of parallel processing are discused (e.g. processes, threads, semaphores, OpenMP, CUDA/GPGPU, MPI, VGAS Systems, Hadoop MapReduce, NoSQL, Spark). Following this, standard approaches for the processing of location data are introduced. Topics such as aggregation (e.g. mean values, location entropy, KDE, rasterization, hotspot detection), data locality (e.g. space filling curves and geohash, space-time cubes, clustering), and processing of navigation and trajectory data are discussed using real-world examples.

1 Introduction

1.1 Programming Languages for Big Data

This section gives an overview about some important tools for processing big geospatial data. It is not the aim of this section to introduce them in full detail. Instead, some aspects of these tools are highlighted and links to useful online tutorials and introductions are collected for those, who really need to learn working in such environments. For the remainder of this lecture, we want to keep aspects such as the environment or programming language open. Still, some programming environments are more flexible than others and we will actually use different programming languages in different contexts. But for now, let us shortly look at what is relevant for big geospatial data, today.

1.1.1 Python

Python is a very flexible scripting language in which many modern concepts (such as list comprehensions and lazy evaluation) are integrated, while classical imperative styles are also possible. As python is so small, it is ideal for gluing together many different algorithms and libraries in a flexible way. As a consequence, you can use matplotlib, numpy and scipy leading to an environment which is quite similar to MATLAB. Using networkx allows you to flexibly and simply do graph analytics (at least in medium scale).

Anyone working in the field of scientific computing (including, but not limited to big geospatial data) will sooner or later trip into python. It is definitely a language to take seriously, especially as it is also often used to glue high-performance C++ with Java-based distributed systems such as Apache Spark.

1.1.2 R

R is a free implementation of a scripting language based on the S language. At its heart, R is a functional programming language. This essentially means that functions are intended to be used as arguments to other functions.

R is extremely simple to extend with modules written in modern C++ and is very platform independent as it brings its own C++ compiler suite for Windows. Additonally, R objects usually support named rows and columns which reduce the complexity of book-keeping indices and knowing the exact meaning of each column of a matrix.

Furthermore, R is based on a small set of functions which behave differently given different objects. For example, you can call plot on almost every object and it will create a plot which the programmer of a specific class intended to provide. And with respect to machine learning, you always use the methods fit and predict.

Possibly one of the most important advantages of R over other environemnts is, however, the Comprehensive R Archive Network (CRAN). It is not only a huge repository of software for the R environment, it is also one with high quality. Most R packages consist of usage documentation (called vignettes) and scientific documentation (usually papers in well-known scientific journals). Additionally, most packages must be open source and are compiled on each specific system during installation. As vignettes are most often written in R markdown, a mixture of the markdown language for writing texts and R source code, one is usually able to see, how the library should be used. Additionally, R is widely accepted in the field of statistics and commercially supported leading to integration in Microsoft SQL server and very easy methods to write interactive web applications using Shiny.

The following list of references should get you started with learning R:

1.1.3 MATLAB and Octave

MATLAB is a system mainly intended for matrix algebra and numerical computations. It is widely accepted in the engineering domains and - together with its extensions called toolboxes - a fully-fledged scientific computing environment.

Octave is a free clone of MATLAB, however, it is not fully compatible and the number and quality of toolboxes is lower.

Still, it is possible to develop big data applications in octave or MATLAB, especially, when the core of the envisioned application can easily be formulated by matrix operations. With respect to spatial computing, linear algebra systems such as multilateration-based positiong or Kalman filtering can be formulated in a quite natural way. However, both systems decided not to support call by reference. Therefore, code reusability is limited as the creation of large copies of data can often only be avoided by using global variables.

1.1.4 C++

C and C++ have been the most important programming languages since their invention. They reach the perfect mixture between simple and modern constructs and nearness to the actual machine code. You can use pointers, but you need not. You can embed assembler, but you need not. You can manage your memory using malloc, but you need not.

This is, of course, only true for modern C++ as of C++11. Therefore, you will find a lot of sources claiming that C/C++ is a bad programming language lacking whatever their favourite language provides for them.

While this is - in general - not true, one should understand that the flexibility and power of C++ comes with a responsibility and a claim like that it is very easy to have security holes in C++ applications is actually true.

With respect to modern computing, however, C++ is the language in which you write source code for your favourite graphics card (for example, NVIDIA CUDA), in which the SDK of your favourite CPU (for example, Intel Xeon Phi) is given to you, in which supercomputers and computer games are usually programmed, because you can unlock a lot of performance with C++.

One aspect of modern C++ is that it provides an extremely flexible generic programming mechanisms. This means that algorithms are written in terms of concepts and not in terms of types. A concept is best introduced as an expectation of what you can do with a type. For example, you could expect your types to be addable. You can later plug in any type you like as long as the type supports the concept.

1.1.5 Java

Java is an artificial programming language introduced in 1995 by James Gosling. It is based on a strong interpretation of object orientation and runs everything on a virtual machine. As Java was fully specified from scratch, it was possible to do it in a way such that the most important drawbacks of other languages were mitigated. However, it comes more and more to a limitation that Java was specified quite strictly object-oriented as object orientation incurs a lot of overheads for object creation and memory management.

Still, the fully specified error and memory handling of the Java platform introduces a new level of quality for software. In summary, the platform independence of Java through its virtual machine architecture, the clean programming language, and the comparably high quality of software even written by non-experts has led to the introduction of Java as one of the most important languages in universities and corporations.

Many distributed systems (scale-out systems) like Apache Hadoop, Apache Cassandra, or Apache HBASE at the heart of major Internet companies are widely written in Java for the reason that some severe security issues are prevented and that the software is easy to manage over distributed, heterogenous systems.

There is an ongoing discussion on whether Java is in general slower than C++. Of course, this question does not make sense, as performance is a property of a specific implementation of an algorithm. What is correct is that it is possible to write quite efficient software in both worlds. However, you can easily find big data situations in which experienced Java programmers leave the object-oriented principles and use, for example, pre-allocated space for their data structures avoiding overheads from excessive garbage collection.

Java is the programming language of software architects and is a very good choice for distributed systems. However, it is limited in what it can unlock from single computers due to the virtual machine model. Additionally, as Java is quite strict with respect to error handling (for example catching exceptions), Java programs are somtimes "noisy" in the sense that error handling code reduces the readability and compactness of the source code.

The following links should get you started with writing Java code:

1.1.6 Scala

Big datasets are often consisting of many semnantically similar things which need to be related to each other. For example, one might have a very large set of points and wants to find the country in which each point lies. This raises the question, whether computations like this should be defined by telling the computer what it should be doing with any point it finds or as a loop over points. Logical programming languages such as LISP, R or Scala are often designed to let functions operate on things with expressions such as apply (recently called map), filter, reduce.

As many of these typical idioms are easily executed in parallel over many computers,logical programming languages are well-suited for writing software for distributed systems.

In this setting, Scala is a relatively new language evolved around the Apache Hadoop big data family. You can write Spark applications directly in Scala and, thereby, reduce the amount of source code drastically.

The future will show, whether Scala will keep its role as an important language. What is a promising aspect of Scala is that it runs on top of the Java virtual machine and can easily interact with Java. Hence, many big data projects can first be prototyped in Scala and performance bottlenecks can be removed one after another by implementing relevant aspects in Java.

2 Part I: Parallel Computing

2.1 Introduction

Parallel computing is a set of ideas and tools in order to enable us to apply computing in large scale. While classical personal computers were purely serial computing devices leading to the dominance of imperative programming languages in which programs run step by step from a beginning towards an end, modern computing infrastructure are getting increasingly parallel. Even smartphones nowadays have more than a single core and are able to execute several programs in parallel.

This text, which eventually might become a book on the topic, intends to bring together the core ideas, tools and techniques of parallel programming with a focus on geospatial data. The world is a highly parallel system: think of each person on the planet or each car on the street as a single object. Most of these objects behave independently from others while some (usually those that are nearby each other) behave depending on each other. For example, a congestion in New York will not have any effect on traffic in Europe, but a congestion on a specific route increases traffic on alternative routes nearby. This fact eases the use of highly parallel systems for spatial data analysis in contrast to fully dependent networks in which it is not obvious how problems can be split across multiple processing nodes.

However, parallel processing requires much more complicated techniques to tackle the inevitable problems of distribute computing.

In the following section, we first introduce parallel computing roughly from small-scale parallelism inside single nodes towards large scale parallel computing. We also introduce some general problems of distributed computing which illustrate how difficult it is to provide consistent, reliable, and high performance services over a distributed computing infrastructure.

While going through the various areas of Geospatial Big Data, we try to give concrete examples, which can all be found on Github. You, my dear reader, are encouraged to help by implementing solutions to problems in your most favourite programming environment and by correcting, supplementing, and discussing all this.

But for now, let us skip over to the first topic: let us give an overview of parallel programming which is the art of writing computer programs that can efficiently exploit parallelism.

2.2 An Overview of Parallel Programming

In the following, we will discuss three areas of parallel programming which are happening within a single personal computer or server. The first section discusses processes, threads and semaphores in general, which are tools which have been invented to control the parallelism of all current operating systems. Then, we will dive into annotation-based parallel programming, which is a set of ideas how we can annotate sequential source code in order to enable parallel execution at least of parts of it. The last topic in this section will be a very short (and incomplete) introduction to GPU programming using NVIDIA CUDA.

2.2.1 Processes and Threads

Modern operating systems including Windows, Linux, Android and many others allow for multi-tasking. In this area, several different concepts are needed, which we briefly recall below:

  • an algorithm is an abstract sequence of actions that should be performed in order to solve a specific problem.
  • a computer program is a formulation of an algorithm consisting of actions that a computer can perform and in a form that a computer can interpret.
  • a process is a running instance of a program, that is, an executable computer program together with some metadata used by the operation system for actually executing the program

These three concepts are connected with each other by actions: transforming an abstract algorithm into a computer program is usually called implementation and might be correct or not. A computer program can exist as a binary object directly runnable on a CPU or in some programming language. The process of transforming any programming language into a form runnable on a specific computer is called compilation. We will call compiled computer programs native program. Alternatively, computer programs can be run by an interpreter, which basically executes actions based on a program description that is not directly interpretable by a computer. Creating a process for executing a given computer program is called instantiation. In general, many instances of the same program can be running (for example, several times your favourite text editor or browser). However, all running processes need to share the given computational resources including CPU, memory and hard disc, the assignment of true hardware resources to processes is generally managed by the operating system.

When discussing the concepts involved in running programs on modern computers, it is best to start from the perspective of the most simple computer, which consists of a CPU and some main memory called RAM. A CPU, however, contains some internal memory slots called registers, which are used in calculations as well as for specifying memory locations using pointers. A specific such pointer is the instruction pointer (IP), which points to a location in memory holding the next operation to be executed. At those memory locations, the native program needs to be provisioned before a process can be executed. A CPU has a specific instruction set, which is usually twice as large as the size of the CPUs registers, that is, for an 8-bit CPU, a 16-bit instruction set is used, for a 16-bit CPU, the instruction set is often 32 bit large. So, keeping with the simple example of an 8-bit CPU, say a modern microcontroller such as an Atmel AVR, 16 bit of program code make up a complete instruction. The following groups of instructions need to be implemented in order to have a fully functioning computer:

  • load data instructions for at least one register
  • move data instructions for copying data between registers
  • computation instructions to calculate with values in registers (e.g., add, multiply, divide, xor, etc.)
  • memory instructions for loading and storing data to and from main memory
  • port instructions for interfacing with external hardware
  • branching instructions allowing for jumping back and forth in the native program based on conditions and results of computations.
  • stack and call instructions for subprograms with arguments and recursion.

With these groups of operations, microcontrollers and also single cores of modern computers perform all their operations. The instruction set is sufficient for many algorithms and it is easy to write programs using these instruction sets.

However, such systems can only perform a single task at a given time. Therefore, the first and possibly most important thing a computer should learn is to share the given CPU resource. A classical way of doing this is to define something called process and let the computer run several processes in parallel. This basically means to introduce an operating system, which is the first and only program running exclusively on the hardware. The operating system now introduces a mechanism such that different programs can run at different times. Such programs running inside an operating system are usually called processes. The operating system now has many choices of how to run processes. Back in the old days (and even today on some larger supercomputers), processes are managed in a queue and run one after another. Many strategies (shortest job first (SJF), shortest job next (SJN),…) have been proposed to manage the execution of processes across computers. However, these strategies assume that only a single process can run at a time and that each process runs from its beginning to its end. This is called non-preemptive scheduling. But, computers have always been interacting with the outside world at least through a keyboard. The mechanism to handle this is called interrupt in which very short subprograms are triggered from external events such as someone pressing a key on a keyboard. In summary, this means that computers have had the ability to interrupt the execution of their program in order to react to external events. This ability has soon been extended in a way such that multiple processes can run at the same time. Therefore, the time is split into short slices and every process gets assigned slices of computation time in which their program runs on the CPU. This is called preemptive scheduling. As programmers will still assume full control over the computer, the values of all CPU registers must remain the same between different time slices assigned to the same process. This is a task of the operating system, which has been supported by specific CPU instructions. In practice, when the operating system switches from executing one process to executing the next, it first stores the actual values of all registers (including the stack pointer and instruction pointer) and restores the last values of all registers for the next process. This operation is called context change. An efficient implementation of context change led to the introduction of multi-tasking, that is, operating systems which can run different programs at the same time. The first fully working, pre-emptively multi-tasking operating systems were Sinclair QL (1984) and Commodore Amiga (1985). For Windows, preemptive multitasking was only introduced in Windows NT and later.

For preemptive multitasking systems, many ways exist, how processor time is assigned to processes. One strategy is called Round-Robin, in which the time is split into equal slots and a slot is assigned to each and every process once. If this happened, we start over from the first process. While Round-Robin is a fair scheduling strategy, it is not capable of guaranteeing processing times. Therefore, real-time operating systems might assign to processes based on priority and all intermediate forms exist. For example, for the Windows operating system, processes belong to one of the following six priority classes (cf. https://msdn.microsoft.com/de-de/library/windows/desktop/ms685100(v=vs.85).aspx)


This mechanism can be used to increase the performance of your favourite big data application in contrast to, for example, your email client. But, these priorities should be used with care. For example, multiple processes with high priority can easily block each other such that the overall gain in performance vanishes.

In order to reduce the amount of work done in changing context, a new and more lightweight unit of execution has been defined: the thread. From an operating system perspective, a thread is the same as a process, but several threads belong to the same process meaning that they share several resources with each other including the code segment, the data segment, and file descriptors. But, threads do not share their stack and register values with each other. Threads provide several benefits aside that context switching is - in general - faster for threads as compared to processes. However, the programmer has to take care that these shared resources are not used in a conflicting way by threads.

Wrapping up all previous sections, we learned that single cores can execute a single sequential program by exploiting instructions for calculation, decision, jumping and calling sub-programs. We further learned that the ability of modern computers to interrupt programs and change context can be utilized by operating systems to create the illusion of parallel execution. The entities of parallel execution are then called process. These are completely isolated from each other in the sense that each one has the feeling of running exclusively on the computer. Finally, threads have been invented to share resources between different execution paths making context changes faster. Note that modern computers have many cores meaning that the operating system can now actually assign thread or process execution slots in parallel. This, however, is often only realized from the perspective of a single program through a higher amount of processing time.

2.2.2 Thread Synchronization: Atomics, Semaphores and Mutual Exclusion

As described in the last section, threads have the capability of increasing the computational performance of a computer by reducing the overhead introduced by managing a full process per execution path. Threads can share segments of main memory, for example, the dataset being analyzed. Threads can additionally communicate with each other through main memory, while processes need full interprocess communication system (IPC) similar to network communication.

However, threads can also interfere which each other leading to severe data corruption. But, there are several mechanisms that programmers must apply to write correct and performant multi-threading applications. One of the biggest issues here is that a program might be working most of the time and fail only in specific situations.

There has been a long discussion on threads due to the differences in how hardware vendors designed the multi-threading API. But, a first step towards standardization were POSIX threads specified by the IEEE POSIX 1003.1c standard in 1995. Since then, most hardware vendors moved over to provide at least Pthread compatibility layers. Some performance data showing the superior performance of threads as compared to MPI or process creation (fork() on UNIX) is given on https://computing.llnl.gov/tutorials/pthreads/#Pthread

A more modern treatment has been given during the standardization of C++11. We will follow the results of this standardization, which hides several historical issues for a very clean API, which can directly be applied. As said, this thread API is extremely clean and only four major operations for threads are important:

  • create a thread giving a C++ sub-program as the thread entry point
  • join a thread, meaning that we pause execution of the calling thread until the thread being joined finishes.
  • detach a thread, meaning that we fully decouple the thread from the current thread.
  • yield a thread, meaning that the thread wants to stop execution, but immediately re-enter scheduling. At least with Round-Robin, this means, that this thread is put to the end of the scheduling list meaning that other threads can be scheduled if they are ready.

In fact, create should be used to create a thread, join can be used if the threads created should complete before our execution continues. detach is needed before we destroy the actual thread object. When the thread object is joinable (i.e., detach has not been called), a destruction of the thread object leads to termination.

The following example program illustrates this:

// thread example
// taken from http://www.cplusplus.com/reference/thread/thread/
// compile with  g++ -lpthread -std=c++11 -o thread thread.cpp
#include <iostream>       // std::cout
#include <thread>         // std::thread

void foo() 
  // do stuff...

void bar(int x)
  // do stuff...

int main() 
  std::thread first (foo);     // spawn new thread that calls foo()
  std::thread second (bar,0);  // spawn new thread that calls bar(0)

  std::cout << "main, foo and bar now execute concurrently...\n";

  // synchronize threads:
  first.join();                // pauses until first finishes
  second.join();               // pauses until second finishes

  std::cout << "foo and bar completed.\n";

  return 0;

First, two threads are created, then the main program waits for both threads to finish (in fact, it first waits for the first thread and then for the second).

In other programming environments, the thread API is slightly larger, but does not actually differ too much:

Now that we have seen, how one generates threads, we have to discuss what to do such that threads do not interfere with each other. Remember, that they share their memory and file descriptors. There are two philosohpies of how thread interference can be avoided: by using atomic operations on shared variables or by using mutual exchange (mutex). Atomic operations are operations that are guaranteed to be executed in a single CPU instruction (an atomic operation). Hence, context changes can only occur before or after atomic operations have been performed. Modern CPUs have specific instructions for the most important atomic operations such as setting a flag or adding some value to an integer. However, atomic operations are limited to extremely simple operations or bring their own inter-thread synchronization mechanism. The list of atomic types in C++11 gives a good feeling of what is possible atomically and what needs explicit synchronization, see http://en.cppreference.com/w/cpp/atomic/atomic.

In contrast to relying on using only atomic operations, programmers might want to know inside a thread that they are owning something in the sense that oneself has the exclusive right to access a specific resource. Such sections of programs are often called critical sections. A critical section is an area of shared code, where some mechanism has been put in place that only one of the parallel threads executes this section. In fact, each thread trying to enter a critical sections usually blocks until no other thread is inside this section. From a more general perspective, a critical section is a piece of code that should not run in parallel.

In general, there are two widely-used concepts for implementing critical sections: semaphores and mutexes. A semaphore is a data structure largely consisting of a counter and two operations. One operation decreases the counter, the other one increases the counter. These two operations are designed in a way that they cannot interfere. Then, semaphores can be used to assign a limited resource (e.g., four cores in a multicore system) to services (e.g., processes asking for a CPU). Usually, a semaphore starts with a number describing the amount of services available, which can be fixed like CPUs or dynamic in producer-consumer networks. A mechanism guarantees that the decreasing operation of the semaphore blocks until the resulting number is non-negative. This means, that when the semaphore has a value of zero and someone tries to decrement it, he will have to wait until someone else incremented the semaphore. A special case of a semaphore is the so-called mutex, which stands for mutual exclusion. A mutex is a shared data structure, which can be used to realize ownership in a conflict-free way. In contrast to a semaphore, however, it can only be locked or unlocked (e.g., the counter of a semaphore would be limited to two states) and unlocking can only be performed by the entity owning the lock. These two properties lead to completely different implementations making mutex and semaphores really different. Every thread can at any time try to lock the mutex. Locking a mutex can fail or block, depending on the actual implementation of the mutex. After locking the mutex, the thread can be sure that no one else successfully obtained a lock on this mutex. Therefore, a mutex can represent the right to use a specific resource, for example, writing to a shared memory data structure in a non-atomic way. Of course, mutexes can be used to implement critical sections: only the thread who owns the mutex executes the section and releases the mutex. But, mutexes can be used to resolve conflicts in more flexible ways. However, mutexes also raise risks such as deadlocks (e.g., thread 1 acquired a lock on mutex 1 and waits for a lock on mutex 2. However, mutex 2 is already locked by thread 2, who is trying to acquire a lock on mutex 1.).

In some environments, mutexes and semaphores are part of the thread class. A modern and compact mutex API has recently been proposed in the C++11 standard together with the C++ threads library. It cleans up with several historical variants and provides a clean interface of three functions, which are available on nearly all systems:

  • lock tries to obtain a lock and waits, if it does not directly succeed.
  • try_lock tries to obtain a lock and returns immediately with a return value of FALSE, if it was unable to obtain the lock immediately.
  • unlock releases a previously obtained lock such that other threads can obtain a lock on the mutex.

In C++, however, the mutex class is seldom used directly. Instead, four variants have been derived, which have better runtime semantics:

  • std::unique_lock manages a mutex in a more high-layer way (for, example, releases the mutex during deconstruciton). It is a wrapper leading to safer mutex usage (similar to std::unique_ptr<> for pointers).
  • std::lock_guard is a mutex wrapper for protecting a specific scope of code similar to a critical section. It fulfills RAII-sematics (Resource Acquisition is Initialization) and is exception-safe 1 and local.
  • std::scoped_lock (C++17) is a mutex wrapper with RAII-semantics. In addition to lock_guard, it allows for locking multiple mutexes at a time employing a deadlock-avoidance algorithm.

As you can see from these real-world standardizations, mutexe usage can be complicated and programmers must be careful, for example, avoid deadlocks. Therefore, simple locking of sections is often favoured over complicated multi-mutex architectures.

2.2.3 Annotation-based Parallelism Using OpenMP

In the last section, we learned that processes can be used to let computers run code in parallel and that threads can be used to do the same while all those threads can share resources like main memory. We also learned that these resources need to be protected from destructive access by different threads at the same time. Either using only atomic operations that cannot interfere with other operations or by using critical sections or explicit locks.

Using all these specific libraries had been a very problematic topic, especially, because the APIs of operating systems and libraries had extremely different semantics (partly reflected by already five specializations of mutex in C++17 for different semantics). Therefore, it has often been a burden for writing distributed programs and even more for reading and understanding distributed programs.

A completely different approach has evolved known as OpenMP: Let us just stick with regular sequential code in some programming language (Fortran or C++). Then, we will sooner or later trip into situations, where a specific pattern or parallelism should be used. The idea was now to leave the source code semantics intact and provide something called annotation-based parallelism. For this, specially-formatted comments are embedded into the source code allowing for parallel execution of sections, most notably for loops, scopes and sub-programs. Thread creation and management is left to the library and the programmer does not need to take care of that.

The following source shows a very simple example. First, a lot of square numbers are calculated and stored into a large array.

/*OpenMP sample*/

using namespace std;
vector<double> roots;

const size_t num_values = 1024*1024*256;

int main(void)
  roots.resize(num_values); // first allocate memory

  #pragma omp parallel for
  for (size_t i=0; i < roots.size(); i++)
    roots[i] = sqrt(i);
  cout << "Roots generated." << endl; 

  return 0;

The pragma line tells the compiler that the following for loop can be safely parallelized and OpenMP just does this. Let us look at the result:

bgd:~$g++ -o openmp openmp.cpp
bgd:~$time ./openmp
Roots generated.

real    0m3.422s
user    0m2.992s
sys     0m0.428s
bgd:~$g++ -fopenmp -o openmp openmp.cpp
bgd:~$time ./openmp
Roots generated.

real    0m1.596s
user    0m4.348s
sys     0m0.380s

As you can see, the user time has increased, while the real time has decreased. This is essentially, what one should expect from parallization: The used CPU time is higher than it was without parallel code, as we need to setup all threads and coordinate the parallel program. But the wall-clock time (real time) decreases, as we exploit more than a single core of our computer.

The following example shows, how you can run an arbitrary section of code in parallel using OpenMP. We illustrate two additional commands, one for a critical section, and one for trying to realize some operation atomically. Note that you will get compiler errors, if your system is unable to atomically implement a specific section. Furthermore, note that all OpenMP directives work on the next C++ block, which is either a block encapsulated in braces or the next instruction.

The output of this program looks similar to

Sequential processing.
Thread 3 waited 1224379
Globally, we waited already 1224379
Thread 5 waited 1267788
Globally, we waited already 2492167
Thread 4 waited 1327394
Globally, we waited already 3819561
Thread 0 waited 1701956
Globally, we waited already 5521517
Thread 7 waited 1727761
Globally, we waited already 7249278
Thread 1 waited 1798996
Globally, we waited already 9048274
Thread 6 waited 2085095
Globally, we waited already 11133369
Thread 2 waited 2525504
Globally, we waited already 13658873
Now, all threads finished. Going back to sequential processing.

All threads wait for a random time, no output is corrupted as the stream operation is put into a critical section and the global counter works well as it is accessed atomically.

The following links will help to get you started with using OpenMP

2.3 Message Passing with MPI

Message Passing is a traditional paradigm in creating scalable and highly parallel computer programs. The central idea of message passing is that you want to avoid any type of synchronization as for example mutexes for threads. Becuase, when applications scale, it becomes harder and harder to manage such locks in a distributed environment.

This paradigm has finally emerged into a single standard used by all major high performance computing stakeholders including researchers as well as hardware vendors. This standard is named Message Passing Interface (MPI) and consists of a simple API enabling to write scalable applications easily.

2.3.1 Message Passing Interface

The Message Passing Interface (MPI) is based on several assumptions that are reasonable for supercomputers, but, for example, stay in contrast to the dynamic scaling world of cloud computing. In terms of MPI, a cluster is a set of computers and each MPI invocation spawns one or more processors on each computer. These running processes are called processors. The number of processors is known beforehand and fixed for the whole run. That is, while the program is scalable, an instance cannot scale. From this assumption, it is reasonable to number the processors from \(0 \ldots N-1\). By tradition, processor \(0\) is often used as a master, which is responsible for all non-parallel tasks (for example, providing a user interface). The number assigned to each processor is called its rank while the total number of processors \(N\) is called size.

MPI provides several methods for sending and receiving messages while the basic behavior is that the send and receive operations run in parallel on all processors. Therefore, each processor uses its rank to find its role in communication. The following sections first explain the general anatomy of MPI programs, introduce point-to-point and collective communications, and finally give hints on advanced topics that make working with MPI even simpler.

The following code snippet is possibly the simplest sensible MPI program:

// compile with mpicxx helloworld.cpp -o helloworld.mpi
// run with mpirun -n 8 ./helloworld.mpi
#include "mpi.h"

int main( int argc, char *argv[] )
    int rank, size;

    MPI_Init( &argc, &argv);
    MPI_Comm_rank(MPI_COMM_WORLD, &rank);
    MPI_Comm_size(MPI_COMM_WORLD, &size);

    std::cout << "Hello world from processor " << rank << " out of " << size << std::endl;

    return 0;

This eaxmple shows that the general anatomy of an MPI program is quite simple: the library needs to be initialized with MPI_Init, then each process calculates its rank and the size of the cluster using MPI_Comm_rank and MPI_Comm_size. Then, the program is ready to perform the distributed tasks deriving the behavior of each node based on the rank. When everything is done, MPI_Finalize finalizes the MPI session.

It can be compiled using the MPI compiler and run through mpirun. Note that we need to give a number of processes to mpirun:

bgd:~$ mpicxx helloworld.cpp -o helloworld.mpi
bgd:~$ mpirun -n 8 ./helloworld.mpi
Hello world from processor 1 out of 8
Hello world from processor Hello world from processor 6 out of 8
Hello world from processor 7 out of 8
3 out of 8
Hello world from processor 2 out of 8
Hello world from processor 5 out of 8
Hello world from processor 4 out of 8
Hello world from processor 0 out of 8

One interesting thing to note is that fact that all output appeared on our screen. In many MPI implementations, the output and error streams (stdout and stderr) are sent back to the terminal starting the MPI session. This means, that all output is visible on the host, where you started the MPI program from. Note, however, that the ordering of messages is arbitrary and that messages from different nodes are not guaranteed to be output consistently (as seen in the example before). Additionally, this type of communication can severely degrade MPI performance. Hence, you should not rely on debug outputs sent in this way. If needed, you could write debug output to a different file per process and include a sufficiently accurate time stamp.

2.3.2 Sending and Receiving Messages

Parallelism in MPI is based on running the very same program on every compute node, however, with a different rank. Therefore, the control flow of the programs is often split between roles, for example sending and receiving messages. As a first simplification, note that the basic message passing mechanisms are synchronous: at the same time as one nodes sends a message, another node should receive it. The following snippet shows, how two nodes can send messages to each other.

// compile with mpicxx pingpong.cpp -o pingpong.mpi
// run with mpirun -n 2 ./pingpong.mpi
#include "mpi.h"

int main( int argc, char *argv[] )
    int rank, size;

    MPI_Init( &argc, &argv);
    MPI_Comm_rank(MPI_COMM_WORLD, &rank);
    MPI_Comm_size(MPI_COMM_WORLD, &size);
    if (size != 2)
      if (rank == 0)
	std::cout << "Error: Run only with 2 processors" << std::endl;

    int ping = 0;
    while (ping < 10)
       if (rank == ping % 2)
	  ping ++;
		   (rank == 0)?1:0,
	  if (rank == 0)
	     std::cout << "Rank 0 sent ping value: "<< ping << std::endl;
		    (rank == 0)?1:0,
	  if (rank == 0)
	      std::cout << "Rank 0 recv ping value: "<< ping << std::endl;

    return 0;

This program first checks that only 2 processors are instantiated. It uses MPI_Abort to abort the MPI session otherwise, first writing an error message on the processor with rank 0. Then, the variable ping is sent and received. Whether a node sends or receives is calculated as \(rank == ping % 2\). This means that rank 0 will be incrementing and sending all even values of ping, while rank 1 will be incrementing and sending all uneven values of ping. The other processor is operating as a receiver.

The node responsible for sending first increments ping and then uses MPI_Send to send a message. The parameters are as follows: First, a pointer to the data is given. In this case, just the address of the variable ping. Then, the number of items and the type, here one and MPI_INT, are given to the MPI library. The next paramter specifies the send destination: if rank zero, then rank 1 is going to be the destination or vice versa. The next parameter is an optional message tag, which is just set to 0. The parameter MPI_COMM_WORLD just means that we are communicating in the context of the full cluster. The parameters for the parallel MPI_Recv are identical. However, there is one additional argument expecting an MPI_Status pointer, which we set to the value MPI_STATUS_IGNORE as we want to ignore any status information for this message.

Look at the following result of running this program:

bgd:~$ mpicxx -o pingpong.mpi pingpong.cpp 
bgd:~$ mpirun -n 3 ./pingpong.mpi
Error: Run only with 2 processors
application called MPI_Abort(MPI_COMM_WORLD, -1) - process 1
application called MPI_Abort(MPI_COMM_WORLD, -1) - process 2
application called MPI_Abort(MPI_COMM_WORLD, -1) - process 0
bgd:~$ mpirun -n 2 ./pingpong.mpi
Rank 0 sent ping value: 1
Rank 0 recv ping value: 2
Rank 0 sent ping value: 3
Rank 0 recv ping value: 4
Rank 0 sent ping value: 5
Rank 0 recv ping value: 6
Rank 0 sent ping value: 7
Rank 0 recv ping value: 8
Rank 0 sent ping value: 9
Rank 0 recv ping value: 10

First, the program is compiled. When running with a wrong number of processors, you first see the message from rank 0 on the screen and then are informed about which process called MPI_Abort including the error code. When run with a correct number of two processors, the output is as expected: Rank 0 alternates between sending and receiving and the number counts smoothly from 1 to 10 as it is always incremented before output.

3 Part III: Examples and Appendix

3.1 OpenMP and C++

3.1.1 Program Snippet: Timing C++ Code in Modern C++

start= std::chrono::system_clock::now();
// Do the long calculations here
end = std::chrono::system_clock::now();
// Use the time caster for changing units.
cout << "Did the work in " << std::chrono::duration_cast<std::chrono::seconds> (end-start).count()
	     << " seconds" << endl;



If a part of a program owning a mutex is aborted due to an exception, the exception handler would have to release the locks on mutexes. However, this is often not possible, as the exception handlers (if any) are in a different scope. If the ownership of the mutex is, however, tied to the existence of an instance of the class lock_guard or unique_lock, it is released together with the instance during exception handling.

Author: Martin Werner

Created: 2017-05-11 Do 07:47

Emacs 24.4.1 (Org mode 8.2.10)