Skip to content

DigitalKin-ai/digitalkin

Repository files navigation

DigitalKin Python SDK

CI PyPI Python Version License

The DigitalKin Python SDK for building and managing modules within the DigitalKin agentic mesh. Create custom Tools and Archetypes that communicate over gRPC, register with a service mesh, and scale independently.

Features

  • Async-native gRPC module system — every module is a gRPC server built on grpcio with full async support
  • Typed module contracts — Pydantic models for Input, Output, Setup, and Secret schemas with protocol-based trigger dispatch
  • Module-to-module communication — tools and archetypes discover each other via the registry and exchange requests over gRPC
  • Tool resolution — archetypes dynamically resolve and invoke tool modules at runtime
  • Admission queue & backpressure — built-in request admission with configurable concurrency limits
  • Healthcheck protocols — automatic ping, services, and status healthcheck triggers registered on every module
  • Profiling — optional [profiling] extra with asyncio-inspector, pyinstrument, viztracer, and yappi
  • Batched history writes — efficient storage writes for conversation history
  • TaskIQ integration — optional distributed task execution backed by RabbitMQ and Redis ([taskiq] extra)

Installation

# With uv (recommended)
uv add digitalkin

# With pip
pip install digitalkin

Optional extras:

# Distributed task execution (RabbitMQ + Redis)
uv add "digitalkin[taskiq]"

# Async profiling tools
uv add "digitalkin[profiling]"

Quick Start

1. Define your models

from pydantic import BaseModel
from digitalkin.models.module.base_types import DataModel, DataTrigger


class MessageInput(DataTrigger):
    protocol: str = "message"
    content: str


class InputModel(DataModel[MessageInput]):
    root: MessageInput


class MessageOutput(DataTrigger):
    protocol: str = "message"
    reply: str


class OutputModel(DataModel[MessageOutput]):
    root: MessageOutput

2. Create a module and trigger

from digitalkin import ArchetypeModule, ModuleContext, TriggerHandler
from digitalkin.models.module.setup_types import SetupModel


class MyArchetype(ArchetypeModule[InputModel, OutputModel, SetupModel, BaseModel]):
    async def initialize(self, context: ModuleContext, setup_data: SetupModel) -> None:
        pass

    async def cleanup(self, context: ModuleContext) -> None:
        pass


@MyArchetype.register
class MessageTrigger(TriggerHandler[InputModel, SetupModel, OutputModel]):
    protocol = "message"
    input_format = InputModel
    output_format = OutputModel

    def __init__(self, context: ModuleContext) -> None:
        super().__init__(context)

    async def handle(
        self,
        input_data: InputModel,
        setup_data: SetupModel,
        context: ModuleContext,
    ) -> None:
        output = OutputModel(root=MessageOutput(reply=f"Echo: {input_data.root.content}"))
        await self.send_message(context, output)

3. Run the server

import asyncio
from digitalkin.grpc_servers.module_server import ModuleServer

async def main() -> None:
    server = ModuleServer(MyArchetype)
    await server.start_async()
    await server.await_termination()

asyncio.run(main())

TaskIQ with RabbitMQ

TaskIQ integration allows the module to scale for heavy CPU tasks by distributing requests to stateless worker instances.

  • Decoupled Scalability: RabbitMQ brokers messages, letting producers and consumers scale independently.
  • Reliability: Durable queues, acknowledgements, and dead-lettering ensure tasks aren't lost.
  • Concurrency Control: TaskIQ's worker pool manages parallel execution without custom schedulers.
  • Flexibility: Built-in retries, exponential backoff, and Redis result-backend for resilient workflows.

To enable RabbitMQ streaming:

sudo rabbitmq-plugins enable rabbitmq_stream
task start-taskiq

Development

Prerequisites

  • Python 3.10+
  • uv — modern Python package management
  • Task — task runner

Setup

git clone --recurse-submodules https://github.com/DigitalKin-ai/digitalkin.git
cd digitalkin

task setup-dev
source .venv/bin/activate

Common Tasks

task linter               # Format + lint (ruff) + type check (mypy)
task check                # Linter + mypy + tests
task run-tests            # Run pytest via Docker
task build-package        # Build distribution
task bump-version -- patch|minor|major

task docs-serve           # Serve docs locally (mkdocs)
task docs-build           # Build docs

task generate-certificates  # Generate mTLS certs for gRPC
task start-taskiq           # Start TaskIQ worker

task clean                # Remove build artifacts + __pycache__
task clean-all            # Above + remove .venv

Publishing Process

  1. Update code and commit changes (following conventional branch/commit standard).
  2. Use task bump-version -- major|minor|patch to commit the new version.
  3. Use GitHub "Create Release" workflow to publish the new version.
  4. Workflow automatically publishes to Test PyPI and PyPI.

License

This project is licensed under the terms specified in the LICENSE file.


For more information, visit our Documentation or report issues on our Issues page.