= Parallel Programming = * Some concepts and methods * Threading vs. Multiprocessing * ParallelPython * Processing package * Other approaches * clustering, multi nodes, ... * Loadbalancing, scheduling As a simple application we use a program, which calculates the sum of prime numbers below a given integer n. We paralize over n in {100000,100100, ....}. As "refernce code" we take an implementation in C : {{{#!python /* File: sum_primes_seq.c Author: Heinrich Widmann (based on python version of Vitalii Vanovschi from Parallel Python Software: http://www.parallelpython.com ) Desc: This program demonstrates sequential computation implemented as C code It calculates the sum of prime numbers below a given integer */ #include int main(int argc, char *argv[]) { clock_t tot_start, tot_end; double tot_time_spent, time_spent; int sum, n, i ; int j, njobs; njobs=8; if (argc > 1) { njobs=atoi( (argv[1])); } printf("Start processing with %d jobs :\n",njobs); tot_start = clock(); // Run njobs jobs for (j = 0; j < njobs; j++) { n=100000+(j*100); sum=sum_primes(n); } tot_end = clock(); tot_time_spent = (double)(tot_end - tot_start) / CLOCKS_PER_SEC; printf("Time elapsed: %12.4f s \n", tot_time_spent); } int sum_primes(unsigned long int n) { // Calculates sum of all primes below given integer n int i; int sum; clock_t start, end; sum = 0; start = clock(); for (i = 0; i < n; i++) { if (isPrime(i)) { sum += i; } } end = clock(); // printf("Sum of primes below %lu is %lu and lasted %8.4f seconds \n", n, sum, (double)(end - start) / CLOCKS_PER_SEC); return sum; } int isPrime(int num) { int x; if (num < 2) { return 0; } for (x = 2; x*x < num; ++x) { if (num % x == 0) { return 0; } } return 1; } }}} Here the code of the ''sequential'' implementation in python : (Hint by Alex : Avoid loops as below (while i < max ...) and replace by an array ... !!!) {{{#!python #!/usr/bin/python # File: sum_primes_seq.py # Author: Heinrich Widmann (based on parallel version of Vitalii Vanovschi # from Parallel Python Software: http://www.parallelpython.com ) # Description: This program demonstrates sequential computation and # calculates the sum of prime numbers below a given integer import math, sys, time def isprime(n): """Returns True if n is prime and False otherwise""" if not isinstance(n, int): raise TypeError("argument passed to is_prime is not of 'int' type") if n < 2: return False if n == 2: return True max = int(math.ceil(math.sqrt(n))) i = 2 while i <= max: if n % i == 0: return False i += 1 return True def sum_primes(n): """Calculates sum of all primes below given integer n""" return sum([x for x in xrange(2,n) if isprime(x)]) print """Usage: python sum_primes_seq.py [njobs] njobs are the number of jobs (calculate sum of primes up to N=10000+(njobs*100)) excecuted """ if len(sys.argv) > 1: njobs = int(sys.argv[1]) else: njobs = 8 result = sum_primes(100) print "Sum of primes below 100 is", result start_time = time.time() # The following submits njobs jobs and then retrieves the results for i in xrange(njobs): input=10000+i*100 job_start_time = time.time() print "Sum of primes below", input, "is", sum_primes(input), "and lasts", time.time() - job_start_time, "s" print "Time elapsed: ", time.time() - start_time, "s" }}} = Parallel code = Only the differences to the sequential code above is listed : == Threading == {{{#!python #!/usr/bin/python # File: sum_primes_threads.py # Author: Heinrich Widmann # Desc: This program demonstrates parallel computations with python threading # It calculates the sum of prime numbers below a given integer using threads .. import math, sys, time import threading def isprime(n): ... class SumPrimesThread(threading.Thread): def __init__(self,n): threading.Thread.__init__(self) self.n=n self.psum=0 def run(self): self.psum=sum([x for x in xrange(2,self.n) if isprime(x)]) ## print "Sum of primes below " + str(self.n) + " is", self.psum print """Usage: ./sum_primes_threads.py [njobs] """ if len(sys.argv) > 1: njobs = int(sys.argv[1]) else: njobs = 8 print "Starting threading with", njobs, "jobs" # Retrieves the result calculated by job1 # The value of job1() is the same as sum_primes(100) # If the job has not been finished yet, execution will wait here until result is available thread0 = SumPrimesThread(100) thread0.start() # This actually causes the thread to run thread0.join() # This waits until the thread has completed result=thread0.psum print "Sum of primes below 100 is", result start_time = time.time() threads=[] for i in xrange(njobs): input=100000+i*100 threads.insert(i, SumPrimesThread(input)) threads[i].start() # This actually causes the thread to run ## threads[i].join() # This waits until the thread has completed --> test with and without join after each thread !! print "Time elapsed before joined: ", time.time() - start_time, "s" for j in xrange(i): threads[j].join() print "Time elapsed after joined: ", time.time() - start_time, "s" }}} == Multiprocessing == {{{#!python #!/usr/bin/python # File: sum_primes_mprocessing.py # Author: Heinrich Widmann # Desc: This program demonstrates parallel computations with python processing package # It calculates the sum of prime numbers below a given integer in parallel import math, sys, time from multiprocessing import Pool def isprime(n): ............ def sum_primes(n): """Calculates sum of all primes below given integer n""" return sum([x for x in xrange(2,n) if isprime(x)]) print """Usage: python sum_primes_processing.py """ if len(sys.argv) > 1: njobs = int(sys.argv[1]) else: njobs = 8 print "Starting multiprocessing with", njobs, "jobs" start_time = time.time() p=Pool(njobs) f_inputs = [] for i in xrange(njobs): f_inputs.append(100000+i*100) p.map(sum_primes, f_inputs) print "Time elapsed: ", time.time() - start_time, "s" }}} == ParallelPython == As argument can the number of CPUs speciied, e.g to run on two CPUs : $ sum_primes_pp.py 2 {{{#!python #!/usr/bin/python # File: sum_primes.py # Author: Vitalii Vanovschi # Modified by : Heinrich Widmann # Desc: This program demonstrates parallel computations with pp module # It calculates the sum of prime numbers below a given integer in parallel # Parallel Python Software: http://www.parallelpython.com import math, sys, time import pp def isprime(n): ..... def sum_primes(n): """Calculates sum of all primes below given integer n""" return sum([x for x in xrange(2,n) if isprime(x)]) print """Usage: python sum_primes.py [njobs] [ncpus] [njobs] - the number of jobs to run in parallel, if omitted it will be set to 8 [ncpus] - the number of workers (CPUs) used, if omitted it will be set to the number of processors in the system """ # tuple of all parallel python servers to connect with ppservers = () #ppservers = ("10.0.0.1",) if len(sys.argv) > 1: njobs = int(sys.argv[1]) else: njobs = 8 if len(sys.argv) > 2: ncpus = int(sys.argv[2]) # Creates jobserver with ncpus workers job_server = pp.Server(ncpus, ppservers=ppservers) else: # Creates jobserver with automatically detected number of workers job_server = pp.Server(ppservers=ppservers) print "Starting pp processing with", njobs, "jobs and", job_server.get_ncpus(), "workers" start_time = time.time() # The following submits njobs jobs and then retrieves the results for i in xrange(njobs): input=100000+i*100 job = job_server.submit(sum_primes,(input,), (isprime,), ("math",)) job() ## print "Sum of primes below", input, "is", job() print "Time elapsed: ", time.time() - start_time, "s" job_server.print_stats() }}} == Cython == The Cython code differs from Python, but is still "Python similar" {{{#!python import time from cython.parallel import prange cimport cython from libc.math cimport sqrt from libc.math cimport ceil @cython.boundscheck(False) @cython.cdivision(True) cdef unsigned int isprime(unsigned int n) nogil: if n < 2: return 0 if n == 2: return 1 cdef unsigned int max max = int(ceil(sqrt(n))) cdef unsigned int i i = 2 while i <= max: if n % i == 0: return 0 i += 1 return 1 @cython.boundscheck(False) @cython.wraparound(False) cdef unsigned int sum_primes(unsigned int n) nogil: cdef unsigned int x cdef unsigned int max max = 0 for x in xrange(2, n): if isprime(x) and x > max: max = x return max def run(unsigned int n, unsigned int i1, unsigned int i2): cdef double start_time start_time = time.time() cdef unsigned int i for i in prange(n, nogil=True): ## job_start_time = time.time() sum_primes(i1 + i2*100) ##print "Sum of primes below", i1 + i2*100, "is", sum_primes(i1 + i2*100), "and lasts", time.time() - job_start_time, "s" ##print "JJJ" ## , job_start_time, "s" print "Time elapsed: ", time.time() - start_time, "s" }}} The Cython code sum_primes_cython.pyx must be converted to C and C code must be compiled. This is done by $ python setup_prim_cython.py build_ext –inplace whereby setup_prim_cython.py is {{{#!python from distutils.core import setup from distutils.extension import Extension from Cython.Distutils import build_ext ext_module = Extension( "sum_primes_cython", ["sum_primes_cython.pyx"], extra_compile_args=['-fopenmp'], extra_link_args=['-fopenmp'], ) setup( name = 'Sum primes test', cmdclass = {'build_ext': build_ext}, ext_modules = [ext_module], ) }}} Let it run by $ python run_sum_primes_cython.py [njobs] whereby run_sum_primes_cython.py is : {{{#!python from sum_primes_cython import run import sys if len(sys.argv) > 1: njobs = int(sys.argv[1]) else: njobs = 8 print "Starting pp with", njobs, "jobs" run(njobs, 100000, 100) }}}