Introduction to Distributed Processing (mpi4py)

Hello, World

from mpi4py import MPI
comm=MPI.COMM_WORLD
print "Hello! I'm rand %d from %d running in total ..."%(comm.rank,comm.size)
comm.Barrier()
Executing your code in parallel
mpirun -np 4 python hello-world.py

Sum Two Vectors

from mpi4py import MPI
import numpy

comm = MPI.COMM_WORLD
rank = comm.Get_rank()
size = comm.Get_size()
Count=999
PCount=Count/(size-1)
# pass explicit MPI datatypes
if rank == 0:
   Adata = numpy.arange(0,Count,1, dtype='i')
   Bdata = numpy.arange(Count,0,-1, dtype='i')
   Cdata = numpy.empty(Count, dtype='i')
   for p in range(1,size):
      Start=PCount*(p-1)
      End=PCount*p
      print "%d:[%d to %d]"%(p,Start,End)
      Dim=[Start,End]
      comm.send(Dim,dest=p,tag=1)
      comm.Send(Adata[Start:End], dest=p, tag=2)
      comm.Send(Bdata[Start:End], dest=p, tag=2)
   for p in range(1,size):
      Start=PCount*(p-1)
      End=PCount*p
      comm.Recv(Cdata[Start:End], source=p, tag=3)
   print Cdata
else:
   
   Dim = comm.recv(source=0, tag=1)
   
   print "Rank %d: Recived[%d to %d]"%(rank,Dim[0],Dim[1])
   Adata = numpy.empty(Dim[1]-Dim[0], dtype='i')
   Bdata = numpy.empty(Dim[1]-Dim[0], dtype='i')
   comm.Recv(Adata, source=0, tag=2)
   comm.Recv(Bdata, source=0, tag=2)
   Cdata=numpy.add(Adata,Bdata)
   comm.Send(Cdata, dest=0, tag=3)

Accepting any message from any source

status=MPI.status()
data=comm.recv(source=MPI.ANY_SOURCE,tag=MPI.ANY_TAG,status=status)
source=status.Get_source()
tag=status.Get_tag()

Computing pi - Serial Version

import time
def Pi(num_steps):
    start = time.time()
    step = 1.0/num_steps
    sum = 0
    for i in xrange(num_steps):        
        x= (i+0.5)*step         
        sum = sum + 4.0/(1.0+x*x)
    pi = step * sum
    end =time.time()
    print "Pi with %d steps is %f in %f secs" %(num_steps, pi, end-start)

if __name__ == '__main__':
    Pi(100000000)

Computing pi - MPI Version

import time
from mpi4py import MPI

def Pi(num_steps):
    comm = MPI.COMM_WORLD
    rank = comm.Get_rank()
    size = comm.Get_size()
    Proc_num_steps=num_steps/size
    Start=rank*Proc_num_steps
    End=(rank+1)*Proc_num_steps
    print "%d: [%d,%d]"%(rank,Start,End)
    step = 1.0/num_steps
    sum = 0
    for i in xrange(Start,End):
        x= (i+0.5)*step
        sum = sum + 4.0/(1.0+x*x)

    return sum


if __name__ == '__main__':
    num_steps=100000000
    start = time.time()
    localsum=Pi(num_steps)
    
    localpi = localsum/num_steps
    comm = MPI.COMM_WORLD
    rank = comm.Get_rank()
    pi = comm.reduce(localpi, op=MPI.SUM, root=0)
    end =time.time()
    if rank==0:
        print "Pi with %d steps is %f in %f secs" %(num_steps, pi, end-start)

Introduction to NeCTAR Cloud