The DigitalKin Python SDK for building and managing modules within the DigitalKin agentic mesh. Create custom Tools and Archetypes that communicate over gRPC, register with a service mesh, and scale independently.
- Async-native gRPC module system — every module is a gRPC server built on
grpciowith full async support - Typed module contracts — Pydantic models for Input, Output, Setup, and Secret schemas with protocol-based trigger dispatch
- Module-to-module communication — tools and archetypes discover each other via the registry and exchange requests over gRPC
- Tool resolution — archetypes dynamically resolve and invoke tool modules at runtime
- Admission queue & backpressure — built-in request admission with configurable concurrency limits
- Healthcheck protocols — automatic ping, services, and status healthcheck triggers registered on every module
- Profiling — optional
[profiling]extra with asyncio-inspector, pyinstrument, viztracer, and yappi - Batched history writes — efficient storage writes for conversation history
- TaskIQ integration — optional distributed task execution backed by
RabbitMQ and Redis (
[taskiq]extra)
# With uv (recommended)
uv add digitalkin
# With pip
pip install digitalkinOptional extras:
# Distributed task execution (RabbitMQ + Redis)
uv add "digitalkin[taskiq]"
# Async profiling tools
uv add "digitalkin[profiling]"from pydantic import BaseModel
from digitalkin.models.module.base_types import DataModel, DataTrigger
class MessageInput(DataTrigger):
protocol: str = "message"
content: str
class InputModel(DataModel[MessageInput]):
root: MessageInput
class MessageOutput(DataTrigger):
protocol: str = "message"
reply: str
class OutputModel(DataModel[MessageOutput]):
root: MessageOutputfrom digitalkin import ArchetypeModule, ModuleContext, TriggerHandler
from digitalkin.models.module.setup_types import SetupModel
class MyArchetype(ArchetypeModule[InputModel, OutputModel, SetupModel, BaseModel]):
async def initialize(self, context: ModuleContext, setup_data: SetupModel) -> None:
pass
async def cleanup(self, context: ModuleContext) -> None:
pass
@MyArchetype.register
class MessageTrigger(TriggerHandler[InputModel, SetupModel, OutputModel]):
protocol = "message"
input_format = InputModel
output_format = OutputModel
def __init__(self, context: ModuleContext) -> None:
super().__init__(context)
async def handle(
self,
input_data: InputModel,
setup_data: SetupModel,
context: ModuleContext,
) -> None:
output = OutputModel(root=MessageOutput(reply=f"Echo: {input_data.root.content}"))
await self.send_message(context, output)import asyncio
from digitalkin.grpc_servers.module_server import ModuleServer
async def main() -> None:
server = ModuleServer(MyArchetype)
await server.start_async()
await server.await_termination()
asyncio.run(main())TaskIQ integration allows the module to scale for heavy CPU tasks by distributing requests to stateless worker instances.
- Decoupled Scalability: RabbitMQ brokers messages, letting producers and consumers scale independently.
- Reliability: Durable queues, acknowledgements, and dead-lettering ensure tasks aren't lost.
- Concurrency Control: TaskIQ's worker pool manages parallel execution without custom schedulers.
- Flexibility: Built-in retries, exponential backoff, and Redis result-backend for resilient workflows.
To enable RabbitMQ streaming:
sudo rabbitmq-plugins enable rabbitmq_stream
task start-taskiqgit clone --recurse-submodules https://github.com/DigitalKin-ai/digitalkin.git
cd digitalkin
task setup-dev
source .venv/bin/activatetask linter # Format + lint (ruff) + type check (mypy)
task check # Linter + mypy + tests
task run-tests # Run pytest via Docker
task build-package # Build distribution
task bump-version -- patch|minor|major
task docs-serve # Serve docs locally (mkdocs)
task docs-build # Build docs
task generate-certificates # Generate mTLS certs for gRPC
task start-taskiq # Start TaskIQ worker
task clean # Remove build artifacts + __pycache__
task clean-all # Above + remove .venv- Update code and commit changes (following conventional branch/commit standard).
- Use
task bump-version -- major|minor|patchto commit the new version. - Use GitHub "Create Release" workflow to publish the new version.
- Workflow automatically publishes to Test PyPI and PyPI.
This project is licensed under the terms specified in the LICENSE file.
For more information, visit our Documentation or report issues on our Issues page.