1 package Parallel::MPI::Simple;
5 use vars qw(@ISA $VERSION);
6 use Storable qw(nfreeze thaw);
11 bootstrap Parallel::MPI::Simple;
13 # evil, but makes everything MPI_*, which is sort of expected
15 my $call = (caller())[0];
17 # subs (MPI_ function calls)
18 foreach (qw(Init Finalize COMM_WORLD ANY_SOURCE Comm_rank Comm_size
19 Recv Send Barrier Bcast Gather
20 Scatter Allgather Alltoall Reduce
21 Comm_compare Comm_dup Comm_free Comm_split
23 *{$call.'::MPI_'.$_} = \&$_;
26 *{$call.'::MPI_'.$_} = \&$_
27 foreach (qw(IDENT CONGRUENT SIMILAR UNEQUAL UNDEFINED));
36 mpirun -np 2 perl script.pl
39 use Parallel::MPI::Simple;
41 my $rank = MPI_Comm_rank(MPI_COMM_WORLD);
43 my $msg = "Hello, I'm $rank";
44 MPI_Send($msg, 0, 123, MPI_COMM_WORLD);
47 my $msg = MPI_Recv(1, 123, MPI_COMM_WORLD);
48 print "$rank received: '$msg'\n";
52 =head1 COMPILING AND RUNNING
54 Please view the README file in the module tarball if you are having
55 trouble compiling or running this module.
59 Perl is not a strongly typed language, Perl does not enforce data
60 structures of a fixed size or dimensionality, Perl makes things easy.
61 Parallel processing solves problems faster and is commonly programmed
62 using a message passing paradigm. Traditional message passing systems
63 are designed for strongly typed languages like C or Fortran, there
64 exist implementations of these for Perl but they concentrate on
65 perfectly mimicing the standards forcing the poor Perl programmer to
66 use strongly typed data despite all his best instincts.
68 This module provides a non-compliant wrapper around the widely
69 implemented MPI libraries, allowing messages to consist of arbitarily
70 nested Perl data structures whose size is limited by available memory.
71 This hybrid approach should allow you to quickly write programs which
72 run anywhere which supports MPI (both Beowulf and traditional MPP
75 =head1 Message Passing and Multiprocessing
77 The message passing paradigm is simple and easy to use. Multiple
78 versions of the same program are run on multiple processors (or
79 nodes). Each running copy should call C<MPI_Init> to announce that it
80 is running. It can then find out who it is by calling
81 C<MPI_Comm_rank> and who else it can talk to by calling
82 C<MPI_Comm_size>. Using this information to decide what part it is to
83 play in the ensuing computation, it the exchanges messages, or
84 parcels of data, with other nodes allowing all to cooperate.
86 Once the computation is finished, the node calls C<MPI_Finalize> and
87 exits cleanly, ready to run another day.
89 These processes are all copies of the I<same> perl script and are invoked
90 using: C<mpirun -np [number of nodes] perl script.pl> .
92 Remember you may need to start a daemon before mpirun will work, for
93 C<mpich> this is often as easy as running: C<mpd &>.
95 =head1 Starting and Stopping a process
97 A process must formally enter and leave the MPI pool by calling these
104 Initialises the message passing layer. This should be the first C<MPI_*>
105 call made by the program and ideally one of the first things the
106 program does. After completing this call, all processes will be
107 synchronised and will become members of the C<MPI_COMM_WORLD>
108 communicator. It is an error for programs invoked with C<mpirun> to
109 fail to call C<MPI_Init> (not to mention being a little silly).
115 Shuts down the message passing layer. This should be called by every
116 participating process before exiting. No more C<MPI_*> calls may be made
117 after this function has been called. It is an error for a program to
118 exit I<without> calling this function.
122 All processes are members of one or more I<communicators>. These are
123 like channels over which messages are broadcast. Any operation
124 involving more than one process will take place in a communicator,
125 operations involving one communicator will not interfere with those in
128 On calling C<MPI_Init> all nodes automatically join the
129 C<MPI_COMM_WORLD> communicator. A communicator can be split into
130 smaller subgroups using the C<MPI_Comm_split> function.
132 =head2 MPI_COMM_WORLD
134 $global_comm = MPI_COMM_WORLD;
136 Returns the global communicator shared by all processes launched at
137 the same time. Can be used as a "constant" where a communicator is
138 required. Most MPI applications can get by using only this
143 $rank = MPI_Comm_rank($comm);
145 Returns the rank of the process within the communicator given by
146 $comm. Processes have ranks from 0..(size-1).
157 $size = MPI_Comm_size($comm);
159 Returns the number of processes in communicator $comm.
167 =head2 MPI_Comm_compare
169 $result = MPI_Comm_compare($comm1, $comm2);
171 Compares the two communicators $comm1 and $comm2. $result will be equal
174 MPI_IDENT : communicators are identical
175 MPI_CONGRUENT: membership is same, ranks are equal
176 MPI_SIMILAR : membership is same, ranks not equal
177 MPI_UNEQUAL : at least one member of one is not in the other
182 sub CONGRUENT () { 2 }
187 my ($c1, $c2) = (@_);
188 _Comm_compare($c1, $c2);
193 $newcomm = MPI_Comm_dup($comm);
195 Duplicates $comm but creates a new context for messages.
204 =head2 MPI_Comm_split
206 $newcomm = MPI_Comm_split($comm, $colour, $key);
208 Every process in $comm calls C<MPI_Comm_split> at the same time. A
209 new set of communicators is produced, one for each distinct value of
210 $colour. All those processes which specified the same value of
211 $colour end up in the same comminicator and are ranked on the values
212 of $key, with their original ranks in $comm being used to settle ties.
214 If $colour is negative (or C<MPI_UNDEFINED>), the process will not be
215 allocated to any of the new communicators and C<undef> will be
220 sub UNDEFINED () { -1 }
222 my ($comm, $colour, $key) = @_;
223 my $rt = _Comm_split($comm, $colour, $key);
234 MPI_Comm_free($comm, [$comm2, ...] );
236 Frees the underlying object in communicator $comm, do not attempt to
237 do this to MPI_COMM_WORLD, wise to do this for any other comminicators
238 that you have created. If given a list of comminicators, will free
239 all of them, make sure there are no duplicates...
244 _Comm_free($_) foreach @_;
247 =head1 Communications operations
253 Waits for every process in $comm to call MPI_Barrier, once done, all
254 continue to execute. This causes synchronisation of processes. Be
255 sure that every process does call this, else your computation will
260 MPI_Send($scalar, $dest, $msg_tag, $comm);
262 This takes a scalar (which can be an anonymous reference to a more
263 complicated data structure) and sends it to process with rank $dest in
264 communicator $comm. The message also carries $msg_tag as an
265 identfier, allowing nodes to receive and send out of order.
266 Completion of this call does not imply anything about the progress of
272 # my ($ref,$dest,$tag,$comm) = @_;
273 my $stor = nfreeze(\$_[0]);
274 _Send($stor, $_[1], $_[2], $_[3]);
279 $scalar = MPI_Recv($source, $msg_tag, $comm);
281 Receives a scalar from another process. $source and $msg_tag must both
282 match a message sent via MPI_Send (or one which will be sent in future)
283 to the same communicator given by $comm.
286 MPI_Send([qw(This is a message)], 1, 0, MPI_COMM_WORLD);
289 my $msg = MPI_Recv(1,0,MPI_COMM_WORLD);
290 print join(' ', @{ $msg } );
293 Will output "This is a message". Messages with the same source,
294 destination, tag and comminicator will be delivered in the order in
295 which they were sent. No other guarantees of timeliness or ordering
296 can be given. If needed, use C<MPI_Barrier>.
298 C<$source> can be C<MPI_ANY_SOURCE> which will do what it says.
304 my ($source, $tag, $comm, $status) = @_;
305 $out = _Recv($source, $tag, $comm, $status);
306 return ${thaw($out)};
311 $data = MPI_Bcast($scalar, $root, $comm);
313 This sends $scalar in process $root from the root process to every
314 other process in $comm, returning this scalar in every process. All
315 non-root processes should provide a dummy message (such as C<undef>),
316 this is a bit ugly, but maintains a consistant interface between the
317 other communication operations. The scalar can be a complicated data
320 if ($rank == 0) { # send from 0
321 my $msg = [1,2,3, {3=>1, 5=>6} ];
322 MPI_Bcast( $msg, 0, MPI_COMM_WORLD);
324 else { # everything else receives, note dummy message
325 my $msg = MPI_Bcast(undef, 0, MPI_COMM_WORLD);
332 # my ($data, $from, $comm) = @_;
333 my $data = nfreeze(\$_[0]);
334 $out = _Bcast($data, $_[1], $_[2]);
335 return ${thaw($out)};
341 @list = MPI_Gather($scalar, $root, $comm);
343 (nothing) = MPI_Gather($scalar, $root, $comm);
345 Sends $scalar from every process in $comm (each $scalar can be
346 different, root's data is also sent) to the root process which
347 collects them as a list of scalars, sorted by process rank order in
353 # my ($ref, $root, $comm) = @_;
355 my $data = nfreeze(\$_[0]);
356 foreach (@{ _Gather($data, $_[1], $_[2]) }) {
357 push @rt, ${thaw($_)};
364 $data = MPI_Scatter([N items of data], $root, $comm);
366 Sends list of scalars (anon array as 1st arg) from $root to all
367 processes in $comm, with process of rank N-1 receiving the Nth item in
368 the array. Very bad things might happen if number of elements in
369 array != N. This does not call the C function at any time, so do not
370 expect any implicit synchronisation.
375 my ($aref, $root, $comm) = @_;
376 if (Comm_rank($comm) == $root) {
377 for my $i (0..@$aref-1) {
379 Send($aref->[$i], $i, 11002, $comm);
384 Recv($root, 11002, $comm);
390 @list = MPI_Allgather($scalar, $comm);
392 Every process receives an ordered list containing an element from every
393 other process. Again, this is implemented without a call to the C function.
398 # my ($data, $comm) = @_;
400 my $frozen = nfreeze(\$_[0]);
401 for my $i (0..Comm_size($_[1])-1) {
402 push @rt, ${ thaw(_Bcast($frozen, $i, $_[1])) };
409 @list = MPI_Alltoall([ list of scalars ], $comm);
411 Simillar to Allgather, each process (with rank I<rank>) ends up with a
412 list such that element I<i> contains the data which started in element
413 I<rank> of process I<i>s data.
418 my ($data, $comm) = @_;
419 my ($rank, $size) = (Comm_rank($comm), Comm_size($comm));
422 foreach (0..$size-1) {
424 Send($data->[$_], $_, 1, $comm);
426 foreach (0..$size-1) {
428 push @rt, $data->[$_]; next;
430 push @rt, Recv($_, 1, $comm);
437 $value = MPI_Reduce($input, \&operation, $comm);
439 Every process receives in $value the result of performing &operation
440 between every processes $input. If there are three processes in
441 $comm, then C<$value = $input_0 op $input_1 op $input_2>.
443 Operation should be a sub which takes two scalar values (the $input
444 above) and returns a single value. The operation it performs should
445 be commutative and associative, otherwise the result will be undefined.
447 For instance, to return the sum of some number held by each process, perform:
449 $sum = MPI_Reduce($number, sub {$_[0] + $_[1]}, $comm);
451 To find which process holds the greatest value of some number:
453 ($max, $mrank) = @{ MPI_Reduce([$number, $rank],
454 sub { $_[0]->[0] > $_[1]->[0] ? $_[0] : $_[1]}
459 # This version is deprecated, but may be faster
461 my ($ref, $code, $comm) = @_;
462 my ($rank, $size) = (Comm_rank($comm), Comm_size($comm));
464 Barrier($comm); # safety first
466 Send($ref, 0, 1, $comm);
467 $rt = Recv(0,1,$comm);
472 $rt = &$code($rt, Recv($_,1,$comm));
475 Send($rt, $_,1,$comm);
481 # This should be O(log(P)) in calc and comm
482 # This version first causes odds to send to evens which reduce, then etc.
484 my ($val, $code, $comm) = @_;
485 my ($rank, $size) = (Comm_rank($comm), Comm_size($comm));
487 my @nodes = (0..$size-1);
489 $#nodes += @nodes % 2;
491 my %to = reverse %from;
492 if ($from{$rank}) { # I'm receiving something
493 $rt = &$code($rt, Recv($from{$rank}, 1, $comm));
495 elsif (defined($to{$rank})) {# I'm sending something
496 Send($rt, $to{$rank}, 1, $comm);
498 @nodes = sort {$a <=> $b} keys %from;
500 # node 0 only to distribute via Broadcast
501 Bcast($rt, 0, $comm);
504 1; # I am the ANTI-POD!
508 I have decided to loosely follow the MPI calling and naming
509 conventions but do not want to stick strictly to them in all cases.
510 In the interests of being simple, I have decided that all errors
511 should result in the death of the MPI process rather than huge amounts
512 of error checking being foisted onto the module's user.
514 Many of the MPI functions have not been implemented, some of this is
515 because I feel they would complicate the module (I chose to have a
516 single version of the Send command, for instance) but some of this is
517 due to my not having finished yet. I certainly do not expect to
518 provide process topologies or inter-communicators, I also do not
519 expect to provide anything in MPI-2 for some time.
523 This module has been tested on a variety of platforms. I have not
524 been able to get it running with the mpich MPI implementation in
525 a clustered environment.
527 In general, I expect that most programs using this module will make
528 use of little other than C<MPI_Init>, C<MPI_Send>, C<MPI_Recv>,
529 C<MPI_COMM_WORLD>, C<MPI_Barrier>, C<MPI_Comm_size>, C<MPI_Comm_rank>
532 Please send bugs to github: L<https://github.com/quidity/p5-parallel-mpi-simple/issues>
536 Alex Gough (alex@earth.li)
540 This module is copyright (c) Alex Gough, 2001,2011.
542 You may use and redistribute this software under the Artistic License as
550 #define GATHER_TAG 11001 /* used to be unlikely to upset other sends */
553 root process first broadcasts length of stored data then broadcasts
554 the data. Non-root processes receive length (via bcast), allocate
555 space to take incomming data from root
557 Both root and non-root processes then create and return a new scalar
558 with contents identical to those root started with.
561 SV* _Bcast (SV* data, int root, SV* comm) {
565 MPI_Comm_rank((MPI_Comm)SvIVX(comm), &rank);
567 buf_len[0] = sv_len(data);
568 MPI_Bcast(buf_len, 1, MPI_INT, root, (MPI_Comm)SvIVX(comm));
569 MPI_Bcast(SvPVX(data), buf_len[0], MPI_CHAR, root, (MPI_Comm)SvIVX(comm));
570 rval = newSVpvn(SvPVX(data), buf_len[0]);
574 MPI_Bcast(buf_len, 1, MPI_INT, root, (MPI_Comm)SvIVX(comm));
575 recv_buf = (char*)malloc((buf_len[0]+1)*sizeof(char));
576 if (recv_buf == NULL) croak("Allocation error in _Bcast");
577 MPI_Bcast(recv_buf, buf_len[0], MPI_CHAR, root, (MPI_Comm)SvIVX(comm));
578 rval = newSVpvn(recv_buf, buf_len[0]);
585 Finds length of data in stor_ref, sends this to receiver, then
586 sends actual data, uses same tag for each message.
589 int _Send(SV* stor_ref, int dest, int tag, SV*comm) {
591 str_len[0] = sv_len(stor_ref);
592 MPI_Send(str_len, 1, MPI_INT, dest, tag, (MPI_Comm)SvIVX(comm));
593 MPI_Send(SvPVX(stor_ref), sv_len(stor_ref),MPI_CHAR,
594 dest, tag, (MPI_Comm)SvIVX(comm));
599 Receives int for length of data it should then expect, allocates space
600 then receives data into that space. Creates a new SV and returns it.
603 SV* _Recv (int source, int tag, SV*comm, SV*status) {
609 MPI_Recv(len_buf, 1, MPI_INT, source, tag, (MPI_Comm)SvIVX(comm), &tstatus);
610 recv_buf = (char*)malloc((len_buf[0]+1)*sizeof(char));
611 if (recv_buf == NULL) croak("Allocation error in _Recv");
612 MPI_Recv(recv_buf, len_buf[0], MPI_CHAR, source, tag,
613 (MPI_Comm)SvIVX(comm), &tstatus);
614 rval = newSVpvn(recv_buf, len_buf[0]);
615 sv_setiv(status, tstatus.MPI_SOURCE);
620 /* Calls MPI_Init with dummy arguments, a bit dodgy but sort of ok */
622 MPI_Init((int) NULL, (char ***)NULL);
625 /* Returns rank of process within comm */
626 int _Comm_rank (SV* comm) {
628 MPI_Comm_rank((MPI_Comm)SvIVX(comm),&trank);
632 /* returns total number of processes within comm */
633 int _Comm_size (SV* comm) {
635 MPI_Comm_size((MPI_Comm)SvIVX(comm), &tsize);
639 /* returns SV whose IV slot is a cast pointer to the MPI_COMM_WORLD object */
641 return newSViv((IV)MPI_COMM_WORLD);
644 /* calls MPI_Barrier for comm */
645 int Barrier (SV*comm) {
646 MPI_Barrier((MPI_Comm)SvIVX(comm));
649 /* ends MPI participation */
655 If non-root: participates in Gather so that root finds length of data
656 to expect from this process. Then send (using MPI_Send)
659 If root: receives array of ints detailing length of scalars held by
660 other processes, then receives from each in turn (using MPI_Recv)
661 returns an array ref to root process only.
664 SV* _Gather (SV* data, int root, SV* comm) {
665 int rank, size, *buf_lens, i, max;
671 /* find out how long data is */
672 ret_arr = av_make(0,(SV**)NULL);
673 my_buf[0] = sv_len(data);
674 if (_Comm_rank(comm) == root) {
675 MPI_Comm_size((MPI_Comm)SvIVX(comm), &size);
676 buf_lens = malloc(size*sizeof(int));
677 if (buf_lens == NULL) croak("Allocation error (lens) in _Gather");
678 /* gather all scalar length data */
679 MPI_Gather(my_buf, 1, MPI_INT, buf_lens, 1,
680 MPI_INT, root, (MPI_Comm)SvIVX(comm));
681 max = 0; // keep buffer allocation calls to minimum
682 for (i=0;i<size;i++) {
683 max = max < buf_lens[i] ? buf_lens[i] : max;
685 recv_buf = malloc(max * sizeof(char));
686 if (recv_buf == NULL) croak("Allocation error (recv) in _Gather");
687 for (i=0;i<size;i++) {
689 av_push(ret_arr, data);
690 continue; /* me, no point sending */
692 MPI_Recv(recv_buf, buf_lens[i], MPI_CHAR, i, GATHER_TAG,
693 (MPI_Comm)SvIVX(comm), &tstatus );
694 av_push(ret_arr, sv_2mortal( newSVpvn(recv_buf, buf_lens[i]) ) );
700 /* send out how long my scalar data is */
701 MPI_Gather(my_buf, 1, MPI_INT, buf_lens, 1,
702 MPI_INT, root, (MPI_Comm)SvIVX(comm) );
703 /* send out my scalar data as normal send with tag of ???? */
704 MPI_Send(SvPVX(data), my_buf[0], MPI_CHAR,
705 root, GATHER_TAG,(MPI_Comm)SvIVX(comm));
708 return newRV_inc((SV*)ret_arr);
711 /* compares two communicators, translates MPI constants into something I
712 can use as constants in the module interface */
713 int _Comm_compare(SV* comm1, SV* comm2) {
715 MPI_Comm_compare((MPI_Comm)SvIVX(comm1), (MPI_Comm)SvIVX(comm2), &result);
730 /* frees a communicator, once all pending communication has taken place */
731 void _Comm_free (SV* comm) {
732 MPI_Comm_free((MPI_Comm*)&SvIVX(comm));
733 if ((MPI_Comm)SvIVX(comm) != MPI_COMM_NULL)
734 croak("Communicator not freed properly\n");
737 SV* _Comm_dup (SV*comm) {
739 MPI_Comm_dup((MPI_Comm)SvIVX(comm), &newcomm);
740 return newSViv((IV)newcomm);
743 SV* _Comm_split (SV* comm, int colour, int key) {
746 MPI_Comm_split((MPI_Comm)SvIVX(comm),
747 (colour < 0 ? MPI_UNDEFINED : colour),
749 return newSViv((IV)newcomm);