package My::AllReduceMapper; use Moose; use Time::HiRes 'time'; with 'Hadoop::AllReduceMapper'; has 'dim' => (is => 'rw', isa => 'Int', default => 0); has 'points' => (is => 'rw', isa => 'ArrayRef[ArrayRef[Num]]', default => sub { [] }); sub map { my ($self, $key, $value, $context) = @_; my @coords = split /\s+/, $value; $self->dim(scalar(@coords)) if $self->dim == 0; die "Bad number of dimensions" if $self->dim != @coords; push @{$self->points}, [@coords]; } sub cooperate { my ($self, $context, $writeResults) = @_; my $dim = $self->dim; # Read initial cluster positions. my @points = @{$self->points}; my @points_best_i = (0) x @points; my @clusters = (); my @sums = (); open (my $fh, "<", $ENV{CLUSTERS_FILE}) or die "Cannot open file specified by CLUSTERS_FILE=$ENV{CLUSTERS_FILE}"; foreach (1..$ENV{CLUSTERS_NUM}) { my @coords = split /\s+/, <$fh>; die "Bad number of dimmensions" if $dim != @coords; push @clusters, [@coords]; push @sums, [(0) x (1+$dim)]; } close $fh; foreach my $iter (1..100) { printf STDERR "%.3f: Starting iteration %d.\n", time(), $iter if $writeResults; # 0. Clear sums. foreach my $sums (@sums) { foreach my $value (@{$sums}) { $value = 0; } } # 1. Add points to their clusters. my $point_index = 0; foreach my $point (@points) { my ($besti, $best) = ($points_best_i[$point_index], 0); { my $cluster = $clusters[$besti]; my $j = 0; foreach my $pointj (@{$point}) { my $diff = $pointj - $cluster->[$j++]; $best += $diff * $diff; } } my $i = -1; foreach my $cluster (@clusters) { next if ++$i == $besti; my $distance = 0; my $j = 0; foreach my $pointj (@{$point}) { my $diff = $pointj - $cluster->[$j++]; $distance += $diff * $diff; if ($besti != -1 && $distance > $best) { last; } } if ($distance < $best) { ($best, $besti) = ($distance, $i); } } $points_best_i[$point_index++] = $besti; $sums[$besti]->[0]++; for (my $j = 0; $j < $dim; $j++) { $sums[$besti]->[$j+1] += $point->[$j]; } } # 2. AllReduce $self->allReduce(\@sums, $self->REDUCE_ADD); # 3. Compute new clusters my $change = 0; for (my $i = 0; $i < @clusters; $i++) { next if $sums[$i]->[0] == 0; my $distance; for (my $j = 0; $j < $dim; $j++) { my $coord = $sums[$i]->[$j+1] / $sums[$i]->[0]; $distance += ($coord - $clusters[$i]->[$j]) * ($coord - $clusters[$i]->[$j]); $clusters[$i]->[$j] = $coord; } $change += sqrt($distance); } printf STDERR "%.3f: Finished iteration %d with change %f.\n", time(), $iter, $change if $writeResults; # 4. Synchronize on change value. $self->allReduce(\$change, $self->REDUCE_MAX); printf STDERR "%.3f: Change synchronized to %f.\n", time(), $change if $writeResults; last if $change < 1e-3; } printf STDERR "%.3f: Done iterating.\n", time() if $writeResults; if ($writeResults) { for (my $i=0; $i < @clusters; $i++) { $context->write($i, join " ", @{$clusters[$i]}); } } } package main; use Hadoop::Runner; my $runner = Hadoop::Runner->new( mapper => My::AllReduceMapper->new(), input_format => 'TextInputFormat', copy_environment => ['CLUSTERS_FILE', 'CLUSTERS_NUM']); $runner->run();