• Home
  • About
    • Anirban photo
    • About Me
    • Email
  • Blog Posts
    • Writings
    • Tags
  • Skill Set

Development of data.table.threads following a rubric

I’m writing this blog post to briefly discuss the improvements I made to my data.table.threads code by following this rubric that Toby created for a course at NAU.

Before diving into lessons learned, I think it would be good to share the before/after versions of the codebase. Here is what the code looks like: A) before following points from the rubric:

runBenchmarks <- function(rowCount, colCount, threadCount)
{
    setDTthreads(threadCount)
    dt <- data.table(matrix(runif(rowCount * colCount), nrow = rowCount, ncol = colCount))
    threadLabel <- ifelse(threadCount == 1, "thread", "threads")
    cat(sprintf("Running benchmarks with %d %s, %d rows, and %d columns.\n", getDTthreads(), threadLabel, rowCount, colCount))
    
    benchmarks <- microbenchmark(
      forder = setorder(dt, V1),
      GForce_sum = dt[, .(sum(V1))],
      subsetting = dt[dt[[1]] > 0.5, ],
      frollmean = frollmean(dt[[1]], 10),
      fcoalesce = fcoalesce(dt[[1]], dt[[2]]),
      between = dt[dt[[1]] %between% c(0.4, 0.6)],
      fifelse = fifelse(dt[[1]] > 0.5, dt[[1]], 0),
      nafill = nafill(dt[[1]], type = "const", fill = 0),
      CJ = CJ(sample(rowCount, size = min(rowCount, 5)), sample(colCount, size = min(colCount, 5))),
      times = 100
    )
    
    benchmark_summary <- summary(benchmarks)
    medianTime <- benchmark_summary$median
    names(medianTime) <- benchmark_summary$expr
    
    return(data.frame(threadCount = threadCount, expr = names(medianTime), medianTime = medianTime))
  }  
  
  findOptimalThreadCount <- function(rowCount, colCount) {
    
    setDTthreads(0)
    maxThreads <- getDTthreads()
    results <- list()
    
    for (threadCount in 1:maxThreads) {
      results[[threadCount]] <- runBenchmarks(rowCount, colCount, threadCount)
    }
    
    result.list <- do.call(rbind, results)
    class(result.list) <- "data_table_threads_benchmark"
    result.list
}

print.data_table_threads_benchmark <- function(x, ...)
{
  df <- data.frame(expr = x$expr, threadCount = x$threadCount, meanTime = x$medianTime)
  
  fastestMedianTime <- aggregate(medianTime ~ expr, data = df, FUN = min)
  bestPerformingThreadCount <- df[df$expr %in% fastestMedianTime$expr & df$medianTime %in% fastestMedianTime$medianTime, "threadCount"]
  results <- data.frame(expr = fastestMedianTime$expr, medianTime = fastestMedianTime$medianTime, threadCount = bestPerformingThreadCount)
  
  cat(sprintf("%-20s %-23s %-12s\n", "data.table function", "Fastest runtime (median)", "Thread count"))
  cat(rep("-", 29), "\n")
  
  for(i in seq_len(nrow(results))) 
  {
    cat(sprintf("%-20s %-23f %-12d\n", results$expr[i], results$medianTime[i], results$threadCount[i]))
  }
}
  
plot.data_table_threads_benchmark <- function(x, ...)
{
    df <- x
    rownames(df) <- NULL
    df$speedup <- df$medianTime[df$threadCount == 1] / df$medianTime
    
    setDT(df)
    maxSpeedup <- df[, .(threadCount = threadCount[which.max(speedup)], speedup = max(speedup)), by = expr]
    idealSpeedup <- seq(1, getDTthreads())
    idealSpeedupData <- data.frame(threadCount = 1:getDTthreads(), speedup = idealSpeedup)
    subOptimalSpeedupData <- data.frame(threadCount = seq(1, getDTthreads(), length.out = getDTthreads()), speedup = seq(1, getDTthreads()/2, length.out = getDTthreads()))
    
    closestPoints <- data.frame()
    for(i in unique(df$expr)) {
      dfSubset <- df[df$expr == i, ]
      suboptimalSubset <- subOptimalSpeedupData[subOptimalSpeedupData$threadCount %in% dfSubset$threadCount, ]
      closestPoint <- dfSubset[which.max(dfSubset$speedup - suboptimalSubset$speedup), ]
      closestPoints <- rbind(closestPoints, closestPoint)
    }
    
    ggplot(df, aes(x = threadCount, y = speedup)) +
      geom_line(aes(linetype = "Measured")) +
      geom_line(data = idealSpeedupData, aes(x = threadCount, y = speedup, linetype = "Ideal"), color = "black") +
      geom_line(data = subOptimalSpeedupData, aes(x = threadCount, y = speedup, linetype = "Sub-optimal"), color = "black") +
      geom_point(data = closestPoints, aes(x = threadCount, y = speedup, shape = "Recommended"), color = "black", size = 2) +
      geom_point(data = maxSpeedup, aes(x = threadCount, y = speedup, shape = "Best performing"), color = "red", size = 2) +
      geom_text(data = closestPoints, aes(label = threadCount), vjust = -0.5, size = 4, na.rm = TRUE) +
      geom_text(data = maxSpeedup, aes(label = threadCount), vjust = -0.5, size = 4, na.rm = TRUE) +
      geom_ribbon(aes(ymin = speedup - 0.3, ymax = speedup + 0.3), alpha = 0.5) +
      facet_wrap(. ~ expr) +
      coord_equal() +
      labs(x = "Threads", y = "Speedup", title = "data.table functions", linetype = "Legend") +
      theme(plot.title = element_text(hjust = 0.5)) +
      scale_x_continuous(breaks = 1:getDTthreads(), labels = 1:getDTthreads()) +
      scale_linetype_manual(values = c("Measured" = "solid", "Ideal" = "dashed", "Sub-optimal" = "dotted"), guide = "legend") +
      scale_shape_manual(values = c("Recommended" = 16, "Best performing" = 19)) +
      guides(linetype = guide_legend(override.aes = list(fill = NA), title = "Speedup"), shape = guide_legend(override.aes = list(fill = NA), title = "Thread count"))
}

B) and finally, after following points in the rubric:

runBenchmarks <- function(rowCount, colCount, threadCount)
{
    setDTthreads(threadCount)
    dt <- data.table(matrix(runif(rowCount * colCount), nrow = rowCount, ncol = colCount))
    threadLabel <- ifelse(threadCount == 1, "thread", "threads")
    cat(sprintf("Running benchmarks with %d %s, %d rows, and %d columns.\n", getDTthreads(), threadLabel, rowCount, colCount))
    
    benchmarks <- microbenchmark(
      forder = setorder(dt, V1),
      GForce_sum = dt[, .(sum(V1))],
      subsetting = dt[dt[[1]] > 0.5, ],
      frollmean = frollmean(dt[[1]], 10),
      fcoalesce = fcoalesce(dt[[1]], dt[[2]]),
      between = dt[dt[[1]] %between% c(0.4, 0.6)],
      fifelse = fifelse(dt[[1]] > 0.5, dt[[1]], 0),
      nafill = nafill(dt[[1]], type = "const", fill = 0),
      CJ = CJ(sample(rowCount, size = min(rowCount, 5)), sample(colCount, size = min(colCount, 5))),
      times = 100
    )
    
    benchmarkSummary <- summary(benchmarks)
    medianTime <- benchmarkSummary$median
    exprNames <- benchmarkSummary$expr
    
    data.table(threadCount = threadCount, expr = exprNames, medianTime = medianTime)
}    

findOptimalThreadCount <- function(rowCount, colCount) 
{
  setDTthreads(0)
  maxThreads <- getDTthreads()
  results <- list()
  
  for (threadCount in 1:maxThreads) {
    results[[threadCount]] <- runBenchmarks(rowCount, colCount, threadCount)
  }
  
  results.dt <- rbindlist(results)
  setattr(results.dt, "class", c("data_table_threads_benchmark", class(results.dt)))
  results.dt
}

print.data_table_threads_benchmark <- function(x, ...)
{
  fastestMedianTime <- x[, .(medianTime = min(medianTime)), by = expr]
  bestPerformingThreadCount <- x[fastestMedianTime, on = .(expr, medianTime), .(expr, threadCount, medianTime)]
  results <- bestPerformingThreadCount
  
  cat(sprintf("%-20s %-23s %-12s\n", "data.table function", "Fastest runtime (median)", "Thread count"))
  cat(rep("-", 29), "\n")
  
  for (i in seq_len(nrow(results)))
  {
    cat(sprintf("%-20s %-23f %-12d\n", results$expr[i], results$medianTime[i], results$threadCount[i]))
  }
}

plot.data_table_threads_benchmark <- function(x, ...)
{
  x[, `:=`(
    speedup = medianTime[threadCount == 1] / medianTime,
    type = "Measured"
  ), by = expr]
  
  maxSpeedup <- x[, .(threadCount = threadCount[which.max(speedup)], 
                      speedup = max(speedup)), by = expr]
  
  idealSpeedup <- x[, .(threadCount = 1:getDTthreads(),
                        speedup = seq(1, getDTthreads()),
                        type = "Ideal",
                        medianTime = NA), by = expr]
  
  subOptimalSpeedup <- x[, .(threadCount = seq(1, getDTthreads(), length.out = getDTthreads()),
                             speedup = seq(1, getDTthreads() / 2, length.out = getDTthreads()),
                             type = "Sub-optimal",
                             medianTime = NA), by = expr]
  
  combinedLineData <- rbindlist(list(idealSpeedup, subOptimalSpeedup, x), use.names = TRUE, fill = TRUE)
  
  closestPoints <- x[, {
    suboptimalSpeedupSubset <- subOptimalSpeedup[expr == .BY$expr]
    .SD[.(threadCount = suboptimalSpeedupSubset$threadCount), on = .(threadCount), nomatch = 0L]
    .SD[which.max(speedup - suboptimalSpeedupSubset$speedup)]
  }, by = expr]
  
  closestPoints[, `:=`(
    medianTime = NULL,
    type = "Recommended"
  )]
  maxSpeedup[, type := "Best performing"]
  combinedPointData <- rbind(maxSpeedup, closestPoints)
  
  x[, `:=`(
    minSpeedup = min(speedup),
    maxSpeedup = max(speedup)
  ), by = expr]
  
  ggplot(x, aes(x = threadCount, y = speedup)) +
    geom_line(data = combinedLineData, aes(color = type), size = 1) +
    geom_point(data = combinedPointData, aes(color = type), size = 3) +
    geom_text(data = combinedPointData, aes(label = threadCount), vjust = -0.5, size = 4, na.rm = TRUE) +
    geom_ribbon(aes(ymin = minSpeedup, ymax = maxSpeedup), alpha = 0.5) +
    facet_wrap(. ~ expr) +
    coord_equal() +
    labs(x = "Threads", y = "Speedup", title = "data.table functions") +
    theme(plot.title = element_text(hjust = 0.5)) +
    scale_x_continuous(breaks = 1:getDTthreads(), labels = 1:getDTthreads()) +
    scale_color_manual(values = c("Measured" = "black", "Ideal" = "#f79494", "Sub-optimal" = "#93c4e0", "Recommended" = "#93c4e0", "Best performing" = "#f79494")) +
    guides(color = guide_legend(override.aes = list(fill = NA), title = "Type"))
}

There were different takeaways from the rubric for me, so I compiled a list of actions I took as a part of following them:

  • Removed distinction of lines and points in the plot and legend based on type (like solid/dashed/dotted for linetype) and shapes (16/19 for points) and used color only. (Rubric point: Using multiple visual properties for the same data variable can be potentially confusing)
  • Refactored the plot method to use perform calls to geom_line, geom_point, and geom_text only once each (apart from other changes to the scales to accommodate them). (Rubric point: Multiple geoms should be avoided in a ggplot when you could use a single geom with a bigger data set)
  • Used more informative names and followed consistent casing. (Rubric point: Use more informative names that are specific to your application)
  • Removed repitition by having the terms ‘Speedup’ and ‘Thread Count’ in the plot legend’s titles instead of the values.
  • Using threadCount and speedup instead of x and y for column names in the data.table that I’m plotting, towards more informative names.

Exceptions:

  • The rubric suggests to avoid single-letter variable names, but given that x is the default parameter name for the S3 plot method (signature plot(x, y, ...), and likewise, print(x, ...) for print), and in order to not avoid adding an unnecessary step/variable like dt <- x, I’m continuining with use of x in my plot method. Toby said this was okay when I brought it up.

Todo:

  • Using constants on the right-hand side (value) of aes that will result in a legend.