Parallel Programming
- Some concepts and methods
- Threading vs. Multiprocessing
- Processing package
- Other approaches
- clustering, multi nodes, ...
- Loadbalancing, scheduling
As a simple application we use a program, which calculates the sum of prime numbers below a given integer n. We paralize over n in {100000,100100, ....}.
As "refernce code" we take an implementation in C :
1 /* File: sum_primes_seq.c
2 Author: Heinrich Widmann (based on python version of Vitalii Vanovschi
3 from Parallel Python Software: http://www.parallelpython.com )
4 Desc: This program demonstrates sequential computation implemented
5 as C code
6 It calculates the sum of prime numbers below a given integer
7 */
8
9 #include <time.h>
10 int main(int argc, char *argv[]) {
11 clock_t tot_start, tot_end;
12 double tot_time_spent, time_spent;
13 int sum, n, i ;
14 int j, njobs;
15
16 njobs=8;
17 if (argc > 1) {
18 njobs=atoi( (argv[1]));
19 }
20
21 printf("Start processing with %d jobs :\n",njobs);
22 tot_start = clock();
23
24 // Run njobs jobs
25 for (j = 0; j < njobs; j++) {
26 n=100000+(j*100);
27 sum=sum_primes(n);
28 }
29 tot_end = clock();
30 tot_time_spent = (double)(tot_end - tot_start) / CLOCKS_PER_SEC;
31 printf("Time elapsed: %12.4f s \n", tot_time_spent);
32 }
33
34
35 int sum_primes(unsigned long int n) {
36 // Calculates sum of all primes below given integer n
37 int i;
38 int sum;
39 clock_t start, end;
40
41 sum = 0;
42 start = clock();
43 for (i = 0; i < n; i++) {
44 if (isPrime(i)) {
45 sum += i;
46 }
47 }
48 end = clock();
49 // printf("Sum of primes below %lu is %lu and lasted %8.4f seconds \n", n, sum, (double)(end - start) / CLOCKS_PER_SEC);
50 return sum;
51 }
52
53 int isPrime(int num) {
54 int x;
55 if (num < 2) {
56 return 0;
57 }
58
59 for (x = 2; x*x < num; ++x) {
60 if (num % x == 0) {
61 return 0;
62 }
63 }
64 return 1;
65 }
Here the code of the sequential implementation in python : (Hint by Alex : Avoid loops as below (while i < max ...) and replace by an array ... !!!)
1 #!/usr/bin/python
2 # File: sum_primes_seq.py
3 # Author: Heinrich Widmann (based on parallel version of Vitalii Vanovschi
4 # from Parallel Python Software: http://www.parallelpython.com )
5 # Description: This program demonstrates sequential computation and
6 # calculates the sum of prime numbers below a given integer
7
8 import math, sys, time
9
10 def isprime(n):
11 """Returns True if n is prime and False otherwise"""
12 if not isinstance(n, int):
13 raise TypeError("argument passed to is_prime is not of 'int' type")
14 if n < 2:
15 return False
16 if n == 2:
17 return True
18 max = int(math.ceil(math.sqrt(n)))
19 i = 2
20 while i <= max:
21 if n % i == 0:
22 return False
23 i += 1
24 return True
25
26 def sum_primes(n):
27 """Calculates sum of all primes below given integer n"""
28 return sum([x for x in xrange(2,n) if isprime(x)])
29
30 print """Usage: python sum_primes_seq.py [njobs]
31 njobs are the number of jobs (calculate sum of primes up to N=10000+(njobs*100)) excecuted
32 """
33
34 if len(sys.argv) > 1:
35 njobs = int(sys.argv[1])
36 else:
37 njobs = 8
38
39 result = sum_primes(100)
40
41 print "Sum of primes below 100 is", result
42
43 start_time = time.time()
44
45 # The following submits njobs jobs and then retrieves the results
46 for i in xrange(njobs):
47 input=10000+i*100
48 job_start_time = time.time()
49 print "Sum of primes below", input, "is", sum_primes(input), "and lasts", time.time() - job_start_time, "s"
50 print "Time elapsed: ", time.time() - start_time, "s"
Parallel code
Only the differences to the sequential code above is listed :
Threading
1 #!/usr/bin/python
2 # File: sum_primes_threads.py
3 # Author: Heinrich Widmann
4 # Desc: This program demonstrates parallel computations with python threading
5 # It calculates the sum of prime numbers below a given integer using threads ..
6
7 import math, sys, time
8 import threading
9
10 def isprime(n):
11 ...
12
13 class SumPrimesThread(threading.Thread):
14 def __init__(self,n):
15 threading.Thread.__init__(self)
16 self.n=n
17 self.psum=0
18
19 def run(self):
20 self.psum=sum([x for x in xrange(2,self.n) if isprime(x)])
21 ## print "Sum of primes below " + str(self.n) + " is", self.psum
22
23 print """Usage: ./sum_primes_threads.py [njobs]
24 """
25
26 if len(sys.argv) > 1:
27 njobs = int(sys.argv[1])
28 else:
29 njobs = 8
30
31 print "Starting threading with", njobs, "jobs"
32
33
34 # Retrieves the result calculated by job1
35 # The value of job1() is the same as sum_primes(100)
36 # If the job has not been finished yet, execution will wait here until result is available
37 thread0 = SumPrimesThread(100)
38 thread0.start() # This actually causes the thread to run
39 thread0.join() # This waits until the thread has completed
40 result=thread0.psum
41 print "Sum of primes below 100 is", result
42
43 start_time = time.time()
44 threads=[]
45 for i in xrange(njobs):
46 input=100000+i*100
47 threads.insert(i, SumPrimesThread(input))
48 threads[i].start() # This actually causes the thread to run
49 ## threads[i].join() # This waits until the thread has completed --> test with and without join after each thread !!
50
51 print "Time elapsed before joined: ", time.time() - start_time, "s"
52
53 for j in xrange(i):
54 threads[j].join()
55
56 print "Time elapsed after joined: ", time.time() - start_time, "s"
Multiprocessing
1 #!/usr/bin/python
2 # File: sum_primes_mprocessing.py
3 # Author: Heinrich Widmann
4 # Desc: This program demonstrates parallel computations with python processing package
5 # It calculates the sum of prime numbers below a given integer in parallel
6
7 import math, sys, time
8 from multiprocessing import Pool
9
10 def isprime(n):
11 ............
12
13 def sum_primes(n):
14 """Calculates sum of all primes below given integer n"""
15 return sum([x for x in xrange(2,n) if isprime(x)])
16
17 print """Usage: python sum_primes_processing.py
18 """
19
20 if len(sys.argv) > 1:
21 njobs = int(sys.argv[1])
22 else:
23 njobs = 8
24
25 print "Starting multiprocessing with", njobs, "jobs"
26
27 start_time = time.time()
28
29 p=Pool(njobs)
30 f_inputs = []
31 for i in xrange(njobs):
32 f_inputs.append(100000+i*100)
33
34 p.map(sum_primes, f_inputs)
35
36 print "Time elapsed: ", time.time() - start_time, "s"
ParallelPython
As argument can the number of CPUs speciied, e.g to run on two CPUs :
- $ sum_primes_pp.py 2
1 #!/usr/bin/python
2 # File: sum_primes.py
3 # Author: Vitalii Vanovschi
4 # Modified by : Heinrich Widmann
5 # Desc: This program demonstrates parallel computations with pp module
6 # It calculates the sum of prime numbers below a given integer in parallel
7 # Parallel Python Software: http://www.parallelpython.com
8
9 import math, sys, time
10 import pp
11
12 def isprime(n):
13 .....
14
15 def sum_primes(n):
16 """Calculates sum of all primes below given integer n"""
17 return sum([x for x in xrange(2,n) if isprime(x)])
18
19 print """Usage: python sum_primes.py [njobs] [ncpus]
20 [njobs] - the number of jobs to run in parallel,
21 if omitted it will be set to 8
22 [ncpus] - the number of workers (CPUs) used,
23 if omitted it will be set to the number of processors in the system
24 """
25
26 # tuple of all parallel python servers to connect with
27 ppservers = ()
28 #ppservers = ("10.0.0.1",)
29
30 if len(sys.argv) > 1:
31 njobs = int(sys.argv[1])
32 else:
33 njobs = 8
34
35 if len(sys.argv) > 2:
36 ncpus = int(sys.argv[2])
37 # Creates jobserver with ncpus workers
38 job_server = pp.Server(ncpus, ppservers=ppservers)
39 else:
40 # Creates jobserver with automatically detected number of workers
41 job_server = pp.Server(ppservers=ppservers)
42
43 print "Starting pp processing with", njobs, "jobs and", job_server.get_ncpus(), "workers"
44
45 start_time = time.time()
46
47 # The following submits njobs jobs and then retrieves the results
48 for i in xrange(njobs):
49 input=100000+i*100
50 job = job_server.submit(sum_primes,(input,), (isprime,), ("math",))
51 job()
52 ## print "Sum of primes below", input, "is", job()
53
54 print "Time elapsed: ", time.time() - start_time, "s"
55 job_server.print_stats()
Cython
The Cython code differs from Python, but is still "Python similar"
1 import time
2
3 from cython.parallel import prange
4 cimport cython
5
6 from libc.math cimport sqrt
7 from libc.math cimport ceil
8
9 @cython.boundscheck(False)
10 @cython.cdivision(True)
11 cdef unsigned int isprime(unsigned int n) nogil:
12 if n < 2:
13 return 0
14 if n == 2:
15 return 1
16 cdef unsigned int max
17 max = int(ceil(sqrt(n)))
18 cdef unsigned int i
19 i = 2
20 while i <= max:
21 if n % i == 0:
22 return 0
23 i += 1
24 return 1
25
26 @cython.boundscheck(False)
27 @cython.wraparound(False)
28 cdef unsigned int sum_primes(unsigned int n) nogil:
29 cdef unsigned int x
30 cdef unsigned int max
31 max = 0
32 for x in xrange(2, n):
33 if isprime(x) and x > max:
34 max = x
35 return max
36
37
38 def run(unsigned int n, unsigned int i1, unsigned int i2):
39 cdef double start_time
40 start_time = time.time()
41
42 cdef unsigned int i
43 for i in prange(n, nogil=True):
44 ## job_start_time = time.time()
45 sum_primes(i1 + i2*100)
46 ##print "Sum of primes below", i1 + i2*100, "is", sum_primes(i1 + i2*100), "and lasts", time.time() - job_start_time, "s"
47 ##print "JJJ" ## , job_start_time, "s"
48 print "Time elapsed: ", time.time() - start_time, "s"
The Cython code sum_primes_cython.pyx must be converted to C and C code must be compiled. This is done by
- $ python setup_prim_cython.py build_ext –inplace
whereby setup_prim_cython.py is
1 from distutils.core import setup
2 from distutils.extension import Extension
3 from Cython.Distutils import build_ext
4
5 ext_module = Extension(
6 "sum_primes_cython",
7 ["sum_primes_cython.pyx"],
8 extra_compile_args=['-fopenmp'],
9 extra_link_args=['-fopenmp'],
10 )
11
12 setup(
13 name = 'Sum primes test',
14 cmdclass = {'build_ext': build_ext},
15 ext_modules = [ext_module],
16 )
Let it run by $ python run_sum_primes_cython.py [njobs]
whereby run_sum_primes_cython.py is :