I need to calculate a matrix multiplication, A'B, that is, A transpose B, using map/reduce (on Hadoop, mrjob and python). The dimensions are for A m by n, for B m by k, where m and n is very large and k is very small. Resulting matrix will be nxm * mxk, so n by k. Both matrices will carry a line number as their first column.

I have an implementation, but I am worried of its performance and think it can be made better. I just wanted to ask here for input. The algorithm is as follows:

I first join line number on both sides (simple enough), I do this in the reducer. Then for each matching row of A and B, I multiply that row of B by each cell j of A, emitting a 1xk vector each time, with key j. There are potentially n cells (for j), but I make use of sparsity, I only multiply with nonzero cells, so for each B row I can emit 15-20 1xk vectors. These vectors then get summed up in the next reducer, reducer_sum for matching key j.

Now with dims A having 6000 x 3000 multiplied by 6000 x 7 can take 4-5 minutes (single machine Hadoop). I don't know how else to implement this, in local mode I see a big file outputted by reducer that has around ~15 * m rows. That makes sense because of the way I implemented this (nonzero A cells times rows of B). I am not sure if this issue would be less important in a true multi-node Hadoop cluster environment though.

from mrjob.job import MRJob
from mrjob.protocol import PickleProtocol
from mrjob.protocol import RawProtocol
from mrjob.protocol import RawValueProtocol
import numpy as np, sys
from scipy import sparse
import random

'''
We feed two files into this job, A and B, then we calculate the matrix
multiplication A transpose Q (AtQ) where A and B have m rows (large),
n and k rows (large and small respectively). 
'''
class MRAtB(MRJob):
    INTERNAL_PROTOCOL = PickleProtocol
    INPUT_PROTOCOL = RawProtocol

    def configure_options(self):
        super(MRAtB, self).configure_options()
        self.add_passthrough_option('--k')

    def __init__(self, *args, **kwargs):
        super(MRAtB, self).__init__(*args, **kwargs)

    '''
    No mapper, only reducer in the first step. W/out mapper, two lines
    (and there are only two lines, per key -one from A one from B-)
    with same key will end up in same reducer.
    '''
    def reducer(self, key, value):
        left = None; right = None
        for i,line in enumerate(value):            
            line = line.replace('"','')
            line_vals = map(lambda x: float(x or 0), line.split(';'))
            if len(line_vals) == int(self.options.k):
                right = np.array(line_vals)
            else:
                left = sparse.coo_matrix(line_vals)

        # iterate only non-zero elements in the bigger (left) vector
        for i,j,v in zip(left.row, left.col, left.data):
            mult = v*right
            yield j, mult

    '''
    In the second step, again no mapper one reducer, there is a sum,
    for all ith \elem n vectors that were multiplied in the previous
    step
    '''
    def reduce_sum(self, key, value):
        mat_sum = np.zeros((1,int(self.options.k)))
        for val in value: mat_sum += val
        yield (int(key), ";".join(map(str,mat_sum[0])))

    def steps(self):
        return [
            self.mr(reducer=self.reducer),
            self.mr(reducer=self.reduce_sum)
        ]

if __name__ == '__main__':
    MRAtB.run()

Sample A

1   3;4;5
2   3;5;5
3   5;5;9
4   9;9;9

Sample B

1   8;1
2   8;2
3   8;3
4   8;4

Run it as

python AtB.py A_matrix B_matrix --k=2

asked Oct 15 '13 at 06:16

Stat%20Q's gravatar image

Stat Q
21779

I don't think this forum is appropriate for this question. We also don't really like to read your code and tell you what's wrong with it; more directed questions are easier to answer.

(Oct 15 '13 at 14:59) Alexandre Passos ♦

the code is not wrong asshole. i put it up there for possible improvements, and as something educational bcz even in this form it could be useful for someone.

(Oct 16 '13 at 03:53) Stat Q
Be the first one to answer this question!
toggle preview

powered by OSQA

User submitted content is under Creative Commons: Attribution - Share Alike; Other things copyright (C) 2010, MetaOptimize LLC.