views:

448

answers:

5

I have a simulation that has a huge aggregate and combine step right in the middle. I prototyped this process using plyr's ddply() function which works great for a huge percentage of my needs. But I need this aggregation step to be faster since I have to run 10K simulations. I'm already scaling the simulations in parallel but if this one step were faster I could greatly decrease the number of nodes I need.

Here's a reasonable simplification of what I am trying to do:

library(Hmisc)

# Set up some example data
year <-    sample(1970:2008, 1e6, rep=T)
state <-   sample(1:50, 1e6, rep=T)
group1 <-  sample(1:6, 1e6, rep=T)
group2 <-  sample(1:3, 1e6, rep=T)
myFact <-  rnorm(100, 15, 1e6)
weights <- rnorm(1e6)
myDF <- data.frame(year, state, group1, group2, myFact, weights)

# this is the step I want to make faster
system.time(aggregateDF <- ddply(myDF, c("year", "state", "group1", "group2"),
                     function(df) wtd.mean(df$myFact, weights=df$weights)
                                 )
           )

All tips or suggestions are appreciated!

+2  A: 

Are you using the latest version of plyr (note: this hasn't made it to all the CRAN mirrors yet)? If so, you could just run this in parallel.

Here's the llply example, but the same should apply to ddply:

  x <- seq_len(20)
  wait <- function(i) Sys.sleep(0.1)
  system.time(llply(x, wait))
  #  user  system elapsed 
  # 0.007   0.005   2.005 

  library(doMC)
  registerDoMC(2) 
  system.time(llply(x, wait, .parallel = TRUE))
  #  user  system elapsed 
  # 0.020   0.011   1.038 

Edit:

Well, other looping approaches are worse, so this probably requires either (a) C/C++ code or (b) a more fundamental rethinking of how you're doing it. I didn't even try using by() because that's very slow in my experience.

groups <- unique(myDF[,c("year", "state", "group1", "group2")])
system.time(
aggregateDF <- do.call("rbind", lapply(1:nrow(groups), function(i) {
   df.tmp <- myDF[myDF$year==groups[i,"year"] & myDF$state==groups[i,"state"] & myDF$group1==groups[i,"group1"] & myDF$group2==groups[i,"group2"],]
   cbind(groups[i,], wtd.mean(df.tmp$myFact, weights=df.tmp$weights))
}))
)

aggregateDF <- data.frame()
system.time(
for(i in 1:nrow(groups)) {
   df.tmp <- myDF[myDF$year==groups[i,"year"] & myDF$state==groups[i,"state"] & myDF$group1==groups[i,"group1"] & myDF$group2==groups[i,"group2"],]
   aggregateDF <- rbind(aggregateDF, data.frame(cbind(groups[i,], wtd.mean(df.tmp$myFact, weights=df.tmp$weights))))
}
)
Shane
JD Long
+10  A: 

Instead of the normal R data frame, you can use a immutable data frame which returns pointers to the original when you subset and can be much faster:

idf <- idata.frame(myDF)
system.time(aggregateDF <- ddply(idf, c("year", "state", "group1", "group2"),
   function(df) wtd.mean(df$myFact, weights=df$weights)))

#    user  system elapsed 
# 18.032   0.416  19.250 

If I was to write a plyr function customised exactly to this situation, I'd do something like this:

system.time({
  ids <- id(myDF[c("year", "state", "group1", "group2")], drop = TRUE)
  data <- as.matrix(myDF[c("myFact", "weights")])
  indices <- plyr:::split_indices(seq_len(nrow(data)), ids, n = attr(ids, "n"))

  fun <- function(rows) {
    weighted.mean(data[rows, 1], data[rows, 2])
  }
  values <- vapply(indices, fun, numeric(1))

  labels <- myDF[match(seq_len(attr(ids, "n")), ids), 
    c("year", "state", "group1", "group2")]
  aggregateDF <- cbind(labels, values)
})

# user  system elapsed 
# 2.04    0.29    2.33 

It's so much faster because it avoids copying the data, only extracting the subset needed for each computation when it's computed. Switching the data to matrix form gives another speed boost because matrix subsetting is much faster than data frame subsetting.

hadley
`idata.frame` was added in plyr 1.0.
hadley
I had messed around with making indexes and such with data.table and had all but given up on that idea. I was hoping for 50% improvement. This far exceeds my expectations.
JD Long
having a little trouble making this run right... But I'm learning as I go... I had changed data to myDF but not sure where the issue is..
JD Long
the code above seems to be missing the creation of the matrix 'data' (if I'm reading this right) possibly a data <- as.matrix(myDF[5:6]) at the top?
JD Long
Ooops, yes, fixed now.
hadley
+4  A: 

I would profile with base R

g <- with(myDF, paste(year, state, group1, group2))
x <- with(myDF, c(tapply(weights * myFact, g, sum) / tapply(weights, g, sum)))
aggregateDF <- myDF[match(names(x), g), c("year", "state", "group1", "group2")]
aggregateDF$V1 <- x

On my machine it takes 5sec compare to 67sec with original code.

EDIT Just found another speed up with rowsum function:

g <- with(myDF, paste(year, state, group1, group2))
X <- with(myDF, rowsum(data.frame(a=weights*myFact, b=weights), g))
x <- X$a/X$b
aggregateDF2 <- myDF[match(rownames(X), g), c("year", "state", "group1", "group2")]
aggregateDF2$V1 <- x

It takes 3sec!

Marek
Second one takes 5 seconds on my computer, so plyr is still narrowly beating base ;) (Plus it orders the rows correctly)
hadley
But thanks for the pointer to `rowsum` - it's so hard to keep up with the plethora of aggregation functions in base R.
hadley
I knew there had to be a tapply way of doing this as well but I was struggling to figure it out. I generally have this struggle with the apply family.
JD Long
@hadley Agree. Some time ago I found replacement for `apply(X,1,which.max)` in `col.max`. I wonder how many other functions this type exists (like `pmax`/`pmin`), optimized for matrices objects by using `.Internal` level.
Marek
@Marek: See http://4dpiecharts.com/2010/09/14/which-functions-in-r-base-call-internal-code/
Richie Cotton
@Richie It's exactly what I plan to do in the weekend :) And the simplicity of this code is outstanding
Marek
+3  A: 

I usually use an index vector with tapply when the function being applied has multiple vector args:

system.time(tapply(1:nrow(myDF), myDF[c('year', 'state', 'group1', 'group2')], function(s) weighted.mean(myDF$myFact[s], myDF$weights[s])))
# user  system elapsed 
# 1.36    0.08    1.44 

I use a simple wrapper which is equivalent but hides the mess:

tmapply(list(myDF$myFact, myDF$weights), myDF[c('year', 'state', 'group1', 'group2')], weighted.mean)
Charles
that's very nifty to see that done in base R. Thank you!
JD Long
Just to add: `as.data.frame(as.table(RESULTS))` it's easy way to create `data.frame` from output.
Marek
+2  A: 

Further 2x speedup and more concise code:

library(data.table)
dtb <- data.table(myDF, key="year, state, group1, group2")
system.time( 
  res <- dtb[, weighted.mean(myFact, weights), by=list(year, state, group1, group2)] 
)
#   user  system elapsed 
#  0.950   0.050   1.007 

My first post, so please be nice ;)

datasmurf
Good post! Thanks for the answer. To be consistent with the other methods, however, the step that creates the data table and index should be inside of the system.time() step.
JD Long
Indeed, but it remains the fastest though. It would be nice to have an option in ddply to operate on data.tables or use data.tables under the hood (I just discovered data.table by looking for solutions to the very same problem, but I would prefer a more ddply-like syntax for this case).
datasmurf