package My::AllReduceMapper; use Moose; with 'Hadoop::AllReduceMapper'; use List::Util qw(sum); use POSIX qw(ceil INT_MIN INT_MAX); use Time::HiRes qw(time); has 'keys' => (is => 'rw', isa => 'ArrayRef[Num]', default => sub { [] }); sub map { my ($self, $key, $value, $context) = @_; push @{$self->keys}, $key; } sub cooperate { my ($self, $context, $writeResults) = @_; my ($min, $max) = (INT_MIN, INT_MAX); my @keys = @{$self->keys}; my $index = @keys; $self->allReduce($index, $self->REDUCE_ADD); $index = ceil($index / 2); my $result = undef; while (not defined $result) { # 1. Get average. my $total_count = @keys; $self->allReduce($total_count, $self->REDUCE_ADD); my $total_sum = sum @keys; $self->allReduce($total_sum, $self->REDUCE_ADD); my $split = ceil($total_sum / $total_count); printf STDERR "%.3f: Searching for ${index}th element out of ${total_count}, min = ${min}, split key = ${split}, max = ${max}.\n", time() if $writeResults; # 2. Count objects my ($less, $equal) = (0, 0); foreach my $key (@keys) { $less++ if $key < $split; $equal++ if $key == $split; } $self->allReduce($less, $self->REDUCE_ADD); $self->allReduce($equal, $self->REDUCE_ADD); # 3. Have we found results? if ($index > $less && $index <= $less + $equal) { $result = $split; last; } # 4. Update min and max if ($index <= $less) { $max = $split - 1; } else { $min = $split + 1; $index -= $less + $equal } # 5. Update keys @keys = grep { $_ >= $min and $_ <= $max} @keys; } $context->write($result) if $writeResults; } package main; use Hadoop::Runner; my $runner = Hadoop::Runner->new( mapper => My::AllReduceMapper->new(), input_format => 'KeyValueTextInputFormat'); $runner->run();