Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 18 additions & 2 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -50,10 +50,24 @@ jobs:
# a prebuilt standalone `haybarn-unittest` — no C++ build. The worker queries a
# real cluster, so a single-node OpenSearch (Apache-2.0, security disabled for
# the test) runs as a service container. See ci/README.md.
#
# Transport matrix: the same suite runs over each transport the vgi extension
# supports, selected by ci/run-integration.sh's TRANSPORT env var (which
# changes what the .test files ATTACH as the worker LOCATION):
# subprocess worker spawned over stdio (the binary path)
# http worker started with --http, LOCATION = http://127.0.0.1:<port>
# unix worker started with --unix <sock>, LOCATION = unix://<sock>
# The OpenSearch service container + seed step run for ALL transports (the
# worker queries the live cluster on every leg). See ci/README.md for the
# per-transport notes.
integration:
name: SQL end-to-end (haybarn)
name: SQL E2E (${{ matrix.transport }})
needs: resolve-haybarn
runs-on: ubuntu-latest
strategy:
fail-fast: false
matrix:
transport: [subprocess, http, unix]
services:
opensearch:
image: opensearchproject/opensearch:2.17.0
Expand Down Expand Up @@ -104,5 +118,7 @@ jobs:
echo "HAYBARN_UNITTEST=$UNITTEST" >> "$GITHUB_ENV"
echo "VGI_ELASTICSEARCH_WORKER=$PWD/vgi-elasticsearch-worker" >> "$GITHUB_ENV"

- name: Run extension integration suite
- name: Run extension integration suite (${{ matrix.transport }})
run: ci/run-integration.sh
env:
TRANSPORT: ${{ matrix.transport }}
50 changes: 45 additions & 5 deletions ci/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,49 @@ extension from the Haybarn community channel:
pass-through here; `require-env` and everything else pass through untouched.
4. **Run** — [`run-integration.sh`](run-integration.sh) waits for the cluster,
builds + runs the repo's `seed` binary to bulk-load the fixed test index,
stages the preprocessed tree, points `VGI_ELASTICSEARCH_WORKER` at the built
worker binary, warms the extension cache once (`INSTALL vgi FROM community`),
then runs the suite in a single `haybarn-unittest` invocation. Any failed
assertion exits non-zero and fails the job.
resolves the worker LOCATION for the selected transport (see below), stages
the preprocessed tree, warms the extension cache once (`INSTALL vgi FROM
community`), then runs the suite in a single `haybarn-unittest` invocation.
Any failed assertion exits non-zero and fails the job.

## Transport matrix (subprocess / http / unix)

The SQL E2E runs the **full** suite over each transport the `vgi` extension
supports, selected by `run-integration.sh`'s `TRANSPORT` env var (the CI
`integration` job is a `transport: [subprocess, http, unix]` matrix). The
extension picks the transport from the ATTACH `LOCATION`, so the *same* suite
exercises a different transport just by changing what `VGI_ELASTICSEARCH_WORKER`
resolves to:

- **subprocess** (default) — `VGI_ELASTICSEARCH_WORKER` = the built worker
binary; the extension spawns it over stdin/stdout.
- **http** — the script starts `<worker> --http`, which prints `PORT:<n>` on
stdout once listening; the script parses that and sets
`VGI_ELASTICSEARCH_WORKER = http://127.0.0.1:<port>` (the **bare**
`scheme://host:port`, no path — the extension POSTs each RPC method at
`<LOCATION>/<method>`). The HTTP worker-RPC rides DuckDB's `httpfs` HTTP
client, so for the http leg **only** the script injects `INSTALL httpfs FROM
core; LOAD httpfs;` after each `LOAD vgi;` in the staged tests.
- **unix** — the script starts `<worker> --unix <sock>`, which prints
`UNIX:<path>` once listening, and sets `VGI_ELASTICSEARCH_WORKER =
unix://<sock>`.

For http/unix the worker is launched **by the script** (not DuckDB) and
trap-killed on exit. The OpenSearch service container + seed step run for **all**
transports — the worker queries the live cluster on every leg.

**No tests are gated.** The `es_search` streaming table function works over the
stateless HTTP transport because its scan state already externalizes the scan
position: the PIT id plus the last hit's `search_after` sort values are plain
gob-encodable fields the framework snapshots into the HTTP continuation token
each tick, and the worker resumes from them (one page per `Process` tick). See
the "EXTERNALIZED scan state" comments in `internal/esworker/functions.go`.

**Silent-skip guard.** The DuckDB/Haybarn sqllogictest runner *skips* (exit 0,
not fail) any test whose error message contains an `"HTTP"`-flavoured substring,
so a broken http leg would otherwise report "All tests were skipped" and go
GREEN having run nothing. `run-integration.sh` fails the leg if the runner
reports every test skipped — never trust a green http leg without it.

## The cluster

Expand All @@ -49,7 +88,8 @@ HAYBARN_UNITTEST=/path/to/haybarn-unittest \
VGI_ELASTICSEARCH_WORKER="$PWD/vgi-elasticsearch-worker" \
VGI_ES_TEST_URL="http://127.0.0.1:9209" \
VGI_ES_TEST_INDEX="vgi_es_e2e" \
ci/run-integration.sh
TRANSPORT=subprocess \
ci/run-integration.sh # or TRANSPORT=http / TRANSPORT=unix
make os-down
```

Expand Down
189 changes: 177 additions & 12 deletions ci/run-integration.sh
Original file line number Diff line number Diff line change
Expand Up @@ -6,20 +6,46 @@
# and the signed community `vgi` extension — no C++ build from source. See
# ci/README.md.
#
# Multi-transport: the same suite runs over whichever transport the TRANSPORT
# env var selects, by changing what `VGI_ELASTICSEARCH_WORKER` resolves to (the
# vgi extension picks the transport from the ATTACH LOCATION string):
#
# subprocess (default) VGI_ELASTICSEARCH_WORKER = the stdio worker binary
# -> extension spawns it over stdin/stdout.
# http start `<worker> --http` (prints "PORT:<n>"), parse the
# port, VGI_ELASTICSEARCH_WORKER = http://127.0.0.1:<port>.
# (The extension POSTs each RPC method at <LOCATION>/<method>,
# e.g. /catalog_attach; the SDK mounts them at the root, so
# LOCATION is the BARE scheme://host:port — no path suffix.)
# unix start `<worker> --unix /tmp/es.sock` (prints
# "UNIX:<path>"), VGI_ELASTICSEARCH_WORKER = unix:///tmp/es.sock.
#
# The es worker queries a real Elasticsearch/OpenSearch cluster (PIT +
# search_after), so the suite needs a running cluster. In CI that cluster is a
# single-node OpenSearch *service container*; locally `make test-sql` boots one
# in Docker. Either way the cluster URL + index name are provided via
# VGI_ES_TEST_URL + VGI_ES_TEST_INDEX, and this script builds + runs the repo's
# `seed` binary to bulk-load the fixed test index before running the suite
# (mirroring `make test-sql`).
# search_after) on EVERY transport, so the suite always needs a running cluster.
# In CI that cluster is a single-node OpenSearch *service container*; locally
# `make test-sql` boots one in Docker. Either way the cluster URL + index name
# are provided via VGI_ES_TEST_URL + VGI_ES_TEST_INDEX, and this script builds +
# runs the repo's `seed` binary to bulk-load the fixed test index before running
# the suite (mirroring `make test-sql`). For http/unix the worker is started by
# this script (not DuckDB) and all started processes are trap-killed on exit.
#
# The es_search streaming table function works over the stateless HTTP transport
# because its scan state carries an explicit gob-encodable cursor (PIT id + the
# last hit's search_after sort values) that the framework snapshots into the
# continuation token each tick — see the "EXTERNALIZED scan state" comments in
# internal/esworker/functions.go. No tests are gated.
#
# Required environment:
# HAYBARN_UNITTEST path to the haybarn-unittest binary
# VGI_ELASTICSEARCH_WORKER worker LOCATION the .test files ATTACH (the built
# Go worker binary the vgi extension spawns over stdio)
# VGI_ELASTICSEARCH_WORKER for TRANSPORT=subprocess: the worker LOCATION the
# .test files ATTACH (the built Go worker binary,
# spawned over stdio). For http/unix this is
# OVERRIDDEN by this script, but the binary it points
# at is reused to launch the out-of-band server, so it
# must still be the worker path.
# VGI_ES_TEST_URL base URL of the OpenSearch/Elasticsearch cluster
# Optional:
# TRANSPORT subprocess (default) | http | unix
# VGI_ES_TEST_INDEX index to seed + query (default: vgi_es_e2e)
# SEED_COUNT number of documents to seed (default: 25)
# STAGE scratch dir for the preprocessed tree (default: mktemp)
Expand All @@ -29,13 +55,38 @@ set -euo pipefail
: "${VGI_ELASTICSEARCH_WORKER:?worker LOCATION (the built Go worker binary)}"
: "${VGI_ES_TEST_URL:?base URL of the OpenSearch/Elasticsearch cluster}"

TRANSPORT="${TRANSPORT:-subprocess}"
case "$TRANSPORT" in
subprocess|http|unix) ;;
*) echo "ERROR: unknown TRANSPORT='$TRANSPORT' (expected subprocess|http|unix)" >&2; exit 2 ;;
esac

export VGI_ES_TEST_INDEX="${VGI_ES_TEST_INDEX:-vgi_es_e2e}"
SEED_COUNT="${SEED_COUNT:-25}"

HERE="$(cd "$(dirname "$0")" && pwd)"
REPO="$(cd "$HERE/.." && pwd)"
STAGE="${STAGE:-$(mktemp -d)}"

# The worker binary the subprocess transport ATTACHes to is also the binary we
# launch out-of-band for http/unix. Capture it before we possibly overwrite
# VGI_ELASTICSEARCH_WORKER with a URL.
WORKER_BIN="$VGI_ELASTICSEARCH_WORKER"

# Collected PIDs and paths to clean up on exit (the optional out-of-band worker).
WORKER_PID=""
UNIX_SOCK=""
cleanup() {
# Preserve the script's exit status: this runs on EXIT, so its own last
# command must not clobber the real exit code (a bare `[ -n "$x" ]` that is
# false returns 1 and would turn a green run red).
local rc=$?
if [ -n "$WORKER_PID" ]; then kill "$WORKER_PID" 2>/dev/null || true; wait "$WORKER_PID" 2>/dev/null || true; fi
if [ -n "$UNIX_SOCK" ]; then rm -f "$UNIX_SOCK"; fi
return "$rc"
}
trap cleanup EXIT

# --- Wait for the cluster to answer, then seed the fixed test index ----------
echo "Waiting for OpenSearch/Elasticsearch at $VGI_ES_TEST_URL ..."
READY=""
Expand All @@ -57,13 +108,100 @@ echo "Building seeder ..."
echo "Seeding index '$VGI_ES_TEST_INDEX' ($SEED_COUNT docs) ..."
"$SEED_BIN" --url "$VGI_ES_TEST_URL" --index "$VGI_ES_TEST_INDEX" --count "$SEED_COUNT"

# --- Per-transport: resolve VGI_ELASTICSEARCH_WORKER (the ATTACH LOCATION) ----
# subprocess keeps the binary path (extension spawns stdio). http/unix start the
# worker out-of-band and hand the extension a URL.
case "$TRANSPORT" in
subprocess)
echo "Transport: subprocess/stdio — VGI_ELASTICSEARCH_WORKER=$VGI_ELASTICSEARCH_WORKER"
;;

http)
# Start the worker in --http mode; it prints "PORT:<n>" once listening.
WORKER_PORT_FILE="$(mktemp)"
echo "Transport: http — starting '$WORKER_BIN --http' ..."
"$WORKER_BIN" --http >"$WORKER_PORT_FILE" 2>/dev/null &
WORKER_PID=$!
WPORT=""
for _ in $(seq 1 50); do
WPORT="$(sed -n 's/^PORT:\([0-9][0-9]*\)$/\1/p' "$WORKER_PORT_FILE" 2>/dev/null | head -1)"
[ -n "$WPORT" ] && break
# Bail early if the worker died.
kill -0 "$WORKER_PID" 2>/dev/null || { echo "ERROR: http worker exited before reporting a port" >&2; cat "$WORKER_PORT_FILE" >&2 || true; exit 1; }
sleep 0.2
done
rm -f "$WORKER_PORT_FILE"
if [ -z "$WPORT" ]; then
echo "ERROR: http worker did not report a port" >&2
exit 1
fi
# The extension treats the LOCATION as a base and POSTs each RPC method at
# <LOCATION>/<method> (e.g. /catalog_attach). The SDK mounts those methods
# at the server root (empty prefix), so the LOCATION must be the bare
# scheme://host:port with NO path. Appending /vgi would make every method
# 404 — which the runner silently skips as an error "matching 'HTTP'".
export VGI_ELASTICSEARCH_WORKER="http://127.0.0.1:$WPORT"
echo "HTTP worker listening on $VGI_ELASTICSEARCH_WORKER (pid $WORKER_PID)"
;;

unix)
# Start the worker on an AF_UNIX socket; it prints "UNIX:<path>" once
# listening. idleTimeout is disabled (we own the process lifecycle).
UNIX_SOCK="${TMPDIR:-/tmp}/es.$$.sock"
rm -f "$UNIX_SOCK"
WORKER_OUT_FILE="$(mktemp)"
echo "Transport: unix — starting '$WORKER_BIN --unix $UNIX_SOCK' ..."
"$WORKER_BIN" --unix "$UNIX_SOCK" >"$WORKER_OUT_FILE" 2>/dev/null &
WORKER_PID=$!
READY=""
for _ in $(seq 1 50); do
if grep -q '^UNIX:' "$WORKER_OUT_FILE" 2>/dev/null && [ -S "$UNIX_SOCK" ]; then
READY=1; break
fi
kill -0 "$WORKER_PID" 2>/dev/null || { echo "ERROR: unix worker exited before the socket was ready" >&2; cat "$WORKER_OUT_FILE" >&2 || true; exit 1; }
sleep 0.2
done
rm -f "$WORKER_OUT_FILE"
if [ -z "$READY" ]; then
echo "ERROR: unix worker did not report a ready socket at $UNIX_SOCK" >&2
exit 1
fi
export VGI_ELASTICSEARCH_WORKER="unix://$UNIX_SOCK"
echo "Unix worker listening on $VGI_ELASTICSEARCH_WORKER (pid $WORKER_PID)"
;;
esac

# --- Stage the preprocessed tests -------------------------------------------
echo "Staging preprocessed tests into $STAGE ..."
mkdir -p "$STAGE/test/sql"
for f in "$REPO"/test/sql/*.test; do
awk -f "$HERE/preprocess-require.awk" "$f" > "$STAGE/test/sql/$(basename "$f")"
done

# The HTTP transport needs DuckDB's HTTP client, which the vgi extension drives
# through DuckDB's HTTPUtil — that is only registered when the `httpfs`
# extension is loaded. The .test files only `LOAD vgi`, so over HTTP the
# worker-RPC POSTs fail with an "HTTP"-flavoured error (which the runner then
# silently skips). Inject an explicit signed `INSTALL httpfs FROM core; LOAD
# httpfs;` after each `LOAD vgi;` in the staged tests for the http transport
# only (subprocess/unix do not use the HTTP client, so they need nothing extra).
if [ "$TRANSPORT" = "http" ]; then
echo "Transport http: injecting 'LOAD httpfs' (required for the worker HTTP RPC) ..."
for f in "$STAGE"/test/sql/*.test; do
awk '
{ print }
/^LOAD[ \t]+vgi;[ \t]*$/ {
print "";
print "statement ok";
print "INSTALL httpfs FROM core;";
print "";
print "statement ok";
print "LOAD httpfs;";
}
' "$f" > "$f.tmp" && mv "$f.tmp" "$f"
done
fi

cd "$STAGE"

# Warm the extension cache once: vgi from the signed community channel. A miss
Expand All @@ -81,7 +219,34 @@ EOF
"$HAYBARN_UNITTEST" "test/_warm.test" >/dev/null 2>&1 || echo "::warning::extension warm step did not fully succeed"
rm -f "$STAGE/test/_warm.test"

# Run the whole suite in one invocation, streaming the runner's native
# sqllogictest report. Any failed assertion exits non-zero and fails the job.
echo "Running suite (worker: $VGI_ELASTICSEARCH_WORKER) ..."
"$HAYBARN_UNITTEST" "test/sql/*"
# Run the whole suite in one invocation, capturing the runner's native
# sqllogictest report so we can both stream it AND guard against a silent skip.
#
# IMPORTANT: the DuckDB/Haybarn sqllogictest runner SKIPS (not fails, exit 0) a
# test whose error message matches a built-in network-error allowlist that
# includes the substring "HTTP". So a broken HTTP transport would otherwise show
# "All tests were skipped" and the job would go GREEN having run nothing — a
# fake pass. We detect that and fail explicitly. A real run prints
# "All tests passed (N assertions ...)".
echo "Running suite (transport: $TRANSPORT, worker: $VGI_ELASTICSEARCH_WORKER) ..."
RUN_LOG="$STAGE/run.log"
set +e
"$HAYBARN_UNITTEST" "test/sql/*" 2>&1 | tee "$RUN_LOG"
RUN_RC="${PIPESTATUS[0]}"
set -e

if [ "$RUN_RC" -ne 0 ]; then
echo "ERROR: suite failed (transport: $TRANSPORT, rc=$RUN_RC)" >&2
exit "$RUN_RC"
fi

# Guard against the silent-skip fake-pass (see comment above). If every test was
# skipped — and none ran — treat it as a failure for this transport, surfacing
# the skip reason the runner reported.
if grep -q 'All tests were skipped' "$RUN_LOG"; then
echo "ERROR: every test was SKIPPED on transport '$TRANSPORT' (the runner's" >&2
echo " built-in network-error skip swallowed the real error). This is" >&2
echo " NOT a pass. Skip reason reported by the runner:" >&2
grep -A3 'Skipped tests for the following reasons' "$RUN_LOG" >&2 || true
exit 1
fi
18 changes: 17 additions & 1 deletion cmd/vgi-elasticsearch-worker/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@
// Command vgi-elasticsearch-worker is a VGI worker that queries an
// Elasticsearch/OpenSearch index as a SQL table, using Point-In-Time +
// search_after for consistent, resumable deep pagination. It speaks the VGI
// protocol over stdio (or HTTP with --http).
// protocol over stdio (or HTTP with --http, or the AF_UNIX launcher transport
// with --unix <path>).
package main

import (
Expand All @@ -17,12 +18,18 @@ import (
)

func main() {
// Accept --http for HTTP transport and --unix for the AF_UNIX launcher
// transport; default is stdio. Unknown launcher flags are tolerated (the
// VGI extension varies argv to key its worker cache), so we filter to flags
// we actually define before parsing.
httpMode := flag.Bool("http", false, "Run as an HTTP server instead of stdio")
unixPath := flag.String("unix", "", "Serve the AF_UNIX launcher transport on this socket path instead of stdio")
logFlags := vgi.RegisterLoggingFlags(flag.CommandLine)
_ = flag.CommandLine.Parse(filterKnownFlags(os.Args[1:], map[string]bool{
"log-level": true,
"log-format": true,
"log-logger": true,
"unix": true,
}))
if err := logFlags.Apply(); err != nil {
log.Fatalf("logging flags: %v", err)
Expand All @@ -43,6 +50,15 @@ func main() {
}
return
}
if *unixPath != "" {
// AF_UNIX launcher transport: serve on the given socket path. The SDK
// prints "UNIX:<path>" once listening; idleTimeout=0 disables the
// self-shutdown timer (the launcher/CI owns the process lifecycle).
if err := w.RunUnix(*unixPath, 0); err != nil {
log.Fatal(err)
}
return
}
w.RunStdio()
}

Expand Down
Loading