Rustie Lin UC Berkeley CS, Distributed Systems, Blockchain

Exploring Parallel Particle Simulation

Summary

The main problem is to simulate the interactions of particles within a finite space. Particles exert forces of repulsion on each other and can also bounce off the walls of the simulation grid.

nbody simulation with cutoff

Here, we present iterative improvements on a toy particle simulation. As with the previous post on parallelization, all results were calculated on Intel Xeon E5- 2698 v3 2.30 GHz CPU, this time across multiple processors. Graphs show performance in per processor $p$ speedup and also simulation time against the number of perticles and number of processors allocated.

We present three main algorithms:

  • Serial algorithm
  • Shared memory algorithm, OpenMP
  • Distributed memory algorithm, MPI

Serial Algorithm

We can reduce the running time complexity of our particle simulation to O(n) if for every particle we only consider the particles nearby. To formalize this, we split the simulation grid into bins and assign each particle to a bin based on its location in the grid. Then, for each particle in each bin, we consider the particles in its own bin as well as those in adjacent bins. So in total, there are 9 bins to consider. This reduces the standard O(n^2) running time complexity to O(n) due to the constant 9 bins.

To optimize the number of bins, we need to consider the effective area around each particle that applies the repulsive force. This is defined as cutoff. The dimensions of each bin should thus be a function of cutoff.

In implementation, we define datatype bin_t as follows, where a bin_t is a vector or particle:

typedef std::vector<particle_t> bin_t;

We choose store a vector of particles rather than a vector of particle pointers due to concurrency issues for when we parallelize.

Before the main simulation loop, we place all particles into their respective bins based on their starting x and y coordinates with some simple geometry:

//
// Puts particles into vector of all bins
//
void putIntoBins (particle_t* particles, vector<bin_t>& allBins, int n) {
    gridSize = sqrt(n * DENSITY);
    numBins = int(gridSize / binSize)+1; 
    allBins.resize(numBins * numBins);
    for (int i = 0; i < n; i++) {
        int x = int(particles[i].x / binSize);
        int y = int(particles[i].y / binSize);
        allBins[x*numBins + y].push_back(particles[i]);
    }
}

When considering the bins at the edge of the simulation grid, we do not have to consider a full 9 bins, but rather only 4 or 6 bins depending on whether the bin in consideration is on an edge or a corner. There are a few approaches to solving this issue:

  1. Using conditionals to check whether the bin is a corner or edge and then performing the appropriate simulation steps
  2. Padding the simulation grid with one layer empty bins and change bin iteration from [0,n) to [1,n-1) where n is the grid dimension

We choose to pursue the first approach so as to avoid additional memory usage for simulations of large size, since padding is linear to the dimensions of the simulation grid.

After simulating and applying force to each particle, we then move the particles to their new bins in the next simulation step. We choose to recycle the bins data structure.

Grid corner checking

In our main simulation loop, we consider all 8 surrounding bins around a single bin, but use conditionals to check for bounds. This is implemented as follows:

  int dx[3] = {-1, 0, 1};
  int dy[3] = {-1, 0, 1};

  for (int dx_index = 0; dx_index < 3; dx_index++) {
      for (int dy_index = 0; dy_index < 3; dy_index++) {
          if (i + dx[dx_index] >= 0 
              && i + dx[dx_index] < numBins 
              && j + dy[dy_index] >= 0 
              && j + dy[dy_index] < numBins) {
            ... // apply forces
          }
      }
  }

The arrays dx and dy represent the x and y coordinates of the surrounding bins, and we use simple bound checking with respect to the simulation grid in the innermost conditional.

Additional bin size considerations

We observed that bin size should be a function of the cutoff distance of each particle. Initially, we noticed that if we make the dimensions of each bin cutoff x cutoff, then in the worst case each particle would only have effect on the 9 bins as previously specified. We tested various bin sizes:

serial_binsize

All simulation times are measured in seconds on the y-axis. We noted that in implementation, having a bin size of cutoff * 2 consistently provided higher performance than previously speculated size of simply cutoff. We hypothesize this is due to the cost of potentially checking 9 bins compared to the relatively lower cost of our conditional checking. If bin size is cutoff * 2, there is a good chance that a particle’s force field of cutoff radius will not cross into all 9 bins. Having a bin size of cutoff * 2 maximizes on this, while avoiding making the bins too big so as to negatively affect the memory performance.

Shared Memory Algorithm, OpenMP

We used OpenMP to parallelize the particle simulation, and mostly kept the algorithm the same as the one in serial, with some minor adjustments for synchronization. Firstly, we use the parallelization option private(dmin) to make each thread have a private copy of the dmin variable. Similarly, for davg and navg, we use #pragma omp for reduction(+:navg,davg) in the simulation loop when moving particles. This is after noting that apply_force in common.cpp aggregates navg and davg using addition.

Synchronization

We also introduce an additional data structure bin_t moved_particles to keep track of the moved particles for each thread. We tried considering having different bin_t moved_particles for different threads and collecting the results thread by thread in the end, rather than using a lock around the variable.

parallel_synch

The above graph shows the results we achieved when we tried using different bin_t moved_particles for different threads and culminated them in the end.

In consideration of synchronization, we then made it such that only the master thread recalculates the bins for each of the moved particles. Therefore, we put a lock around the moved_particles variable. This gave a performance boost in comparison with the previous method. Also notable is the #pragma omp barrier used as an explicit barrier for all threads for each step in the main simulation loop.

Results

Different values of binsize, Parallel implementation

As with our serial implementation of the simulation algorithm, we considered various bin sizes.

parallel_binsize

The two plots above show the simulation times for increasing numbers of particles, for both the serial and parallel implementations. We see that in both implementations, a binsize equal to cutoff * 2 performs best, which is why we chose this value to be the dimensions of the bins in our implementations.

Serial vs Parallel Runtimes, log-log scale

serial_loglog

parallel_loglog

The runtime of both the serial and parallel implementations are linear O(n) in the log-log scale, as can be seen from the plots above. This is consistent with the expected behavior of the serial and parallel implementations, as the algorithm we used to do the particle simulation lowers the computational complexity of the program from O(n^2) to O(n).

Num_threads vs Runtimes, Parallel

numThreads

Performance increases as we add more threads, with a optimum seemingly reached at num_threads = 32, which achieves a speedup factor of around 19.3.

speedup_plt

Above is the speedup plot, with linear speedup represented as the orange line. After around 20 threads, the observed speedup no longer follows the linear pattern and levels off, suggesting perhaps that at this point, the overhead in parallelization (e.g. communication and synchronization methods) exceeds the speedup resulting from the parallelization.

Design choice note

Originally in designing our parallel algorithm, we had envisioned using nested parallelism. Instead of having to iterate through all bins and for each bin check the 9 bins around it, we would parallelize further. We would split the entire simulation grid up into tiles (much like the block tiling from matrix multiply), each tile being operated upon by a thread. Then, for each tile, the threads would then spawn a thread for each of the 9 surrounding bins, since applying the force from one particle upon all others can be parallelized so long as we fork and join for each particle in each bin. However, we quickly discovered that this type of nested parallelism is not recommended due to oversubscription. The number of threads would be too great compared to the number of cores, thereby increasing the parallel overhead.

Distributed memory algorithm, MPI

The key observation in reducing the running time complexity from quadratic to linear time was to levergae data locality and place nearby particles into bins. Each particle would then only need to apply force on particles in the 8 surround bins and also in the bin it belongs to. In consideration of the shared memory paradigm, we used OpenMP to write a multithreaded program such that each thread applied forces on a subset of bins, and in a critical section aggregated all forces by Newton’s second law and then moved the particles. This was part 1.

To leverage multiple processors connected through some interconnection network, we need to redesign the particle simulation algorithm for distributed memory. Our main decision was to consider communication overhead in light of computational running time complexity.

NOTE: In this paper, we use the terms root processor and master processor interchangeably. They are the main processor with rank == 0. Also used interchangeably are the terms processor and process.

Geometry & Row Abstraction

We consider simulation on a per-row basis, such that each processor is responsible for applying forces from all particles in a set number of rows of bins. We choose the row abstraction (e.g. as opposed to quadrants) because it is easy to identify the neighbors of a row. Because simulation of a force from a particle requires at most the 8 bins surrounding it and also the bin it is contained in, we need to consider all neighbors of a bin. By considering rows, we only have to worry about vertical neighbors, as horizontal neighbors are by definition included in the row.

If by formulation each processor receives a certain number of rows of bins, we must consider the particles that move into and out of the scope of the processor’s simulation.

Global & Local Scope of Calculations

Initally, all particles are broadcast to all processors, which then groups the particles into bins deterministically. All processors start with a trivially global view of the simulation. However, each time step makes each processor’s global view diverge. Processors simulate on only the rows allocated to it, but also consider the particles in the rows directly above and below it (e.g. its vertical neighbors). This is because they have the potential to move into and out of the rows in scope of the simulation.


if (rank != 0) {
  for (int j = 0; j < numBins; j++) {
    bin_t& temp_bin = allBins[(start) * numBins + j];
    for (int i = 0; i < temp_bin.size(); i++)
      toMove.push_back(temp_bin[i]);
    temp_bin.clear();
  }
}

...

//Delete neighbours as we will get them in the new timestep
if (rank != 0) {
  for (int j = 0; j < numBins; j++) {
    bin_t& temp_bin = allBins[(start - 1) * numBins + j];
    temp_bin.clear();
  }
}


In the above code snippet, we are clearing out the bins in the row directly above the start row index. We are also moving all particles in the row specified by the start index, since they have the potential to move outside of the simulated rows and move inter-processor.

Distributed Memory Algorithm

We implement the following distributed memory algorithm using MPI functions in C++. All calls distributing, gathering, scattering, etc. particles amongst processors are blocking, and is discussed in the next section.

  • Broadcast all particles to all processors, bearing large communication overhead to save later
  • All processors place particles into bins
  • All processors calculate start and end row indices, to define the rows of bins that each processor will be simulating the particles of
  • Simulate for N timesteps:
    • All processors compute forces for all particles for all bins in their rows from start to end row indices
    • (davg, navg, and dmin must be reduced when flag is set)
    • All processors calculate intra and inter-processor particle movement (with respect to the rows of bins each processor simulates)
    • All processors perform intra-processor moves
    • All processors consider vertical neighbors (top and bottom rows) and empty them
    • All processors send inter-processor moves, as well as information about the vertical neighbors, and gather at the root process
    • Root process gathers all information about moved particles between processors, and scatters back to all processors
    • All processors receive particles from root processor about inter-process moves and re-bins these inbound particles
    • All processors have new local state of particles and can continue with next simulation step

NOTE: while each processor starts with a copy of all particles, the majority of the view of each processor quickly becomes stale. Processors are only aware of the narrow band of rows it simulates on, as well as its vertical neighbors. All other particles in other bins/rows are inconsistent and do not affect the state of the simulation. Only the root process after gathering particle movements gains a global view of the entire simulation.

MPI Usage

We use the standard MPI message passing interface to communicate between processors. Primarily, our general pattern is to handle a majority of the global reduction and aggregation logic in a master root process, and distribute state to be worked on symmetrically by other processors. We use the MPI datatype PARTICLE, which is defined as six consecutive MPI_DOUBLE, as the serial analog consists of six c doubles representing position, velocity, and accelerator for both the x and y directions.


MPI_Datatype PARTICLE;
MPI_Type_contiguous( 6, MPI_DOUBLE, &PARTICLE );
MPI_Type_commit( &PARTICLE );


Initially, particles are initialized by the root process, since init_particles is not deterministic. The particles are then broadcasted to all other processors using MPI_Bcast(particles, n, PARTICLE, 0, MPI_COMM_WORLD) with the communicator MPI_COMM_WORLD. Once all processors receive the broadcast of all particles, they then place all particles into bins. The initial binning process is deterministic and can be replicated for all processors:


if( rank == 0 )
    init_particles( n, particles );

// broadcast all particles from root and put them into bins
MPI_Bcast(particles, n, PARTICLE, 0, MPI_COMM_WORLD);
putIntoBins(particles, allBins, n);


In the main simulation loop, when calculating the davg, dmin, and navg statistics, it is important to reduce the variables. This is done using the following code. Calls to MPI_Reduce use MPI datatypes and operators and reduce into the root processor 0:


if( find_option( argc, argv, "-no" ) == -1 )
{
  MPI_Reduce(&davg,&rdavg,1,MPI_DOUBLE,MPI_SUM,0,MPI_COMM_WORLD);
  MPI_Reduce(&navg,&rnavg,1,MPI_INT,MPI_SUM,0,MPI_COMM_WORLD);
  MPI_Reduce(&dmin,&rdmin,1,MPI_DOUBLE,MPI_MIN,0,MPI_COMM_WORLD);
  if (rank == 0){
    //
    // Computing statistical data
    //
    if (rnavg) {
      absavg +=  rdavg/rnavg;
      nabsavg++;
    }
    if (rdmin < absmin) absmin = rdmin;
  }
}


The last major usage of MPI comes when considering the mechanism by which processes receive particles that move between bins. When particles move between bins, they can either move to a bin belonging to the same processor, or to a bin belonging to a neighboring processor.

Intra-processor movement can be handled locally without MPI. However, Inter-processor movement is trickier. This is especially the case when considering MPI’s inability to deal with varying data sizes. Therefore, when communicating inter-process moved particles, we must first communicate the number of particles that move out of a process, and only then can the receiving processor know the size of data to receive.


int sendSize = toMove.size();
int rcvSize[n_proc];

// master process collects sizes to determine offsets
MPI_Gather(&sendSize, 1, MPI_INT, rcvSize, 1, MPI_INT, 0, MPI_COMM_WORLD);  

The array rcvSize records the number of inter-process moves into the processor represented by the index of the array. In other words, processor i can expect rcvSize[i] inbound inter-process moved particles. Each processor records its own sendSize since only it knows the number of particles from its local simulation that move out of the current processor. We use a call to MPI_Gather to gather all processors’ values of sendSize into the array rcvSize in the root processor.

The goal is to scatter variable information amongst processors, depending on their simulation space, so we have to calculate offsets and sizes to pass into the scatter MPI call:

int offset[n_proc];
int totalSize = 0;
bin_t inboundParticles;

if (rank == 0) {
    offset[0] = 0;
    for (int i = 1; i < n_proc; ++i) {
        offset[i] = offset[i-1] + rcvSize[i-1];
        totalSize += rcvSize[i - 1];
    }
    totalSize += rcvSize[n_proc-1];

    inboundParticles.resize(totalSize);
}

Offsets are defined by the number of particles inbound before it, which is defined by the previous offset plus the previous size: offset[i] = offset[i-1] + rcvSize[i-1]. The counter totalSize exists to track the size of all received particles, which is why resize is called at the end. Also note that this subroutine is only run by the root process, which is rank 0. Afterwards, the root process calls MPI_Gatherv with the calculated arguments:


// after determining offsets, master process collects the particles to be moved
MPI_Gatherv(toMove.data(), sendSize, PARTICLE, 
    inboundParticles.data(), rcvSize, offset, PARTICLE, 
    0, MPI_COMM_WORLD);

After the root process gathers all the particles to be moved between processes, it must scatter that information. We can do this the same way as before, in which we must scatter information about the size before scattering. Before doing this, it’s easy to flatten all particles into a single vector, and specify offset and size as before. After extracting all particles into a vector, we call MPI_Scatter and MPI_Scatterv.


MPI_Scatter(rcvSize, 1, MPI_INT, &sendSize, 1, MPI_INT, 0, MPI_COMM_WORLD);
...
//ready to scatter with offset sizes
MPI_Scatterv(singleVector.data(), rcvSize, offset, PARTICLE, 
    foreignParticles.data(), sendSize, PARTICLE, 0, MPI_COMM_WORLD);

Above, we scatter all the sizes of the particles to be received by each process, and in the subsequent call, we scatter vectors of particles from the root process into all other processors. After these MPI calls, all processors have access in their memory the particles that have moved into their rows (e.g. foreignParticles). They then put these particles into bins and continue onto the next step of simulation. This is possible because all MPI calls to Scatter are blocking.

![normal]((https://rustie.xyz/res/img/normal.png){: .center-image }

After implementing MPI distributed memory algorithm, we achieved simulation times as seen in the figure above. Results were run on Cori using 16 processors. In varying the number of processors, we saw the below speedup. We ran the benchmark with 100,000 particle simulation:

![num_proc_linegraph]((https://rustie.xyz/res/img/num_proc_linegraph.png){: .center-image }

The simulation time generally is inversely proportional to the number of processors allocated to the distributed memory algorithm. There seems to be a point where computation overhead and communication overhead trade dominance near when the number of processors equals 8 and 16. There is a similar occurance early on with smaller number of processors too. Anomolies in this data are explained in the next section.

Considerations

Communication overhead

One key factor in achieving speedup in the particle simulation is to consider the communication overhead in comparison to computational complexity. Our initial assumption was that communication dominates computation. It should be that the more computation each process does before gathering back together in the root process, then the less the communication overhead counts in the overall speedup.

The cost of communication is also considered in regards to the scope of each processor’s simulation. It is assumed to be very costly to broadcast the updated state of all particles to all processors at each time step in the simulation. In our particle simulation implementation, we only broadcast all particles to all processors before the simulation happens. Afterwards, processes are only responsible for simulating and communicating particles from a narrow band of rows, thereby minimizing the communication done between processors.

The consideration of intra and inter-processor particle movement is one of the biggest factors affecting communication overhead, since particles are only communicated when they move between bins owned by different processors. Again, this only affects algorithmic performance when our assumption that communication overhead dominates compute overhead is true.

An optimization we tested to compare communication and computation overhead was to assume all particles on the edge of rows were going to move out of the row. Originally, we had only scattered the particles that we have calculated to move to a bin outside of a processor’s row, to increase computation in order to reduce communication later on.

opt_vs_normal

Conclusion

After testing various MPI implementation methods and schemes, we were able to achieve strong scaling efficiency of 0.26 and weak scaling efficiency of 0.24. Additional efficiency gains can be observed when combining multi processor schema such as MPI with multi threaded schema such as OpenMP as seen in part 1 of our report.

Especially notable in this project was when considering the communication versus computation overheads and plotting the results of simulation, we concluded that for small simulations, computational overhead dominates communication overhead. This could be the result of scheduling and traffic on Cori, as there was unusual difficulty getting jobs scheduled and run recently. This is not a general rule for all systems, and was the result of testing many times on Cori.

Future work will explore further speedup methods such as using GPU to accelerate computation.