Skip to content

Commit 60b59ef

Browse files
committed
Add Python SDK support for WorkOS Pipes
1 parent af7a29d commit 60b59ef

File tree

7 files changed

+319
-0
lines changed

7 files changed

+319
-0
lines changed

tests/test_pipes.py

Lines changed: 167 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,167 @@
1+
import pytest
2+
3+
from tests.utils.syncify import syncify
4+
from workos.pipes import AsyncPipes, Pipes
5+
6+
7+
@pytest.mark.sync_and_async(Pipes, AsyncPipes)
8+
class TestPipes:
9+
@pytest.fixture
10+
def mock_access_token(self):
11+
return {
12+
"object": "access_token",
13+
"access_token": "test_access_token_123",
14+
"expires_at": "2026-01-09T12:00:00.000Z",
15+
"scopes": ["read:users", "write:users"],
16+
"missing_scopes": [],
17+
}
18+
19+
def test_get_access_token_success_with_expiry(
20+
self,
21+
module_instance,
22+
mock_access_token,
23+
capture_and_mock_http_client_request,
24+
):
25+
response_body = {
26+
"active": True,
27+
"access_token": mock_access_token,
28+
}
29+
request_kwargs = capture_and_mock_http_client_request(
30+
module_instance._http_client, response_body, 200
31+
)
32+
33+
result = syncify(
34+
module_instance.get_access_token(
35+
provider="test-provider",
36+
user_id="user_123",
37+
)
38+
)
39+
40+
assert request_kwargs["url"].endswith("data-integrations/test-provider/token")
41+
assert request_kwargs["method"] == "post"
42+
assert request_kwargs["json"]["user_id"] == "user_123"
43+
assert result.active is True
44+
assert result.access_token.access_token == mock_access_token["access_token"]
45+
assert result.access_token.scopes == mock_access_token["scopes"]
46+
47+
def test_get_access_token_success_without_expiry(
48+
self,
49+
module_instance,
50+
capture_and_mock_http_client_request,
51+
):
52+
response_body = {
53+
"active": True,
54+
"access_token": {
55+
"object": "access_token",
56+
"access_token": "test_token",
57+
"expires_at": None,
58+
"scopes": ["read"],
59+
"missing_scopes": [],
60+
},
61+
}
62+
capture_and_mock_http_client_request(
63+
module_instance._http_client, response_body, 200
64+
)
65+
66+
result = syncify(
67+
module_instance.get_access_token(
68+
provider="test-provider",
69+
user_id="user_123",
70+
)
71+
)
72+
73+
assert result.active is True
74+
assert result.access_token.expires_at is None
75+
76+
def test_get_access_token_with_organization_id(
77+
self,
78+
module_instance,
79+
mock_access_token,
80+
capture_and_mock_http_client_request,
81+
):
82+
response_body = {
83+
"active": True,
84+
"access_token": mock_access_token,
85+
}
86+
request_kwargs = capture_and_mock_http_client_request(
87+
module_instance._http_client, response_body, 200
88+
)
89+
90+
syncify(
91+
module_instance.get_access_token(
92+
provider="test-provider",
93+
user_id="user_123",
94+
organization_id="org_456",
95+
)
96+
)
97+
98+
assert request_kwargs["json"]["organization_id"] == "org_456"
99+
100+
def test_get_access_token_without_organization_id(
101+
self,
102+
module_instance,
103+
mock_access_token,
104+
capture_and_mock_http_client_request,
105+
):
106+
response_body = {
107+
"active": True,
108+
"access_token": mock_access_token,
109+
}
110+
request_kwargs = capture_and_mock_http_client_request(
111+
module_instance._http_client, response_body, 200
112+
)
113+
114+
syncify(
115+
module_instance.get_access_token(
116+
provider="test-provider",
117+
user_id="user_123",
118+
)
119+
)
120+
121+
assert "organization_id" not in request_kwargs["json"]
122+
123+
def test_get_access_token_not_installed(
124+
self,
125+
module_instance,
126+
capture_and_mock_http_client_request,
127+
):
128+
response_body = {
129+
"active": False,
130+
"error": "not_installed",
131+
}
132+
capture_and_mock_http_client_request(
133+
module_instance._http_client, response_body, 200
134+
)
135+
136+
result = syncify(
137+
module_instance.get_access_token(
138+
provider="test-provider",
139+
user_id="user_123",
140+
)
141+
)
142+
143+
assert result.active is False
144+
assert result.error == "not_installed"
145+
146+
def test_get_access_token_needs_reauthorization(
147+
self,
148+
module_instance,
149+
capture_and_mock_http_client_request,
150+
):
151+
response_body = {
152+
"active": False,
153+
"error": "needs_reauthorization",
154+
}
155+
capture_and_mock_http_client_request(
156+
module_instance._http_client, response_body, 200
157+
)
158+
159+
result = syncify(
160+
module_instance.get_access_token(
161+
provider="test-provider",
162+
user_id="user_123",
163+
)
164+
)
165+
166+
assert result.active is False
167+
assert result.error == "needs_reauthorization"

workos/_base_client.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
from workos.fga import FGAModule
1111
from workos.mfa import MFAModule
1212
from workos.organization_domains import OrganizationDomainsModule
13+
from workos.pipes import PipesModule
1314
from workos.organizations import OrganizationsModule
1415
from workos.passwordless import PasswordlessModule
1516
from workos.portal import PortalModule
@@ -101,6 +102,10 @@ def organization_domains(self) -> OrganizationDomainsModule: ...
101102
@abstractmethod
102103
def passwordless(self) -> PasswordlessModule: ...
103104

105+
@property
106+
@abstractmethod
107+
def pipes(self) -> PipesModule: ...
108+
104109
@property
105110
@abstractmethod
106111
def portal(self) -> PortalModule: ...

workos/async_client.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
from workos.organizations import AsyncOrganizations
1111
from workos.organization_domains import AsyncOrganizationDomains
1212
from workos.passwordless import PasswordlessModule
13+
from workos.pipes import AsyncPipes
1314
from workos.portal import PortalModule
1415
from workos.sso import AsyncSSO
1516
from workos.user_management import AsyncUserManagement
@@ -102,6 +103,12 @@ def passwordless(self) -> PasswordlessModule:
102103
"Passwordless APIs are not yet supported in the async client."
103104
)
104105

106+
@property
107+
def pipes(self) -> AsyncPipes:
108+
if not getattr(self, "_pipes", None):
109+
self._pipes = AsyncPipes(self._http_client)
110+
return self._pipes
111+
105112
@property
106113
def portal(self) -> PortalModule:
107114
raise NotImplementedError(

workos/client.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
from workos.organizations import Organizations
99
from workos.organization_domains import OrganizationDomains
1010
from workos.passwordless import Passwordless
11+
from workos.pipes import Pipes
1112
from workos.portal import Portal
1213
from workos.sso import SSO
1314
from workos.webhooks import Webhooks
@@ -102,6 +103,12 @@ def passwordless(self) -> Passwordless:
102103
self._passwordless = Passwordless(self._http_client)
103104
return self._passwordless
104105

106+
@property
107+
def pipes(self) -> Pipes:
108+
if not getattr(self, "_pipes", None):
109+
self._pipes = Pipes(self._http_client)
110+
return self._pipes
111+
105112
@property
106113
def portal(self) -> Portal:
107114
if not getattr(self, "_portal", None):

workos/pipes.py

Lines changed: 93 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,93 @@
1+
from typing import Dict, Optional, Protocol
2+
3+
from workos.types.pipes import (
4+
GetAccessTokenFailureResponse,
5+
GetAccessTokenResponse,
6+
GetAccessTokenSuccessResponse,
7+
)
8+
from workos.typing.sync_or_async import SyncOrAsync
9+
from workos.utils.http_client import AsyncHTTPClient, SyncHTTPClient
10+
from workos.utils.request_helper import REQUEST_METHOD_POST
11+
12+
13+
class PipesModule(Protocol):
14+
"""Protocol defining the Pipes module interface."""
15+
16+
def get_access_token(
17+
self,
18+
*,
19+
provider: str,
20+
user_id: str,
21+
organization_id: Optional[str] = None,
22+
) -> SyncOrAsync[GetAccessTokenResponse]:
23+
"""Retrieve an access token for a third-party provider.
24+
25+
Kwargs:
26+
provider (str): The third-party provider identifier
27+
user_id (str): The WorkOS user ID
28+
organization_id (str, optional): The WorkOS organization ID
29+
30+
Returns:
31+
GetAccessTokenResponse: Success response with token or failure response with error
32+
"""
33+
...
34+
35+
36+
class Pipes(PipesModule):
37+
"""Sync implementation of the Pipes module."""
38+
39+
_http_client: SyncHTTPClient
40+
41+
def __init__(self, http_client: SyncHTTPClient):
42+
self._http_client = http_client
43+
44+
def get_access_token(
45+
self,
46+
*,
47+
provider: str,
48+
user_id: str,
49+
organization_id: Optional[str] = None,
50+
) -> GetAccessTokenResponse:
51+
json_data: Dict[str, str] = {"user_id": user_id}
52+
if organization_id is not None:
53+
json_data["organization_id"] = organization_id
54+
55+
response = self._http_client.request(
56+
f"data-integrations/{provider}/token",
57+
method=REQUEST_METHOD_POST,
58+
json=json_data,
59+
)
60+
61+
if response.get("active") is True:
62+
return GetAccessTokenSuccessResponse.model_validate(response)
63+
return GetAccessTokenFailureResponse.model_validate(response)
64+
65+
66+
class AsyncPipes(PipesModule):
67+
"""Async implementation of the Pipes module."""
68+
69+
_http_client: AsyncHTTPClient
70+
71+
def __init__(self, http_client: AsyncHTTPClient):
72+
self._http_client = http_client
73+
74+
async def get_access_token(
75+
self,
76+
*,
77+
provider: str,
78+
user_id: str,
79+
organization_id: Optional[str] = None,
80+
) -> GetAccessTokenResponse:
81+
json_data: Dict[str, str] = {"user_id": user_id}
82+
if organization_id is not None:
83+
json_data["organization_id"] = organization_id
84+
85+
response = await self._http_client.request(
86+
f"data-integrations/{provider}/token",
87+
method=REQUEST_METHOD_POST,
88+
json=json_data,
89+
)
90+
91+
if response.get("active") is True:
92+
return GetAccessTokenSuccessResponse.model_validate(response)
93+
return GetAccessTokenFailureResponse.model_validate(response)

workos/types/pipes/__init__.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
from workos.types.pipes.pipes import (
2+
AccessToken as AccessToken,
3+
GetAccessTokenFailureResponse as GetAccessTokenFailureResponse,
4+
GetAccessTokenResponse as GetAccessTokenResponse,
5+
GetAccessTokenSuccessResponse as GetAccessTokenSuccessResponse,
6+
)

workos/types/pipes/pipes.py

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
from datetime import datetime
2+
from typing import Literal, Optional, Sequence, Union
3+
4+
from workos.types.workos_model import WorkOSModel
5+
6+
7+
class AccessToken(WorkOSModel):
8+
"""Represents an OAuth access token for a third-party provider."""
9+
10+
object: Literal["access_token"]
11+
access_token: str
12+
expires_at: Optional[datetime] = None
13+
scopes: Sequence[str]
14+
missing_scopes: Sequence[str]
15+
16+
17+
class GetAccessTokenSuccessResponse(WorkOSModel):
18+
"""Successful response containing the access token."""
19+
20+
active: Literal[True]
21+
access_token: AccessToken
22+
23+
24+
class GetAccessTokenFailureResponse(WorkOSModel):
25+
"""Failed response indicating why the token couldn't be retrieved."""
26+
27+
active: Literal[False]
28+
error: Literal["not_installed", "needs_reauthorization"]
29+
30+
31+
GetAccessTokenResponse = Union[
32+
GetAccessTokenSuccessResponse,
33+
GetAccessTokenFailureResponse,
34+
]

0 commit comments

Comments
 (0)