Skip to main content

Documentation Index

Fetch the complete documentation index at: https://mintlify.com/iii-hq/agentos/llms.txt

Use this file to discover all available pages before exploring further.

Overview

Workers are processes that connect to the iii-engine over WebSocket and register functions. Every AgentOS capability—agents, tools, memory, security, workflows—is implemented as a worker.
Worker = Process + Functions + TriggersA worker registers functions and binds them to triggers (HTTP, queue, cron, pubsub). Functions call each other via trigger() regardless of language.

Architecture

All workers follow this pattern:
┌─────────────────────────────────────────────────┐
│              iii-engine                         │
│   WebSocket: ws://localhost:49134               │
│   REST API: http://localhost:3111               │
└─────────────────────────────────────────────────┘
         ▲          ▲          ▲
         │          │          │
    ┌────┴───┐  ┌───┴────┐  ┌─┴───────┐
    │ Rust   │  │ TypeS  │  │ Python  │
    │ Worker │  │ Worker │  │ Worker  │
    └────────┘  └────────┘  └─────────┘
Workers register functions like agent::chat, tool::file_read, memory::store and the engine routes calls between them.

Creating a Rust Worker

Rust workers are used for performance-critical operations (agent core, security, memory, LLM routing).

Example: Agent Core Worker

From crates/agent-core/src/main.rs:
crates/my-worker/src/main.rs
use iii_sdk::iii::III;
use iii_sdk::error::IIIError;
use serde_json::{json, Value};

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    tracing_subscriber::fmt::init();

    // 1. Connect to iii-engine
    let iii = III::new("ws://localhost:49134");

    // 2. Register function: agent::chat
    let iii_clone = iii.clone();
    iii.register_function_with_description(
        "agent::chat",
        "Process a message through the agent loop",
        move |input: Value| {
            let iii = iii_clone.clone();
            async move {
                let req: ChatRequest = serde_json::from_value(input)
                    .map_err(|e| IIIError::Handler(e.to_string()))?;
                agent_chat(&iii, req).await
            }
        },
    );

    // 3. Register function: agent::list_tools
    let iii_clone = iii.clone();
    iii.register_function_with_description(
        "agent::list_tools",
        "List tools available to an agent",
        move |input: Value| {
            let iii = iii_clone.clone();
            async move {
                let agent_id = input["agentId"].as_str().unwrap_or("default");
                list_tools(&iii, agent_id).await
            }
        },
    );

    // 4. Register trigger: bind agent::chat to queue
    iii.register_trigger(
        "queue",
        "agent::chat",
        json!({ "topic": "agent.inbox" })
    )?;

    tracing::info!("agent-core worker started");
    tokio::signal::ctrl_c().await?;
    iii.shutdown_async().await;
    Ok(())
}

async fn agent_chat(iii: &III, req: ChatRequest) -> Result<Value, IIIError> {
    // Call other functions
    let config: Value = iii.trigger("state::get", json!({
        "scope": "agents",
        "key": req.agent_id,
    })).await?;

    let memories: Value = iii.trigger("memory::recall", json!({
        "agentId": req.agent_id,
        "query": req.message,
        "limit": 20,
    })).await.unwrap_or(json!([]));

    // Process message...
    Ok(json!({ "content": "response" }))
}

Cargo.toml

crates/my-worker/Cargo.toml
[package]
name = "agentos-my-worker"
version = "0.0.1"
edition = "2024"

[dependencies]
iii-sdk = "0.4"
tokio = { version = "1", features = ["full"] }
serde = { version = "1", features = ["derive"] }
serde_json = "1"
tracing = "0.1"
tracing-subscriber = "0.3"

Add to Workspace

In root Cargo.toml:
Cargo.toml
[workspace]
members = [
    "crates/agent-core",
    "crates/my-worker",  # Add here
    # ...
]

Run the Worker

cargo run --release -p agentos-my-worker

Creating a TypeScript Worker

TypeScript workers are used for rapid iteration (API, tools, workflows, channels).

Example: Tools Worker

From src/tools.ts:
src/my-worker.ts
import { init } from "iii-sdk";

const ENGINE_URL = "ws://localhost:49134";

// 1. Initialize worker
const { registerFunction, registerTrigger, trigger, triggerVoid } = init(
  ENGINE_URL,
  { workerName: "my-worker" }
);

// 2. Register function: tool::file_read
registerFunction(
  {
    id: "tool::file_read",
    description: "Read file contents with path containment",
    metadata: { category: "tool" },
  },
  async ({ path, maxBytes }: { path: string; maxBytes?: number }) => {
    const resolved = resolve(WORKSPACE_ROOT, path);
    assertPathContained(resolved);

    const content = await readFile(resolved, "utf-8");
    const limited = maxBytes ? content.slice(0, maxBytes) : content;
    return { content: limited, path: resolved, size: content.length };
  }
);

// 3. Register function: tool::file_write
registerFunction(
  {
    id: "tool::file_write",
    description: "Write file with path containment",
    metadata: { category: "tool" },
  },
  async ({ path, content }: { path: string; content: string }) => {
    const resolved = resolve(WORKSPACE_ROOT, path);
    assertPathContained(resolved);

    await writeFile(resolved, content, "utf-8");
    return { written: true, path: resolved, size: content.length };
  }
);

// 4. Call other functions
async function processFile(path: string) {
  // Call file_read
  const data: any = await trigger("tool::file_read", { path }, 30_000);
  
  // Call security scan
  const scan: any = await trigger("security::scan_content", {
    content: data.content
  }, 10_000);
  
  if (scan.safe) {
    return data;
  } else {
    throw new Error("Content failed security scan");
  }
}

console.log("my-worker started");

package.json

Ensure iii-sdk is in dependencies:
package.json
{
  "name": "agentos",
  "type": "module",
  "dependencies": {
    "iii-sdk": "^0.4.0"
  },
  "devDependencies": {
    "tsx": "^4.0.0",
    "typescript": "^5.0.0"
  }
}

Run the Worker

# Development (with watch mode)
npx tsx --watch src/my-worker.ts

# Production
node --loader tsx src/my-worker.ts

Creating a Python Worker

Python workers are used for ML operations (embeddings, inference).

Example: Embedding Worker

From workers/embedding/main.py:
workers/my-worker/main.py
import asyncio
import os
from iii_sdk import III

# 1. Initialize worker
iii = III(
    "ws://localhost:49134",
    worker_name="my-worker",
)

model = None

def get_model():
    global model
    if model is None:
        try:
            from sentence_transformers import SentenceTransformer
            model_name = os.getenv("EMBEDDING_MODEL", "all-MiniLM-L6-v2")
            model = SentenceTransformer(model_name)
        except ImportError:
            model = "fallback"
    return model

# 2. Register function: embedding::generate
@iii.function(id="embedding::generate", description="Generate text embeddings")
async def generate_embedding(input):
    text = input.get("text", "")
    batch = input.get("batch")

    m = get_model()

    if m == "fallback":
        if batch:
            return {"embeddings": [_hash_embed(t) for t in batch], "dim": 128}
        return {"embedding": _hash_embed(text), "dim": 128}

    if batch:
        embeddings = m.encode(batch, normalize_embeddings=True)
        return {
            "embeddings": [e.tolist() for e in embeddings],
            "dim": embeddings.shape[1],
        }

    embedding = m.encode([text], normalize_embeddings=True)[0]
    return {"embedding": embedding.tolist(), "dim": len(embedding)}

# 3. Register function: embedding::similarity
@iii.function(id="embedding::similarity", description="Compute cosine similarity")
async def compute_similarity(input):
    a = input.get("a", [])
    b = input.get("b", [])

    if len(a) != len(b) or not a:
        return {"similarity": 0.0}

    dot = sum(x * y for x, y in zip(a, b))
    norm_a = sum(x * x for x in a) ** 0.5
    norm_b = sum(x * x for x in b) ** 0.5
    denom = norm_a * norm_b

    return {"similarity": dot / denom if denom > 0 else 0.0}

def _hash_embed(text: str, dim: int = 128) -> list:
    import math
    words = text.lower().split()
    vec = [0.0] * dim
    for word in words:
        h = hash(word) & 0xFFFFFFFF
        for i in range(dim):
            vec[i] += math.sin(h * (i + 1)) / max(len(words), 1)
    norm = sum(v * v for v in vec) ** 0.5
    if norm > 0:
        vec = [v / norm for v in vec]
    return vec

async def main():
    print("my-worker started")
    try:
        await asyncio.Event().wait()
    except KeyboardInterrupt:
        await iii.shutdown()

if __name__ == "__main__":
    asyncio.run(main())

requirements.txt

workers/my-worker/requirements.txt
iii-sdk>=0.4.0
sentence-transformers>=2.0.0
torch>=2.0.0

Run the Worker

pip install -r workers/my-worker/requirements.txt
python workers/my-worker/main.py

Worker Registration Patterns

Pattern 1: Simple Function

iii.register_function_with_description(
    "my::simple",
    "A simple function",
    |input: Value| async move {
        Ok(json!({ "result": "success" }))
    },
);

Pattern 2: Function with State

let state = Arc::new(Mutex::new(HashMap::new()));
let state_clone = state.clone();

iii.register_function_with_description(
    "my::with_state",
    "Function with shared state",
    move |input: Value| {
        let state = state_clone.clone();
        async move {
            let mut s = state.lock().unwrap();
            s.insert("key", "value");
            Ok(json!({ "state": s.len() }))
        }
    },
);

Pattern 3: Function Calling Other Functions

let iii_clone = iii.clone();
iii.register_function_with_description(
    "my::composite",
    "Calls other functions",
    move |input: Value| {
        let iii = iii_clone.clone();
        async move {
            let result1 = iii.trigger("tool::file_read", json!({ "path": "/tmp/test" })).await?;
            let result2 = iii.trigger("security::scan", json!({ "content": result1 })).await?;
            Ok(result2)
        }
    },
);

Trigger Types

HTTP Trigger

Expose a function via REST API:
iii.register_trigger(
    "http",
    "my::http_handler",
    json!({
        "api_path": "my/endpoint",
        "http_method": "POST"
    })
)?;
Accessible at: POST http://localhost:3111/my/endpoint

Queue Trigger

Process messages from a queue:
iii.register_trigger(
    "queue",
    "my::queue_handler",
    json!({ "topic": "my.queue" })
)?;

Cron Trigger

Run on a schedule:
iii.register_trigger(
    "cron",
    "my::scheduled",
    json!({ "expression": "0 */5 * * * *" })  // Every 5 minutes
)?;

PubSub Trigger

Subscribe to events:
iii.register_trigger(
    "pubsub",
    "my::event_handler",
    json!({ "topic": "agent.lifecycle" })
)?;

Testing Workers

#[cfg(test)]
mod tests {
    use super::*;

    #[tokio::test]
    async fn test_function() {
        let input = json!({ "key": "value" });
        let result = my_function(input).await.unwrap();
        assert_eq!(result["status"], "ok");
    }
}
cargo test -p agentos-my-worker

Best Practices

1

Use Descriptive IDs

Function IDs should follow namespace::action format:
  • agent::chat, agent::create, agent::delete
  • tool::file_read, tool::file_write
  • memory::store, memory::recall
2

Handle Errors Gracefully

Always return structured errors:
try {
  const result = await riskyOperation();
  return { success: true, data: result };
} catch (err: any) {
  return { success: false, error: err.message };
}
3

Set Timeouts

Always specify timeouts when calling other functions:
// Good
await trigger("tool::web_fetch", { url }, 30_000);

// Bad (no timeout)
await trigger("tool::web_fetch", { url });
4

Add Metadata

Include metadata for categorization:
registerFunction(
  {
    id: "my::function",
    description: "Clear description",
    metadata: { category: "my-category", version: "1.0" }
  },
  handler
);
5

Log Important Events

Use structured logging:
tracing::info!("agent-core worker started");
tracing::warn!("rate limit exceeded", agent_id = %agent_id);

Next Steps

Creating Tools

Build tools that agents can call

Creating Agents

Create agent templates

Testing

Test your workers

API Reference

Full worker API documentation