I need to calculate a matrix multiplication, A'B, that is, A transpose B, using map/reduce (on Hadoop, mrjob and python). The dimensions are for A m by n, for B m by k, where m and n is very large and k is very small. Resulting matrix will be nxm * mxk, so n by k. Both matrices will carry a line number as their first column.
I have an implementation, but I am worried of its performance and think it can be made better. I just wanted to ask here for input. The algorithm is as follows:
I first join line number on both sides (simple enough), I do this in the reducer. Then for each matching row of A and B, I multiply that row of B by each cell j of A, emitting a 1xk vector each time, with key j. There are potentially n cells (for j), but I make use of sparsity, I only multiply with nonzero cells, so for each B row I can emit 15-20 1xk vectors. These vectors then get summed up in the next reducer, reducer_sum for matching key j.
Now with dims A having 6000 x 3000 multiplied by 6000 x 7 can take 4-5 minutes (single machine Hadoop). I don't know how else to implement this, in local mode I see a big file outputted by reducer that has around ~15 * m rows. That makes sense because of the way I implemented this (nonzero A cells times rows of B). I am not sure if this issue would be less important in a true multi-node Hadoop cluster environment though.
from mrjob.job import MRJob
from mrjob.protocol import PickleProtocol
from mrjob.protocol import RawProtocol
from mrjob.protocol import RawValueProtocol
import numpy as np, sys
from scipy import sparse
import random
'''
We feed two files into this job, A and B, then we calculate the matrix
multiplication A transpose Q (AtQ) where A and B have m rows (large),
n and k rows (large and small respectively).
'''
class MRAtB(MRJob):
INTERNAL_PROTOCOL = PickleProtocol
INPUT_PROTOCOL = RawProtocol
def configure_options(self):
super(MRAtB, self).configure_options()
self.add_passthrough_option('--k')
def __init__(self, *args, **kwargs):
super(MRAtB, self).__init__(*args, **kwargs)
'''
No mapper, only reducer in the first step. W/out mapper, two lines
(and there are only two lines, per key -one from A one from B-)
with same key will end up in same reducer.
'''
def reducer(self, key, value):
left = None; right = None
for i,line in enumerate(value):
line = line.replace('"','')
line_vals = map(lambda x: float(x or 0), line.split(';'))
if len(line_vals) == int(self.options.k):
right = np.array(line_vals)
else:
left = sparse.coo_matrix(line_vals)
# iterate only non-zero elements in the bigger (left) vector
for i,j,v in zip(left.row, left.col, left.data):
mult = v*right
yield j, mult
'''
In the second step, again no mapper one reducer, there is a sum,
for all ith \elem n vectors that were multiplied in the previous
step
'''
def reduce_sum(self, key, value):
mat_sum = np.zeros((1,int(self.options.k)))
for val in value: mat_sum += val
yield (int(key), ";".join(map(str,mat_sum[0])))
def steps(self):
return [
self.mr(reducer=self.reducer),
self.mr(reducer=self.reduce_sum)
]
if __name__ == '__main__':
MRAtB.run()
Sample A
1 3;4;5
2 3;5;5
3 5;5;9
4 9;9;9
Sample B
1 8;1
2 8;2
3 8;3
4 8;4
Run it as
python AtB.py A_matrix B_matrix --k=2
asked
Oct 15 '13 at 06:16
Stat Q
21●7●7●9
I don't think this forum is appropriate for this question. We also don't really like to read your code and tell you what's wrong with it; more directed questions are easier to answer.
the code is not wrong asshole. i put it up there for possible improvements, and as something educational bcz even in this form it could be useful for someone.