hello world

This commit is contained in:
kuberwastaken 2026-04-01 01:20:27 +05:30
commit c99507ca1e
84 changed files with 54252 additions and 0 deletions

View file

@ -0,0 +1,25 @@
[package]
name = "cc-query"
version.workspace = true
edition.workspace = true
[dependencies]
cc-core = { workspace = true }
cc-api = { workspace = true }
cc-tools = { workspace = true }
tokio = { workspace = true }
tokio-stream = { workspace = true }
futures = { workspace = true }
serde = { workspace = true }
serde_json = { workspace = true }
anyhow = { workspace = true }
thiserror = { workspace = true }
tracing = { workspace = true }
async-trait = { workspace = true }
uuid = { workspace = true }
chrono = { workspace = true }
parking_lot = { workspace = true }
tokio-util = { workspace = true }
[dev-dependencies]
tempfile = { workspace = true }

View file

@ -0,0 +1,205 @@
// AgentTool: spawn a sub-agent to handle a complex sub-task.
//
// Lives in cc-query (not cc-tools) to avoid a circular dependency:
// cc-tools would need cc-query, but cc-query already needs cc-tools.
//
// The AgentTool creates a nested query loop with its own context, enabling
// the model to delegate complex work to specialized sub-agents. Each sub-agent:
// - Runs its own agentic loop
// - Has access to all tools (except AgentTool itself, preventing infinite recursion)
// - Returns its final output as the tool result
use async_trait::async_trait;
use cc_api::client::ClientConfig;
use cc_api::AnthropicClient;
use cc_core::types::Message;
use cc_tools::{PermissionLevel, Tool, ToolContext, ToolResult};
use serde::Deserialize;
use serde_json::{json, Value};
use std::sync::Arc;
use tokio_util::sync::CancellationToken;
use tracing::{debug, info};
use crate::{run_query_loop, QueryConfig, QueryOutcome};
pub struct AgentTool;
#[derive(Debug, Deserialize)]
struct AgentInput {
/// Short description of the agent's task (used for logging).
description: String,
/// The complete task prompt to send as the first user message.
prompt: String,
/// Optional: which tools to make available (defaults to all minus AgentTool).
#[serde(default)]
tools: Option<Vec<String>>,
/// Optional: system prompt override for the sub-agent.
#[serde(default)]
system_prompt: Option<String>,
/// Optional: max turns for the sub-agent (default 10).
#[serde(default)]
max_turns: Option<u32>,
/// Optional: model override for this sub-agent.
#[serde(default)]
model: Option<String>,
}
#[async_trait]
impl Tool for AgentTool {
fn name(&self) -> &str {
cc_core::constants::TOOL_NAME_AGENT
}
fn description(&self) -> &str {
"Launch a new agent to handle complex, multi-step tasks autonomously. \
The agent runs its own agentic loop with access to tools and returns \
its final result. Use this to delegate sub-tasks, run parallel \
workstreams, or handle tasks that require many tool calls."
}
fn permission_level(&self) -> PermissionLevel {
// The agent inherits parent permissions; no extra level required.
PermissionLevel::None
}
fn input_schema(&self) -> Value {
json!({
"type": "object",
"properties": {
"description": {
"type": "string",
"description": "Short description of the agent's task (3-5 words)"
},
"prompt": {
"type": "string",
"description": "The complete task for the agent to perform"
},
"tools": {
"type": "array",
"items": { "type": "string" },
"description": "List of tool names to make available. Defaults to all tools."
},
"system_prompt": {
"type": "string",
"description": "Optional system prompt override for the sub-agent"
},
"max_turns": {
"type": "number",
"description": "Maximum number of turns for the sub-agent (default 10)"
},
"model": {
"type": "string",
"description": "Optional model to use for this agent"
}
},
"required": ["description", "prompt"]
})
}
async fn execute(&self, input: Value, ctx: &ToolContext) -> ToolResult {
let params: AgentInput = match serde_json::from_value(input) {
Ok(p) => p,
Err(e) => return ToolResult::error(format!("Invalid input: {}", e)),
};
info!(description = %params.description, "Spawning sub-agent");
// Resolve API key from environment.
let api_key = match std::env::var("ANTHROPIC_API_KEY")
.ok()
.filter(|k| !k.is_empty())
{
Some(k) => k,
None => {
return ToolResult::error(
"ANTHROPIC_API_KEY not set cannot spawn sub-agent".to_string(),
)
}
};
// Dedicated Anthropic client for the sub-agent.
let client = match AnthropicClient::new(ClientConfig {
api_key,
..Default::default()
}) {
Ok(c) => Arc::new(c),
Err(e) => return ToolResult::error(format!("Failed to create client: {}", e)),
};
// Build the tool list for the sub-agent.
// Always exclude AgentTool itself to prevent unbounded recursion.
let all = cc_tools::all_tools();
let agent_tools: Vec<Box<dyn Tool>> = if let Some(ref allowed) = params.tools {
all.into_iter()
.filter(|t| allowed.contains(&t.name().to_string()))
.collect()
} else {
all.into_iter()
.filter(|t| t.name() != cc_core::constants::TOOL_NAME_AGENT)
.collect()
};
// Resolve model: explicit override > parent context model > default.
let model = params
.model
.filter(|m| !m.is_empty())
.unwrap_or_else(|| cc_core::constants::DEFAULT_MODEL.to_string());
let system_prompt = params.system_prompt.unwrap_or_else(|| {
"You are a specialized AI agent helping with a specific sub-task. \
Complete the task thoroughly and return your findings."
.to_string()
});
let query_config = QueryConfig {
model,
max_tokens: cc_core::constants::DEFAULT_MAX_TOKENS,
max_turns: params.max_turns.unwrap_or(10),
system_prompt: Some(system_prompt),
append_system_prompt: None,
thinking_budget: None,
temperature: None,
};
// Run the sub-agent loop.
let mut messages = vec![Message::user(params.prompt)];
let cancel = CancellationToken::new();
let outcome = run_query_loop(
client.as_ref(),
&mut messages,
&agent_tools,
ctx,
&query_config,
ctx.cost_tracker.clone(),
None, // no event forwarding for sub-agents
cancel,
)
.await;
match outcome {
QueryOutcome::EndTurn { message, usage } => {
let text = message.get_all_text();
debug!(
description = %params.description,
output_tokens = usage.output_tokens,
"Sub-agent completed"
);
ToolResult::success(text)
}
QueryOutcome::MaxTokens { partial_message, .. } => {
let text = partial_message.get_all_text();
ToolResult::success(format!(
"{}\n\n[Note: Agent hit max_tokens limit]",
text
))
}
QueryOutcome::Cancelled => {
ToolResult::error("Sub-agent was cancelled".to_string())
}
QueryOutcome::Error(e) => {
ToolResult::error(format!("Sub-agent error: {}", e))
}
}
}
}

View file

@ -0,0 +1,410 @@
//! AutoDream: automatic memory consolidation daemon
//!
//! Background memory consolidation. Fires a consolidation prompt as a forked
//! subagent when the time gate passes AND enough sessions have accumulated.
//!
//! Gate order (cheapest first):
//! 1. Time: hours since last_consolidated_at >= min_hours (one stat)
//! 2. Sessions: transcript count with mtime > last_consolidated_at >= min_sessions
//! 3. Lock: no other process mid-consolidation (stale after 1 hour)
use std::path::PathBuf;
use std::time::{Duration, SystemTime, UNIX_EPOCH};
use tokio::fs;
use anyhow::Result;
use serde::{Deserialize, Serialize};
// Scan throttle: when time-gate passes but session-gate doesn't, the lock
// mtime doesn't advance, so the time-gate keeps passing every turn.
pub const SESSION_SCAN_INTERVAL_SECS: u64 = 10 * 60; // 10 minutes
/// GrowthBook-sourced scheduling config (with defaults)
#[derive(Debug, Clone)]
pub struct AutoDreamConfig {
/// Minimum hours between consolidations (default: 24)
pub min_hours: f64,
/// Minimum new-session count to trigger (default: 5)
pub min_sessions: usize,
}
impl Default for AutoDreamConfig {
fn default() -> Self {
Self {
min_hours: 24.0,
min_sessions: 5,
}
}
}
/// Persisted state written to `.consolidation_state.json`
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
pub struct ConsolidationState {
/// Unix timestamp (seconds) of last successful consolidation.
/// `None` means never consolidated.
pub last_consolidated_at: Option<u64>,
/// ETag / opaque lock token reserved for future distributed locking.
pub lock_etag: Option<String>,
}
/// Core AutoDream logic; owns path state, delegates I/O to async methods.
pub struct AutoDream {
config: AutoDreamConfig,
memory_dir: PathBuf,
conversations_dir: PathBuf,
lock_file: PathBuf,
state_file: PathBuf,
}
impl AutoDream {
pub fn new(memory_dir: PathBuf, conversations_dir: PathBuf) -> Self {
let lock_file = memory_dir.join(".consolidation_lock");
let state_file = memory_dir.join(".consolidation_state.json");
Self {
config: AutoDreamConfig::default(),
memory_dir,
conversations_dir,
lock_file,
state_file,
}
}
/// Construct with explicit config (for testing / feature-flag overrides).
pub fn with_config(
config: AutoDreamConfig,
memory_dir: PathBuf,
conversations_dir: PathBuf,
) -> Self {
let lock_file = memory_dir.join(".consolidation_lock");
let state_file = memory_dir.join(".consolidation_state.json");
Self {
config,
memory_dir,
conversations_dir,
lock_file,
state_file,
}
}
// -------------------------------------------------------------------------
// Gate checks
// -------------------------------------------------------------------------
/// Check all gates cheapest-first. Returns `true` if consolidation should run.
pub async fn should_consolidate(&self, state: &ConsolidationState) -> Result<bool> {
// Gate 1: Time gate (cheapest one arithmetic check)
if !self.time_gate_passes(state) {
return Ok(false);
}
// Gate 2: Session gate (directory scan)
if !self.session_gate_passes(state).await? {
return Ok(false);
}
// Gate 3: Lock gate (no other process mid-consolidation)
if !self.lock_gate_passes().await? {
return Ok(false);
}
Ok(true)
}
fn time_gate_passes(&self, state: &ConsolidationState) -> bool {
let now_secs = now_secs();
match state.last_consolidated_at {
None => true, // Never consolidated → always pass
Some(last) => {
let hours_elapsed = (now_secs.saturating_sub(last)) as f64 / 3600.0;
hours_elapsed >= self.config.min_hours
}
}
}
async fn session_gate_passes(&self, state: &ConsolidationState) -> Result<bool> {
let last_secs = state.last_consolidated_at.unwrap_or(0);
let mut count = 0usize;
if !self.conversations_dir.exists() {
return Ok(false);
}
let mut dir = fs::read_dir(&self.conversations_dir).await?;
while let Some(entry) = dir.next_entry().await? {
let metadata = entry.metadata().await?;
if let Ok(mtime) = metadata.modified() {
let mtime_secs = mtime
.duration_since(UNIX_EPOCH)
.unwrap_or(Duration::ZERO)
.as_secs();
if mtime_secs > last_secs {
count += 1;
if count >= self.config.min_sessions {
return Ok(true);
}
}
}
}
Ok(false)
}
async fn lock_gate_passes(&self) -> Result<bool> {
if !self.lock_file.exists() {
return Ok(true);
}
// Stale lock (>1 hour) is treated as released
match fs::metadata(&self.lock_file).await {
Ok(meta) => {
if let Ok(mtime) = meta.modified() {
let age_secs = SystemTime::now()
.duration_since(mtime)
.unwrap_or(Duration::ZERO)
.as_secs();
Ok(age_secs > 3600)
} else {
// Cannot stat mtime → conservative: gate passes (treat as stale)
Ok(true)
}
}
Err(_) => Ok(true), // File disappeared between exists() and metadata()
}
}
// -------------------------------------------------------------------------
// Lock management
// -------------------------------------------------------------------------
/// Write a timestamp to the lock file, creating it if absent.
pub async fn acquire_lock(&self) -> Result<()> {
if let Some(parent) = self.lock_file.parent() {
fs::create_dir_all(parent).await?;
}
fs::write(&self.lock_file, now_secs().to_string()).await?;
Ok(())
}
/// Remove the lock file. No-op if it doesn't exist.
pub async fn release_lock(&self) -> Result<()> {
if self.lock_file.exists() {
fs::remove_file(&self.lock_file).await?;
}
Ok(())
}
// -------------------------------------------------------------------------
// State persistence
// -------------------------------------------------------------------------
/// Stamp `last_consolidated_at = now` and persist.
pub async fn update_state(&self, state: &mut ConsolidationState) -> Result<()> {
state.last_consolidated_at = Some(now_secs());
let json = serde_json::to_string_pretty(state)?;
if let Some(parent) = self.state_file.parent() {
fs::create_dir_all(parent).await?;
}
fs::write(&self.state_file, json).await?;
Ok(())
}
/// Load persisted state; returns `Default` on any error (missing file, parse failure).
pub async fn load_state(&self) -> ConsolidationState {
match fs::read_to_string(&self.state_file).await {
Ok(data) => serde_json::from_str(&data).unwrap_or_default(),
Err(_) => ConsolidationState::default(),
}
}
// -------------------------------------------------------------------------
// Prompt construction
// -------------------------------------------------------------------------
/// Build the consolidation prompt for the forked subagent.
pub fn consolidation_prompt(&self) -> String {
format!(
r#"# Dream: Memory Consolidation
You are performing a dream a reflective pass over your memory files. Synthesize what you have learned recently into durable, well-organized memories so that future sessions can orient quickly.
Memory directory: `{memory_dir}`
Session transcripts: `{conv_dir}` (large JSONL files grep narrowly, do not read whole files)
---
## Phase 1 Orient
- `ls` the memory directory to see what already exists
- Read `MEMORY.md` to understand the current index
- Skim existing topic files so you improve them rather than creating duplicates
## Phase 2 Gather recent signal
Look for new information worth persisting:
1. **Daily logs** (`logs/YYYY/MM/YYYY-MM-DD.md`) if present
2. **Existing memories that drifted** facts that contradict what you see now
3. **Transcript search** grep narrowly for specific terms:
`grep -rn "<narrow term>" {conv_dir}/ --include="*.jsonl" | tail -50`
Do not exhaustively read transcripts. Look only for things you already suspect matter.
## Phase 3 Consolidate
For each thing worth remembering, write or update a memory file. Focus on:
- Merging new signal into existing topic files rather than creating near-duplicates
- Converting relative dates to absolute dates
- Deleting contradicted facts
## Phase 4 Prune and index
Update `MEMORY.md` so it stays under 200 lines and ~25 KB. It is an **index**, not a dump.
Each entry: `- [Title](file.md) one-line hook`
- Remove pointers to stale, wrong, or superseded memories
- Shorten verbose entries; move detail into topic files
- Add pointers to newly important memories
- Resolve contradictions
---
Return a brief summary of what you consolidated, updated, or pruned. If nothing changed, say so.
**Tool constraints for this run:** Use only read-only Bash commands (ls, find, grep, cat, stat, wc, head, tail). Anything that writes, redirects to a file, or modifies state will be denied.
"#,
memory_dir = self.memory_dir.display(),
conv_dir = self.conversations_dir.display(),
)
}
}
// -------------------------------------------------------------------------
// Helpers
// -------------------------------------------------------------------------
fn now_secs() -> u64 {
SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or(Duration::ZERO)
.as_secs()
}
// -------------------------------------------------------------------------
// Tests
// -------------------------------------------------------------------------
#[cfg(test)]
mod tests {
use super::*;
use tempfile::TempDir;
fn make_dream(tmp: &TempDir) -> AutoDream {
let mem = tmp.path().join("memory");
let conv = tmp.path().join("conversations");
AutoDream::new(mem, conv)
}
// --- time_gate_passes ---
#[test]
fn test_time_gate_never_consolidated() {
let tmp = TempDir::new().unwrap();
let dream = make_dream(&tmp);
let state = ConsolidationState::default();
assert!(dream.time_gate_passes(&state), "no prior consolidation → gate passes");
}
#[test]
fn test_time_gate_recent_consolidation() {
let tmp = TempDir::new().unwrap();
let dream = AutoDream::with_config(
AutoDreamConfig { min_hours: 24.0, min_sessions: 5 },
tmp.path().join("memory"),
tmp.path().join("conversations"),
);
let state = ConsolidationState {
last_consolidated_at: Some(now_secs()), // just now
lock_etag: None,
};
assert!(!dream.time_gate_passes(&state), "just consolidated → gate blocked");
}
#[test]
fn test_time_gate_old_consolidation() {
let tmp = TempDir::new().unwrap();
let dream = AutoDream::with_config(
AutoDreamConfig { min_hours: 24.0, min_sessions: 5 },
tmp.path().join("memory"),
tmp.path().join("conversations"),
);
// 25 hours ago
let old = now_secs().saturating_sub(25 * 3600);
let state = ConsolidationState {
last_consolidated_at: Some(old),
lock_etag: None,
};
assert!(dream.time_gate_passes(&state), "consolidated 25h ago → gate passes");
}
// --- lock_gate_passes (sync-friendly via tokio::test) ---
#[tokio::test]
async fn test_lock_gate_no_lock_file() {
let tmp = TempDir::new().unwrap();
let dream = make_dream(&tmp);
assert!(dream.lock_gate_passes().await.unwrap());
}
#[tokio::test]
async fn test_lock_gate_fresh_lock_blocks() {
let tmp = TempDir::new().unwrap();
let dream = make_dream(&tmp);
std::fs::create_dir_all(&dream.memory_dir).unwrap();
std::fs::write(&dream.lock_file, "12345").unwrap();
// Fresh file → gate blocked
assert!(!dream.lock_gate_passes().await.unwrap());
}
// --- consolidation_prompt sanity ---
#[test]
fn test_consolidation_prompt_contains_paths() {
let tmp = TempDir::new().unwrap();
let dream = make_dream(&tmp);
let prompt = dream.consolidation_prompt();
assert!(prompt.contains("MEMORY.md"));
assert!(prompt.contains("Memory Consolidation"));
assert!(prompt.contains("Phase 1"));
assert!(prompt.contains("Phase 4"));
}
// --- update_state / load_state round-trip ---
#[tokio::test]
async fn test_state_round_trip() {
let tmp = TempDir::new().unwrap();
let dream = make_dream(&tmp);
std::fs::create_dir_all(&dream.memory_dir).unwrap();
let mut state = ConsolidationState::default();
dream.update_state(&mut state).await.unwrap();
assert!(state.last_consolidated_at.is_some());
let loaded = dream.load_state().await;
assert_eq!(loaded.last_consolidated_at, state.last_consolidated_at);
}
// --- acquire_lock / release_lock ---
#[tokio::test]
async fn test_acquire_release_lock() {
let tmp = TempDir::new().unwrap();
let dream = make_dream(&tmp);
dream.acquire_lock().await.unwrap();
assert!(dream.lock_file.exists());
dream.release_lock().await.unwrap();
assert!(!dream.lock_file.exists());
}
}

View file

@ -0,0 +1,290 @@
// Auto-compact service for cc-query.
//
// When the conversation context window fills up (~90%+), we automatically
// summarise older messages to free space. This mirrors the TypeScript
// autoCompact / compact service behaviour.
//
// Strategy:
// 1. Keep the last KEEP_RECENT_MESSAGES messages verbatim.
// 2. Ask the model to summarise everything before those messages.
// 3. Replace the head of the conversation with a single synthetic
// <compact-summary> user message, followed by the recent tail.
//
// The summary is generated in a single non-agentic API call so it doesn't
// trigger another compaction recursively.
use cc_api::{ApiMessage, CreateMessageRequest, StreamAccumulator, StreamEvent, StreamHandler, SystemPrompt};
use cc_core::error::ClaudeError;
use cc_core::types::{Message, Role};
use serde_json::Value;
use std::sync::Arc;
use tracing::{debug, info, warn};
// ---------------------------------------------------------------------------
// Constants (mirrors TypeScript autoCompact.ts)
// ---------------------------------------------------------------------------
/// We target keeping this many context tokens free after compaction.
const AUTOCOMPACT_BUFFER_TOKENS: u64 = 13_000;
/// Start warning when this many tokens remain in the context window.
const WARNING_THRESHOLD_BUFFER_TOKENS: u64 = 20_000;
/// Fraction of the context window at which auto-compact triggers.
const AUTOCOMPACT_TRIGGER_FRACTION: f64 = 0.90;
/// How many recent messages to preserve verbatim after compaction.
const KEEP_RECENT_MESSAGES: usize = 10;
/// Max consecutive auto-compact failures before giving up (circuit breaker).
const MAX_CONSECUTIVE_FAILURES: u32 = 3;
// ---------------------------------------------------------------------------
// Public types
// ---------------------------------------------------------------------------
/// Tracks auto-compact state across turns.
#[derive(Debug, Default, Clone)]
pub struct AutoCompactState {
/// Total compactions performed this session.
pub compaction_count: u32,
/// Consecutive failures (reset on success).
pub consecutive_failures: u32,
/// Whether the circuit breaker is open (too many failures).
pub disabled: bool,
}
impl AutoCompactState {
/// Record a successful compaction.
pub fn on_success(&mut self) {
self.compaction_count += 1;
self.consecutive_failures = 0;
}
/// Record a failed compaction; open circuit breaker if too many.
pub fn on_failure(&mut self) {
self.consecutive_failures += 1;
if self.consecutive_failures >= MAX_CONSECUTIVE_FAILURES {
warn!(
failures = self.consecutive_failures,
"Auto-compact circuit breaker opened disabling for this session"
);
self.disabled = true;
}
}
}
/// Token-usage state relative to the context window.
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum TokenWarningState {
/// Plenty of space left.
Ok,
/// Getting close warn the user.
Warning,
/// Critical compact now.
Critical,
}
// ---------------------------------------------------------------------------
// Threshold helpers
// ---------------------------------------------------------------------------
/// Return the effective context-window size in tokens for the given model.
/// These are approximate; the API enforces the real limits server-side.
pub fn context_window_for_model(model: &str) -> u64 {
if model.contains("opus-4") || model.contains("sonnet-4") || model.contains("haiku-4") {
200_000
} else if model.contains("claude-3-5") || model.contains("claude-3.5") {
200_000
} else {
100_000
}
}
/// Determine token-warning state given current input token count and model.
pub fn calculate_token_warning_state(input_tokens: u64, model: &str) -> TokenWarningState {
let window = context_window_for_model(model);
let remaining = window.saturating_sub(input_tokens);
if remaining <= WARNING_THRESHOLD_BUFFER_TOKENS as u64 {
TokenWarningState::Warning
} else {
TokenWarningState::Ok
}
}
/// Return `true` when auto-compaction should fire.
pub fn should_auto_compact(input_tokens: u64, model: &str, state: &AutoCompactState) -> bool {
if state.disabled {
return false;
}
let window = context_window_for_model(model);
let threshold = (window as f64 * AUTOCOMPACT_TRIGGER_FRACTION) as u64;
input_tokens >= threshold
}
// ---------------------------------------------------------------------------
// Core compaction logic
// ---------------------------------------------------------------------------
/// Summarise `messages[..split_at]` using the Anthropic API and return a
/// new conversation consisting of a single summary message followed by
/// `messages[split_at..]`.
async fn summarise_head(
client: &cc_api::AnthropicClient,
messages: &[Message],
split_at: usize,
model: &str,
) -> Result<Vec<Message>, ClaudeError> {
if split_at == 0 {
return Ok(messages.to_vec());
}
let head = &messages[..split_at];
// Build a transcript string for the summarisation prompt.
let mut transcript = String::new();
for msg in head {
let role_label = match msg.role {
Role::User => "Human",
Role::Assistant => "Assistant",
};
let text = msg.get_all_text();
if !text.is_empty() {
transcript.push_str(&format!("{}: {}\n\n", role_label, text));
}
}
let summarise_prompt = format!(
"Please create a comprehensive yet concise summary of the conversation transcript \
below. The summary will be used as context for continuing the conversation, so \
include all important decisions, code changes, findings, and context that would be \
needed to continue seamlessly.\n\n\
Focus on:\n\
- Key decisions made and their rationale\n\
- Code or files that were created/modified\n\
- Important findings or conclusions\n\
- The current state of any ongoing tasks\n\
- Any constraints or requirements discovered\n\n\
<transcript>\n{}\n</transcript>",
transcript
);
let api_msgs = vec![ApiMessage {
role: "user".to_string(),
content: Value::String(summarise_prompt),
}];
let request = CreateMessageRequest::builder(model, 4096)
.messages(api_msgs)
.system(SystemPrompt::Text(
"You are a helpful assistant that creates concise conversation summaries. \
Be thorough but concise. Preserve technical details, file names, and code snippets \
that would be important for continuing the work."
.to_string(),
))
.build();
// Use a null handler since we just want the final accumulated message.
let handler: Arc<dyn StreamHandler> = Arc::new(cc_api::streaming::NullStreamHandler);
let mut rx = client.create_message_stream(request, handler).await?;
let mut acc = StreamAccumulator::new();
while let Some(evt) = rx.recv().await {
acc.on_event(&evt);
if matches!(evt, StreamEvent::MessageStop) {
break;
}
}
let (summary_msg, _usage, _stop) = acc.finish();
let summary_text = summary_msg.get_all_text();
if summary_text.is_empty() {
return Err(ClaudeError::Other("Compact summary was empty".to_string()));
}
// Build the new conversation:
// [user: compact summary preamble] [assistant: summary content] [tail messages]
let compact_notice = Message::user(format!(
"<compact-summary>\n\
The conversation history has been automatically compacted to stay within context limits.\n\
The following is a summary of the previous conversation:\n\n\
{}\n\
</compact-summary>",
summary_text
));
let mut new_messages = vec![compact_notice];
new_messages.extend_from_slice(&messages[split_at..]);
Ok(new_messages)
}
/// Compact `messages` in-place, replacing the head with a summary.
/// Returns the new messages vector on success.
pub async fn compact_conversation(
client: &cc_api::AnthropicClient,
messages: &[Message],
model: &str,
) -> Result<Vec<Message>, ClaudeError> {
let total = messages.len();
if total <= KEEP_RECENT_MESSAGES + 1 {
debug!(
total,
"Too few messages to compact keeping everything"
);
return Ok(messages.to_vec());
}
// Split: summarise everything except the most recent KEEP_RECENT_MESSAGES.
let split_at = total.saturating_sub(KEEP_RECENT_MESSAGES);
info!(
total,
split_at,
keep = KEEP_RECENT_MESSAGES,
"Compacting conversation"
);
summarise_head(client, messages, split_at, model).await
}
/// Auto-compact `messages` if needed. Updates `state` in place.
/// Returns `Some(new_messages)` if compaction ran, `None` otherwise.
pub async fn auto_compact_if_needed(
client: &cc_api::AnthropicClient,
messages: &[Message],
input_tokens: u64,
model: &str,
state: &mut AutoCompactState,
) -> Option<Vec<Message>> {
if !should_auto_compact(input_tokens, model, state) {
return None;
}
info!(
input_tokens,
model,
compaction_count = state.compaction_count,
"Auto-compact triggered"
);
match compact_conversation(client, messages, model).await {
Ok(new_msgs) => {
state.on_success();
info!(
original_count = messages.len(),
new_count = new_msgs.len(),
"Auto-compact complete"
);
Some(new_msgs)
}
Err(e) => {
warn!(error = %e, "Auto-compact failed");
state.on_failure();
None
}
}
}

View file

@ -0,0 +1,173 @@
//! Coordinator mode: multi-worker agent orchestration
use crate::*;
pub const COORDINATOR_ENV_VAR: &str = "CLAUDE_CODE_COORDINATOR_MODE";
pub fn is_coordinator_mode() -> bool {
std::env::var(COORDINATOR_ENV_VAR)
.map(|v| !v.is_empty() && v != "0" && v != "false")
.unwrap_or(false)
}
/// System prompt sections injected when coordinator mode is active
pub fn coordinator_system_prompt() -> &'static str {
r#"
## Coordinator Mode
You are operating as an orchestrator for parallel worker agents.
### Your Role
- Orchestrate workers using the Agent tool to spawn parallel subagents
- Use SendMessage to continue communication with running workers
- Use TaskStop to cancel workers that are no longer needed
- Synthesize findings across workers before presenting to the user
- Answer directly when the question doesn't need delegation
### Task Workflow
1. **Research Phase**: Spawn workers to gather information in parallel
2. **Synthesis Phase**: Collect and merge worker findings
3. **Implementation Phase**: Delegate implementation tasks to specialized workers
4. **Verification Phase**: Spawn verification workers to validate results
### Worker Guidelines
- Worker prompts must be fully self-contained (workers cannot see your conversation)
- Always synthesize findings before spawning follow-up workers
- Workers have access to all standard tools + MCP + skills
- Use TaskCreate/TaskUpdate to track parallel work
### Internal Tools (do not delegate to workers)
- Agent, SendMessage, TaskStop (coordination only)
"#
}
/// Tools that should NOT be passed to worker agents
pub const INTERNAL_COORDINATOR_TOOLS: &[&str] = &[
"Agent",
"SendMessage",
"TaskStop",
];
/// Get the user context injected for coordinator sessions
pub fn coordinator_user_context(available_tools: &[String], mcp_servers: &[String]) -> String {
let tool_list = available_tools
.iter()
.filter(|t| !INTERNAL_COORDINATOR_TOOLS.contains(&t.as_str()))
.cloned()
.collect::<Vec<_>>()
.join(", ");
let mcp_section = if mcp_servers.is_empty() {
String::new()
} else {
format!("\nConnected MCP servers: {}", mcp_servers.join(", "))
};
format!(
"Available worker tools: {}{}\n",
tool_list, mcp_section
)
}
/// Check if session mode matches current coordinator setting, returns warning if mismatched
pub fn match_session_mode(stored_coordinator: bool) -> Option<String> {
let current = is_coordinator_mode();
if stored_coordinator != current {
if current {
std::env::set_var(COORDINATOR_ENV_VAR, "1");
} else {
std::env::remove_var(COORDINATOR_ENV_VAR);
}
Some(format!(
"Session was created in {} mode, switching to match.",
if stored_coordinator { "coordinator" } else { "standard" }
))
} else {
None
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_is_coordinator_mode_unset() {
std::env::remove_var(COORDINATOR_ENV_VAR);
assert!(!is_coordinator_mode());
}
#[test]
fn test_is_coordinator_mode_set_to_one() {
std::env::set_var(COORDINATOR_ENV_VAR, "1");
assert!(is_coordinator_mode());
std::env::remove_var(COORDINATOR_ENV_VAR);
}
#[test]
fn test_is_coordinator_mode_set_to_false() {
std::env::set_var(COORDINATOR_ENV_VAR, "false");
assert!(!is_coordinator_mode());
std::env::remove_var(COORDINATOR_ENV_VAR);
}
#[test]
fn test_is_coordinator_mode_set_to_zero() {
std::env::set_var(COORDINATOR_ENV_VAR, "0");
assert!(!is_coordinator_mode());
std::env::remove_var(COORDINATOR_ENV_VAR);
}
#[test]
fn test_coordinator_user_context_filters_internal_tools() {
let tools = vec![
"Bash".to_string(),
"Agent".to_string(),
"SendMessage".to_string(),
"TaskStop".to_string(),
"Read".to_string(),
];
let ctx = coordinator_user_context(&tools, &[]);
assert!(ctx.contains("Bash"));
assert!(ctx.contains("Read"));
assert!(!ctx.contains("Agent"));
assert!(!ctx.contains("SendMessage"));
assert!(!ctx.contains("TaskStop"));
}
#[test]
fn test_coordinator_user_context_mcp_servers() {
let tools = vec!["Bash".to_string()];
let mcps = vec!["filesystem".to_string(), "git".to_string()];
let ctx = coordinator_user_context(&tools, &mcps);
assert!(ctx.contains("filesystem"));
assert!(ctx.contains("git"));
}
#[test]
fn test_match_session_mode_no_change_needed() {
std::env::remove_var(COORDINATOR_ENV_VAR);
// current = false, stored = false → no warning
assert!(match_session_mode(false).is_none());
}
#[test]
fn test_match_session_mode_switches_to_coordinator() {
std::env::remove_var(COORDINATOR_ENV_VAR);
// current = false, stored = true → should flip and warn
let msg = match_session_mode(true);
assert!(msg.is_some());
assert!(msg.unwrap().contains("coordinator"));
// Clean up
std::env::remove_var(COORDINATOR_ENV_VAR);
}
#[test]
fn test_coordinator_system_prompt_content() {
let prompt = coordinator_system_prompt();
assert!(prompt.contains("Coordinator Mode"));
assert!(prompt.contains("orchestrator"));
assert!(prompt.contains("Research Phase"));
assert!(prompt.contains("Synthesis Phase"));
}
}

View file

@ -0,0 +1,114 @@
// cron_scheduler: background task that fires cron-scheduled prompts.
//
// Runs as a long-lived tokio task. Every minute it checks the global CRON_STORE
// (in cc-tools) for tasks whose cron expression matches the current wall-clock
// minute. Matching tasks are fired by spawning a sub-query loop, exactly like
// the AgentTool does for sub-agents.
//
// One-shot tasks (recurring=false) are automatically removed from the store
// by `pop_due_tasks` after they are returned.
use crate::{QueryConfig, QueryOutcome, run_query_loop};
use cc_core::types::Message;
use cc_tools::Tool;
use cc_tools::ToolContext;
use chrono::Timelike;
use std::sync::Arc;
use tokio::time::{Duration, sleep};
use tokio_util::sync::CancellationToken;
use tracing::{debug, error, info};
/// Start the background cron scheduler.
///
/// Returns immediately; the scheduler runs as a detached tokio task.
/// Call `cancel.cancel()` to stop it gracefully.
pub fn start_cron_scheduler(
client: Arc<cc_api::AnthropicClient>,
tools: Arc<Vec<Box<dyn Tool>>>,
tool_ctx: ToolContext,
query_config: QueryConfig,
cancel: CancellationToken,
) {
tokio::spawn(async move {
run_scheduler_loop(client, tools, tool_ctx, query_config, cancel).await;
});
}
async fn run_scheduler_loop(
client: Arc<cc_api::AnthropicClient>,
tools: Arc<Vec<Box<dyn Tool>>>,
tool_ctx: ToolContext,
query_config: QueryConfig,
cancel: CancellationToken,
) {
info!("Cron scheduler started");
loop {
// Sleep until the next whole-minute boundary (±1s tolerance).
let now = chrono::Local::now();
let secs_into_minute = now.second() as u64;
let nanos_ms = now.nanosecond() as u64 / 1_000_000;
// How many ms until the next minute starts? Use saturating sub to avoid underflow.
let ms_to_next_minute = (60u64.saturating_sub(secs_into_minute))
.saturating_mul(1_000)
.saturating_sub(nanos_ms)
.max(1); // always sleep at least 1ms
tokio::select! {
_ = sleep(Duration::from_millis(ms_to_next_minute)) => {}
_ = cancel.cancelled() => {
info!("Cron scheduler stopped");
return;
}
}
let tick_time = chrono::Local::now();
debug!(time = %tick_time.format("%H:%M"), "Cron scheduler tick");
// Find tasks due at this minute.
let due = cc_tools::cron::pop_due_tasks(&tick_time).await;
for task in due {
info!(id = %task.id, cron = %task.cron, "Firing cron task");
let client = client.clone();
let tools = tools.clone();
let tool_ctx = tool_ctx.clone();
let query_config = query_config.clone();
let cost_tracker = tool_ctx.cost_tracker.clone();
let cancel_child = cancel.clone();
let task_id = task.id.clone();
tokio::spawn(async move {
let mut messages = vec![Message::user(task.prompt.clone())];
let outcome = run_query_loop(
client.as_ref(),
&mut messages,
&tools,
&tool_ctx,
&query_config,
cost_tracker,
None, // background — no UI event channel
cancel_child,
)
.await;
match outcome {
QueryOutcome::EndTurn { .. } => {
info!(id = %task_id, "Cron task completed");
}
QueryOutcome::Error(e) => {
error!(id = %task_id, error = %e, "Cron task failed");
}
QueryOutcome::MaxTokens { .. } => {
info!(id = %task_id, "Cron task hit max tokens");
}
QueryOutcome::Cancelled => {
debug!(id = %task_id, "Cron task cancelled");
}
}
});
}
}
}

View file

@ -0,0 +1,636 @@
// cc-query: The core agentic query loop.
//
// This crate implements the main conversation loop that:
// 1. Sends messages to the Anthropic API
// 2. Processes streaming responses
// 3. Detects tool-use requests and dispatches them
// 4. Feeds tool results back to the model
// 5. Handles auto-compact when the context window fills up
// 6. Manages stop conditions (end_turn, max_turns, cancellation)
pub mod agent_tool;
pub mod auto_dream;
pub mod compact;
pub mod coordinator;
pub mod cron_scheduler;
pub use agent_tool::AgentTool;
pub use cron_scheduler::start_cron_scheduler;
pub use compact::{
AutoCompactState, TokenWarningState, auto_compact_if_needed, calculate_token_warning_state,
compact_conversation, context_window_for_model, should_auto_compact,
};
use cc_api::{
ApiMessage, ApiToolDefinition, CreateMessageRequest, StreamAccumulator, StreamEvent,
StreamHandler, SystemPrompt, ThinkingConfig,
};
use cc_core::config::Config;
use cc_core::cost::CostTracker;
use cc_core::error::ClaudeError;
use cc_core::types::{ContentBlock, Message, ToolResultContent, UsageInfo};
use cc_tools::{Tool, ToolContext, ToolResult};
use serde_json::Value;
use std::sync::Arc;
use tokio::sync::mpsc;
use tracing::{debug, error, info, warn};
// ---------------------------------------------------------------------------
// Public types
// ---------------------------------------------------------------------------
/// Outcome of a single query-loop run.
#[derive(Debug)]
pub enum QueryOutcome {
/// The model finished its turn (end_turn stop reason).
EndTurn { message: Message, usage: UsageInfo },
/// The model hit max_tokens.
MaxTokens { partial_message: Message, usage: UsageInfo },
/// The conversation was cancelled by the user.
Cancelled,
/// An unrecoverable error occurred.
Error(ClaudeError),
}
/// Configuration for a single query-loop invocation.
#[derive(Clone)]
pub struct QueryConfig {
pub model: String,
pub max_tokens: u32,
pub max_turns: u32,
pub system_prompt: Option<String>,
pub append_system_prompt: Option<String>,
pub thinking_budget: Option<u32>,
pub temperature: Option<f32>,
}
impl Default for QueryConfig {
fn default() -> Self {
Self {
model: cc_core::constants::DEFAULT_MODEL.to_string(),
max_tokens: cc_core::constants::DEFAULT_MAX_TOKENS,
max_turns: cc_core::constants::MAX_TURNS_DEFAULT,
system_prompt: None,
append_system_prompt: None,
thinking_budget: None,
temperature: None,
}
}
}
impl QueryConfig {
pub fn from_config(cfg: &Config) -> Self {
Self {
model: cfg.effective_model().to_string(),
max_tokens: cfg.effective_max_tokens(),
..Default::default()
}
}
}
/// Events emitted by the query loop for the TUI to render.
#[derive(Debug, Clone)]
pub enum QueryEvent {
/// A stream event from the API.
Stream(StreamEvent),
/// A tool is about to be executed.
ToolStart { tool_name: String, tool_id: String },
/// A tool has finished executing.
ToolEnd { tool_name: String, tool_id: String, result: String, is_error: bool },
/// The model finished a turn.
TurnComplete { turn: u32, stop_reason: String },
/// An informational status message.
Status(String),
/// An error.
Error(String),
}
// ---------------------------------------------------------------------------
// Query loop
// ---------------------------------------------------------------------------
/// Run the agentic query loop.
///
/// This sends the conversation to the API, handles tool calls in a loop, and
/// returns when the model issues an end_turn or an error/limit is hit.
pub async fn run_query_loop(
client: &cc_api::AnthropicClient,
messages: &mut Vec<Message>,
tools: &[Box<dyn Tool>],
tool_ctx: &ToolContext,
config: &QueryConfig,
cost_tracker: Arc<CostTracker>,
event_tx: Option<mpsc::UnboundedSender<QueryEvent>>,
cancel_token: tokio_util::sync::CancellationToken,
) -> QueryOutcome {
let mut turn = 0u32;
let mut compact_state = compact::AutoCompactState::default();
loop {
turn += 1;
if turn > config.max_turns {
info!(turns = turn, "Max turns reached");
if let Some(ref tx) = event_tx {
let _ = tx.send(QueryEvent::Status(format!(
"Reached maximum turn limit ({})",
config.max_turns
)));
}
// Return the last assistant message if any
let last_msg = messages
.last()
.cloned()
.unwrap_or_else(|| Message::assistant("Max turns reached."));
return QueryOutcome::EndTurn {
message: last_msg,
usage: UsageInfo::default(),
};
}
// Check for cancellation
if cancel_token.is_cancelled() {
return QueryOutcome::Cancelled;
}
// Build API request
let api_messages: Vec<ApiMessage> = messages.iter().map(ApiMessage::from).collect();
let api_tools: Vec<ApiToolDefinition> = tools
.iter()
.map(|t| ApiToolDefinition::from(&t.to_definition()))
.collect();
let system = build_system_prompt(config);
let mut req_builder = CreateMessageRequest::builder(&config.model, config.max_tokens)
.messages(api_messages)
.system(system)
.tools(api_tools);
// Only enable extended thinking if an explicit budget was provided.
if let Some(budget) = config.thinking_budget {
req_builder = req_builder.thinking(ThinkingConfig::enabled(budget));
}
let request = req_builder.build();
// Create a stream handler that forwards to the event channel
let handler: Arc<dyn StreamHandler> = if let Some(ref tx) = event_tx {
let tx = tx.clone();
Arc::new(ChannelStreamHandler { tx })
} else {
Arc::new(cc_api::streaming::NullStreamHandler)
};
// Send to API
debug!(turn, model = %config.model, "Sending API request");
let mut stream_rx = match client.create_message_stream(request, handler).await {
Ok(rx) => rx,
Err(e) => {
error!(error = %e, "API request failed");
return QueryOutcome::Error(e);
}
};
// Accumulate the streamed response
let mut accumulator = StreamAccumulator::new();
loop {
tokio::select! {
_ = cancel_token.cancelled() => {
return QueryOutcome::Cancelled;
}
event = stream_rx.recv() => {
match event {
Some(evt) => {
accumulator.on_event(&evt);
match &evt {
StreamEvent::Error { error_type, message } => {
if error_type == "overloaded_error" {
warn!("API overloaded, should retry");
}
error!(error_type, message, "Stream error");
}
StreamEvent::MessageStop => break,
_ => {}
}
}
None => break, // Stream ended
}
}
}
}
let (assistant_msg, usage, stop_reason) = accumulator.finish();
// Track costs
cost_tracker.add_usage(
usage.input_tokens,
usage.output_tokens,
usage.cache_creation_input_tokens,
usage.cache_read_input_tokens,
);
// Append assistant message to conversation
messages.push(assistant_msg.clone());
let stop = stop_reason.as_deref().unwrap_or("end_turn");
// Auto-compact: if context is near-full, summarise older messages now
// (before the next turn's API call would fail with prompt-too-long).
if stop == "end_turn" || stop == "tool_use" {
if let Some(new_msgs) = compact::auto_compact_if_needed(
client,
messages,
usage.input_tokens,
&config.model,
&mut compact_state,
)
.await
{
*messages = new_msgs;
if let Some(ref tx) = event_tx {
let _ = tx.send(QueryEvent::Status(
"Context compacted to stay within limits.".to_string(),
));
}
}
}
if let Some(ref tx) = event_tx {
let _ = tx.send(QueryEvent::TurnComplete {
turn,
stop_reason: stop.to_string(),
});
}
// Helper closure for firing the Stop hook.
macro_rules! fire_stop_hook {
($msg:expr) => {{
let stop_ctx = cc_core::hooks::HookContext {
event: "Stop".to_string(),
tool_name: None,
tool_input: None,
tool_output: Some($msg.get_all_text()),
is_error: None,
session_id: Some(tool_ctx.session_id.clone()),
};
cc_core::hooks::run_hooks(
&tool_ctx.config.hooks,
cc_core::config::HookEvent::Stop,
&stop_ctx,
&tool_ctx.working_dir,
)
.await;
}};
}
match stop {
"end_turn" => {
fire_stop_hook!(assistant_msg);
return QueryOutcome::EndTurn {
message: assistant_msg,
usage,
};
}
"max_tokens" => {
return QueryOutcome::MaxTokens {
partial_message: assistant_msg,
usage,
};
}
"tool_use" => {
// Extract tool calls and execute them
let tool_blocks = assistant_msg.get_tool_use_blocks();
if tool_blocks.is_empty() {
// Shouldn't happen but treat as end_turn
return QueryOutcome::EndTurn {
message: assistant_msg,
usage,
};
}
let mut result_blocks: Vec<ContentBlock> = Vec::new();
for block in tool_blocks {
if let ContentBlock::ToolUse { id, name, input } = block {
if let Some(ref tx) = event_tx {
let _ = tx.send(QueryEvent::ToolStart {
tool_name: name.clone(),
tool_id: id.clone(),
});
}
// Fire PreToolUse hooks (blocking hooks can cancel execution)
let hooks = &tool_ctx.config.hooks;
let hook_ctx = cc_core::hooks::HookContext {
event: "PreToolUse".to_string(),
tool_name: Some(name.clone()),
tool_input: Some(input.clone()),
tool_output: None,
is_error: None,
session_id: Some(tool_ctx.session_id.clone()),
};
let pre_outcome = cc_core::hooks::run_hooks(
hooks,
cc_core::config::HookEvent::PreToolUse,
&hook_ctx,
&tool_ctx.working_dir,
)
.await;
let result = if let cc_core::hooks::HookOutcome::Blocked(reason) = pre_outcome {
warn!(tool = name, reason = %reason, "PreToolUse hook blocked execution");
cc_tools::ToolResult::error(format!("Blocked by hook: {}", reason))
} else {
execute_tool(&name, &input, tools, tool_ctx).await
};
// Fire PostToolUse hooks
let post_ctx = cc_core::hooks::HookContext {
event: "PostToolUse".to_string(),
tool_name: Some(name.clone()),
tool_input: Some(input.clone()),
tool_output: Some(result.content.clone()),
is_error: Some(result.is_error),
session_id: Some(tool_ctx.session_id.clone()),
};
cc_core::hooks::run_hooks(
hooks,
cc_core::config::HookEvent::PostToolUse,
&post_ctx,
&tool_ctx.working_dir,
)
.await;
if let Some(ref tx) = event_tx {
let _ = tx.send(QueryEvent::ToolEnd {
tool_name: name.clone(),
tool_id: id.clone(),
result: result.content.clone(),
is_error: result.is_error,
});
}
result_blocks.push(ContentBlock::ToolResult {
tool_use_id: id.clone(),
content: ToolResultContent::Text(result.content),
is_error: if result.is_error { Some(true) } else { None },
});
}
}
// Append tool results as a user message
messages.push(Message::user_blocks(result_blocks));
// Continue the loop to send results back to the model
continue;
}
"stop_sequence" => {
fire_stop_hook!(assistant_msg);
return QueryOutcome::EndTurn {
message: assistant_msg,
usage,
};
}
other => {
warn!(stop_reason = other, "Unknown stop reason, treating as end_turn");
fire_stop_hook!(assistant_msg);
return QueryOutcome::EndTurn {
message: assistant_msg,
usage,
};
}
}
}
}
/// Execute a single tool invocation.
async fn execute_tool(
name: &str,
input: &Value,
tools: &[Box<dyn Tool>],
ctx: &ToolContext,
) -> ToolResult {
let tool = tools.iter().find(|t| t.name() == name);
match tool {
Some(tool) => {
debug!(tool = name, "Executing tool");
tool.execute(input.clone(), ctx).await
}
None => {
warn!(tool = name, "Unknown tool requested");
ToolResult::error(format!("Unknown tool: {}", name))
}
}
}
/// Build the system prompt from config.
///
/// Delegates to `cc_core::system_prompt::build_system_prompt` so that all
/// default content (capabilities, safety guidelines, dynamic-boundary marker,
/// etc.) is assembled in one place. The `QueryConfig` fields map directly to
/// `SystemPromptOptions`:
///
/// - `system_prompt` → `custom_system_prompt` (added to cacheable block)
/// - `append_system_prompt` → `append_system_prompt` (added after boundary)
fn build_system_prompt(config: &QueryConfig) -> SystemPrompt {
use cc_core::system_prompt::{OutputStyle, SystemPromptOptions};
let opts = SystemPromptOptions {
custom_system_prompt: config.system_prompt.clone(),
append_system_prompt: config.append_system_prompt.clone(),
// All other fields use sensible defaults:
// - prefix: auto-detect from env
// - output_style: Default (no suffix)
// - working_directory: None (callers inject via append if needed)
// - memory_content: empty (callers inject via append if needed)
// - replace_system_prompt: false (additive mode)
// - coordinator_mode: false
output_style: OutputStyle::Default,
..Default::default()
};
let text = cc_core::system_prompt::build_system_prompt(&opts);
SystemPrompt::Text(text)
}
// ---------------------------------------------------------------------------
// Tests
// ---------------------------------------------------------------------------
#[cfg(test)]
mod tests {
use super::*;
use cc_api::SystemPrompt;
fn make_config(sys: Option<&str>, append: Option<&str>) -> QueryConfig {
QueryConfig {
model: "claude-sonnet-4-6".to_string(),
max_tokens: 4096,
max_turns: 10,
system_prompt: sys.map(String::from),
append_system_prompt: append.map(String::from),
thinking_budget: None,
temperature: None,
}
}
// ---- build_system_prompt tests ------------------------------------------
#[test]
fn test_system_prompt_default_when_empty() {
// The default prompt (no custom system prompt set) should include the
// Claude Code attribution and standard sections.
let cfg = make_config(None, None);
let prompt = build_system_prompt(&cfg);
if let SystemPrompt::Text(text) = prompt {
assert!(
text.contains("Claude Code") || text.contains("Claude agent"),
"Default prompt should contain attribution: {}",
text
);
assert!(
text.contains(cc_core::system_prompt::SYSTEM_PROMPT_DYNAMIC_BOUNDARY),
"Default prompt must contain the dynamic boundary marker"
);
} else {
panic!("Expected SystemPrompt::Text");
}
}
#[test]
fn test_system_prompt_with_custom() {
// A custom system prompt is injected into the cacheable section as
// <custom_instructions>; the default sections are still present.
let cfg = make_config(Some("You are a code reviewer."), None);
let prompt = build_system_prompt(&cfg);
if let SystemPrompt::Text(text) = prompt {
assert!(
text.contains("You are a code reviewer."),
"Custom prompt text should appear in the output"
);
assert!(
text.contains("Claude Code") || text.contains("Claude agent"),
"Default attribution should still be present"
);
} else {
panic!("Expected SystemPrompt::Text");
}
}
#[test]
fn test_system_prompt_with_append() {
// Appended text lands after the dynamic boundary.
let cfg = make_config(Some("Base prompt."), Some("Additional context."));
let prompt = build_system_prompt(&cfg);
if let SystemPrompt::Text(text) = prompt {
assert!(text.contains("Base prompt."));
assert!(text.contains("Additional context."));
// append_system_prompt appears after the boundary
let boundary_pos = text
.find(cc_core::system_prompt::SYSTEM_PROMPT_DYNAMIC_BOUNDARY)
.expect("boundary must exist");
let append_pos = text.find("Additional context.").unwrap();
assert!(
append_pos > boundary_pos,
"Appended text must appear after the dynamic boundary"
);
} else {
panic!("Expected SystemPrompt::Text");
}
}
#[test]
fn test_system_prompt_append_only() {
// When only append is set, default sections are present plus the
// appended text after the dynamic boundary.
let cfg = make_config(None, Some("Appended text."));
let prompt = build_system_prompt(&cfg);
if let SystemPrompt::Text(text) = prompt {
assert!(
text.contains("Appended text."),
"Appended text must appear in the prompt"
);
let boundary_pos = text
.find(cc_core::system_prompt::SYSTEM_PROMPT_DYNAMIC_BOUNDARY)
.expect("boundary must exist");
let append_pos = text.find("Appended text.").unwrap();
assert!(
append_pos > boundary_pos,
"Appended text must appear after the dynamic boundary"
);
} else {
panic!("Expected SystemPrompt::Text");
}
}
// ---- QueryConfig tests --------------------------------------------------
#[test]
fn test_query_config_clone() {
let cfg = make_config(Some("test"), Some("append"));
let cloned = cfg.clone();
assert_eq!(cloned.model, "claude-sonnet-4-6");
assert_eq!(cloned.max_tokens, 4096);
assert_eq!(cloned.system_prompt, Some("test".to_string()));
}
// ---- QueryOutcome variant tests -----------------------------------------
#[test]
fn test_query_outcome_debug() {
// Ensure the enum variants can be created and debug-formatted
let outcome = QueryOutcome::Cancelled;
let s = format!("{:?}", outcome);
assert!(s.contains("Cancelled"));
let err_outcome = QueryOutcome::Error(cc_core::error::ClaudeError::RateLimit);
let s2 = format!("{:?}", err_outcome);
assert!(s2.contains("Error"));
}
}
/// Stream handler that forwards events to an unbounded channel.
struct ChannelStreamHandler {
tx: mpsc::UnboundedSender<QueryEvent>,
}
impl StreamHandler for ChannelStreamHandler {
fn on_event(&self, event: &StreamEvent) {
let _ = self.tx.send(QueryEvent::Stream(event.clone()));
}
}
// ---------------------------------------------------------------------------
// Single-shot query (non-looping, for simple one-off calls)
// ---------------------------------------------------------------------------
/// Run a single (non-agentic) query no tool loop, just one API call.
pub async fn run_single_query(
client: &cc_api::AnthropicClient,
messages: Vec<Message>,
config: &QueryConfig,
) -> Result<Message, ClaudeError> {
let api_messages: Vec<ApiMessage> = messages.iter().map(ApiMessage::from).collect();
let system = build_system_prompt(config);
let request = CreateMessageRequest::builder(&config.model, config.max_tokens)
.messages(api_messages)
.system(system)
.build();
let handler: Arc<dyn StreamHandler> = Arc::new(cc_api::streaming::NullStreamHandler);
let mut rx = client.create_message_stream(request, handler).await?;
let mut acc = StreamAccumulator::new();
while let Some(evt) = rx.recv().await {
acc.on_event(&evt);
if matches!(evt, StreamEvent::MessageStop) {
break;
}
}
let (msg, _usage, _stop) = acc.finish();
Ok(msg)
}