-
Notifications
You must be signed in to change notification settings - Fork 1
/
design.tex
567 lines (511 loc) · 25.5 KB
/
design.tex
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
\section{Design}
FlashR parallelizes and scales matrix operations in R for machine learning and
statistics in a non-uniform memory access (NUMA) machine. Figure \ref{fig:arch}
shows the architecture of FlashR.
FlashR supports a small number of classes of generalized operations (GenOps)
and uses GenOps to implement many matrix operations in the R base package
to provide users a familiar programming interface. The GenOps simplify
the implementation and improve expressiveness of the framework. The optimizer
aggressively merges operations to reduce data movement in the memory hierarchy.
FlashR stores matrices on SSDs through SAFS \cite{safs}, a user-space filesystem
for SSD arrays, to fully utilize high I/O throughput of SSDs.
FlashR supports both sparse matrices and dense matrices. For large sparse matrices,
FlashR integrates with the work \cite{SEM_SpMM} that performs sparse matrix
multiplication in semi-external memory.
\begin{figure}
\centering
\includegraphics[scale=0.3]{FlashMatrix_figs/architecture.pdf}
\vspace{-5pt}
\caption{The architecture of FlashR.}
\label{fig:arch}
\vspace{-10pt}
\end{figure}
\subsection{Programming interface} \label{sec:api}
FlashR provides a matrix-oriented functional programming interface built
on a small set of GenOps (Table \ref{tbl:genops}). GenOps take matrices
and some functions
as input and output new matrices that represent computation results.
Input functions define computation on individual elements in matrices,
and all of these functions for GenOps in the current implementation are
predefined. All GenOps are lazily evaluated for better performance (Section
\ref{sec:lazyeval}).
\begin{table}
\begin{center}
\caption{Generalized operations (GenOps) in FlashR. $A$, $B$ and $C$ are
matrices, and $c$ is a scalar. $f$ is a user-defined function that
operates on elements of matrices. $A_{i,j}$ indicates the element
in row $i$ and column $j$ of matrix $A$.}
\vspace{-10pt}
\footnotesize
\begin{tabular}{|l|l|l|}
\hline
GenOp & Description \\
\hline
$C=sapply(A, f)$ & $C_{i,j}=f(A_{i,j})$ \\
$C=mapply(A, B, f)$ & $C_{i,j}=f(A_{i,j}, B_{i,j})$ \\
\hline
$c=agg(A, f)$ & $c=f(A_{i,j}, c)$, $\forall i, j$ \\
$C=agg.row(A, f)$ & $C_i=f(A_{i,j}, C_i)$, $\forall j$ \\
$C=agg.col(A, f)$ & $C_j=f(A_{i,j}, C_j)$, $\forall i$ \\
\hline
$C=groupby(A, f)$ & $C_{k}=f(A_{i,j}, C_{k})$,\\ & where $A_{i, j}=k$, $\forall i,j$ \\
$C=groupby.row(A, B, f)$ & $C_{k,j}=f(A_{i,j}, C_{k,j})$,\\ & where $B_i=k$, $\forall i$ \\
$C=groupby.col(A, B, f)$ & $C_{i,k}=f(A_{i,j}, C_{i,k})$,\\ & where $B_j=k$, $\forall j$ \\
\hline
$C=inner.prod(A, B, f1, f2)$ & $t=f1(A_{i,k}, B_{k,j})$,
\\ & $C_{i,j}=f2(t, C_{i,j})$, $\forall k$ \\
\hline
$C=cum.row(A, f)$ & $C_{i,j}=f(A_{i,j}, C_{i,j-1})$ \\
$C=cum.col(A, f)$ & $C_{i,j}=f(A_{i,j}, C_{i-1,j})$ \\
\hline
\end{tabular}
\normalsize
\label{tbl:genops}
\vspace{-10pt}
\end{center}
\end{table}
GenOps are classified into four categories that describe different data access
patterns.
\noindent \textbf{Element-wise operations}:
\textit{sapply} is an element-wise unary operation; \textit{mapply}
is an element-wise binary operation.
\noindent \textbf{Aggregation}: \textit{agg} computes aggregation over
all elements in a matrix and outputs a scalar; \textit{agg.row}/\textit{agg.col}
compute over all elements in every row/column and outputs a vector.
\noindent \textbf{Groupby}: \textit{groupby} splits the elements of a matrix
into groups, applies \textit{agg} to each group and outputs a vector;
\textit{groupby.row} splits rows into groups and applies \textit{agg.col}
to each group; \textit{groupby.col} splits columns into groups and applies
\textit{agg.row} to each group.
\noindent \textbf{Inner product} is a generalized matrix multiplication
that replaces multiplication and addition with two functions.
\noindent \textbf{Cumulative operation} performs computation cumulatively
on the elements in rows or columns and outputs matrices with the same
shape as the input matrices. Special cases in R are \textit{cumsum} and
\textit{cumprod}.
FlashR overrides a large number of matrix functions in the R \textit{base}
package with GenOps to scale and parallelize
existing R code with little/no modification. Table \ref{tbl:Rfuns} shows
a small subset of R matrix operations overridden by FlashR and their
implementations with GenOps.
\begin{table}
\begin{center}
\caption{Some of the R matrix functions implemented with GenOps.}
\vspace{-10pt}
\footnotesize
\begin{tabular}{|l|l|l|}
\hline
Function & Implementation with GenOps \\
\hline
$C=A+B$ & $C=mapply(A, B, ``+")$ \\
$C=pmin(A,B)$ & $C=mapply(A, B, ``pmin")$ \\
$C=sqrt(A)$ & $C=sapply(A, ``sqrt")$ \\
\hline
$c=sum(A)$ & $c=agg(A, ``+")$ \\
$C=rowSums(A)$ & $C=agg.row(A, ``+")$ \\
$c=any(A)$ & $c=agg(A, ``|")$ \\
\hline
$C=unique(A)$ & $C=groupby(A, ``uniq")$ \\
$C=table(A)$ & $C=groupby(A, ``count")$ \\
\hline
$C=A \%*\% B$ & integers: $C=inner.prod(A, B, ``*", ``+")$ \\
& floating-points: BLAS \\
& sparse matrices: SpMM \cite{SEM_SpMM} \\
\hline
\end{tabular}
\normalsize
\label{tbl:Rfuns}
\end{center}
\end{table}
FlashR provides a set of functions for matrix creation, element access
and execution tuning (Table \ref{tbl:utility}). Like GenOps, FlashR avoids
data movement in most of these matrix operations. For example, transpose
of a matrix only needs to change data access from the original matrix in
the subsequent
matrix operations \cite{Guibas78}; reading columns from a tall matrix outputs
a new matrix that indicates the columns to be accessed from the original matrix;
writing to a matrix outputs a \textit{virtual matrix} (see Section
\ref{sec:lazyeval}) that constructs the modified matrix on the fly.
FlashR provides functions for tuning the execution of lazily evaluated operations.
\textit{materialize} forces FlashR to perform actual computation.
\textit{set.cache} informs FlashR to save the computation results
of a matrix during computation. \textit{as.vector} and \textit{as.matrix}
convert FlashR vectors and matrices to R vectors and matrices, which potentially
force FlashR to perform computation.
\begin{table}
\begin{center}
\caption{Some of the miscellaneous functions in FlashR for matrix creation,
matrix access and execution tuning.}
\vspace{-10pt}
\footnotesize
\begin{tabular}{|l|l|l|}
\hline
Function & Description \\
\hline
$runif.matrix$ & Create a uniformly random matrix \\
$rnorm.matrix$ & Create a matrix under a normal distribution \\
\hline
$load.dense$ & Read a dense matrix from text files. \\
\hline
$dim$ & Get the dimension information of a matrix\\
$length$ & Get the number of elements in a matrix\\
\hline
$t$ & Matrix transpose \\
$rbind$ & Concatenate matrices by rows \\
$[]$ & Get rows/columns/elements from a matrix \\
$[]\gets$ & Set rows/columns/elements from a matrix \\
\hline
%$fm.conv.layout$ & Convert the data layout of a matrix \\
$materialize$ & Materialize a \textit{virtual matrix} \\
$set.cache$ & Set to cache materialized data \\
$as.vector$ & Convert to an R vector \\
$as.matrix$ & Convert to an R matrix \\
\hline
\end{tabular}
\normalsize
\label{tbl:utility}
\end{center}
\end{table}
\subsubsection{Examples} \label{sec:apps}
We showcase some classic algorithms to illustrate the programming interface
of FlashR.
\begin{figure}
\begin{minted}[mathescape,
fontsize=\footnotesize,
frame=single,
tabsize=2,
]{R}
# `X' is the data matrix, whose rows are data points.
# `y' stores the labels of data points.
logistic.regression <- function(X,y) {
grad <- function(X,y,w)
(t(X)%*%(1/(1+exp(-X%*%t(w)))-y))/length(y)
cost <- function(X,y,w)
sum(y*(-X%*%t(w))+log(1+exp(X%*%t(w))))/length(y)
theta <- matrix(rep(0, num.features), nrow=1)
for (i in 1:max.iters) {
g <- grad(X, y, theta)
l <- cost(X, y, theta)
eta <- 1
delta <- 0.5 * (-g) %*% t(g)
# Convert it to an R value for the while loop.
l2 <- as.vector(cost(X, y, theta+eta*(-g)))
while (l2 < as.vector(l)+delta*eta)
eta <- eta * 0.2
theta <- theta + (-g) * eta
}
}
\end{minted}
\vspace{-10pt}
\caption{A simplified implementation of logistic regression using
gradient descent with line search.}
\label{logistic}
\vspace{-5pt}
\end{figure}
Logistic regression is a commonly used classification algorithm. We implement
this algorithm for binary-class problems and use gradient descent with
line search to minimize the \textit{cost} function. This implementation
solely uses the R \textit{base} functions overridden by FlashR (Figure
\ref{logistic}) and can be executed in the existing R framework.
\begin{figure}
\centering
\begin{minted}[mathescape,
fontsize=\footnotesize,
frame=single,
tabsize=2,
]{R}
# X is the data matrix. C is cluster centers.
kmeans <- function(X,C) {
I <- NULL
num.moves > nrow(X)
while (num.moves > 0) {
D <- inner.prod(X, t(C), "euclidean", "+")
old.I <- I
I <- agg.row(D, "which.min")
# Inform FlashR to save data during computation.
I <- set.cache(I, TRUE)
CNT <- groupby.row(rep.int(1, nrow(I)), I, "+")
C <- sweep(groupby.row(X, I, "+"), 2, CNT, "/")
if (!is.null(old.I))
num.moves <- as.vector(sum(old.I != I))
}
}
\end{minted}
\vspace{-10pt}
\caption{A simplified implementation of k-means.}
\label{fig:kmeans}
\vspace{-10pt}
\end{figure}
Figure \ref{fig:kmeans} implements k-means, a popular clustering algorithm
\cite{kmeans}, with GenOps. It uses \textit{inner.prod} to
compute the Euclidean distance between data points and cluster centers
and outputs a matrix whose rows represent the distances to centers.
It uses \textit{agg.row} to find the closest cluster for each data point.
It then uses \textit{groupby.row} to count
the number of data points in each cluster and compute cluster centers.
\subsection{Dense matrices}
FlashR optimizes for dense matrices that are rectangular---with
a longer and shorter dimension---because of their frequent occurrence
in machine learning and statistics. Dense matrices are optimized for
all types of storage, including NUMA memory and SSDs.
\subsubsection{Tall-and-skinny (TAS) matrices}
A data matrix may contain a large number of samples with a few features
(tall-and-skinny),
or a large number of features with a few samples (wide-and-short).
We use similar strategies to optimize these two types of matrices. FlashR
supports both row-major and column-major layouts (Figure \ref{fig:den_mat}(a)
and (b)), which allows FlashR to transpose matrices without a copy.
We store vectors as a one-column TAS matrix.
\begin{figure}
\centering
\includegraphics[scale=0.5]{FlashMatrix_figs/dense_matrix2.pdf}
\vspace{-5pt}
\caption{The format of a tall dense matrix.}
\label{fig:den_mat}
\vspace{-12pt}
\end{figure}
A TAS matrix is partitioned physically into I/O-partitions (Figure
\ref{fig:den_mat}). We refer to the dimension that is partitioned as
the \textit{partition dimension}. All elements in an I/O-partition are stored
contiguously regardless of the data layout. All I/O-partitions
have the same number of rows regardless of the number of columns.
The number of rows in an I/O-partition is $2^i$, where
$i \in \mathbb{N}$. This produces column-major TAS
matrices whose data are well aligned in memory to encourage CPU vectorization.
FlashR stores the I/O partitions of an in-memory matrix in
fixed-size memory chunks (e.g., 64MB) across NUMA nodes.
I/O partitions from different matrices may have different sizes. By storing
I/O partitions in fixed-size memory chunks shared among all in-memory matrices,
FlashR can easily recycle memory and reduce memory allocation overhead.
%Because all in-memory
%matrices are distributed across NUMA nodes in the same fashion,
%FlashR schedules threads to perform computation on I/O partitions
%in the memory close to the processors to increase the memory bandwidth.
FlashR stores an SSD-based matrix as a SAFS file \cite{safs}.
An I/O partition is accessed asynchronously with direct I/O to bypass the Linux
page cache for better I/O performance. We rely on SAFS to map the data of
a matrix evenly across SSDs. By default, we use
a hash function to map data to fully utilize the bandwidth of all SSDs
even if we access only a subset of columns from a TAS matrix.
\subsubsection{Block matrices} \label{sec:block_mat}
FlashR stores a tall matrix as a \textit{block matrix}
(Figure \ref{fig:den_mat}(c)) comprised of TAS blocks with $32$ columns each,
except the last block. Each block is stored as a separate TAS matrix.
We decompose a matrix operation
on a block matrix into operations on individual TAS matrices to take advantage
of the optimizations on TAS matrices and reduce data movement.
Coupled with the I/O partitioning on TAS matrices, this strategy enables
2D-partitioning on a dense matrix and each partition fits in main memory.
%\dz{Should we explain GenOps on block matrices in more details?}
%\subsection{Sparse matrices}
%FlashR supports sparse matrices and optimizes sparse matrices of different
%shapes differently.
%For large sparse matrices that arise from graphs (e.g., social networks
%and Web graphs), which have a large number of rows and columns, FlashR integrates
%with our prior work \cite{SEM_SpMM} that stores sparse matrices
%in a compact format on SSDs and performs sparse matrix multiplication
%in semi-external memory, i.e., we keep the sparse matrix on SSDs and
%the dense matrix or part of the dense matrix in memory.
%Because many graph algorithms can be formulated with sparse matrix multiplication
%\cite{linear_algebra}, we can express these algorithms in FlashR. In contrast,
%for sparse matrices with many rows and few columns or with many columns
%and few rows, FlashR stores the sparse matrices with the coordinate format
%(COO). These sparse matrices can be used in sparse random projection
%\cite{sparse_proj} or to store categorial values for computation.
%FlashR keeps these sparse matrices in memory.
\subsection{Parallelize matrix operations}
When executing matrix operations in parallel, FlashR aims at achieving
good I/O performance and load balancing as well as reducing remote memory access
in a NUMA machine. FlashR evaluates a matrix operation with
a single pass over input data.
For good load balancing and I/O performance, FlashR uses a global task scheduler
to dispatch I/O-partitions to threads sequentially and dynamically. Initially,
the scheduler assigns
multiple contiguous I/O-partitions to a thread. The thread reads them in
a single I/O asynchronously. The number of contiguous I/O-partitions
assigned to a thread is determined by the block size of SAFS.
As the computation nears an end, the scheduler dispatches single I/O-partitions.
The scheduler dispatches I/O-partitions sequentially to increase contiguity
on SSD. When FlashR writes data to SSDs,
contiguity makes it easier for the file system to merge
writes from multiple threads, which helps to sustain write throughput and reduces
write amplification \cite{ripq}.
\begin{figure*}
\centering
\includegraphics[scale=0.5]{FlashMatrix_figs/Parallelize.pdf}
\vspace{-4pt}
\caption{Data flow for the GenOps in Table \ref{tbl:genops} on tall matrices
with 1D partitioning.}
\label{fig:parallel}
\vspace{-8pt}
\end{figure*}
Parallelization strategies in FlashR are based on the matrix operations
and matrix shape because matrix operations have various data dependencies
(Figure \ref{fig:parallel}).
%Currently, FlashR parallelizes matrix operations
%ith 1D partitioning, because most of machine learning datasets have many more
%amples than features.
\noindent \textbf{Operations (a, b, c, d)}: a partition $i$ of the output matrix solely depends
on partitions $i$ of the input matrices. This simplifies
parallelization. FlashR assigns partitions $i$ of all matrices to
the same thread to avoid remote memory access. There is no data sharing
among threads.
\noindent \textbf{Operations (e, f)}: a partition
$i$ of the output matrix still solely depends on a partition
$i$ of the input matrix $A$, but the input matrix
$B$ is shared by all threads. Because $B$ is
read-only, computation does not require synchronization.
$B$ is generally small and FlashR keeps it in memory.
\noindent \textbf{Operations (g, h, i)}: the output matrix contains the aggregation
over all partitions of the input matrices. To parallelize these operations,
each thread maintains a local buffer for partial aggregation results.
FlashR combines all partial results at the end of
the computation.
%The local buffers may grow large in
%\textit{groupby.row} because the size of partial aggregation
%results is determined by the number of different keys in
%the input data. As such, during computation, \textit{groupby.row} merges
%partial aggregation in a local buffer to a global buffer when the size of
%a local buffer reaches a threshold.
\noindent \textbf{Cumulative operations (j)}:
a partition $i$ of the output matrix depends on a partition $i$ of
the input matrix as well as a partition $i-1$ of the output matrix.
Executing this operation in parallel typically requires two passes over
the input data \cite{Ladner1980}. To reduce I/O and fuse this operation
with others (Section \ref{sec:materialize}), FlashR performs
this operation with a single scan over input by taking advantage
of sequential task dispatching and asynchronous I/O. FlashR maintains
and shares a current global accumulated result and a small set of local
accumulated results among all threads. If the data that a partition $i$
depends on is ready, a thread computes this partition. Otherwise,
a thread moves to the next partition $i+1$. If the number
of pending partitions reaches a threshold, a thread sleeps
and waits for all dependency data to become available.
%\dz{NUMA optimizations.}
%dispatch computation tasks in a NUMA-aware fashion.
%\dz{Most of computation requires to access partitions of a matrix sequentially.
%This allows us to virtualize some operations that we don't know the size of
%output in advance. e.g., read text input and filter operation.}
%For a block matrix with many TAS matrices, we parallelize the computation and
%I/O access differently. One of the goals is to reduce memory consumption.
%Instead of getting the row/column range from all TAS matrices before performing
%computation, we read the row/column range from some TAS matrices first, perform
%computation and move on to the next TAS matrices in the same range. This is
%very helpful if the computation is aggregation.
\subsection{Lazy evaluation}\label{sec:lazyeval}
In practice, FlashR almost never evaluates a single matrix operation alone.
Instead, it evaluates matrix operations, such as GenOps,
lazily and constructs directed acyclic graphs (DAG) to represent computation.
Lazy evaluation is essential to achieve substantial performance for a sequence
of matrix operations in a deep memory hierarchy. FlashR grows each DAG as large
as possible and evaluates all matrix operations inside a DAG
in a single parallel execution to increase the ratio of computation to I/O.
With lazily evaluation, matrix operations
output \textit{virtual matrices} that represent the computation result,
instead of storing data physically. In the current implementation,
the only operations that are not lazily evaluated are the ones that
load data from external sources, such as \textit{load.dense}, and the ones
that output matrices
with the size depending on data of the input matrices, such as \textit{unique}
and \textit{table}. An operation on a \textit{block matrix} may output
a block \textit{virtual matrix}.
%\dz{There are two types of virtual matrices. Most of the virtual matrices
%are reporducible (each access returns the same value). The computation in
%the other type of virtual matrices may not be reproduciable and thus, we
%store the computation result once data is generated (e.g., random matrix).}
Some of the matrix operations output matrices with
a different \textit{partition dimension} size than the input matrices and,
in general, forms the edge nodes of a DAG. We denote these matrices as
\textit{sink matrices}. Operations, such as aggregation and groupby,
output \textit{sink matrices}. \textit{Sink matrices} tend to be small and,
once materialized, store results in memory.
%The maximum size of a sink matrix from an aggregation
%is $\sqrt{N}$ for $N$ elements in the input matrix and for groupby
%$k \times \sqrt{N}$ for $k$ groups, where $k$ is usually a small number.
%For most of machine learning and data analysis tasks, the output of inner product
%of a wide matrix with a tall matrix is small because
%the long dimension of these matrices is much larger than the short dimension.
Figure \ref{fig:dag} (a) shows an example of DAG that represents the k-means
computation in a single iteration (Figure \ref{fig:kmeans}).
A DAG comprises a set of
matrix nodes (rectangles) and computation nodes (ellipses). The majority of
matrix nodes are virtual matrices (dashed line rectangles).
In this example, only the input matrix \textit{X} has materialized data.
A computation node references a GenOp and input matrices and
may contain some immutable computation state, such as scalar variables and
small matrices.
FlashR stops constructing a DAG and starts to materialize the computation
in the DAG when encountering the following functions: \textit{(i)}
\textit{materialize} to materialize a virtual matrix;
\textit{(ii)} \textit{as.vector} and \textit{as.matrix} to convert
to R objects; \textit{(iii)} access to individual elements of a sink matrix;
\textit{(iv)} \textit{unique} and \textit{table},
whose output size depends on input data. The first two cases
give users the opportunity to control DAG materialization for better speed,
while the last two cases are implicit DAG materialization to simplify programming.
\subsection{DAG materialization}\label{sec:materialize}
When computation is triggered, we evaluate all operations in a DAG
to increase the ratio of computation to I/O. FlashR fuses all operations in
a DAG into a single parallel execution to reduce data
movement in the memory hierarchy. This data-driven,
operation fusion allows out-of-core problems to approach in-memory speed.
By default, FlashR saves the computation results of all sink matrices of
the DAG in memory and discards the data of non-sink matrices on the fly.
Because sink matrices tend to be small, this rule leads to small memory
consumption. In exceptional cases, especially for iterative algorithms,
it is helpful to save some non-sink matrices to avoid redundant computation
and I/O across iterations. We allow users to
set a flag on any virtual matrix with \textit{set.cache} to
cache data in memory or on SSDs during computation, similar to caching
a resilient distributed dataset (RDD) in Spark \cite{spark}.
Figure \ref{fig:kmeans} shows an example that we cache $I$, a vector of
partition Ids assigned to each data point, for the next iteration.
\subsubsection{Reduce data movement}
FlashR performs memory hierarchy aware execution, when evaluating operations
in a DAG, to reduce data movement
between SSDs and RAM, between NUMA nodes as well as between RAM and CPU cache.
\begin{figure}
\centering
\includegraphics[scale=0.6]{FlashMatrix_figs/kmeans.pdf}
\vspace{-4pt}
\caption{(a) Matrix operations are lazily evaluated to form
a directed-acyclic graph (DAG); (b) The data flow in DAG materialization
with two levels of partitioning: matrix X on SSDs is first partitioned
and is read to memory in I/O partitions; an I/O partition is further
split into processor cache (Pcache) partitions; once a Pcache partition
is materialized, it is passed to the next GenOp to reduce CPU cache misses. }
\label{fig:dag}
\vspace{-8pt}
\end{figure}
FlashR materializes matrix partitions separately in a DAG in most cases.
This is possible because all matrices in a DAG except sink matrices
share the same \textit{partition dimension} and the same I/O partition size.
As illustrated in Figure \ref{fig:dag} (b), a partition $i$ of a virtual
matrix requires data only from partitions
$i$ of the parent matrices. All DAG operations in a partition are processed by
the same thread so that all data required by the computations are stored and
accessed in the memory close to the processor to increase the memory bandwidth
in a NUMA machine.
FlashR uses two-level partitioning on dense matrices to reduce data movement
between SSDs and CPU (Figure \ref{fig:dag} (b)). It reads data on SSDs in
I/O partitions and assigns these partitions to a thread as a parallel task.
It further splits I/O-partitions into processor cache (Pcache) partitions
at runtime. Each thread materializes one Pcache-partition at a time from
a matrix. Regular tall matrices are divided into TAS matrices and matrix
operations are converted to running on these TAS matrices instead. As such,
a Pcache-partition is sufficiently small to fit in the CPU L1/L2 cache.
To reduce CPU cache pollution and reduce data movement between CPU and memory,
a thread performs depth-first traversal in a DAG and evaluates matrix operations
in the order that they are traversed. Each time, a thread performs a matrix
operation on a Pcache partition of a matrix and passes the Pcache partition to
the subsequent matrix operation, instead of materializing the next Pcache partition.
This ensures that a Pcache partition resides in the CPU cache when the next
matrix operation consumes it. In each thread, all intermediate matrices have
only one Pcache partition materialized at any time.
To further reduce CPU cache pollution, FlashR recycles memory buffers used
by Pcache partitions in the CPU cache. FlashR maintains a counter on each
Pcache partition. When the counter indicates the partition has been used
by all subsequent matrix operations, the memory buffer of the partition is
recycled and used to store the output of
the next matrix operation. As such, the next matrix operation writes
its output data in the memory that is already in CPU cache.