views:

114

answers:

5

Is there a way to speed up this 1D convolution ? I tried to make the dy cache efficient but compiling with g++ and -O3 gave worse performances.

I am convolving with [-1. , 0., 1] in both directions. Is not homework.

#include<iostream>
#include<cstdlib>
#include<sys/time.h>

void print_matrix( int height, int width, float *matrix){
    for (int j=0; j < height; j++){
      for (int i=0; i < width; i++){
        std::cout << matrix[j * width + i] << ",";
    }
      std::cout << std::endl;
  }
}

void fill_matrix( int height, int width,  float *matrix){
    for (int j=0; j < height; j++){
      for (int i=0; i < width; i++){
        matrix[j * width + i] = ((float)rand() / (float)RAND_MAX) ;
    }
  }
}

#define RESTRICT __restrict__

void dx_matrix( int height, int width, float * RESTRICT in_matrix,  float * RESTRICT out_matrix, float *min, float *max){
  //init min,max
  *min = *max = -1.F * in_matrix[0] + in_matrix[1]; 

    for (int j=0; j < height; j++){
      float* row = in_matrix + j * width;
      for (int i=1; i < width-1; i++){
        float res = -1.F * row[i-1] + row[i+1]; /* -1.F * value + 0.F * value + 1.F * value; */ 
        if (res > *max ) *max = res;
        if (res < *min ) *min = res;
        out_matrix[j * width + i] = res;
      }
    }
}

void dy_matrix( int height, int width, float * RESTRICT in_matrix,  float * RESTRICT out_matrix, float *min, float *max){
  //init min,max
  *min = *max = -1.F * in_matrix[0] + in_matrix[ width + 1]; 

  for (int j=1; j < height-1; j++){
      for (int i=0; i < width; i++){
        float res = -1.F * in_matrix[ (j-1) * width + i] + in_matrix[ (j+1) * width + i] ;
        if (res > *max ) *max = res;
        if (res < *min ) *min = res;
        out_matrix[j * width + i] =  res;
      }
    }
}

double now (void)                                                                                          
{                                                                                                                    
  struct timeval tv;                                                                                               
  gettimeofday(&tv, NULL);                                                                                         
  return (double)tv.tv_sec + (double)tv.tv_usec / 1000000.0;
}


int main(int argc, char **argv){

  int width, height;
  float *in_matrix;
  float *out_matrix;

  if(argc < 3){
    std::cout  << argv[0] << "usage: width height " << std::endl;
    return -1;
  }

  srand(123);

  width = atoi(argv[1]);
  height = atoi(argv[2]);

  std::cout << "Width:"<< width << " Height:" << height << std::endl;

  if (width < 3){
    std::cout << "Width too short " << std::endl;
    return -1;
  }
  if (height < 3){
    std::cout << "Height too short " << std::endl;
    return -1;
  }

  in_matrix = (float *) malloc( height * width * sizeof(float));
  out_matrix = (float *) malloc( height * width * sizeof(float));

  fill_matrix(height, width, in_matrix);
  //print_matrix(height, width, in_matrix);

  float min, max;

  double a = now();
  dx_matrix(height, width, in_matrix, out_matrix, &min, &max);
  std::cout << "dx min:" << min << " max:" << max << std::endl;

  dy_matrix(height, width, in_matrix, out_matrix, &min, &max);
  double b = now();
  std::cout << "dy min:" << min << " max:" << max << std::endl;
  std::cout << "time: " << b-a << " sec" << std::endl;


  return 0;
}
+1  A: 

Well, the compiler might be taking care of these, but here are a couple of small things:

a) Why are you multiplying by -1.F? Why not just subtract? For instance:

float res = -1.F * row[i-1] + row[i+1];

could just be:

float res = row[i+1] - row[i-1];

b) This:

if (res > *max ) *max = res;
if (res < *min ) *min = res;

can be made into

if (res > *max ) *max = res;
else if (res < *min ) *min = res;

and in other places. If the first is true, the second can't be so let's not check it.

Addition:

Here's another thing. To minimize your multiplications, change

for (int j=1; j < height-1; j++){
  for (int i=0; i < width; i++){
    float res = -1.F * in_matrix[ (j-1) * width + i] + in_matrix[ (j+1) * width + i] ;

to

int h = 0;
int width2 = 2 * width;
for (int j=1; j < height-1; j++){
  h += width;
  for (int i=h; i < h + width; i++){
    float res = in_matrix[i + width2] - in_matrix[i];

and at the end of the loop

    out_matrix[i + width] =  res;

You can do similar things in other places, but hopefully you get the idea. Also, there is a minor bug,

*min = *max = -1.F * in_matrix[0] + in_matrix[ width + 1 ];

should be just in_matrix[ width ] at the end.

Justin Peel
+1 good catches
fabrizioM
Actually, the "else" could make things slower ! If the compiler is smart, in the absence of 'else' it will generate fcmov instructions, which are reasonably fast, but the 'else' will create a potentially unpredictable slow branch. But that presumes that the compiler is smart enough to use fcmov in the first place, and that has to be checked by reviewing the assembly listing.
one could also get rid of "in_matrinx[i+width2]" by taking [i+width2] out and having width2 increased in the loop (remember to reset). "inc/++" is very cheap.
Pasi Savolainen
+1  A: 

First of all, I would rewrite the dy loop to get rid of "[ (j-1) * width + i]" and "in_matrix[ (j+1) * width + i]", and do something like:

  float* p, *q, *out;
 p = &in_matrix[(j-1)*width];
 q = &in_matrix[(j+1)*width];
 out = &out_matrix[j*width];
  for (int i=0; i < width; i++){ 
        float res = -1.F * p[i] + q[i] ; 
        if (res > *max ) *max = res; 
        if (res < *min ) *min = res; 
        out[i] =  res; 
      } 

But that is a trivial optimization that the compiler may already be doing for you.

It will be slightly faster to do "q[i]-p[i]" instead of "-1.f*p[i]+q[i]", but, again, the compiler may be smart enough to do that behind your back.

The whole thing would benefit considerably from SSE2 and multithreading. I'd bet on at least a 3x speedup from SSE2 right away. Multithreading can be added using OpenMP and it will only take a few lines of code.

+1 same as the other post. I think the compiler optimizes the matrix accesses
fabrizioM
A: 

The compiler might notice this but you are creating/freeing a lot of variables on the stack as you go in and out of the scope operators {}. Instead of:

for (int j=0; j < height; j++){ 
      float* row = in_matrix + j * width; 
      for (int i=1; i < width-1; i++){ 
        float res = -1.F * row[i-1] + row[i+1];

How about:

int i, j;
float *row;
float res;

for (j=0; j < height; j++){ 
      row = in_matrix + j * width; 
      for (i=1; i < width-1; i++){ 
        res = -1.F * row[i-1] + row[i+1];
No one in particular
no, they will go into the registers
fabrizioM
+1  A: 

Use local variables for computing the min and max. Every time you do this:

if (res > *max ) *max = res;
if (res < *min ) *min = res;

max and min have to get written to memory. Adding restrict on the pointers would help (indicating the writes are independent), but an even better way would be something like

//Setup
float tempMin = ...
float tempMax = ...
...
    // Inner loop
    tempMin = (res < tempMin) ? res : tempMin;
    tempMax = (res > tempMax) ? res : tempMax;
...
// End
*min = tempMin;
*max = tempMax;
celion
+1  A: 

Profiling this with -O3 and -O2 using versions of both the clang and g++ compilers on OS X, I found that

30% of the time was spent filling the initial matrix

  matrix[j * width + i] = ((float)rand() / (float)RAND_MAX) ;

40% of the time was spent in dx_matrix on the line.

  out_matrix[j * width + i] = row[i+1] -row[i-1];

About 9% of the time was spent in the conditionals in dx_matrix .. I separated them into a separate loop to see if that helped, but it didn't change anything much.

Shark gave the suggestion that this could be improved through the use of SSE instructions.

Interestingly only about 19% of the time was spent in the dy_matrix routine.

This was running on 10k by 10k matrix ( about 1.6 seconds )

Note your results may be different if you're using a different compiler, different OS etc.

Michael Anderson
interesting how did you get those informations ? shark ? steps to reproduce?
fabrizioM
Shark is a profiling tool for OS X. I'm not sure what a direct equivalents under other OSes are - but you might try cachegrind in the valgrind suite if you're on linux, or gprof. I simply ran "shark -i -1 ./a.out" followed by opening the resulting profile reuslt.
Michael Anderson