views:

541

answers:

8

I'm writing a sparse matrix solver using the Gauss-Seidel method. By profiling, I've determined that about half of my program's time is spent inside the solver. The performance-critical part is as follows:

size_t ic = d_ny + 1, iw = d_ny, ie = d_ny + 2, is = 1, in = 2 * d_ny + 1;
for (size_t y = 1; y < d_ny - 1; ++y) {
    for (size_t x = 1; x < d_nx - 1; ++x) {
        d_x[ic] = d_b[ic]
                - d_w[ic] * d_x[iw] - d_e[ic] * d_x[ie]
                - d_s[ic] * d_x[is] - d_n[ic] * d_x[in];
        ++ic; ++iw; ++ie; ++is; ++in;
    }
    ic += 2; iw += 2; ie += 2; is += 2; in += 2;
}

All arrays involved are of float type. Actually, they are not arrays but objects with an overloaded [] operator, which (I think) should be optimized away, but is defined as follows:

inline float &operator[](size_t i) { return d_cells[i]; }
inline float const &operator[](size_t i) const { return d_cells[i]; }

For d_nx = d_ny = 128, this can be run about 3500 times per second on an Intel i7 920. This means that the inner loop body runs 3500 * 128 * 128 = 57 million times per second. Since only some simple arithmetic is involved, that strikes me as a low number for a 2.66 GHz processor.

Maybe it's not limited by CPU power, but by memory bandwidth? Well, one 128 * 128 float array eats 65 kB, so all 6 arrays should easily fit into the CPU's L3 cache (which is 8 MB). Assuming that nothing is cached in registers, I count 15 memory accesses in the inner loop body. On a 64-bits system this is 120 bytes per iteration, so 57 million * 120 bytes = 6.8 GB/s. The L3 cache runs at 2.66 GHz, so it's the same order of magnitude. My guess is that memory is indeed the bottleneck.

To speed this up, I've attempted the following:

  • Compile with g++ -O3. (Well, I'd been doing this from the beginning.)

  • Parallelizing over 4 cores using OpenMP pragmas. I have to change to the Jacobi algorithm to avoid reads from and writes to the same array. This requires that I do twice as many iterations, leading to a net result of about the same speed.

  • Fiddling with implementation details of the loop body, such as using pointers instead of indices. No effect.

What's the best approach to speed this guy up? Would it help to rewrite the inner body in assembly (I'd have to learn that first)? Should I run this on the GPU instead (which I know how to do, but it's such a hassle)? Any other bright ideas?

(N.B. I do take "no" for an answer, as in: "it can't be done significantly faster, because...")

Update: as requested, here's a full program:

#include <iostream>
#include <cstdlib>
#include <cstring>

using namespace std;

size_t d_nx = 128, d_ny = 128;
float *d_x, *d_b, *d_w, *d_e, *d_s, *d_n;

void step() {
    size_t ic = d_ny + 1, iw = d_ny, ie = d_ny + 2, is = 1, in = 2 * d_ny + 1;
    for (size_t y = 1; y < d_ny - 1; ++y) {
        for (size_t x = 1; x < d_nx - 1; ++x) {
            d_x[ic] = d_b[ic]
                - d_w[ic] * d_x[iw] - d_e[ic] * d_x[ie]
                - d_s[ic] * d_x[is] - d_n[ic] * d_x[in];
            ++ic; ++iw; ++ie; ++is; ++in;
        }
        ic += 2; iw += 2; ie += 2; is += 2; in += 2;
    }
}

void solve(size_t iters) {
    for (size_t i = 0; i < iters; ++i) {
        step();
    }
}

void clear(float *a) {
    memset(a, 0, d_nx * d_ny * sizeof(float));
}

int main(int argc, char **argv) {
    size_t n = d_nx * d_ny;
    d_x = new float[n]; clear(d_x);
    d_b = new float[n]; clear(d_b);
    d_w = new float[n]; clear(d_w);
    d_e = new float[n]; clear(d_e);
    d_s = new float[n]; clear(d_s);
    d_n = new float[n]; clear(d_n);
    solve(atoi(argv[1]));
    cout << d_x[0] << endl; // prevent the thing from being optimized away
}

I compile and run it as follows:

$ g++ -o gstest -O3 gstest.cpp
$ time ./gstest 8000
0

real    0m1.052s
user    0m1.050s
sys     0m0.010s

(It does 8000 instead of 3500 iterations per second because my "real" program does a lot of other stuff too. But it's representative.)

Update 2: I've been told that unititialized values may not be representative because NaN and Inf values may slow things down. Now clearing the memory in the example code. It makes no difference for me in execution speed, though.

+1  A: 

Could you create a mini-app (VC++ 2008 here) so we can try it? Trying to figure a way to optimize just by looking at it doesn't make sense, as there are very complex calculations in this one.

Poni
A good idea, I'll get on it, but this should be a comment and not an answer.
Thomas
Sorry, I'm new here, and couldn't (still can't) see the "add comment" button below your question.
Poni
Fyi, you're not allowed to leave comments until you have 50 rep.
James Kolpack
Code has been added. Say sensible things and rep will flow your way! ;)
Thomas
+1  A: 

I think you're right about memory being a bottleneck. It's a pretty simple loop with just some simple arithmetic per iteration. the ic, iw, ie, is, and in indices seem to be on opposite sides of the matrix so i'm guessing that there's a bunch of cache misses there.

miked
+5  A: 

Hello!

I think I've managed to optimize it, here's a code, create a new project in VC++, add this code and simply compile under "Release".

#include <iostream>
#include <cstdlib>
#include <cstring>

#define _WIN32_WINNT 0x0400
#define WIN32_LEAN_AND_MEAN
#include <windows.h>

#include <conio.h>

using namespace std;

size_t d_nx = 128, d_ny = 128;
float *d_x, *d_b, *d_w, *d_e, *d_s, *d_n;

void step_original() {
    size_t ic = d_ny + 1, iw = d_ny, ie = d_ny + 2, is = 1, in = 2 * d_ny + 1;
    for (size_t y = 1; y < d_ny - 1; ++y) {
        for (size_t x = 1; x < d_nx - 1; ++x) {
            d_x[ic] = d_b[ic]
                - d_w[ic] * d_x[iw] - d_e[ic] * d_x[ie]
                - d_s[ic] * d_x[is] - d_n[ic] * d_x[in];
            ++ic; ++iw; ++ie; ++is; ++in;
        }
        ic += 2; iw += 2; ie += 2; is += 2; in += 2;
    }
}
void step_new() {
    //size_t ic = d_ny + 1, iw = d_ny, ie = d_ny + 2, is = 1, in = 2 * d_ny + 1;
    float
        *d_b_ic,
        *d_w_ic,
        *d_e_ic,
        *d_x_ic,
        *d_x_iw,
        *d_x_ie,
        *d_x_is,
        *d_x_in,
        *d_n_ic,
        *d_s_ic;

    d_b_ic = d_b;
    d_w_ic = d_w;
    d_e_ic = d_e;
    d_x_ic = d_x;
    d_x_iw = d_x;
    d_x_ie = d_x;
    d_x_is = d_x;
    d_x_in = d_x;
    d_n_ic = d_n;
    d_s_ic = d_s;

    for (size_t y = 1; y < d_ny - 1; ++y)
    {
        for (size_t x = 1; x < d_nx - 1; ++x)
        {
            /*d_x[ic] = d_b[ic]
                - d_w[ic] * d_x[iw] - d_e[ic] * d_x[ie]
                - d_s[ic] * d_x[is] - d_n[ic] * d_x[in];*/
            *d_x_ic = *d_b_ic
                - *d_w_ic * *d_x_iw - *d_e_ic * *d_x_ie
                - *d_s_ic * *d_x_is - *d_n_ic * *d_x_in;
            //++ic; ++iw; ++ie; ++is; ++in;
            d_b_ic++;
            d_w_ic++;
            d_e_ic++;
            d_x_ic++;
            d_x_iw++;
            d_x_ie++;
            d_x_is++;
            d_x_in++;
            d_n_ic++;
            d_s_ic++;
        }
        //ic += 2; iw += 2; ie += 2; is += 2; in += 2;
        d_b_ic += 2;
        d_w_ic += 2;
        d_e_ic += 2;
        d_x_ic += 2;
        d_x_iw += 2;
        d_x_ie += 2;
        d_x_is += 2;
        d_x_in += 2;
        d_n_ic += 2;
        d_s_ic += 2;
    }
}

void solve_original(size_t iters) {
    for (size_t i = 0; i < iters; ++i) {
        step_original();
    }
}
void solve_new(size_t iters) {
    for (size_t i = 0; i < iters; ++i) {
        step_new();
    }
}

void clear(float *a) {
    memset(a, 0, d_nx * d_ny * sizeof(float));
}

int main(int argc, char **argv) {
    size_t n = d_nx * d_ny;
    d_x = new float[n]; clear(d_x);
    d_b = new float[n]; clear(d_b);
    d_w = new float[n]; clear(d_w);
    d_e = new float[n]; clear(d_e);
    d_s = new float[n]; clear(d_s);
    d_n = new float[n]; clear(d_n);

    if(argc < 3)
        printf("app.exe (x)iters (o/n)algo\n");

    bool bOriginalStep = (argv[2][0] == 'o');
    size_t iters = atoi(argv[1]);

    /*printf("Press any key to start!");
    _getch();
    printf(" Running speed test..\n");*/

    __int64 freq, start, end, diff;
    if(!::QueryPerformanceFrequency((LARGE_INTEGER*)&freq))
        throw "Not supported!";
    freq /= 1000000; // microseconds!
    {
        ::QueryPerformanceCounter((LARGE_INTEGER*)&start);
        if(bOriginalStep)
            solve_original(iters);
        else
            solve_new(iters);
        ::QueryPerformanceCounter((LARGE_INTEGER*)&end);
        diff = (end - start) / freq;
    }
    printf("Speed (%s)\t\t: %u\n", (bOriginalStep ? "original" : "new"), diff);
    //_getch();


    //cout << d_x[0] << endl; // prevent the thing from being optimized away
}

Run it like this:

app.exe 10000 o

app.exe 10000 n

"o" means old code, yours.

"n" is mine, the new one.

My results: Speed (original):

1515028

1523171

1495988

Speed (new):

966012

984110

1006045

Improvement of about 30%.

The logic behind: You've been using index counters to access/manipulate. I use pointers. While running, breakpoint at a certain calculation code line in VC++'s debugger, and press F8. You'll get the disassembler window. The you'll see the produced opcodes (assembly code).

Anyway, look:

int *x = ...;

x[3] = 123;

This tells the PC to put the pointer x at a register (say EAX). The add it (3 * sizeof(int)). Only then, set the value to 123.

The pointers approach is much better as you can understand, because we cut the adding process, actually we handle it ourselves, thus able to optimize as needed.

I hope this helps.

Sidenote to stackoverflow.com's staff: Great website, I hope I've heard of it long ago!

Poni
Hey! I thought I tried that. But it seems I overlooked something. g++ even manages to make it twice as fast as the originial! Still, not as much as I'd hoped... I'm open for more upgrades...
Thomas
I might look into it later on. Twice as fast sounds great, if you will do it faster please let me know how.
Poni
What's the final aim? If I'd know you goal then there could be another way. Anyway, is this going to have static values of some sort? If so, I'd make a pre-calculated table/array with the values - you can't get any faster than that...
Poni
It's in a fluid simulation. I'm not actually doing 5000 iterations each second for the same linear system, but I'm doing 50 or so to solve for each time step, and then the values change.
Thomas
Combining this with Eamon's trick and a `float old_x_ic;` inside the y-loop that's used to store the `d_x[ic]` value for the next iteration when it becomes `d_x[iw]` (ditching `iw` entirely from the inner loop) cuts another 10% or so off the time for me, by the way.
Rex Kerr
+3  A: 

For one thing, there seems to be a pipelining issue here. The loop reads from the value in d_x that has just been written to, but apparently it has to wait for that write to complete. Just rearranging the order of the computation, doing something useful while it's waiting, makes it almost twice as fast:

d_x[ic] = d_b[ic]
                        - d_e[ic] * d_x[ie]
    - d_s[ic] * d_x[is] - d_n[ic] * d_x[in]
    - d_w[ic] * d_x[iw] /* d_x[iw] has just been written to, process this last */;

It was Eamon Nerbonne who figured this out. Many upvotes to him! I would never have guessed.

Thomas
Quite welcome ;-)
Eamon Nerbonne
Similarly, adding the "restrict" keyword on d_x might help also. This tells the compiler that d_e, etc. are separate arrays, so the store to d_x doesn't affect the reads from d_e the next time through the loop.
celion
+1  A: 

Poni's answer looks like the right one to me.

I just want to point out that in this type of problem, you often gain benefits from memory locality. Right now, the b,w,e,s,n arrays are all at separate locations in memory. If you could not fit the problem in L3 cache (mostly in L2), then this would be bad, and a solution of this sort would be helpful:

size_t d_nx = 128, d_ny = 128;
float *d_x;

struct D { float b,w,e,s,n; };
D *d;

void step() {
    size_t ic = d_ny + 1, iw = d_ny, ie = d_ny + 2, is = 1, in = 2 * d_ny + 1;
    for (size_t y = 1; y < d_ny - 1; ++y) {
        for (size_t x = 1; x < d_nx - 1; ++x) {
            d_x[ic] = d[ic].b
                - d[ic].w * d_x[iw] - d[ic].e * d_x[ie]
                - d[ic].s * d_x[is] - d[ic].n * d_x[in];
            ++ic; ++iw; ++ie; ++is; ++in;
        }
        ic += 2; iw += 2; ie += 2; is += 2; in += 2;
    }
}
void solve(size_t iters) { for (size_t i = 0; i < iters; ++i) step(); }
void clear(float *a) { memset(a, 0, d_nx * d_ny * sizeof(float)); }

int main(int argc, char **argv) {
    size_t n = d_nx * d_ny;
    d_x = new float[n]; clear(d_x);
    d = new D[n]; memset(d,0,n * sizeof(D));
    solve(atoi(argv[1]));
    cout << d_x[0] << endl; // prevent the thing from being optimized away
}

For example, this solution at 1280x1280 is a little less than 2x faster than Poni's solution (13s vs 23s in my test--your original implementation is then 22s), while at 128x128 it's 30% slower (7s vs. 10s--your original is 10s).

(Iterations were scaled up to 80000 for the base case, and 800 for the 100x larger case of 1280x1280.)

Rex Kerr
+1  A: 

I'm no expert on the subject, but I've seen that there are several academic papers on improving the cache usage of the Gauss-Seidel method.

Another possible optimization is the use of the red-black variant, where points are updated in two sweeps in a chessboard-like pattern. In this way, all updates in a sweep are independent and can be parallelized.

Roberto Bonvallet
A: 

I suggest putting in some prefetch statements and also researching "data oriented design":

void step_original() {
    size_t ic = d_ny + 1, iw = d_ny, ie = d_ny + 2, is = 1, in = 2 * d_ny + 1;
    float dw_ic, dx_ic, db_ic, de_ic, dn_ic, ds_ic;
    float dx_iw, dx_is, dx_ie, dx_in, de_ic, db_ic;
    for (size_t y = 1; y < d_ny - 1; ++y) {
        for (size_t x = 1; x < d_nx - 1; ++x) {
// Perform the prefetch
// Sorting these statements by array may increase speed;
//    although sorting by index name may increase speed too.
            db_ic = d_b[ic];
            dw_ic = d_w[ic];
            dx_iw = d_x[iw];
            de_ic = d_e[ic];
            dx_ie = d_x[ie];
            ds_ic = d_s[ic];
            dx_is = d_x[is];
            dn_ic = d_n[ic];
            dx_in = d_x[in];
// Calculate
            d_x[ic] = db_ic
                - dw_ic * dx_iw - de_ic * dx_ie
                - ds_ic * dx_is - dn_ic * dx_in;
            ++ic; ++iw; ++ie; ++is; ++in;
        }
        ic += 2; iw += 2; ie += 2; is += 2; in += 2;
    }
}

This differs from your second method since the values are copied to local temporary variables before the calculation is performed.

Thomas Matthews
Did you actually try this out? It isn't any faster for me.
Rex Kerr
+4  A: 

Couple of ideas:

  1. Use SIMD. You could load 4 floats at a time from each array into a SIMD register (e.g. SSE on Intel, VMX on PowerPC). The disadvantage of this is that some of the d_x values will be "stale" so your convergence rate will suffer (but not as bad as a jacobi iteration); it's hard to say whether the speedup offsets it.

  2. Use SOR. It's simple, doesn't add much computation, and can improve your convergence rate quite well, even for a relatively conservative relaxation value (say 1.5).

  3. Use conjugate gradient. If this is for the projection step of a fluid simulation (i.e. enforcing non-compressability), you should be able to apply CG and get a much better convergence rate. A good preconditioner helps even more.

  4. Use a specialized solver. If the linear system arises from the Poisson equation, you can do even better than conjugate gradient using an FFT-based methods.

If you can explain more about what the system you're trying to solve looks like, I can probably give some more advice on #3 and #4.

celion
Great answer!You seem to be clairvoyant. It's indeed a Poisson solver for the pressure of an incompressible fluid. However, due to the boundary conditions on the free surface, the matrix is not symmetric, although it is diagonally dominant. CG can only be used on symmetric matrices, right?I've briefly looked into FFT methods but my irregular domain seems to make them useless, too.
Thomas
SOR makes much more of a difference than I thought! I can now get away with 10 iterations where I previously needed 50 or so, even though the individual iterations become 20% slower. I'll see if I can add some code to optimize the relaxation factor automatically. Lesson learned: work smarter, not harder :)
Thomas
I was having flashbacks to my CFD class in grad school :) I'm a little rusty on my non-symmetric solvers, but you should check out GMRES and/or biconjugate gradients - either one should outperform SOR. And once you've got that working, you might be able to precondition the system by approximating it as symmetric and using an FFT solver. As for the irregular domain, I've never done anything with them, but I seem to recall "domain decomposition" methods that will let you break up your domain into easier ones, solve those, then recombine to get the full answer.
celion
Also, do you have a link to what you're working on? Now I'm curious :)
celion
Since you seem to know a lot about this stuff, could you also have a look at my follow-up question? Thanks! http://stackoverflow.com/questions/2392932/why-does-the-speed-of-this-sor-solver-depend-on-the-input What I'm working on is a realtime 2D incompressible free-surface fluid simulator. I don't have an URL or something, yet, sorry...
Thomas
You can implement the boundary conditions symmetrically, and you can use SOR as a preconditioner for any Krylov method, but it is more effective as a smoother for multigrid (which may itself be a preconditioner for a Krylov method). These things become important on anisotropic meshes and at higher resolution, such as when trying to resolve a boundary layer. A standard multigrid should converge in 1 or 2 iterations (independent of mesh resolution) for this problem.
Jed