feat(queue): erweiterte Queue mit Cron-Jobs + Tasks, Prioritäten, Delete/Priority API
This commit is contained in:
@@ -135,14 +135,52 @@ public class DashboardController(
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Returns the cron queue / pending tasks.
|
||||
/// Returns aggregated queue: cron jobs + open tasks (merged, sorted by priority).
|
||||
/// </summary>
|
||||
[HttpGet("queue")]
|
||||
public async Task<List<QueueItem>> GetQueue()
|
||||
public async Task<List<QueueItem>> GetQueue(CancellationToken ct)
|
||||
{
|
||||
try
|
||||
{
|
||||
return await gateway.GetQueueAsync();
|
||||
// Fetch cron jobs and open tasks concurrently
|
||||
var cronTask = gateway.GetQueueAsync();
|
||||
var tasksTask = taskRepo.GetAllAsync(ct);
|
||||
|
||||
await Task.WhenAll(cronTask, tasksTask);
|
||||
|
||||
var cronJobs = cronTask.Result;
|
||||
var openTasks = tasksTask.Result
|
||||
.Where(t => !string.Equals(t.State, "Done", StringComparison.OrdinalIgnoreCase))
|
||||
.ToList();
|
||||
|
||||
var merged = new List<QueueItem>();
|
||||
|
||||
// Map cron jobs (already in QueueItem format from gateway)
|
||||
merged.AddRange(cronJobs);
|
||||
|
||||
// Map open tasks to QueueItems
|
||||
foreach (var t in openTasks)
|
||||
{
|
||||
var priority = NormalizePriority(t.Priority);
|
||||
merged.Add(new QueueItem(
|
||||
"task-" + t.Id.ToString(),
|
||||
t.Title,
|
||||
t.State,
|
||||
priority,
|
||||
"task",
|
||||
"--"
|
||||
));
|
||||
}
|
||||
|
||||
// Sort: high priority first, then medium, then low
|
||||
var priorityOrder = new Dictionary<string, int>(StringComparer.OrdinalIgnoreCase)
|
||||
{
|
||||
["high"] = 0,
|
||||
["medium"] = 1,
|
||||
["low"] = 2
|
||||
};
|
||||
|
||||
return merged.OrderBy(q => priorityOrder.GetValueOrDefault(q.Priority, 99)).ToList();
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
@@ -151,6 +189,115 @@ public class DashboardController(
|
||||
}
|
||||
}
|
||||
|
||||
private static string NormalizePriority(string priority)
|
||||
{
|
||||
return priority.ToLowerInvariant() switch
|
||||
{
|
||||
"high" or "critical" or "urgent" => "high",
|
||||
"low" or "minor" => "low",
|
||||
_ => "medium"
|
||||
};
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Removes a queue item: cron jobs are deleted via gateway, tasks are set to Done.
|
||||
/// </summary>
|
||||
[HttpDelete("queue/{id}")]
|
||||
public async Task<ActionResult> DeleteQueueItem(string id, [FromQuery] string? source, CancellationToken ct)
|
||||
{
|
||||
try
|
||||
{
|
||||
if (string.Equals(source, "cron", StringComparison.OrdinalIgnoreCase))
|
||||
{
|
||||
var ok = await gateway.DeleteCronJobAsync(id);
|
||||
if (!ok)
|
||||
return StatusCode(502, new { error = "Gateway could not delete cron job" });
|
||||
return NoContent();
|
||||
}
|
||||
else if (string.Equals(source, "task", StringComparison.OrdinalIgnoreCase))
|
||||
{
|
||||
// Extract the actual GUID from the prefixed id ("task-{guid}")
|
||||
if (!id.StartsWith("task-"))
|
||||
return BadRequest(new { error = "Invalid task id format" });
|
||||
|
||||
var guidStr = id["task-".Length..];
|
||||
if (!Guid.TryParse(guidStr, out var guid))
|
||||
return BadRequest(new { error = "Invalid task id" });
|
||||
|
||||
var task = await taskRepo.GetByIdAsync(guid, ct);
|
||||
if (task is null)
|
||||
return NotFound(new { error = "Task not found" });
|
||||
|
||||
// Set task status to Done instead of deleting
|
||||
task.State = "Done";
|
||||
await taskRepo.UpdateAsync(task, ct);
|
||||
await activityRepo.AddAsync(new ActivityEvent
|
||||
{
|
||||
Type = "task",
|
||||
Message = $"Task \"{task.Title}\" completed via queue"
|
||||
}, ct);
|
||||
|
||||
return NoContent();
|
||||
}
|
||||
|
||||
// Default: try cron
|
||||
var deleted = await gateway.DeleteCronJobAsync(id);
|
||||
if (!deleted)
|
||||
return NotFound(new { error = "Queue item not found" });
|
||||
return NoContent();
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
logger.LogWarning(ex, "Delete queue item failed for {Id}", id);
|
||||
return StatusCode(500, new { error = "Internal error" });
|
||||
}
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Changes the priority of a queue item (only for tasks; cron jobs are ignored).
|
||||
/// Cycles: high → medium → low → high.
|
||||
/// </summary>
|
||||
[HttpPut("queue/{id}/priority")]
|
||||
public async Task<ActionResult> ChangeQueuePriority(string id, CancellationToken ct)
|
||||
{
|
||||
try
|
||||
{
|
||||
if (!id.StartsWith("task-"))
|
||||
return Ok(new { status = "ignored", reason = "Cron job priorities are managed by the gateway" });
|
||||
|
||||
var guidStr = id["task-".Length..];
|
||||
if (!Guid.TryParse(guidStr, out var guid))
|
||||
return BadRequest(new { error = "Invalid task id" });
|
||||
|
||||
var task = await taskRepo.GetByIdAsync(guid, ct);
|
||||
if (task is null)
|
||||
return NotFound(new { error = "Task not found" });
|
||||
|
||||
// Cycle priority: high → medium → low → high
|
||||
task.Priority = task.Priority.ToLowerInvariant() switch
|
||||
{
|
||||
"high" => "Medium",
|
||||
"medium" => "Low",
|
||||
"low" => "High",
|
||||
_ => "Medium"
|
||||
};
|
||||
|
||||
await taskRepo.UpdateAsync(task, ct);
|
||||
await activityRepo.AddAsync(new ActivityEvent
|
||||
{
|
||||
Type = "task",
|
||||
Message = $"Task \"{task.Title}\" priority → {task.Priority}"
|
||||
}, ct);
|
||||
|
||||
return Ok(new { status = "ok", priority = task.Priority });
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
logger.LogWarning(ex, "Change queue priority failed for {Id}", id);
|
||||
return StatusCode(500, new { error = "Internal error" });
|
||||
}
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Returns the current model and provider for a specific agent session.
|
||||
/// Calls session_status with the agent's session key.
|
||||
|
||||
@@ -47,7 +47,10 @@ public sealed record DashboardStatus(
|
||||
public sealed record QueueItem(
|
||||
string Id,
|
||||
string Name,
|
||||
string Status
|
||||
string Status,
|
||||
string Priority,
|
||||
string Source,
|
||||
string WaitTime
|
||||
);
|
||||
|
||||
public sealed record AgentModelInfo(
|
||||
|
||||
@@ -639,7 +639,28 @@ public sealed class OpenClawGatewayClient(HttpClient httpClient, IConfiguration
|
||||
var status = j["state"]?["lastStatus"]?.GetValue<string>()
|
||||
?? j["status"]?.GetValue<string>()
|
||||
?? "unknown";
|
||||
items.Add(new QueueItem(id, name, status));
|
||||
|
||||
// Calculate waitTime from nextRun if available
|
||||
var waitTime = "--";
|
||||
var nextRunStr = j["nextRun"]?.GetValue<string>()
|
||||
?? j["next_run"]?.GetValue<string>()
|
||||
?? j["scheduledAt"]?.GetValue<string>();
|
||||
if (nextRunStr is not null && DateTimeOffset.TryParse(nextRunStr, out var nextRun))
|
||||
{
|
||||
var diff = nextRun - DateTimeOffset.UtcNow;
|
||||
if (diff.TotalMinutes < 0)
|
||||
waitTime = "now";
|
||||
else if (diff.TotalMinutes < 1)
|
||||
waitTime = "<1m";
|
||||
else if (diff.TotalMinutes < 60)
|
||||
waitTime = $"{(int)diff.TotalMinutes}m";
|
||||
else if (diff.TotalHours < 24)
|
||||
waitTime = $"{(int)diff.TotalHours}h";
|
||||
else
|
||||
waitTime = $"{(int)diff.TotalDays}d";
|
||||
}
|
||||
|
||||
items.Add(new QueueItem(id, name, status, "medium", "cron", waitTime));
|
||||
}
|
||||
return items;
|
||||
}
|
||||
@@ -649,6 +670,19 @@ public sealed class OpenClawGatewayClient(HttpClient httpClient, IConfiguration
|
||||
}
|
||||
}
|
||||
|
||||
public async Task<bool> DeleteCronJobAsync(string id)
|
||||
{
|
||||
try
|
||||
{
|
||||
var result = await InvokeToolAsync("cron", new { action = "delete", id });
|
||||
return result is not null;
|
||||
}
|
||||
catch
|
||||
{
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
public async Task<DashboardStatus> GetStatusAsync()
|
||||
{
|
||||
var gatewayOk = false;
|
||||
|
||||
Reference in New Issue
Block a user