| 
  
   Size: 2287 
  
  Comment:  
 | 
    ← Revision 8 as of 2012-11-09 20:22:45  ⇥ 
  Size: 10741 
  
  Comment:  
 | 
| Deletions are marked like this. | Additions are marked like this. | 
| Line 1: | Line 1: | 
| = Parallel Programming =   Test code : | 
= Parallel Programming = | 
| Line 15: | Line 12: | 
| calculates the sum of prime numbers below a given integer in parallel  Here the code of the sequential implementation in python :  | 
calculates the sum of prime numbers below a given integer n. We paralize over n in {100000,100100, ....}. As "refernce code" we take an implementation in C : {{{#!python /* File: sum_primes_seq.c Author: Heinrich Widmann (based on python version of Vitalii Vanovschi from Parallel Python Software: http://www.parallelpython.com ) Desc: This program demonstrates sequential computation implemented as C code It calculates the sum of prime numbers below a given integer */ #include <time.h> int main(int argc, char *argv[]) { clock_t tot_start, tot_end; double tot_time_spent, time_spent; int sum, n, i ; int j, njobs; njobs=8; if (argc > 1) { njobs=atoi( (argv[1])); } printf("Start processing with %d jobs :\n",njobs); tot_start = clock(); // Run njobs jobs for (j = 0; j < njobs; j++) { n=100000+(j*100); sum=sum_primes(n); } tot_end = clock(); tot_time_spent = (double)(tot_end - tot_start) / CLOCKS_PER_SEC; printf("Time elapsed: %12.4f s \n", tot_time_spent); } int sum_primes(unsigned long int n) { // Calculates sum of all primes below given integer n int i; int sum; clock_t start, end; sum = 0; start = clock(); for (i = 0; i < n; i++) { if (isPrime(i)) { sum += i; } } end = clock(); // printf("Sum of primes below %lu is %lu and lasted %8.4f seconds \n", n, sum, (double)(end - start) / CLOCKS_PER_SEC); return sum; } int isPrime(int num) { int x; if (num < 2) { return 0; } for (x = 2; x*x < num; ++x) { if (num % x == 0) { return 0; } } return 1; } }}} Here the code of the ''sequential'' implementation in python : (Hint by Alex : Avoid loops as below (while i < max ...) and replace by an array ... !!!)  | 
| Line 72: | Line 142: | 
| ... Parllel code will follow ... ... May be I will find time to show a more realistic application with data processing ... Precip data you need for the exercise: [[https://wiki.zmaw.de/lehre/PythonCourse/PythonLES/Pandas?action=AttachFile&do=get&target=precip2.tar.gz | precip.tar.gz]]  | 
= Parallel code = Only the differences to the sequential code above is listed : == Threading == {{{#!python #!/usr/bin/python # File: sum_primes_threads.py # Author: Heinrich Widmann # Desc: This program demonstrates parallel computations with python threading # It calculates the sum of prime numbers below a given integer using threads .. import math, sys, time import threading def isprime(n): ... class SumPrimesThread(threading.Thread): def __init__(self,n): threading.Thread.__init__(self) self.n=n self.psum=0 def run(self): self.psum=sum([x for x in xrange(2,self.n) if isprime(x)]) ## print "Sum of primes below " + str(self.n) + " is", self.psum print """Usage: ./sum_primes_threads.py [njobs] """ if len(sys.argv) > 1: njobs = int(sys.argv[1]) else: njobs = 8 print "Starting threading with", njobs, "jobs" # Retrieves the result calculated by job1 # The value of job1() is the same as sum_primes(100) # If the job has not been finished yet, execution will wait here until result is available thread0 = SumPrimesThread(100) thread0.start() # This actually causes the thread to run thread0.join() # This waits until the thread has completed result=thread0.psum print "Sum of primes below 100 is", result start_time = time.time() threads=[] for i in xrange(njobs): input=100000+i*100 threads.insert(i, SumPrimesThread(input)) threads[i].start() # This actually causes the thread to run ## threads[i].join() # This waits until the thread has completed --> test with and without join after each thread !! print "Time elapsed before joined: ", time.time() - start_time, "s" for j in xrange(i): threads[j].join() print "Time elapsed after joined: ", time.time() - start_time, "s" }}} == Multiprocessing == {{{#!python #!/usr/bin/python # File: sum_primes_mprocessing.py # Author: Heinrich Widmann # Desc: This program demonstrates parallel computations with python processing package # It calculates the sum of prime numbers below a given integer in parallel import math, sys, time from multiprocessing import Pool def isprime(n): ............ def sum_primes(n): """Calculates sum of all primes below given integer n""" return sum([x for x in xrange(2,n) if isprime(x)]) print """Usage: python sum_primes_processing.py """ if len(sys.argv) > 1: njobs = int(sys.argv[1]) else: njobs = 8 print "Starting multiprocessing with", njobs, "jobs" start_time = time.time() p=Pool(njobs) f_inputs = [] for i in xrange(njobs): f_inputs.append(100000+i*100) p.map(sum_primes, f_inputs) print "Time elapsed: ", time.time() - start_time, "s" }}} == ParallelPython == As argument can the number of CPUs speciied, e.g to run on two CPUs : $ sum_primes_pp.py 2 {{{#!python #!/usr/bin/python # File: sum_primes.py # Author: Vitalii Vanovschi # Modified by : Heinrich Widmann # Desc: This program demonstrates parallel computations with pp module # It calculates the sum of prime numbers below a given integer in parallel # Parallel Python Software: http://www.parallelpython.com import math, sys, time import pp def isprime(n): ..... def sum_primes(n): """Calculates sum of all primes below given integer n""" return sum([x for x in xrange(2,n) if isprime(x)]) print """Usage: python sum_primes.py [njobs] [ncpus] [njobs] - the number of jobs to run in parallel, if omitted it will be set to 8 [ncpus] - the number of workers (CPUs) used, if omitted it will be set to the number of processors in the system """ # tuple of all parallel python servers to connect with ppservers = () #ppservers = ("10.0.0.1",) if len(sys.argv) > 1: njobs = int(sys.argv[1]) else: njobs = 8 if len(sys.argv) > 2: ncpus = int(sys.argv[2]) # Creates jobserver with ncpus workers job_server = pp.Server(ncpus, ppservers=ppservers) else: # Creates jobserver with automatically detected number of workers job_server = pp.Server(ppservers=ppservers) print "Starting pp processing with", njobs, "jobs and", job_server.get_ncpus(), "workers" start_time = time.time() # The following submits njobs jobs and then retrieves the results for i in xrange(njobs): input=100000+i*100 job = job_server.submit(sum_primes,(input,), (isprime,), ("math",)) job() ## print "Sum of primes below", input, "is", job() print "Time elapsed: ", time.time() - start_time, "s" job_server.print_stats() }}} == Cython == The Cython code differs from Python, but is still "Python similar" {{{#!python import time from cython.parallel import prange cimport cython from libc.math cimport sqrt from libc.math cimport ceil @cython.boundscheck(False) @cython.cdivision(True) cdef unsigned int isprime(unsigned int n) nogil: if n < 2: return 0 if n == 2: return 1 cdef unsigned int max max = int(ceil(sqrt(n))) cdef unsigned int i i = 2 while i <= max: if n % i == 0: return 0 i += 1 return 1 @cython.boundscheck(False) @cython.wraparound(False) cdef unsigned int sum_primes(unsigned int n) nogil: cdef unsigned int x cdef unsigned int max max = 0 for x in xrange(2, n): if isprime(x) and x > max: max = x return max def run(unsigned int n, unsigned int i1, unsigned int i2): cdef double start_time start_time = time.time() cdef unsigned int i for i in prange(n, nogil=True): ## job_start_time = time.time() sum_primes(i1 + i2*100) ##print "Sum of primes below", i1 + i2*100, "is", sum_primes(i1 + i2*100), "and lasts", time.time() - job_start_time, "s" ##print "JJJ" ## , job_start_time, "s" print "Time elapsed: ", time.time() - start_time, "s" }}} The Cython code sum_primes_cython.pyx must be converted to C and C code must be compiled. This is done by $ python setup_prim_cython.py build_ext –inplace whereby setup_prim_cython.py is {{{#!python from distutils.core import setup from distutils.extension import Extension from Cython.Distutils import build_ext ext_module = Extension( "sum_primes_cython", ["sum_primes_cython.pyx"], extra_compile_args=['-fopenmp'], extra_link_args=['-fopenmp'], ) setup( name = 'Sum primes test', cmdclass = {'build_ext': build_ext}, ext_modules = [ext_module], ) }}} Let it run by $ python run_sum_primes_cython.py [njobs] whereby run_sum_primes_cython.py is : {{{#!python from sum_primes_cython import run import sys if len(sys.argv) > 1: njobs = int(sys.argv[1]) else: njobs = 8 print "Starting pp with", njobs, "jobs" run(njobs, 100000, 100) }}}  | 
Parallel Programming
- Some concepts and methods
 - Threading vs. Multiprocessing
 - Processing package
 - Other approaches
 - clustering, multi nodes, ...
 - Loadbalancing, scheduling
 
As a simple application we use a program, which calculates the sum of prime numbers below a given integer n. We paralize over n in {100000,100100, ....}.
As "refernce code" we take an implementation in C :
   1 /* File: sum_primes_seq.c
   2  Author: Heinrich Widmann (based on python version of Vitalii Vanovschi
   3   from Parallel Python Software: http://www.parallelpython.com )
   4  Desc: This program demonstrates sequential computation implemented
   5     as C code
   6  It calculates the sum of prime numbers below a given integer 
   7 */
   8 
   9 #include <time.h>
  10 int main(int argc, char *argv[]) {
  11     clock_t tot_start, tot_end;
  12     double tot_time_spent, time_spent;
  13     int sum, n, i ;
  14     int j, njobs;
  15 
  16     njobs=8;
  17     if (argc > 1) {
  18       njobs=atoi( (argv[1]));
  19     }
  20 
  21     printf("Start processing with %d jobs :\n",njobs); 
  22     tot_start = clock();
  23 
  24     // Run njobs jobs
  25     for (j = 0; j < njobs; j++) {
  26       n=100000+(j*100);
  27       sum=sum_primes(n);
  28     }
  29     tot_end = clock();
  30     tot_time_spent = (double)(tot_end - tot_start) / CLOCKS_PER_SEC;
  31     printf("Time elapsed: %12.4f s \n", tot_time_spent);
  32 }
  33 
  34 
  35 int sum_primes(unsigned long int n) {
  36     // Calculates sum of all primes below given integer n
  37     int i;
  38     int sum;
  39     clock_t start, end;
  40 
  41     sum = 0;
  42     start = clock();
  43     for (i = 0; i < n; i++) {
  44       if (isPrime(i)) {
  45         sum += i;
  46       }
  47     }
  48     end = clock();
  49     // printf("Sum of primes below %lu is %lu and lasted %8.4f seconds \n", n, sum, (double)(end - start) / CLOCKS_PER_SEC);
  50     return sum;  
  51 }
  52 
  53 int isPrime(int num) {
  54     int x;
  55     if (num < 2) {
  56         return 0;
  57     }
  58 
  59     for (x = 2; x*x < num; ++x) {
  60         if (num % x == 0) {
  61             return 0;
  62         }
  63     }
  64     return 1;
  65 }
Here the code of the sequential implementation in python : (Hint by Alex : Avoid loops as below (while i < max ...) and replace by an array ... !!!)
   1 #!/usr/bin/python
   2 # File: sum_primes_seq.py
   3 # Author: Heinrich Widmann (based on parallel version of Vitalii Vanovschi
   4 #  from Parallel Python Software: http://www.parallelpython.com )
   5 # Description: This program demonstrates sequential computation and
   6 #  calculates the sum of prime numbers below a given integer
   7 
   8 import math, sys, time
   9 
  10 def isprime(n):
  11     """Returns True if n is prime and False otherwise"""
  12     if not isinstance(n, int):
  13         raise TypeError("argument passed to is_prime is not of 'int' type")
  14     if n < 2:
  15         return False
  16     if n == 2:
  17         return True
  18     max = int(math.ceil(math.sqrt(n)))
  19     i = 2
  20     while i <= max:
  21         if n % i == 0:
  22             return False
  23         i += 1
  24     return True
  25 
  26 def sum_primes(n):
  27     """Calculates sum of all primes below given integer n"""
  28     return sum([x for x in xrange(2,n) if isprime(x)])
  29 
  30 print """Usage: python sum_primes_seq.py [njobs]
  31   njobs are the number of jobs (calculate sum of primes up to N=10000+(njobs*100)) excecuted
  32 """
  33 
  34 if len(sys.argv) > 1:
  35     njobs = int(sys.argv[1])
  36 else:
  37     njobs = 8
  38 
  39 result = sum_primes(100)
  40 
  41 print "Sum of primes below 100 is", result
  42 
  43 start_time = time.time()
  44 
  45 # The following submits njobs jobs and then retrieves the results
  46 for i in xrange(njobs):
  47     input=10000+i*100
  48     job_start_time = time.time()
  49     print "Sum of primes below", input, "is", sum_primes(input), "and lasts", time.time() - job_start_time, "s"
  50 print "Time elapsed: ", time.time() - start_time, "s"
Parallel code
Only the differences to the sequential code above is listed :
Threading
   1 #!/usr/bin/python
   2 # File: sum_primes_threads.py
   3 # Author: Heinrich Widmann
   4 # Desc: This program demonstrates parallel computations with python threading
   5 # It calculates the sum of prime numbers below a given integer using threads ..
   6 
   7 import math, sys, time
   8 import threading
   9 
  10 def isprime(n):
  11     ...
  12 
  13 class SumPrimesThread(threading.Thread):
  14      def __init__(self,n):
  15          threading.Thread.__init__(self)
  16          self.n=n
  17          self.psum=0
  18 
  19      def run(self):
  20          self.psum=sum([x for x in xrange(2,self.n) if isprime(x)])
  21          ## print "Sum of primes below " + str(self.n) + " is", self.psum
  22 
  23 print """Usage: ./sum_primes_threads.py [njobs]
  24 """
  25 
  26 if len(sys.argv) > 1:
  27     njobs = int(sys.argv[1])
  28 else:
  29     njobs = 8
  30 
  31 print "Starting threading with", njobs, "jobs"
  32 
  33 
  34 # Retrieves the result calculated by job1
  35 # The value of job1() is the same as sum_primes(100)
  36 # If the job has not been finished yet, execution will wait here until result is available
  37 thread0 = SumPrimesThread(100)
  38 thread0.start() # This actually causes the thread to run
  39 thread0.join()  # This waits until the thread has completed
  40 result=thread0.psum
  41 print "Sum of primes below 100 is", result
  42 
  43 start_time = time.time()
  44 threads=[]
  45 for i in xrange(njobs):
  46     input=100000+i*100
  47     threads.insert(i, SumPrimesThread(input))
  48     threads[i].start() # This actually causes the thread to run
  49     ##    threads[i].join()  # This waits until the thread has completed --> test with and without join after each thread !!   
  50 
  51 print "Time elapsed before joined: ", time.time() - start_time, "s"
  52 
  53 for j in xrange(i): 
  54     threads[j].join()              
  55  
  56 print "Time elapsed after joined: ", time.time() - start_time, "s"
Multiprocessing
   1 #!/usr/bin/python
   2 # File: sum_primes_mprocessing.py
   3 # Author: Heinrich Widmann
   4 # Desc: This program demonstrates parallel computations with python processing package
   5 # It calculates the sum of prime numbers below a given integer in parallel
   6 
   7 import math, sys, time
   8 from multiprocessing import Pool
   9 
  10 def isprime(n):
  11    ............
  12 
  13 def sum_primes(n):
  14     """Calculates sum of all primes below given integer n"""
  15     return sum([x for x in xrange(2,n) if isprime(x)])
  16          
  17 print """Usage: python sum_primes_processing.py
  18 """
  19 
  20 if len(sys.argv) > 1:
  21     njobs = int(sys.argv[1])
  22 else:
  23     njobs = 8
  24 
  25 print "Starting multiprocessing with", njobs, "jobs"
  26 
  27 start_time = time.time()
  28 
  29 p=Pool(njobs)
  30 f_inputs = []
  31 for i in xrange(njobs):
  32     f_inputs.append(100000+i*100)
  33 
  34 p.map(sum_primes, f_inputs)
  35  
  36 print "Time elapsed: ", time.time() - start_time, "s"
ParallelPython
As argument can the number of CPUs speciied, e.g to run on two CPUs :
- $ sum_primes_pp.py 2
 
   1 #!/usr/bin/python
   2 # File: sum_primes.py
   3 # Author: Vitalii Vanovschi
   4 # Modified by : Heinrich Widmann
   5 # Desc: This program demonstrates parallel computations with pp module
   6 # It calculates the sum of prime numbers below a given integer in parallel
   7 # Parallel Python Software: http://www.parallelpython.com
   8 
   9 import math, sys, time
  10 import pp
  11 
  12 def isprime(n):
  13    .....
  14 
  15 def sum_primes(n):
  16     """Calculates sum of all primes below given integer n"""
  17     return sum([x for x in xrange(2,n) if isprime(x)])
  18 
  19 print """Usage: python sum_primes.py [njobs] [ncpus] 
  20     [njobs] - the number of jobs to run in parallel, 
  21     if omitted it will be set to 8
  22     [ncpus] - the number of workers (CPUs) used, 
  23     if omitted it will be set to the number of processors in the system
  24 """
  25 
  26 # tuple of all parallel python servers to connect with
  27 ppservers = ()
  28 #ppservers = ("10.0.0.1",)
  29 
  30 if len(sys.argv) > 1:
  31     njobs = int(sys.argv[1])
  32 else:
  33     njobs = 8
  34 
  35 if len(sys.argv) > 2:
  36     ncpus = int(sys.argv[2])
  37     # Creates jobserver with ncpus workers
  38     job_server = pp.Server(ncpus, ppservers=ppservers)
  39 else:
  40     # Creates jobserver with automatically detected number of workers
  41     job_server = pp.Server(ppservers=ppservers)
  42 
  43 print "Starting pp processing with", njobs, "jobs and", job_server.get_ncpus(), "workers"
  44 
  45 start_time = time.time()
  46 
  47 # The following submits njobs jobs and then retrieves the results
  48 for i in xrange(njobs):
  49     input=100000+i*100
  50     job = job_server.submit(sum_primes,(input,), (isprime,), ("math",))
  51     job()
  52     ## print "Sum of primes below", input, "is", job()
  53 
  54 print "Time elapsed: ", time.time() - start_time, "s"
  55 job_server.print_stats()
Cython
The Cython code differs from Python, but is still "Python similar"
   1 import time
   2 
   3 from cython.parallel import prange
   4 cimport cython
   5 
   6 from libc.math cimport sqrt
   7 from libc.math cimport ceil
   8 
   9 @cython.boundscheck(False)
  10 @cython.cdivision(True)
  11 cdef unsigned int isprime(unsigned int n) nogil:
  12     if n < 2:
  13         return 0
  14     if n == 2:
  15         return 1
  16     cdef unsigned int max
  17     max = int(ceil(sqrt(n)))
  18     cdef unsigned int i
  19     i = 2
  20     while i <= max:
  21         if n % i == 0:
  22             return 0
  23         i += 1
  24     return 1
  25 
  26 @cython.boundscheck(False)
  27 @cython.wraparound(False)
  28 cdef unsigned int sum_primes(unsigned int n) nogil:
  29     cdef unsigned int x
  30     cdef unsigned int max
  31     max = 0
  32     for x in xrange(2, n):
  33         if isprime(x) and x > max:
  34             max = x
  35     return max
  36 
  37 
  38 def run(unsigned int n, unsigned int i1, unsigned int i2):
  39     cdef double start_time
  40     start_time = time.time()
  41 
  42     cdef unsigned int i
  43     for i in prange(n, nogil=True):
  44         ## job_start_time = time.time()
  45         sum_primes(i1 + i2*100)
  46         ##print "Sum of primes below", i1 + i2*100, "is", sum_primes(i1 + i2*100), "and lasts", time.time() - job_start_time, "s"
  47         ##print "JJJ" ## , job_start_time, "s"
  48     print "Time elapsed: ", time.time() - start_time, "s"
The Cython code sum_primes_cython.pyx must be converted to C and C code must be compiled. This is done by
- $ python setup_prim_cython.py build_ext –inplace
 
whereby setup_prim_cython.py is
   1 from distutils.core import setup
   2 from distutils.extension import Extension
   3 from Cython.Distutils import build_ext
   4 
   5 ext_module = Extension(
   6     "sum_primes_cython",
   7     ["sum_primes_cython.pyx"],
   8     extra_compile_args=['-fopenmp'],
   9     extra_link_args=['-fopenmp'],
  10 )
  11 
  12 setup(
  13     name = 'Sum primes test',
  14     cmdclass = {'build_ext': build_ext},
  15     ext_modules = [ext_module],
  16 )
Let it run by $ python run_sum_primes_cython.py [njobs]
whereby run_sum_primes_cython.py is :
