Skip to content

Commit e253dc4

Browse files
rustyconoverclaude
andcommitted
feat(copy): on_secrets hook to forward CREATE SECRET creds to COPY writers/readers
CopyToFunction/CopyFromFunction had a @Final on_bind with no seam to request secrets, so a worker writing to / reading from secret-backed cloud storage (S3/GCS/HTTP) could not receive the caller's credentials. Add an overridable on_secrets(params) hook, called from the @Final on_bind on both base classes. An author calls params.secrets.get(type, scope=...) — typically scoping by the COPY path — and the framework's existing two-phase secret bind resolves it from the caller's SecretManager and surfaces it on params.secrets at write/close (TO) / read (FROM) time. Default is a no-op, so existing formats are unaffected. Add SecretLinesCopyToFunction / SecretLinesCopyFromFunction fixtures + unit tests. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
1 parent f9e15f2 commit e253dc4

6 files changed

Lines changed: 280 additions & 7 deletions

File tree

tests/test_copy_to_function.py

Lines changed: 82 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,14 @@
99

1010
import pyarrow as pa
1111

12-
from vgi._test_fixtures.copy_to import ExampleLinesCopyToArgs, ExampleLinesCopyToFunction
12+
from vgi._test_fixtures.copy_to import (
13+
ExampleLinesCopyToArgs,
14+
ExampleLinesCopyToFunction,
15+
SecretLinesCopyToArgs,
16+
SecretLinesCopyToFunction,
17+
)
1318
from vgi._test_fixtures.worker import ExampleCatalog
19+
from vgi.table_function import ResolvedSecrets, SecretsAccessor
1420

1521
SCHEMA = pa.schema([("a", pa.int64()), ("b", pa.string())])
1622

@@ -100,6 +106,81 @@ def test_close_empty_input_with_header_writes_header_only() -> None:
100106
assert _read(out_name) == "a,b\n"
101107

102108

109+
# ---------------------------------------------------------------------------
110+
# Secret forwarding (CopyToFunction.on_secrets hook)
111+
# ---------------------------------------------------------------------------
112+
113+
114+
def _secret_params(store: _Store, secrets: ResolvedSecrets) -> types.SimpleNamespace:
115+
"""A TableBufferingParams-shaped stub carrying resolved secrets for write/close."""
116+
bind_call = types.SimpleNamespace(input_schema=SCHEMA)
117+
init_call = types.SimpleNamespace(bind_call=bind_call)
118+
return types.SimpleNamespace(storage=store, init_call=init_call, execution_id=b"x", secrets=secrets)
119+
120+
121+
def test_on_secrets_requests_destination_scoped_secret() -> None:
122+
"""on_secrets() registers a pending lookup scoped to the COPY destination path."""
123+
accessor = SecretsAccessor(None) # nothing resolved yet → first-call behavior
124+
params = types.SimpleNamespace(
125+
args=SecretLinesCopyToArgs(), # default secret_type='vgi_example'
126+
bind_call=types.SimpleNamespace(copy_to=types.SimpleNamespace(file_path="s3://bucket/out.bin")),
127+
secrets=accessor,
128+
)
129+
SecretLinesCopyToFunction.on_secrets(params)
130+
# The hook must have asked the framework for a scoped two-phase resolution.
131+
assert accessor.needs_resolution
132+
pending = accessor.pending_lookups
133+
assert len(pending) == 1
134+
assert pending[0].secret_type == "vgi_example"
135+
assert pending[0].scope == "s3://bucket/out.bin"
136+
137+
138+
def test_close_forwards_resolved_secret_api_key() -> None:
139+
"""close() writes the resolved (destination-scoped) secret's api_key + row count."""
140+
store = _Store()
141+
out_name = _tmp_path()
142+
secrets = ResolvedSecrets(
143+
{
144+
"writer_creds": {
145+
"type": pa.scalar("vgi_example"),
146+
"scope": pa.scalar(out_name),
147+
"api_key": pa.scalar("WRITER_KEY"),
148+
}
149+
}
150+
)
151+
params = _secret_params(store, secrets)
152+
opts = SecretLinesCopyToArgs()
153+
154+
SecretLinesCopyToFunction.write(
155+
batch=pa.record_batch({"a": [1, 2], "b": ["x", "y"]}, schema=SCHEMA),
156+
options=opts,
157+
file_path=out_name,
158+
params=params,
159+
)
160+
n = SecretLinesCopyToFunction.close(options=opts, file_path=out_name, params=params)
161+
assert n == 2
162+
assert _read(out_name) == "api_key=WRITER_KEY\nrows=2\n"
163+
164+
165+
def test_close_writes_none_when_secret_absent() -> None:
166+
"""A genuinely missing secret resolves to 'NONE' (silent miss, not an error)."""
167+
store = _Store()
168+
out_name = _tmp_path()
169+
params = _secret_params(store, ResolvedSecrets())
170+
opts = SecretLinesCopyToArgs()
171+
n = SecretLinesCopyToFunction.close(options=opts, file_path=out_name, params=params)
172+
assert n == 0
173+
assert _read(out_name) == "api_key=NONE\nrows=0\n"
174+
175+
176+
def test_catalog_advertises_secret_lines_out_format() -> None:
177+
"""The secret-forwarding writer is advertised like any other COPY TO format."""
178+
formats = ExampleCatalog().copy_from_formats(attach_opaque_data=b"", transaction_opaque_data=None)
179+
by = {(f.direction, f.format_name): f for f in formats}
180+
assert ("to", "secret_lines_out") in by
181+
assert by[("to", "secret_lines_out")].handler == "secret_lines_writer"
182+
183+
103184
def test_catalog_advertises_copy_to_format() -> None:
104185
"""The example catalog advertises example_lines_out with direction='to'."""
105186
formats = ExampleCatalog().copy_from_formats(attach_opaque_data=b"", transaction_opaque_data=None)

vgi/_test_fixtures/copy_from.py

Lines changed: 58 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,9 +27,9 @@
2727
if TYPE_CHECKING:
2828
from vgi_rpc.rpc import OutputCollector
2929

30-
from vgi.table_function import ProcessParams
30+
from vgi.table_function import BindParams, ProcessParams
3131

32-
__all__ = ["ExampleLinesCopyFromFunction"]
32+
__all__ = ["ExampleLinesCopyFromFunction", "SecretLinesCopyFromFunction"]
3333

3434

3535
@dataclass(slots=True, frozen=True, kw_only=True)
@@ -97,3 +97,59 @@ def read(
9797
raw = [None if v == options.null_string else v for v in columns[idx]]
9898
arrays.append(pa.array(raw, type=pa.string()).cast(field.type))
9999
out.emit(pa.RecordBatch.from_arrays(arrays, schema=expected_schema))
100+
101+
102+
@dataclass(slots=True, frozen=True, kw_only=True)
103+
class SecretLinesCopyFromArgs:
104+
"""Options for the ``secret_lines_in`` COPY format."""
105+
106+
secret_type: Annotated[
107+
str,
108+
Arg("secret_type", default="vgi_example", doc="Secret type to fetch, scoped by the source path"),
109+
] = "vgi_example"
110+
111+
112+
class SecretLinesCopyFromFunction(CopyFromFunction[SecretLinesCopyFromArgs]):
113+
"""COPY ... FROM reader that forwards a ``CREATE SECRET`` credential.
114+
115+
Exercises the COPY-FROM secret-bind hook (:meth:`CopyFromFunction.on_secrets`):
116+
it requests the ``secret_type`` secret scoped to the source path during bind,
117+
and :meth:`read` emits a single VARCHAR row holding the resolved secret's
118+
``api_key`` (or ``NONE``) — so a test can assert that the caller's secret
119+
reached the reader for a secret-backed cloud source.
120+
"""
121+
122+
COPY_FROM_FORMAT: ClassVar[str] = "secret_lines_in"
123+
COPY_FROM_COMMENT: ClassVar[str | None] = "Reader that forwards a CREATE SECRET credential (test fixture)"
124+
125+
class Meta:
126+
name = "secret_lines_reader"
127+
description = "Emit the resolved secret's api_key as a single VARCHAR row"
128+
categories = ["copy", "test", "secret"]
129+
tags = {"category": "copy_from", "stability": "test"}
130+
131+
@classmethod
132+
def on_secrets(cls, params: BindParams[SecretLinesCopyFromArgs]) -> None:
133+
"""Request the source-scoped secret; framework two-phase resolves it."""
134+
cf = params.bind_call.copy_from
135+
scope = cf.file_path if cf is not None else None
136+
params.secrets.get(params.args.secret_type, scope=scope)
137+
138+
@classmethod
139+
def read(
140+
cls,
141+
*,
142+
path: str,
143+
options: SecretLinesCopyFromArgs,
144+
expected_schema: pa.Schema,
145+
params: ProcessParams[SecretLinesCopyFromArgs],
146+
out: OutputCollector,
147+
) -> None:
148+
"""Emit one row carrying the forwarded secret's api_key (or NONE)."""
149+
secret = params.secrets.for_scope_of_type(path, options.secret_type) or {}
150+
api_key = secret.get("api_key")
151+
if api_key is not None and hasattr(api_key, "as_py"):
152+
api_key = api_key.as_py()
153+
value = "NONE" if api_key is None else str(api_key)
154+
arrays = [pa.array([value], type=pa.string()).cast(expected_schema.field(0).type)]
155+
out.emit(pa.RecordBatch.from_arrays(arrays, schema=expected_schema))

vgi/_test_fixtures/copy_to.py

Lines changed: 78 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,8 +29,13 @@
2929

3030
if TYPE_CHECKING:
3131
from vgi.table_buffering_function import TableBufferingParams
32+
from vgi.table_function import BindParams
3233

33-
__all__ = ["ExampleLinesCopyToFunction", "ExampleLinesOrderedCopyToFunction"]
34+
__all__ = [
35+
"ExampleLinesCopyToFunction",
36+
"ExampleLinesOrderedCopyToFunction",
37+
"SecretLinesCopyToFunction",
38+
]
3439

3540
_SHARD_NS = b"copy_to_shard"
3641

@@ -158,3 +163,75 @@ class Meta:
158163
categories = ["copy", "test"]
159164
tags = {"category": "copy_to", "stability": "test"}
160165
sink_order_dependent = True # ordered COPY TO → single-thread sink
166+
167+
168+
_SECRET_NS = b"copy_to_secret_shard"
169+
170+
171+
@dataclass(slots=True, frozen=True, kw_only=True)
172+
class SecretLinesCopyToArgs:
173+
"""Options for the ``secret_lines_out`` COPY format."""
174+
175+
secret_type: Annotated[
176+
str,
177+
Arg("secret_type", default="vgi_example", doc="Secret type to fetch, scoped by the destination path"),
178+
] = "vgi_example"
179+
180+
181+
class SecretLinesCopyToFunction(CopyToFunction[SecretLinesCopyToArgs]):
182+
"""COPY ... TO writer that forwards a ``CREATE SECRET`` credential.
183+
184+
Exercises the COPY-TO secret-bind hook (:meth:`CopyToFunction.on_secrets`):
185+
it requests the ``secret_type`` secret scoped to the destination path during
186+
bind, and ``close()`` writes the resolved secret's ``api_key`` (or ``NONE``)
187+
plus the row count into the destination — so a test can assert that the
188+
caller's secret reached the writer for a secret-backed cloud write.
189+
"""
190+
191+
COPY_TO_FORMAT: ClassVar[str] = "secret_lines_out"
192+
COPY_TO_COMMENT: ClassVar[str | None] = "Writer that forwards a CREATE SECRET credential (test fixture)"
193+
194+
class Meta:
195+
name = "secret_lines_writer"
196+
description = "Write the resolved secret's api_key + row count to the destination"
197+
categories = ["copy", "test", "secret"]
198+
tags = {"category": "copy_to", "stability": "test"}
199+
200+
@classmethod
201+
def on_secrets(cls, params: BindParams[SecretLinesCopyToArgs]) -> None:
202+
"""Request the destination-scoped secret; framework two-phase resolves it."""
203+
cf = params.bind_call.copy_to
204+
scope = cf.file_path if cf is not None else None
205+
# Scoped lookup → longest-prefix match against the caller's CREATE SECRETs.
206+
params.secrets.get(params.args.secret_type, scope=scope)
207+
208+
@classmethod
209+
def write(
210+
cls,
211+
*,
212+
batch: pa.RecordBatch,
213+
options: SecretLinesCopyToArgs,
214+
file_path: str,
215+
params: TableBufferingParams[SecretLinesCopyToArgs],
216+
) -> None:
217+
"""Record this shard's row count (cross-process-safe append)."""
218+
params.storage.state_append(_SECRET_NS, b"", str(batch.num_rows).encode())
219+
220+
@classmethod
221+
def close(
222+
cls,
223+
*,
224+
options: SecretLinesCopyToArgs,
225+
file_path: str,
226+
params: TableBufferingParams[SecretLinesCopyToArgs],
227+
) -> int:
228+
"""Write the forwarded secret's api_key + total row count, once."""
229+
secret = params.secrets.for_scope_of_type(file_path, options.secret_type) or {}
230+
api_key = secret.get("api_key")
231+
if api_key is not None and hasattr(api_key, "as_py"):
232+
api_key = api_key.as_py()
233+
total = sum(int(blob) for _id, blob in params.storage.state_log_scan(_SECRET_NS, b"", after_id=-1))
234+
with open(file_path, "w", encoding="utf-8") as fh:
235+
fh.write(f"api_key={'NONE' if api_key is None else api_key}\n")
236+
fh.write(f"rows={total}\n")
237+
return total

vgi/_test_fixtures/worker.py

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -59,8 +59,12 @@
5959
SlowCancellableFunction,
6060
SlowCancellableInOutFunction,
6161
)
62-
from vgi._test_fixtures.copy_from import ExampleLinesCopyFromFunction
63-
from vgi._test_fixtures.copy_to import ExampleLinesCopyToFunction, ExampleLinesOrderedCopyToFunction
62+
from vgi._test_fixtures.copy_from import ExampleLinesCopyFromFunction, SecretLinesCopyFromFunction
63+
from vgi._test_fixtures.copy_to import (
64+
ExampleLinesCopyToFunction,
65+
ExampleLinesOrderedCopyToFunction,
66+
SecretLinesCopyToFunction,
67+
)
6468
from vgi._test_fixtures.nest_tensor import NestTensorFunction, UnnestTensorFunction, UnnestTensorRowsFunction
6569
from vgi._test_fixtures.scalar import (
6670
AddValuesFunction,
@@ -355,9 +359,11 @@ def _build_enum_stats() -> dict[str, ColumnStatisticsInput]:
355359
SlowCancellableBufferingFunction,
356360
# CopyFromFunction - custom COPY ... FROM format reader
357361
ExampleLinesCopyFromFunction,
362+
SecretLinesCopyFromFunction,
358363
# CopyToFunction - custom COPY ... TO format writer
359364
ExampleLinesCopyToFunction,
360365
ExampleLinesOrderedCopyToFunction,
366+
SecretLinesCopyToFunction,
361367
# TableFunctionGenerator - generate output without input
362368
ConstantColumnsFunction,
363369
SlowCancellableFunction,

vgi/copy_from_function.py

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -98,10 +98,32 @@ def on_bind(cls, params: BindParams[TArgs]) -> BindResponse:
9898
f"{cls.__name__} is a COPY FROM format reader; invoke it via "
9999
f"COPY <table> FROM '<path>' (FORMAT {fmt}), not as a table function."
100100
)
101+
# Forward credentials for secret-backed cloud sources (S3/GCS/HTTP/...)
102+
# via the framework's two-phase secret bind. on_bind is @final (the output
103+
# schema is fixed to the COPY target), so on_secrets is the seam.
104+
cls.on_secrets(params)
101105
# ``expected_schema`` is transparently a ``pa.Schema`` here — the
102106
# ArrowType(binary) annotation only governs the wire encoding.
103107
return BindResponse(output_schema=cf.expected_schema)
104108

109+
@classmethod
110+
def on_secrets(cls, params: BindParams[TArgs]) -> None:
111+
"""Request the credentials this reader needs to reach its source.
112+
113+
Override to forward ``CREATE SECRET`` values to :meth:`read` for
114+
secret-backed cloud sources. Call ``params.secrets.get(secret_type,
115+
scope=..., name=...)`` — typically scoping by the source path
116+
(``params.bind_call.copy_from.file_path``) so DuckDB resolves the
117+
longest-prefix-matching secret. The framework issues a two-phase bind retry
118+
to resolve every requested secret from the caller's secret store, then
119+
surfaces the resolved values on ``params.secrets`` (a
120+
:class:`ResolvedSecrets`) at :meth:`read` time. Pass ``required=True`` to
121+
``get()`` to make a missing secret fail the bind.
122+
123+
Default: request nothing (no secrets forwarded).
124+
"""
125+
# Default no-op. Subclasses override to call params.secrets.get(...).
126+
105127
@final
106128
@classmethod
107129
def initial_state(cls, params: ProcessParams[TArgs]) -> _CopyFromState:

vgi/copy_to_function.py

Lines changed: 32 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -96,9 +96,40 @@ class CopyToFunction[TArgs](TableBufferingFunction[TArgs, None]):
9696
@final
9797
@classmethod
9898
def on_bind(cls, params: BindParams[TArgs]) -> BindResponse:
99-
"""A sink produces no rows — bind to an empty output schema."""
99+
"""A sink produces no rows — bind to an empty output schema.
100+
101+
``on_bind`` is ``@final`` (a writer has no output schema to compute), but
102+
a cloud writer still needs credentials. The seam is :meth:`on_secrets`,
103+
called here so a subclass can request ``CREATE SECRET`` values via the
104+
framework's two-phase secret bind without overriding ``on_bind`` itself.
105+
"""
106+
cls.on_secrets(params)
100107
return BindResponse(output_schema=pa.schema([]))
101108

109+
@classmethod
110+
def on_secrets(cls, params: BindParams[TArgs]) -> None:
111+
"""Request the credentials this writer needs to reach its destination.
112+
113+
Override to forward ``CREATE SECRET`` values to :meth:`write` / :meth:`close`
114+
for secret-backed cloud writes (S3/GCS/HTTP/...). Call
115+
``params.secrets.get(secret_type, scope=..., name=...)`` — typically scoping
116+
by the destination path (:meth:`copy_to_path` / ``params.bind_call.copy_to``)
117+
so DuckDB resolves the longest-prefix-matching secret. The framework issues a
118+
two-phase bind retry to resolve every requested secret from the caller's
119+
secret store, then surfaces the resolved values on ``params.secrets`` (a
120+
:class:`ResolvedSecrets`) at :meth:`write` / :meth:`close` time. Requested
121+
secrets that don't exist resolve to "not found" rather than an error — pass
122+
``required=True`` to ``get()`` to make a missing secret fail the bind.
123+
124+
The destination path is available without an ``init_call`` here via
125+
``params.bind_call.copy_to.file_path`` (use :meth:`copy_to_path` only at
126+
write/close time, where an ``init_call`` exists).
127+
128+
Default: request nothing (no secrets forwarded), so existing writers that
129+
never touched credentials are unaffected.
130+
"""
131+
# Default no-op. Subclasses override to call params.secrets.get(...).
132+
102133
@final
103134
@classmethod
104135
def copy_to_path(cls, params: TableBufferingParams[TArgs]) -> str:

0 commit comments

Comments
 (0)