Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions NEWS.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,20 @@
- Type conversion support in GForce expressions (e.g., `sum(as.numeric(x))` will use GForce, saving the need to coerce `x` in a setup step) [#2934](https://github.com/Rdatatable/data.table/issues/2934)
- Arithmetic operation support in GForce (e.g., `max(x) - min(x)` will use GForce on both `max(x)` and `min(x)`, saving the need to do the subtraction in a follow-up step) [#3815](https://github.com/Rdatatable/data.table/issues/3815)

4. `median()` is now faster when used in `by=` grouping operations. Thanks @ben-schwen for the suggestion and implementation.
```r
set.seed(1)
DT = data.table(g = sample(1e4, 1e7, TRUE), v = rnorm(1e7))
microbenchmark::microbenchmark(
"master" = `[.data.table`(DT, j=median(v), by=g),
"1.18.0" = data.table:::`[.data.table`(DT, j=median(v), by=g)
)
# Unit: milliseconds
# expr min lq mean median uq max neval
# master 85.11445 92.76479 101.6840 99.22233 105.7872 146.8693 100
# 1.18.0 217.07793 232.08915 250.6945 244.30433 264.9017 321.9682 100
```

### BUG FIXES

1. `fread()` with `skip=0` and `(header=TRUE|FALSE)` no longer skips the first row when it has fewer fields than subsequent rows, [#7463](https://github.com/Rdatatable/data.table/issues/7463). Thanks @emayerhofer for the report and @ben-schwen for the fix.
Expand Down
10 changes: 10 additions & 0 deletions inst/tests/tests.Rraw
Original file line number Diff line number Diff line change
Expand Up @@ -21484,3 +21484,13 @@ dt = data.table(a=1:4, b=1:2)
test(2362.51, optimize=0:2, dt[, c(list()), b, verbose=TRUE], data.table(b=integer(0L)), output="GForce FALSE")
test(2362.52, optimize=0:2, dt[, c(lapply(.SD, sum), list()), b, verbose=TRUE], output=out)
test(2362.53, optimize=0:2, dt[, list(lapply(.SD, sum), list()), b, verbose=TRUE], output="GForce FALSE")

set.seed(1)
dt = data.table(g=sample(1:2, 1e4, TRUE), i=sample(1e4), d=rnorm(1e4))
test(2363.1, optimize=0:2, dt[, median(i), by=g][, .(g, V1=as.numeric(V1))])
test(2363.2, optimize=0:2, dt[, median(d), by=g][, .(g, V1=as.numeric(V1))])
if (test_bit64) {
dt[, dd := as.integer64(sample(i))]
# test omitted to clarify intended bit64 behavior
# test(2363.3, dt[, median(dd), by=g])
}
3 changes: 3 additions & 0 deletions src/data.table.h
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,9 @@ SEXP bmerge(SEXP idt, SEXP xdt, SEXP icolsArg, SEXP xcolsArg,
double dquickselect(double *x, int n);
double iquickselect(int *x, int n);
double i64quickselect(int64_t *x, int n);
double dfloyd_rivest(double *x, int n);
double ifloyd_rivest(int *x, int n);
double i64floyd_rivest(int64_t *x, int n);

// fread.c
double wallclock(void);
Expand Down
69 changes: 47 additions & 22 deletions src/gsumm.c
Original file line number Diff line number Diff line change
Expand Up @@ -880,43 +880,68 @@ SEXP gmedian(SEXP x, SEXP narmArg) {
const bool nosubset = irowslen==-1;
switch(TYPEOF(x)) {
case REALSXP: {
double *subd = REAL(PROTECT(allocVector(REALSXP, maxgrpn))); // allocate once upfront and reuse
int64_t *xi64 = (int64_t *)REAL(x);
double *xd = REAL(x);
for (int i=0; i<ngrp; ++i) {
int thisgrpsize = grpsize[i], nacount=0;
for (int j=0; j<thisgrpsize; ++j) {
int k = ff[i]+j-1;
if (isunsorted) k = oo[k]-1;
k = nosubset ? k : (irows[k]==NA_INTEGER ? NA_INTEGER : irows[k]-1);
if (k==NA_INTEGER || (isInt64 ? xi64[k]==NA_INTEGER64 : ISNAN(xd[k]))) nacount++;
else subd[j-nacount] = xd[k];
#pragma omp parallel num_threads(getDTthreads(ngrp, true))
{
double *thread_subd = malloc(maxgrpn * sizeof(double));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does it make sense to allocate outside of parallel and use R alloc, so the memory usage can be tracked by apps that track memory usage using R API.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point, will do.

if (!thread_subd) error(_("Unable to allocate thread-local buffer in gmedian")); // # nocov
#pragma omp for
for (int i=0; i<ngrp; ++i) {
int thisgrpsize = grpsize[i], nacount=0;
for (int j=0; j<thisgrpsize; ++j) {
int k = ff[i]+j-1;
if (isunsorted) k = oo[k]-1;
k = nosubset ? k : (irows[k]==NA_INTEGER ? NA_INTEGER : irows[k]-1);
if (k==NA_INTEGER || (isInt64 ? xi64[k]==NA_INTEGER64 : ISNAN(xd[k]))) nacount++;
else thread_subd[j-nacount] = xd[k];
}
thisgrpsize -= nacount; // all-NA is returned as NA_REAL via n==0 case inside *quickselect
if (nacount && !narm) {
ansd[i] = NA_REAL;
} else {
// Use Floyd-Rivest for groups larger than 100, quickselect for smaller
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does it make sense to put 100 as a parameter, so we can explore impact of this code path branches by options

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could do. Like all of these parameters, it should be tuned.

ansd[i] = isInt64 ?
(thisgrpsize > 100 ? i64floyd_rivest((int64_t *)thread_subd, thisgrpsize) : i64quickselect((int64_t *)thread_subd, thisgrpsize)) :
(thisgrpsize > 100 ? dfloyd_rivest(thread_subd, thisgrpsize) : dquickselect(thread_subd, thisgrpsize));
}
}
thisgrpsize -= nacount; // all-NA is returned as NA_REAL via n==0 case inside *quickselect
ansd[i] = (nacount && !narm) ? NA_REAL : (isInt64 ? i64quickselect((void *)subd, thisgrpsize) : dquickselect(subd, thisgrpsize));
free(thread_subd);
}}
break;
case LGLSXP: case INTSXP: {
int *subi = INTEGER(PROTECT(allocVector(INTSXP, maxgrpn)));
int *xi = INTEGER(x);
for (int i=0; i<ngrp; i++) {
const int thisgrpsize = grpsize[i];
int nacount=0;
for (int j=0; j<thisgrpsize; ++j) {
int k = ff[i]+j-1;
if (isunsorted) k = oo[k]-1;
if (nosubset ? xi[k]==NA_INTEGER : (irows[k]==NA_INTEGER || (k=irows[k]-1,xi[k]==NA_INTEGER))) nacount++;
else subi[j-nacount] = xi[k];
#pragma omp parallel num_threads(getDTthreads(ngrp, true))
{
int *thread_subi = malloc(maxgrpn * sizeof(int));
if (!thread_subi) error(_("Unable to allocate thread-local buffer in gmedian")); // # nocov
#pragma omp for
for (int i=0; i<ngrp; i++) {
const int thisgrpsize = grpsize[i];
int nacount=0;
for (int j=0; j<thisgrpsize; ++j) {
int k = ff[i]+j-1;
if (isunsorted) k = oo[k]-1;
if (nosubset ? xi[k]==NA_INTEGER : (irows[k]==NA_INTEGER || (k=irows[k]-1,xi[k]==NA_INTEGER))) nacount++;
else thread_subi[j-nacount] = xi[k];
}
if (nacount && !narm) {
ansd[i] = NA_REAL;
} else {
const int validsize = thisgrpsize-nacount;
// Use Floyd-Rivest for groups larger than 100, quickselect for smaller
ansd[i] = validsize > 100 ? ifloyd_rivest(thread_subi, validsize) : iquickselect(thread_subi, validsize);
}
}
ansd[i] = (nacount && !narm) ? NA_REAL : iquickselect(subi, thisgrpsize-nacount);
free(thread_subi);
}}
break;
default:
error(_("Type '%s' is not supported by GForce %s. Either add the prefix %s or turn off GForce optimization using options(datatable.optimize=1)"), type2char(TYPEOF(x)), "median (gmedian)", "stats::median(.)");
}
if (!isInt64) copyMostAttrib(x, ans);
// else the integer64 class needs to be dropped since double is always returned by gmedian
UNPROTECT(2); // ans, subd|subi
UNPROTECT(1); // ans
return ans;
}

Expand Down
83 changes: 83 additions & 0 deletions src/quickselect.c
Original file line number Diff line number Diff line change
Expand Up @@ -71,3 +71,86 @@ double i64quickselect(int64_t *x, int n)
int64_t a, b;
BODY(i64swap);
}

// Floyd-Rivest selection algorithm
// https://en.wikipedia.org/wiki/Floyd%E2%80%93Rivest_algorithm

#undef DEFINE_FLOYD_RIVEST_SELECT_FN
#define DEFINE_FLOYD_RIVEST_SELECT_FN(SELECT_FN, TYPE, SWAP) \
static void SELECT_FN(TYPE *x, unsigned long left, unsigned long right, unsigned long k) { \
while (right > left) { \
if (right - left > 600) { \
unsigned long n = right - left + 1; \
unsigned long i = k - left + 1; \
double z = log((double)n); \
double s = 0.5 * exp(2.0 * z / 3.0); \
double sd = 0.5 * sqrt(z * s * (n - s) / n); \
long delta = (long)i - (long)(n / 2); \
if (delta < 0) sd = -sd; \
else if (delta == 0) sd = 0.0; \
double new_left = (double)k - (double)i * s / n + sd; \
double new_right = (double)k + (double)(n - i) * s / n + sd; \
unsigned long newL = (unsigned long)MAX((long)left, (long)new_left); \
unsigned long newR = (unsigned long)MIN((long)right, (long)new_right); \
SELECT_FN(x, newL, newR, k); \
} \
TYPE pivot = x[k]; \
unsigned long i = left, j = right; \
SWAP(x + left, x + k); \
if (x[right] > pivot) { \
SWAP(x + right, x + left); \
} \
while (i < j) { \
SWAP(x + i, x + j); \
i++; j--; \
while (x[i] < pivot) i++; \
while (x[j] > pivot) j--; \
} \
if (x[left] == pivot) { \
SWAP(x + left, x + j); \
} else { \
j++; \
SWAP(x + j, x + right); \
} \
if (j <= k) left = j + 1; \
if (k <= j) right = j - 1; \
} \
}

DEFINE_FLOYD_RIVEST_SELECT_FN(dfloyd_rivest_select, double, dswap)
DEFINE_FLOYD_RIVEST_SELECT_FN(ifloyd_rivest_select, int, iswap)
DEFINE_FLOYD_RIVEST_SELECT_FN(i64floyd_rivest_select, int64_t, i64swap)

#undef FLOYD_RIVEST_BODY
#define FLOYD_RIVEST_BODY(TYPE, SELECTFN) \
if (n == 0) return NA_REAL; \
unsigned long med = n / 2 - (n % 2 == 0); \
SELECTFN(x, 0, n - 1, med); \
a = x[med]; \
if (n % 2 == 1) { \
return (double)a; \
} else { \
b = x[med + 1]; \
for (unsigned long i = med + 2; i < (unsigned long)n; i++) { \
if (x[i] < b) b = x[i]; \
} \
return ((double)a + (double)b) / 2.0; \
}

double dfloyd_rivest(double *x, int n)
{
double a, b;
FLOYD_RIVEST_BODY(double, dfloyd_rivest_select);
}

double ifloyd_rivest(int *x, int n)
{
int a, b;
FLOYD_RIVEST_BODY(int, ifloyd_rivest_select);
}

double i64floyd_rivest(int64_t *x, int n)
{
int64_t a, b;
FLOYD_RIVEST_BODY(int64_t, i64floyd_rivest_select);
}
Loading