Parallel Programming

As a simple application we use a program, which calculates the sum of prime numbers below a given integer n. We paralize over n in {100000,100100, ....}.

As "refernce code" we take an implementation in C :

   1 /* File: sum_primes_seq.c
   2  Author: Heinrich Widmann (based on python version of Vitalii Vanovschi
   3   from Parallel Python Software: http://www.parallelpython.com )
   4  Desc: This program demonstrates sequential computation implemented
   5     as C code
   6  It calculates the sum of prime numbers below a given integer 
   7 */
   8 
   9 #include <time.h>
  10 int main(int argc, char *argv[]) {
  11     clock_t tot_start, tot_end;
  12     double tot_time_spent, time_spent;
  13     int sum, n, i ;
  14     int j, njobs;
  15 
  16     njobs=8;
  17     if (argc > 1) {
  18       njobs=atoi( (argv[1]));
  19     }
  20 
  21     printf("Start processing with %d jobs :\n",njobs); 
  22     tot_start = clock();
  23 
  24     // Run njobs jobs
  25     for (j = 0; j < njobs; j++) {
  26       n=100000+(j*100);
  27       sum=sum_primes(n);
  28     }
  29     tot_end = clock();
  30     tot_time_spent = (double)(tot_end - tot_start) / CLOCKS_PER_SEC;
  31     printf("Time elapsed: %12.4f s \n", tot_time_spent);
  32 }
  33 
  34 
  35 int sum_primes(unsigned long int n) {
  36     // Calculates sum of all primes below given integer n
  37     int i;
  38     int sum;
  39     clock_t start, end;
  40 
  41     sum = 0;
  42     start = clock();
  43     for (i = 0; i < n; i++) {
  44       if (isPrime(i)) {
  45         sum += i;
  46       }
  47     }
  48     end = clock();
  49     // printf("Sum of primes below %lu is %lu and lasted %8.4f seconds \n", n, sum, (double)(end - start) / CLOCKS_PER_SEC);
  50     return sum;  
  51 }
  52 
  53 int isPrime(int num) {
  54     int x;
  55     if (num < 2) {
  56         return 0;
  57     }
  58 
  59     for (x = 2; x*x < num; ++x) {
  60         if (num % x == 0) {
  61             return 0;
  62         }
  63     }
  64     return 1;
  65 }

Here the code of the sequential implementation in python : (Hint by Alex : Avoid loops as below (while i < max ...) and replace by an array ... !!!)

   1 #!/usr/bin/python
   2 # File: sum_primes_seq.py
   3 # Author: Heinrich Widmann (based on parallel version of Vitalii Vanovschi
   4 #  from Parallel Python Software: http://www.parallelpython.com )
   5 # Description: This program demonstrates sequential computation and
   6 #  calculates the sum of prime numbers below a given integer
   7 
   8 import math, sys, time
   9 
  10 def isprime(n):
  11     """Returns True if n is prime and False otherwise"""
  12     if not isinstance(n, int):
  13         raise TypeError("argument passed to is_prime is not of 'int' type")
  14     if n < 2:
  15         return False
  16     if n == 2:
  17         return True
  18     max = int(math.ceil(math.sqrt(n)))
  19     i = 2
  20     while i <= max:
  21         if n % i == 0:
  22             return False
  23         i += 1
  24     return True
  25 
  26 def sum_primes(n):
  27     """Calculates sum of all primes below given integer n"""
  28     return sum([x for x in xrange(2,n) if isprime(x)])
  29 
  30 print """Usage: python sum_primes_seq.py [njobs]
  31   njobs are the number of jobs (calculate sum of primes up to N=10000+(njobs*100)) excecuted
  32 """
  33 
  34 if len(sys.argv) > 1:
  35     njobs = int(sys.argv[1])
  36 else:
  37     njobs = 8
  38 
  39 result = sum_primes(100)
  40 
  41 print "Sum of primes below 100 is", result
  42 
  43 start_time = time.time()
  44 
  45 # The following submits njobs jobs and then retrieves the results
  46 for i in xrange(njobs):
  47     input=10000+i*100
  48     job_start_time = time.time()
  49     print "Sum of primes below", input, "is", sum_primes(input), "and lasts", time.time() - job_start_time, "s"
  50 print "Time elapsed: ", time.time() - start_time, "s"

Parallel code

Only the differences to the sequential code above is listed :

Threading

   1 #!/usr/bin/python
   2 # File: sum_primes_threads.py
   3 # Author: Heinrich Widmann
   4 # Desc: This program demonstrates parallel computations with python threading
   5 # It calculates the sum of prime numbers below a given integer using threads ..
   6 
   7 import math, sys, time
   8 import threading
   9 
  10 def isprime(n):
  11     ...
  12 
  13 class SumPrimesThread(threading.Thread):
  14      def __init__(self,n):
  15          threading.Thread.__init__(self)
  16          self.n=n
  17          self.psum=0
  18 
  19      def run(self):
  20          self.psum=sum([x for x in xrange(2,self.n) if isprime(x)])
  21          ## print "Sum of primes below " + str(self.n) + " is", self.psum
  22 
  23 print """Usage: ./sum_primes_threads.py [njobs]
  24 """
  25 
  26 if len(sys.argv) > 1:
  27     njobs = int(sys.argv[1])
  28 else:
  29     njobs = 8
  30 
  31 print "Starting threading with", njobs, "jobs"
  32 
  33 
  34 # Retrieves the result calculated by job1
  35 # The value of job1() is the same as sum_primes(100)
  36 # If the job has not been finished yet, execution will wait here until result is available
  37 thread0 = SumPrimesThread(100)
  38 thread0.start() # This actually causes the thread to run
  39 thread0.join()  # This waits until the thread has completed
  40 result=thread0.psum
  41 print "Sum of primes below 100 is", result
  42 
  43 start_time = time.time()
  44 threads=[]
  45 for i in xrange(njobs):
  46     input=100000+i*100
  47     threads.insert(i, SumPrimesThread(input))
  48     threads[i].start() # This actually causes the thread to run
  49     ##    threads[i].join()  # This waits until the thread has completed --> test with and without join after each thread !!   
  50 
  51 print "Time elapsed before joined: ", time.time() - start_time, "s"
  52 
  53 for j in xrange(i): 
  54     threads[j].join()              
  55  
  56 print "Time elapsed after joined: ", time.time() - start_time, "s"

Multiprocessing

   1 #!/usr/bin/python
   2 # File: sum_primes_mprocessing.py
   3 # Author: Heinrich Widmann
   4 # Desc: This program demonstrates parallel computations with python processing package
   5 # It calculates the sum of prime numbers below a given integer in parallel
   6 
   7 import math, sys, time
   8 from multiprocessing import Pool
   9 
  10 def isprime(n):
  11    ............
  12 
  13 def sum_primes(n):
  14     """Calculates sum of all primes below given integer n"""
  15     return sum([x for x in xrange(2,n) if isprime(x)])
  16          
  17 print """Usage: python sum_primes_processing.py
  18 """
  19 
  20 if len(sys.argv) > 1:
  21     njobs = int(sys.argv[1])
  22 else:
  23     njobs = 8
  24 
  25 print "Starting multiprocessing with", njobs, "jobs"
  26 
  27 start_time = time.time()
  28 
  29 p=Pool(njobs)
  30 f_inputs = []
  31 for i in xrange(njobs):
  32     f_inputs.append(100000+i*100)
  33 
  34 p.map(sum_primes, f_inputs)
  35  
  36 print "Time elapsed: ", time.time() - start_time, "s"

ParallelPython

As argument can the number of CPUs speciied, e.g to run on two CPUs :

   1 #!/usr/bin/python
   2 # File: sum_primes.py
   3 # Author: Vitalii Vanovschi
   4 # Modified by : Heinrich Widmann
   5 # Desc: This program demonstrates parallel computations with pp module
   6 # It calculates the sum of prime numbers below a given integer in parallel
   7 # Parallel Python Software: http://www.parallelpython.com
   8 
   9 import math, sys, time
  10 import pp
  11 
  12 def isprime(n):
  13    .....
  14 
  15 def sum_primes(n):
  16     """Calculates sum of all primes below given integer n"""
  17     return sum([x for x in xrange(2,n) if isprime(x)])
  18 
  19 print """Usage: python sum_primes.py [njobs] [ncpus] 
  20     [njobs] - the number of jobs to run in parallel, 
  21     if omitted it will be set to 8
  22     [ncpus] - the number of workers (CPUs) used, 
  23     if omitted it will be set to the number of processors in the system
  24 """
  25 
  26 # tuple of all parallel python servers to connect with
  27 ppservers = ()
  28 #ppservers = ("10.0.0.1",)
  29 
  30 if len(sys.argv) > 1:
  31     njobs = int(sys.argv[1])
  32 else:
  33     njobs = 8
  34 
  35 if len(sys.argv) > 2:
  36     ncpus = int(sys.argv[2])
  37     # Creates jobserver with ncpus workers
  38     job_server = pp.Server(ncpus, ppservers=ppservers)
  39 else:
  40     # Creates jobserver with automatically detected number of workers
  41     job_server = pp.Server(ppservers=ppservers)
  42 
  43 print "Starting pp processing with", njobs, "jobs and", job_server.get_ncpus(), "workers"
  44 
  45 start_time = time.time()
  46 
  47 # The following submits njobs jobs and then retrieves the results
  48 for i in xrange(njobs):
  49     input=100000+i*100
  50     job = job_server.submit(sum_primes,(input,), (isprime,), ("math",))
  51     job()
  52     ## print "Sum of primes below", input, "is", job()
  53 
  54 print "Time elapsed: ", time.time() - start_time, "s"
  55 job_server.print_stats()

Cython

The Cython code differs from Python, but is still "Python similar"

   1 import time
   2 
   3 from cython.parallel import prange
   4 cimport cython
   5 
   6 from libc.math cimport sqrt
   7 from libc.math cimport ceil
   8 
   9 @cython.boundscheck(False)
  10 @cython.cdivision(True)
  11 cdef unsigned int isprime(unsigned int n) nogil:
  12     if n < 2:
  13         return 0
  14     if n == 2:
  15         return 1
  16     cdef unsigned int max
  17     max = int(ceil(sqrt(n)))
  18     cdef unsigned int i
  19     i = 2
  20     while i <= max:
  21         if n % i == 0:
  22             return 0
  23         i += 1
  24     return 1
  25 
  26 @cython.boundscheck(False)
  27 @cython.wraparound(False)
  28 cdef unsigned int sum_primes(unsigned int n) nogil:
  29     cdef unsigned int x
  30     cdef unsigned int max
  31     max = 0
  32     for x in xrange(2, n):
  33         if isprime(x) and x > max:
  34             max = x
  35     return max
  36 
  37 
  38 def run(unsigned int n, unsigned int i1, unsigned int i2):
  39     cdef double start_time
  40     start_time = time.time()
  41 
  42     cdef unsigned int i
  43     for i in prange(n, nogil=True):
  44         ## job_start_time = time.time()
  45         sum_primes(i1 + i2*100)
  46         ##print "Sum of primes below", i1 + i2*100, "is", sum_primes(i1 + i2*100), "and lasts", time.time() - job_start_time, "s"
  47         ##print "JJJ" ## , job_start_time, "s"
  48     print "Time elapsed: ", time.time() - start_time, "s"

The Cython code sum_primes_cython.pyx must be converted to C and C code must be compiled. This is done by

whereby setup_prim_cython.py is

   1 from distutils.core import setup
   2 from distutils.extension import Extension
   3 from Cython.Distutils import build_ext
   4 
   5 ext_module = Extension(
   6     "sum_primes_cython",
   7     ["sum_primes_cython.pyx"],
   8     extra_compile_args=['-fopenmp'],
   9     extra_link_args=['-fopenmp'],
  10 )
  11 
  12 setup(
  13     name = 'Sum primes test',
  14     cmdclass = {'build_ext': build_ext},
  15     ext_modules = [ext_module],
  16 )

Let it run by $ python run_sum_primes_cython.py [njobs]

whereby run_sum_primes_cython.py is :

   1 from sum_primes_cython import run
   2 import sys
   3 
   4 if len(sys.argv) > 1:
   5     njobs = int(sys.argv[1])
   6 else:
   7     njobs = 8
   8 
   9 print "Starting pp with", njobs, "jobs"
  10 
  11 run(njobs, 100000, 100) 

LehreWiki: PythonCourse/PythonLES/Parallel_Programming (last edited 2012-11-09 20:22:45 by HeinrichWidmann)