package My::AllReduceMapper; use Moose; with 'Hadoop::AllReduceMapper'; has 'dim' => (is => 'rw', isa => 'Int', default => 0); has 'points' => (is => 'rw', isa => 'ArrayRef[ArrayRef[Num]]', default => sub { [] }); sub map { my ($self, $key, $value, $context) = @_; my @coords = split /\s+/, $value; $self->dim(scalar(@coords)) if $self->dim == 0; die "Bad number of dimensions" if $self->dim != @coords; push @{$self->points}, [@coords]; } sub cooperate { my ($self, $context, $writeResults) = @_; my @points = @{$self->points}; my $dim = $self->dim; # Read initial cluster positions. my @clusters = (); open (my $fh, "<", $ENV{CLUSTERS_FILE}) or die "Cannot open file specified by CLUSTERS_FILE=$ENV{CLUSTERS_FILE}"; foreach (1..$ENV{CLUSTERS_NUM}) { my @coords = split /\s+/, <$fh>; die "Bad number of dimmensions" if $dim != @coords; push @clusters, [@coords]; } close $fh; # INSERT CODE HERE } package main; use Hadoop::Runner; my $runner = Hadoop::Runner->new( mapper => My::AllReduceMapper->new(), input_format => 'TextInputFormat', copy_environment => ['CLUSTERS_FILE', 'CLUSTERS_NUM']); $runner->run();