diff --git a/BotSharp.sln b/BotSharp.sln index ad95f29e8..6abc5b47b 100644 --- a/BotSharp.sln +++ b/BotSharp.sln @@ -157,6 +157,8 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "BotSharp.Core.A2A", "src\In EndProject Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "BotSharp.Plugin.MultiTenancy", "src\Plugins\BotSharp.Plugin.MultiTenancy\BotSharp.Plugin.MultiTenancy.csproj", "{562DD0C6-DAC8-02CC-C1DD-D43DF186CE76}" EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "BotSharp.Plugin.RabbitMQ", "src\Plugins\BotSharp.Plugin.RabbitMQ\BotSharp.Plugin.RabbitMQ.csproj", "{8E609A1C-0421-5BB5-DEA9-5FDB68F6D1C5}" +EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution Debug|Any CPU = Debug|Any CPU @@ -669,6 +671,14 @@ Global {562DD0C6-DAC8-02CC-C1DD-D43DF186CE76}.Release|Any CPU.Build.0 = Release|Any CPU {562DD0C6-DAC8-02CC-C1DD-D43DF186CE76}.Release|x64.ActiveCfg = Release|Any CPU {562DD0C6-DAC8-02CC-C1DD-D43DF186CE76}.Release|x64.Build.0 = Release|Any CPU + {8E609A1C-0421-5BB5-DEA9-5FDB68F6D1C5}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {8E609A1C-0421-5BB5-DEA9-5FDB68F6D1C5}.Debug|Any CPU.Build.0 = Debug|Any CPU + {8E609A1C-0421-5BB5-DEA9-5FDB68F6D1C5}.Debug|x64.ActiveCfg = Debug|Any CPU + {8E609A1C-0421-5BB5-DEA9-5FDB68F6D1C5}.Debug|x64.Build.0 = Debug|Any CPU + {8E609A1C-0421-5BB5-DEA9-5FDB68F6D1C5}.Release|Any CPU.ActiveCfg = Release|Any CPU + {8E609A1C-0421-5BB5-DEA9-5FDB68F6D1C5}.Release|Any CPU.Build.0 = Release|Any CPU + {8E609A1C-0421-5BB5-DEA9-5FDB68F6D1C5}.Release|x64.ActiveCfg = Release|Any CPU + {8E609A1C-0421-5BB5-DEA9-5FDB68F6D1C5}.Release|x64.Build.0 = Release|Any CPU EndGlobalSection GlobalSection(SolutionProperties) = preSolution HideSolutionNode = FALSE @@ -745,6 +755,7 @@ Global {13223C71-9EAC-9835-28ED-5A4833E6F915} = {53E7CD86-0D19-40D9-A0FA-AB4613837E89} {E8D01281-D52A-BFF4-33DB-E35D91754272} = {E29DC6C4-5E57-48C5-BCB0-6B8F84782749} {562DD0C6-DAC8-02CC-C1DD-D43DF186CE76} = {51AFE054-AE99-497D-A593-69BAEFB5106F} + {8E609A1C-0421-5BB5-DEA9-5FDB68F6D1C5} = {64264688-0F5C-4AB0-8F2B-B59B717CCE00} EndGlobalSection GlobalSection(ExtensibilityGlobals) = postSolution SolutionGuid = {A9969D89-C98B-40A5-A12B-FC87E55B3A19} diff --git a/Directory.Packages.props b/Directory.Packages.props index 1c198a828..96897fb92 100644 --- a/Directory.Packages.props +++ b/Directory.Packages.props @@ -10,6 +10,7 @@ + diff --git a/src/Infrastructure/BotSharp.Abstraction/Agents/Models/AgentRule.cs b/src/Infrastructure/BotSharp.Abstraction/Agents/Models/AgentRule.cs index 75c0985a8..e0411801e 100644 --- a/src/Infrastructure/BotSharp.Abstraction/Agents/Models/AgentRule.cs +++ b/src/Infrastructure/BotSharp.Abstraction/Agents/Models/AgentRule.cs @@ -1,3 +1,5 @@ +using System.Text.Json; + namespace BotSharp.Abstraction.Agents.Models; public class AgentRule @@ -8,6 +10,58 @@ public class AgentRule [JsonPropertyName("disabled")] public bool Disabled { get; set; } - [JsonPropertyName("criteria")] - public string Criteria { get; set; } = string.Empty; + [JsonPropertyName("rule_criteria")] + [JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingNull)] + public AgentRuleCriteria? RuleCriteria { get; set; } + + [JsonPropertyName("rule_action")] + [JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingNull)] + public AgentRuleAction? RuleAction { get; set; } +} + +public class AgentRuleCriteria : AgentRuleConfigBase +{ + /// + /// Criteria + /// + [JsonPropertyName("criteria_text")] + [JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingNull)] + public string CriteriaText { get; set; } = string.Empty; + + /// + /// Adaptive configuration for rule criteria. + /// This flexible JSON document can store any criteria-specific configuration. + /// The structure depends on the criteria executor + /// + [JsonPropertyName("config")] + [JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingNull)] + public override JsonDocument? Config { get; set; } +} + +public class AgentRuleAction : AgentRuleConfigBase +{ + /// + /// Adaptive configuration for rule actions. + /// This flexible JSON document can store any action-specific configuration. + /// The structure depends on the action type: + /// - For "Http" action: contains http_context with base_url, relative_url, method, etc. + /// - For "MessageQueue" action: contains mq_config with topic_name, routing_key, etc. + /// - For custom actions: can contain any custom configuration structure + /// + [JsonPropertyName("config")] + [JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingNull)] + public override JsonDocument? Config { get; set; } } + +public class AgentRuleConfigBase +{ + [JsonPropertyName("name")] + public virtual string Name { get; set; } + + [JsonPropertyName("disabled")] + public virtual bool Disabled { get; set; } + + [JsonPropertyName("config")] + [JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingNull)] + public virtual JsonDocument? Config { get; set; } +} \ No newline at end of file diff --git a/src/Infrastructure/BotSharp.Abstraction/Infrastructures/MessageQueues/IMQConsumer.cs b/src/Infrastructure/BotSharp.Abstraction/Infrastructures/MessageQueues/IMQConsumer.cs new file mode 100644 index 000000000..4df43dd0e --- /dev/null +++ b/src/Infrastructure/BotSharp.Abstraction/Infrastructures/MessageQueues/IMQConsumer.cs @@ -0,0 +1,21 @@ +namespace BotSharp.Abstraction.Infrastructures.MessageQueues; + +/// +/// Abstract interface for message queue consumers. +/// Implement this interface to create consumers that are independent of MQ products (e.g., RabbitMQ, Kafka, Azure Service Bus). +/// +public interface IMQConsumer : IDisposable +{ + /// + /// Gets the consumer config + /// + object Config { get; } + + /// + /// Handles the received message from the queue. + /// + /// The consumer channel identifier + /// The message data as string + /// True if the message was handled successfully, false otherwise + Task HandleMessageAsync(string channel, string data); +} diff --git a/src/Infrastructure/BotSharp.Abstraction/Infrastructures/MessageQueues/IMQService.cs b/src/Infrastructure/BotSharp.Abstraction/Infrastructures/MessageQueues/IMQService.cs new file mode 100644 index 000000000..672e539c1 --- /dev/null +++ b/src/Infrastructure/BotSharp.Abstraction/Infrastructures/MessageQueues/IMQService.cs @@ -0,0 +1,31 @@ +using BotSharp.Abstraction.Infrastructures.MessageQueues.Models; + +namespace BotSharp.Abstraction.Infrastructures.MessageQueues; + +public interface IMQService : IDisposable +{ + /// + /// Subscribe a consumer to the message queue. + /// The consumer will be initialized with the appropriate MQ-specific infrastructure. + /// + /// Unique identifier for the consumer + /// The consumer implementing IMQConsumer interface + /// Task representing the async subscription operation + Task SubscribeAsync(string key, IMQConsumer consumer); + + /// + /// Unsubscribe a consumer from the message queue. + /// + /// Unique identifier for the consumer + /// Task representing the async unsubscription operation + Task UnsubscribeAsync(string key); + + /// + /// Publish payload to message queue + /// + /// + /// + /// + /// + Task PublishAsync(T payload, MQPublishOptions options); +} diff --git a/src/Infrastructure/BotSharp.Abstraction/Infrastructures/MessageQueues/MQConsumerBase.cs b/src/Infrastructure/BotSharp.Abstraction/Infrastructures/MessageQueues/MQConsumerBase.cs new file mode 100644 index 000000000..cd66be1fd --- /dev/null +++ b/src/Infrastructure/BotSharp.Abstraction/Infrastructures/MessageQueues/MQConsumerBase.cs @@ -0,0 +1,51 @@ +using Microsoft.Extensions.Logging; + +namespace BotSharp.Abstraction.Infrastructures.MessageQueues; + +/// +/// Abstract base class for RabbitMQ consumers. +/// Implements IMQConsumer to allow other projects to define consumers independently of RabbitMQ. +/// The RabbitMQ-specific infrastructure is handled by RabbitMQService. +/// +public abstract class MQConsumerBase : IMQConsumer +{ + protected readonly IServiceProvider _services; + protected readonly ILogger _logger; + private bool _disposed = false; + + /// + /// Gets the consumer config for this consumer. + /// Override this property to customize exchange, queue and routing configuration. + /// + public abstract object Config { get; } + + protected MQConsumerBase( + IServiceProvider services, + ILogger logger) + { + _services = services; + _logger = logger; + } + + /// + /// Handles the received message from the queue. + /// + /// The consumer channel identifier + /// The message data as string + /// True if the message was handled successfully, false otherwise + public abstract Task HandleMessageAsync(string channel, string data); + + public void Dispose() + { + if (_disposed) + { + return; + } + + var consumerName = GetType().Name; + _logger.LogWarning($"Disposing consumer: {consumerName}"); + _disposed = true; + GC.SuppressFinalize(this); + } +} + diff --git a/src/Infrastructure/BotSharp.Abstraction/Infrastructures/MessageQueues/MessageQueueSettings.cs b/src/Infrastructure/BotSharp.Abstraction/Infrastructures/MessageQueues/MessageQueueSettings.cs new file mode 100644 index 000000000..b08a5a054 --- /dev/null +++ b/src/Infrastructure/BotSharp.Abstraction/Infrastructures/MessageQueues/MessageQueueSettings.cs @@ -0,0 +1,7 @@ +namespace BotSharp.Abstraction.Infrastructures.MessageQueues; + +public class MessageQueueSettings +{ + public bool Enabled { get; set; } + public string Provider { get; set; } +} diff --git a/src/Infrastructure/BotSharp.Abstraction/Infrastructures/MessageQueues/Models/MQMessage.cs b/src/Infrastructure/BotSharp.Abstraction/Infrastructures/MessageQueues/Models/MQMessage.cs new file mode 100644 index 000000000..e940aff01 --- /dev/null +++ b/src/Infrastructure/BotSharp.Abstraction/Infrastructures/MessageQueues/Models/MQMessage.cs @@ -0,0 +1,14 @@ +namespace BotSharp.Abstraction.Infrastructures.MessageQueues.Models; + +public class MQMessage +{ + public MQMessage(T payload, string messageId) + { + Payload = payload; + MessageId = messageId; + } + + public T Payload { get; set; } + public string MessageId { get; set; } + public DateTime CreateDate { get; set; } = DateTime.UtcNow; +} diff --git a/src/Infrastructure/BotSharp.Abstraction/Infrastructures/MessageQueues/Models/MQPublishOptions.cs b/src/Infrastructure/BotSharp.Abstraction/Infrastructures/MessageQueues/Models/MQPublishOptions.cs new file mode 100644 index 000000000..dead523be --- /dev/null +++ b/src/Infrastructure/BotSharp.Abstraction/Infrastructures/MessageQueues/Models/MQPublishOptions.cs @@ -0,0 +1,40 @@ +using System.Text.Json; + +namespace BotSharp.Abstraction.Infrastructures.MessageQueues.Models; + +/// +/// Configuration options for publishing messages to a message queue. +/// These options are MQ-product agnostic and can be adapted by different implementations. +/// +public class MQPublishOptions +{ + /// + /// The topic name (exchange in RabbitMQ, topic in Kafka/Azure Service Bus). + /// + public string TopicName { get; set; } = string.Empty; + + /// + /// The routing key (partition key in some MQ systems, used for message routing). + /// + public string RoutingKey { get; set; } = string.Empty; + + /// + /// Delay in milliseconds before the message is delivered. + /// + public long DelayMilliseconds { get; set; } + + /// + /// Optional unique identifier for the message. + /// + public string? MessageId { get; set; } + + /// + /// Additional arguments for the publish configuration (MQ-specific). + /// + public Dictionary Arguments { get; set; } = []; + + /// + /// Json serializer options + /// + public JsonSerializerOptions? JsonOptions { get; set; } +} diff --git a/src/Infrastructure/BotSharp.Abstraction/Rules/Hooks/IRuleTriggerHook.cs b/src/Infrastructure/BotSharp.Abstraction/Rules/Hooks/IRuleTriggerHook.cs new file mode 100644 index 000000000..9e9817890 --- /dev/null +++ b/src/Infrastructure/BotSharp.Abstraction/Rules/Hooks/IRuleTriggerHook.cs @@ -0,0 +1,13 @@ +using BotSharp.Abstraction.Hooks; +using BotSharp.Abstraction.Rules.Models; + +namespace BotSharp.Abstraction.Rules.Hooks; + +public interface IRuleTriggerHook : IHookBase +{ + Task BeforeRuleCriteriaExecuted(Agent agent, IRuleTrigger trigger, RuleCriteriaContext context) => Task.CompletedTask; + Task AfterRuleCriteriaExecuted(Agent agent, IRuleTrigger trigger, RuleCriteriaResult result) => Task.CompletedTask; + + Task BeforeRuleActionExecuted(Agent agent, IRuleTrigger trigger, RuleActionContext context) => Task.CompletedTask; + Task AfterRuleActionExecuted(Agent agent, IRuleTrigger trigger, RuleActionResult result) => Task.CompletedTask; +} diff --git a/src/Infrastructure/BotSharp.Abstraction/Rules/IRuleAction.cs b/src/Infrastructure/BotSharp.Abstraction/Rules/IRuleAction.cs index bebac5d6f..9c2bf03d9 100644 --- a/src/Infrastructure/BotSharp.Abstraction/Rules/IRuleAction.cs +++ b/src/Infrastructure/BotSharp.Abstraction/Rules/IRuleAction.cs @@ -1,5 +1,29 @@ +using BotSharp.Abstraction.Agents.Models; +using BotSharp.Abstraction.Conversations.Models; +using BotSharp.Abstraction.Rules.Models; +using BotSharp.Abstraction.Rules.Options; + namespace BotSharp.Abstraction.Rules; +/// +/// Base interface for rule actions that can be executed by the RuleEngine +/// public interface IRuleAction { -} + /// + /// The unique name of the rule action provider + /// + string Name { get; } + + /// + /// Execute the rule action + /// + /// The agent that triggered the rule + /// The rule trigger + /// The action context + /// The action execution result + Task ExecuteAsync( + Agent agent, + IRuleTrigger trigger, + RuleActionContext context); +} \ No newline at end of file diff --git a/src/Infrastructure/BotSharp.Abstraction/Rules/IRuleCriteria.cs b/src/Infrastructure/BotSharp.Abstraction/Rules/IRuleCriteria.cs index bc5022911..f94e0b36f 100644 --- a/src/Infrastructure/BotSharp.Abstraction/Rules/IRuleCriteria.cs +++ b/src/Infrastructure/BotSharp.Abstraction/Rules/IRuleCriteria.cs @@ -1,5 +1,11 @@ +using BotSharp.Abstraction.Rules.Models; + namespace BotSharp.Abstraction.Rules; public interface IRuleCriteria { + string Provider { get; } + + Task ValidateAsync(Agent agent, IRuleTrigger trigger, RuleCriteriaContext context) + => Task.FromResult(new RuleCriteriaResult()); } diff --git a/src/Infrastructure/BotSharp.Abstraction/Rules/Models/RuleActionContext.cs b/src/Infrastructure/BotSharp.Abstraction/Rules/Models/RuleActionContext.cs new file mode 100644 index 000000000..534130c63 --- /dev/null +++ b/src/Infrastructure/BotSharp.Abstraction/Rules/Models/RuleActionContext.cs @@ -0,0 +1,10 @@ +using System.Text.Json; + +namespace BotSharp.Abstraction.Rules.Models; + +public class RuleActionContext +{ + public string Text { get; set; } = string.Empty; + public Dictionary Parameters { get; set; } = []; + public JsonSerializerOptions? JsonOptions { get; set; } +} diff --git a/src/Infrastructure/BotSharp.Abstraction/Rules/Models/RuleActionResult.cs b/src/Infrastructure/BotSharp.Abstraction/Rules/Models/RuleActionResult.cs new file mode 100644 index 000000000..ffdaeab2e --- /dev/null +++ b/src/Infrastructure/BotSharp.Abstraction/Rules/Models/RuleActionResult.cs @@ -0,0 +1,46 @@ +namespace BotSharp.Abstraction.Rules.Models; + +/// +/// Result of a rule action execution +/// +public class RuleActionResult +{ + /// + /// Whether the action executed successfully + /// + public bool Success { get; set; } + + /// + /// The conversation ID if a new conversation was created + /// + public string? ConversationId { get; set; } + + /// + /// Response content from the action + /// + public string? Response { get; set; } + + /// + /// Error message if the action failed + /// + public string? ErrorMessage { get; set; } + + public static RuleActionResult Succeeded(string? response = null) + { + return new RuleActionResult + { + Success = true, + Response = response + }; + } + + public static RuleActionResult Failed(string errorMessage) + { + return new RuleActionResult + { + Success = false, + ErrorMessage = errorMessage + }; + } +} + diff --git a/src/Infrastructure/BotSharp.Abstraction/Rules/Models/RuleCriteriaContext.cs b/src/Infrastructure/BotSharp.Abstraction/Rules/Models/RuleCriteriaContext.cs new file mode 100644 index 000000000..709e7be3e --- /dev/null +++ b/src/Infrastructure/BotSharp.Abstraction/Rules/Models/RuleCriteriaContext.cs @@ -0,0 +1,10 @@ +using System.Text.Json; + +namespace BotSharp.Abstraction.Rules.Models; + +public class RuleCriteriaContext +{ + public string Text { get; set; } = string.Empty; + public Dictionary Parameters { get; set; } = []; + public JsonSerializerOptions? JsonOptions { get; set; } +} diff --git a/src/Infrastructure/BotSharp.Abstraction/Rules/Models/RuleCriteriaResult.cs b/src/Infrastructure/BotSharp.Abstraction/Rules/Models/RuleCriteriaResult.cs new file mode 100644 index 000000000..fc1df6ceb --- /dev/null +++ b/src/Infrastructure/BotSharp.Abstraction/Rules/Models/RuleCriteriaResult.cs @@ -0,0 +1,19 @@ +namespace BotSharp.Abstraction.Rules.Models; + +public class RuleCriteriaResult +{ + /// + /// Whether the criteria executed successfully + /// + public bool Success { get; set; } + + /// + /// Response content from the action + /// + public bool IsValid { get; set; } + + /// + /// Error message if the criteria failed + /// + public string? ErrorMessage { get; set; } +} diff --git a/src/Infrastructure/BotSharp.Abstraction/Rules/Options/RuleCriteriaOptions.cs b/src/Infrastructure/BotSharp.Abstraction/Rules/Options/RuleCriteriaOptions.cs new file mode 100644 index 000000000..27b5167f4 --- /dev/null +++ b/src/Infrastructure/BotSharp.Abstraction/Rules/Options/RuleCriteriaOptions.cs @@ -0,0 +1,39 @@ +using System.Text.Json; + +namespace BotSharp.Abstraction.Rules.Options; + +public class RuleCriteriaOptions : CriteriaExecuteOptions +{ + /// + /// Criteria execution provider + /// + public string Provider { get; set; } = "botsharp-rule"; +} + +public class CriteriaExecuteOptions +{ + /// + /// Code processor provider + /// + public string? CodeProcessor { get; set; } + + /// + /// Code script name + /// + public string? CodeScriptName { get; set; } + + /// + /// Argument name as an input key to the code script + /// + public string? ArgumentName { get; set; } + + /// + /// Json arguments as an input value to the code script + /// + public JsonDocument? ArgumentContent { get; set; } + + /// + /// Custom parameters + /// + public Dictionary Parameters { get; set; } = []; +} \ No newline at end of file diff --git a/src/Infrastructure/BotSharp.Abstraction/Rules/Options/RuleTriggerOptions.cs b/src/Infrastructure/BotSharp.Abstraction/Rules/Options/RuleTriggerOptions.cs index 068052b0b..abba98115 100644 --- a/src/Infrastructure/BotSharp.Abstraction/Rules/Options/RuleTriggerOptions.cs +++ b/src/Infrastructure/BotSharp.Abstraction/Rules/Options/RuleTriggerOptions.cs @@ -1,3 +1,4 @@ +using BotSharp.Abstraction.Repositories.Filters; using System.Text.Json; namespace BotSharp.Abstraction.Rules.Options; @@ -5,22 +6,12 @@ namespace BotSharp.Abstraction.Rules.Options; public class RuleTriggerOptions { /// - /// Code processor provider + /// Filter agents /// - public string? CodeProcessor { get; set; } + public AgentFilter? AgentFilter { get; set; } /// - /// Code script name + /// Json serializer options /// - public string? CodeScriptName { get; set; } - - /// - /// Argument name as an input key to the code script - /// - public string? ArgumentName { get; set; } - - /// - /// Json arguments as an input value to the code script - /// - public JsonDocument? ArgumentContent { get; set; } + public JsonSerializerOptions? JsonOptions { get; set; } } diff --git a/src/Infrastructure/BotSharp.Abstraction/Utilities/ObjectExtensions.cs b/src/Infrastructure/BotSharp.Abstraction/Utilities/ObjectExtensions.cs index a36516c8a..9efdf6f42 100644 --- a/src/Infrastructure/BotSharp.Abstraction/Utilities/ObjectExtensions.cs +++ b/src/Infrastructure/BotSharp.Abstraction/Utilities/ObjectExtensions.cs @@ -65,4 +65,42 @@ public static class ObjectExtensions return null; } } + + public static T? TryGetValueOrDefault(this IDictionary dict, string key, T? defaultValue = default, JsonSerializerOptions? jsonOptions = null) + { + return dict.TryGetValue(key, out var value, jsonOptions) + ? value! + : defaultValue; + } + + public static bool TryGetValue(this IDictionary dict, string key, out T? result, JsonSerializerOptions? jsonOptions = null) + { + result = default; + + if (!dict.TryGetValue(key, out var value) || value is null) + { + return false; + } + + if (value is T t) + { + result = t; + return true; + } + + if (value is JsonElement je) + { + try + { + result = je.Deserialize(jsonOptions); + return true; + } + catch + { + return false; + } + } + + return false; + } } diff --git a/src/Infrastructure/BotSharp.Core.Rules/Constants/RuleConstant.cs b/src/Infrastructure/BotSharp.Core.Rules/Constants/RuleConstant.cs new file mode 100644 index 000000000..e9c180120 --- /dev/null +++ b/src/Infrastructure/BotSharp.Core.Rules/Constants/RuleConstant.cs @@ -0,0 +1,7 @@ +namespace BotSharp.Core.Rules.Constants; + +public static class RuleConstant +{ + public const string DEFAULT_CRITERIA_PROVIDER = "BotSharp-code-script"; + public const string DEFAULT_ACTION_NAME = "BotSharp-chat"; +} diff --git a/src/Infrastructure/BotSharp.Core.Rules/Controllers/RuleController.cs b/src/Infrastructure/BotSharp.Core.Rules/Controllers/RuleController.cs new file mode 100644 index 000000000..feb314ef7 --- /dev/null +++ b/src/Infrastructure/BotSharp.Core.Rules/Controllers/RuleController.cs @@ -0,0 +1,42 @@ +using BotSharp.Core.Rules.Models; +using Microsoft.AspNetCore.Authorization; +using Microsoft.AspNetCore.Mvc; + +namespace BotSharp.Core.Rules.Controllers; + +[Authorize] +[ApiController] +public class RuleController : ControllerBase +{ + private readonly IServiceProvider _services; + private readonly ILogger _logger; + private readonly IRuleEngine _ruleEngine; + + public RuleController( + IServiceProvider services, + ILogger logger, + IRuleEngine ruleEngine) + { + _services = services; + _logger = logger; + _ruleEngine = ruleEngine; + } + + [HttpPost("/rule/trigger/action")] + public async Task RunAction([FromBody] RuleTriggerActionRequest request) + { + if (request == null) + { + return BadRequest(new { Success = false, Error = "Request cannnot be empty." }); + } + + var trigger = _services.GetServices().FirstOrDefault(x => x.Name.IsEqualTo(request.TriggerName)); + if (trigger == null) + { + return BadRequest(new { Success = false, Error = "Unable to find rule trigger." }); + } + + var result = await _ruleEngine.Triggered(trigger, request.Text, request.States, request.Options); + return Ok(new { Success = true }); + } +} diff --git a/src/Infrastructure/BotSharp.Core.Rules/Engines/RuleEngine.cs b/src/Infrastructure/BotSharp.Core.Rules/Engines/RuleEngine.cs index 5f68b722d..53b748ea5 100644 --- a/src/Infrastructure/BotSharp.Core.Rules/Engines/RuleEngine.cs +++ b/src/Infrastructure/BotSharp.Core.Rules/Engines/RuleEngine.cs @@ -1,17 +1,3 @@ -using BotSharp.Abstraction.Agents.Models; -using BotSharp.Abstraction.Coding; -using BotSharp.Abstraction.Coding.Contexts; -using BotSharp.Abstraction.Coding.Enums; -using BotSharp.Abstraction.Coding.Models; -using BotSharp.Abstraction.Coding.Settings; -using BotSharp.Abstraction.Coding.Utils; -using BotSharp.Abstraction.Conversations; -using BotSharp.Abstraction.Hooks; -using BotSharp.Abstraction.Models; -using BotSharp.Abstraction.Repositories.Filters; -using BotSharp.Abstraction.Rules.Options; -using BotSharp.Abstraction.Utilities; -using Microsoft.Extensions.Logging; using System.Data; using System.Text.Json; @@ -21,16 +7,13 @@ public class RuleEngine : IRuleEngine { private readonly IServiceProvider _services; private readonly ILogger _logger; - private readonly CodingSettings _codingSettings; public RuleEngine( IServiceProvider services, - ILogger logger, - CodingSettings codingSettings) + ILogger logger) { _services = services; _logger = logger; - _codingSettings = codingSettings; } public async Task> Triggered(IRuleTrigger trigger, string text, IEnumerable? states = null, RuleTriggerOptions? options = null) @@ -39,7 +22,7 @@ public async Task> Triggered(IRuleTrigger trigger, string te // Pull all user defined rules var agentService = _services.GetRequiredService(); - var agents = await agentService.GetAgents(new AgentFilter + var agents = await agentService.GetAgents(options?.AgentFilter ?? new AgentFilter { Pager = new Pagination { @@ -51,154 +34,210 @@ public async Task> Triggered(IRuleTrigger trigger, string te var filteredAgents = agents.Items.Where(x => x.Rules.Exists(r => r.TriggerName.IsEqualTo(trigger.Name) && !x.Disabled)).ToList(); foreach (var agent in filteredAgents) { - // Code trigger - if (options != null) + var rule = agent.Rules.FirstOrDefault(x => x.TriggerName.IsEqualTo(trigger.Name) && !x.Disabled); + if (rule == null) { - var isTriggered = await TriggerCodeScript(agent, trigger.Name, options); - if (!isTriggered) + continue; + } + + // Criteria validation + if (!string.IsNullOrEmpty(rule.RuleCriteria?.Name) && !rule.RuleCriteria.Disabled) + { + var criteriaResult = await ExecuteCriteriaAsync(agent, rule, trigger, rule.RuleCriteria?.Name, text, states, options); + if (criteriaResult?.IsValid == false) { + _logger.LogWarning("Criteria validation failed for agent {AgentId} with trigger {TriggerName}", agent.Id, trigger.Name); continue; } } - - var convService = _services.GetRequiredService(); - var conv = await convService.NewConversation(new Conversation - { - Channel = trigger.Channel, - Title = text, - AgentId = agent.Id - }); - - var message = new RoleDialogModel(AgentRole.User, text); - - var allStates = new List + + // Execute action + if (rule.RuleAction?.Disabled == true) { - new("channel", trigger.Channel) - }; + continue; + } - if (states != null) + var actionName = !string.IsNullOrEmpty(rule?.RuleAction?.Name) ? rule.RuleAction.Name : RuleConstant.DEFAULT_ACTION_NAME; + var actionResult = await ExecuteActionAsync(agent, rule, trigger, actionName, text, states, options); + if (actionResult?.Success == true && !string.IsNullOrEmpty(actionResult.ConversationId)) { - allStates.AddRange(states); + newConversationIds.Add(actionResult.ConversationId); } - - await convService.SetConversationId(conv.Id, allStates); - - await convService.SendMessage(agent.Id, - message, - null, - msg => Task.CompletedTask); - - await convService.SaveStates(); - newConversationIds.Add(conv.Id); } return newConversationIds; } - #region Private methods - private async Task TriggerCodeScript(Agent agent, string triggerName, RuleTriggerOptions options) + + #region Criteria + private async Task ExecuteCriteriaAsync( + Agent agent, + AgentRule rule, + IRuleTrigger trigger, + string? criteriaProvider, + string text, + IEnumerable? states, + RuleTriggerOptions? triggerOptions) { - if (string.IsNullOrWhiteSpace(agent?.Id)) - { - return false; - } + var result = new RuleCriteriaResult(); - var provider = options.CodeProcessor ?? BuiltInCodeProcessor.PyInterpreter; - var processor = _services.GetServices().FirstOrDefault(x => x.Provider.IsEqualTo(provider)); - if (processor == null) + try { - _logger.LogWarning($"Unable to find code processor: {provider}."); - return false; - } + var criteria = _services.GetServices() + .FirstOrDefault(x => x.Provider == criteriaProvider); - var agentService = _services.GetRequiredService(); - var scriptName = options.CodeScriptName ?? $"{triggerName}_rule.py"; - var codeScript = await agentService.GetAgentCodeScript(agent.Id, scriptName, scriptType: AgentCodeScriptType.Src); + if (criteria == null) + { + return result; + } - var msg = $"rule trigger ({triggerName}) code script ({scriptName}) in agent ({agent.Name}) => args: {options.ArgumentContent?.RootElement.GetRawText()}."; - if (codeScript == null || string.IsNullOrWhiteSpace(codeScript.Content)) - { - _logger.LogWarning($"Unable to find {msg}."); - return false; - } + var context = new RuleCriteriaContext + { + Text = text, + Parameters = BuildContextParameters(rule.RuleCriteria?.Config, states), + JsonOptions = triggerOptions?.JsonOptions + }; - try - { - var hooks = _services.GetHooks(agent.Id); + _logger.LogInformation("Start execution rule criteria {CriteriaProvider} for agent {AgentId} with trigger {TriggerName}", + criteria.Provider, agent.Id, trigger.Name); - var arguments = BuildArguments(options.ArgumentName, options.ArgumentContent); - var context = new CodeExecutionContext + var hooks = _services.GetHooks(agent.Id); + foreach (var hook in hooks) { - CodeScript = codeScript, - Arguments = arguments - }; + await hook.BeforeRuleCriteriaExecuted(agent, trigger, context); + } + + // Execute criteria + context.Parameters ??= []; + result = await criteria.ValidateAsync(agent, trigger, context); foreach (var hook in hooks) { - await hook.BeforeCodeExecution(agent, context); + await hook.AfterRuleCriteriaExecuted(agent, trigger, result); } - var (useLock, useProcess, timeoutSeconds) = CodingUtil.GetCodeExecutionConfig(_codingSettings); - using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(timeoutSeconds)); - var response = processor.Run(codeScript.Content, options: new() + return result; + } + catch (Exception ex) + { + _logger.LogError(ex, "Error executing rule criteria {CriteriaProvider} for agent {AgentId}", criteriaProvider ?? string.Empty, agent.Id); + return result; + } + } + #endregion + + + #region Action + private async Task ExecuteActionAsync( + Agent agent, + AgentRule rule, + IRuleTrigger trigger, + string? actionName, + string text, + IEnumerable? states, + RuleTriggerOptions? triggerOptions) + { + try + { + // Get all registered rule actions + var actions = _services.GetServices(); + + // Find the matching action + var action = actions.FirstOrDefault(x => x.Name.IsEqualTo(actionName)); + + if (action == null) { - ScriptName = scriptName, - Arguments = arguments, - UseLock = useLock, - UseProcess = useProcess - }, cancellationToken: cts.Token); + var errorMsg = $"No rule action {actionName} is found"; + _logger.LogWarning(errorMsg); + return RuleActionResult.Failed(errorMsg); + } - var codeResponse = new CodeExecutionResponseModel + var context = new RuleActionContext { - CodeProcessor = processor.Provider, - CodeScript = codeScript, - Arguments = arguments.DistinctBy(x => x.Key).ToDictionary(x => x.Key, x => x.Value ?? string.Empty), - ExecutionResult = response + Text = text, + Parameters = BuildContextParameters(rule.RuleAction?.Config, states), + JsonOptions = triggerOptions?.JsonOptions }; + _logger.LogInformation("Start execution rule action {ActionName} for agent {AgentId} with trigger {TriggerName}", + action.Name, agent.Id, trigger.Name); + + var hooks = _services.GetHooks(agent.Id); foreach (var hook in hooks) { - await hook.AfterCodeExecution(agent, codeResponse); + await hook.BeforeRuleActionExecuted(agent, trigger, context); } - if (response == null || !response.Success) - { - _logger.LogWarning($"Failed to handle {msg}"); - return false; - } + // Execute action + context.Parameters ??= []; + var result = await action.ExecuteAsync(agent, trigger, context); - bool result; - LogLevel logLevel; - if (response.Result.IsEqualTo("true")) - { - logLevel = LogLevel.Information; - result = true; - } - else + foreach (var hook in hooks) { - logLevel = LogLevel.Warning; - result = false; + await hook.AfterRuleActionExecuted(agent, trigger, result); } - _logger.Log(logLevel, $"Code script execution result ({response}) from {msg}"); return result; } catch (Exception ex) { - _logger.LogError(ex, $"Error when handling {msg}"); - return false; + _logger.LogError(ex, "Error executing rule action {ActionName} for agent {AgentId}", actionName, agent.Id); + return RuleActionResult.Failed(ex.Message); + } + } + #endregion + + + #region Private methods + private Dictionary BuildContextParameters(JsonDocument? config, IEnumerable? states) + { + var dict = new Dictionary(); + + if (config != null) + { + dict = ConvertToDictionary(config); + } + + if (!states.IsNullOrEmpty()) + { + foreach (var state in states!) + { + dict[state.Key] = state.Value; + } } + + return dict; } - private List BuildArguments(string? name, JsonDocument? args) + private static Dictionary ConvertToDictionary(JsonDocument doc) { - var keyValues = new List(); - if (args != null) + var dict = new Dictionary(); + + foreach (var prop in doc.RootElement.EnumerateObject()) { - keyValues.Add(new KeyValue(name ?? "trigger_args", args.RootElement.GetRawText())); + dict[prop.Name] = prop.Value.ValueKind switch + { + JsonValueKind.String => prop.Value.GetString(), + JsonValueKind.Number when prop.Value.TryGetDecimal(out decimal decimalValue) => decimalValue, + JsonValueKind.Number when prop.Value.TryGetDouble(out double doubleValue) => doubleValue, + JsonValueKind.Number when prop.Value.TryGetInt32(out int intValue) => intValue, + JsonValueKind.Number when prop.Value.TryGetInt64(out long longValue) => longValue, + JsonValueKind.Number when prop.Value.TryGetDateTime(out DateTime dateTimeValue) => dateTimeValue, + JsonValueKind.Number when prop.Value.TryGetDateTimeOffset(out DateTimeOffset dateTimeOffsetValue) => dateTimeOffsetValue, + JsonValueKind.Number when prop.Value.TryGetGuid(out Guid guidValue) => guidValue, + JsonValueKind.Number => prop.Value.GetDouble(), + JsonValueKind.True => true, + JsonValueKind.False => false, + JsonValueKind.Null => null, + JsonValueKind.Undefined => null, + JsonValueKind.Array => prop.Value, + JsonValueKind.Object => prop.Value, + _ => prop.Value + }; } - return keyValues; + + return dict; + #endregion } - #endregion } diff --git a/src/Infrastructure/BotSharp.Core.Rules/Models/RuleMessagePayload.cs b/src/Infrastructure/BotSharp.Core.Rules/Models/RuleMessagePayload.cs new file mode 100644 index 000000000..1c5e81d71 --- /dev/null +++ b/src/Infrastructure/BotSharp.Core.Rules/Models/RuleMessagePayload.cs @@ -0,0 +1,11 @@ +namespace BotSharp.Core.Rules.Models; + +public class RuleMessagePayload +{ + public string AgentId { get; set; } + public string TriggerName { get; set; } + public string Channel { get; set; } + public string Text { get; set; } + public Dictionary States { get; set; } + public DateTime Timestamp { get; set; } +} diff --git a/src/Infrastructure/BotSharp.Core.Rules/Models/RuleTriggerActionRequest.cs b/src/Infrastructure/BotSharp.Core.Rules/Models/RuleTriggerActionRequest.cs new file mode 100644 index 000000000..0abea08b0 --- /dev/null +++ b/src/Infrastructure/BotSharp.Core.Rules/Models/RuleTriggerActionRequest.cs @@ -0,0 +1,18 @@ +using System.Text.Json.Serialization; + +namespace BotSharp.Core.Rules.Models; + +public class RuleTriggerActionRequest +{ + [JsonPropertyName("trigger_name")] + public string TriggerName { get; set; } + + [JsonPropertyName("text")] + public string Text { get; set; } = string.Empty; + + [JsonPropertyName("states")] + public IEnumerable? States { get; set; } + + [JsonPropertyName("options")] + public RuleTriggerOptions? Options { get; set; } +} diff --git a/src/Infrastructure/BotSharp.Core.Rules/RulesPlugin.cs b/src/Infrastructure/BotSharp.Core.Rules/RulesPlugin.cs index 56e1fb8ae..5b8ade6b0 100644 --- a/src/Infrastructure/BotSharp.Core.Rules/RulesPlugin.cs +++ b/src/Infrastructure/BotSharp.Core.Rules/RulesPlugin.cs @@ -1,4 +1,6 @@ using BotSharp.Core.Rules.Engines; +using BotSharp.Core.Rules.Services.Actions; +using BotSharp.Core.Rules.Services.Criteria; namespace BotSharp.Core.Rules; @@ -17,5 +19,12 @@ public class RulesPlugin : IBotSharpPlugin public void RegisterDI(IServiceCollection services, IConfiguration config) { services.AddScoped(); + services.AddScoped(); + + // Register rule actions + services.AddScoped(); + services.AddScoped(); + services.AddScoped(); + services.AddScoped(); } } diff --git a/src/Infrastructure/BotSharp.Core.Rules/Services/Actions/ChatRuleAction.cs b/src/Infrastructure/BotSharp.Core.Rules/Services/Actions/ChatRuleAction.cs new file mode 100644 index 000000000..4c599a3e9 --- /dev/null +++ b/src/Infrastructure/BotSharp.Core.Rules/Services/Actions/ChatRuleAction.cs @@ -0,0 +1,72 @@ +namespace BotSharp.Core.Rules.Services.Actions; + +public sealed class ChatRuleAction : IRuleAction +{ + private readonly IServiceProvider _services; + private readonly ILogger _logger; + + public ChatRuleAction( + IServiceProvider services, + ILogger logger) + { + _services = services; + _logger = logger; + } + + public string Name => RuleConstant.DEFAULT_ACTION_NAME; + + public async Task ExecuteAsync( + Agent agent, + IRuleTrigger trigger, + RuleActionContext context) + { + using var scope = _services.CreateScope(); + var sp = scope.ServiceProvider; + + try + { + var channel = trigger.Channel; + var convService = sp.GetRequiredService(); + var conv = await convService.NewConversation(new Conversation + { + Channel = channel, + Title = context.Text, + AgentId = agent.Id + }); + + var message = new RoleDialogModel(AgentRole.User, context.Text); + + var allStates = new List + { + new("channel", channel) + }; + + if (!context.Parameters.IsNullOrEmpty()) + { + var states = context.Parameters.Select(x => new MessageState(x.Key, x.Value)); + allStates.AddRange(states); + } + + await convService.SetConversationId(conv.Id, allStates); + await convService.SendMessage(agent.Id, + message, + null, + msg => Task.CompletedTask); + + await convService.SaveStates(); + + _logger.LogInformation("Chat rule action executed successfully for agent {AgentId}, conversation {ConversationId}", agent.Id, conv.Id); + + return new RuleActionResult + { + Success = true, + ConversationId = conv.Id + }; + } + catch (Exception ex) + { + _logger.LogError(ex, "Error when sending chat via rule action for agent {AgentId} and trigger {TriggerName}", agent.Id, trigger.Name); + return RuleActionResult.Failed(ex.Message); + } + } +} diff --git a/src/Infrastructure/BotSharp.Core.Rules/Services/Actions/FunctionCallRuleAction.cs b/src/Infrastructure/BotSharp.Core.Rules/Services/Actions/FunctionCallRuleAction.cs new file mode 100644 index 000000000..811db8124 --- /dev/null +++ b/src/Infrastructure/BotSharp.Core.Rules/Services/Actions/FunctionCallRuleAction.cs @@ -0,0 +1,44 @@ +using BotSharp.Abstraction.Functions; + +namespace BotSharp.Core.Rules.Services.Actions; + +public sealed class FunctionCallRuleAction : IRuleAction +{ + private readonly IServiceProvider _services; + private readonly ILogger _logger; + + public FunctionCallRuleAction( + IServiceProvider services, + ILogger logger) + { + _services = services; + _logger = logger; + } + + public string Name => "BotSharp-function-call"; + + public async Task ExecuteAsync( + Agent agent, + IRuleTrigger trigger, + RuleActionContext context) + { + var funcName = context.Parameters.TryGetValueOrDefault("function_name", string.Empty); + var func = _services.GetServices().FirstOrDefault(x => x.Name.IsEqualTo(funcName)); + + if (func == null) + { + var errorMsg = $"Unable to find function '{funcName}' when running action {agent.Name}-{trigger.Name}"; + _logger.LogWarning(errorMsg); + return RuleActionResult.Failed(errorMsg); + } + + var funcArg = context.Parameters.TryGetValueOrDefault("function_argument") ?? new(); + await func.Execute(funcArg); + + return new RuleActionResult + { + Success = true, + Response = funcArg?.RichContent?.Message?.Text ?? funcArg?.Content + }; + } +} diff --git a/src/Infrastructure/BotSharp.Core.Rules/Services/Actions/HttpRuleAction.cs b/src/Infrastructure/BotSharp.Core.Rules/Services/Actions/HttpRuleAction.cs new file mode 100644 index 000000000..5e60e6c3f --- /dev/null +++ b/src/Infrastructure/BotSharp.Core.Rules/Services/Actions/HttpRuleAction.cs @@ -0,0 +1,194 @@ +using System.Net.Mime; +using System.Text.Json; +using System.Text.Json.Serialization; +using System.Web; + +namespace BotSharp.Core.Rules.Services.Actions; + +public sealed class HttpRuleAction : IRuleAction +{ + private readonly IServiceProvider _services; + private readonly ILogger _logger; + private readonly IHttpClientFactory _httpClientFactory; + + private readonly JsonSerializerOptions _defaultJsonOptions = new() + { + PropertyNameCaseInsensitive = true, + PropertyNamingPolicy = JsonNamingPolicy.CamelCase, + AllowTrailingCommas = true, + ReferenceHandler = ReferenceHandler.IgnoreCycles + }; + + public HttpRuleAction( + IServiceProvider services, + ILogger logger, + IHttpClientFactory httpClientFactory) + { + _services = services; + _logger = logger; + _httpClientFactory = httpClientFactory; + } + + public string Name => "BotSharp-http"; + + public async Task ExecuteAsync( + Agent agent, + IRuleTrigger trigger, + RuleActionContext context) + { + try + { + var httpMethod = GetHttpMethod(context); + if (httpMethod == null) + { + var errorMsg = $"HTTP method is not supported in agent rule {agent.Name}-{trigger.Name}"; + _logger.LogWarning(errorMsg); + return RuleActionResult.Failed(errorMsg); + } + + // Build the full URL + var fullUrl = BuildUrl(context); + + using var client = _httpClientFactory.CreateClient(); + + // Add headers + AddHttpHeaders(client, context); + + // Create request + var request = new HttpRequestMessage(httpMethod, fullUrl); + + // Add request body if provided + var requestBodyStr = GetHttpRequestBody(context); + if (!string.IsNullOrEmpty(requestBodyStr)) + { + request.Content = new StringContent(requestBodyStr, Encoding.UTF8, MediaTypeNames.Application.Json); + } + + _logger.LogInformation("Executing HTTP rule action for agent {AgentId}, URL: {Url}, Method: {Method}", + agent.Id, fullUrl, httpMethod); + + // Send request + var response = await client.SendAsync(request); + var responseContent = await response.Content.ReadAsStringAsync(); + + if (response.IsSuccessStatusCode) + { + _logger.LogInformation("HTTP rule action executed successfully for agent {AgentId}, Status: {StatusCode}", + agent.Id, response.StatusCode); + + return new RuleActionResult + { + Success = true, + Response = responseContent + }; + } + else + { + var errorMsg = $"HTTP request failed with status code {response.StatusCode}: {responseContent}"; + _logger.LogWarning(errorMsg); + return RuleActionResult.Failed(errorMsg); + } + } + catch (Exception ex) + { + _logger.LogError(ex, "Error executing HTTP rule action for agent {AgentId} and trigger {TriggerName}", + agent.Id, trigger.Name); + return RuleActionResult.Failed(ex.Message); + } + } + + private string BuildUrl(RuleActionContext context) + { + var url = context.Parameters.TryGetValueOrDefault("http_url"); + if (string.IsNullOrEmpty(url)) + { + throw new ArgumentNullException("Unable to find http_url in context"); + } + + // Fill in placeholders in url + foreach (var param in context.Parameters) + { + var value = param.Value?.ToString(); + if (string.IsNullOrEmpty(value)) + { + continue; + } + url = url.Replace($"{{{param.Key}}}", value); + } + + // Add query parameters + var queryParams = context.Parameters.TryGetValueOrDefault>("http_query_params"); + if (!queryParams.IsNullOrEmpty()) + { + var builder = new UriBuilder(url); + var query = HttpUtility.ParseQueryString(builder.Query); + + // Add new query params + foreach (var kv in queryParams!.Where(x => x.Value != null)) + { + query[kv.Key] = kv.Value!; + } + + // Assign merged query back + builder.Query = query.ToString(); + url = builder.ToString(); + } + + return url; + } + + private HttpMethod? GetHttpMethod(RuleActionContext context) + { + var method = context.Parameters.TryGetValueOrDefault("http_method", string.Empty); + var innerMethod = method?.Trim()?.ToUpper(); + HttpMethod? matchMethod = null; + + switch (innerMethod) + { + case "GET": + matchMethod = HttpMethod.Get; + break; + case "POST": + matchMethod = HttpMethod.Post; + break; + case "DELETE": + matchMethod = HttpMethod.Delete; + break; + case "PUT": + matchMethod = HttpMethod.Put; + break; + case "PATCH": + matchMethod = HttpMethod.Patch; + break; + default: + break; + + } + + return matchMethod; + } + + private void AddHttpHeaders(HttpClient client, RuleActionContext context) + { + var headerParams = context.Parameters.TryGetValueOrDefault>("http_headers"); + if (!headerParams.IsNullOrEmpty()) + { + foreach (var header in headerParams!) + { + client.DefaultRequestHeaders.TryAddWithoutValidation(header.Key, header.Value); + } + } + } + + private string? GetHttpRequestBody(RuleActionContext context) + { + var body = context.Parameters.GetValueOrDefault("http_request_body"); + if (body == null) + { + return null; + } + + return JsonSerializer.Serialize(body, context.JsonOptions ?? _defaultJsonOptions); + } +} + diff --git a/src/Infrastructure/BotSharp.Core.Rules/Services/Actions/MessageQueueRuleAction.cs b/src/Infrastructure/BotSharp.Core.Rules/Services/Actions/MessageQueueRuleAction.cs new file mode 100644 index 000000000..2f819c6f5 --- /dev/null +++ b/src/Infrastructure/BotSharp.Core.Rules/Services/Actions/MessageQueueRuleAction.cs @@ -0,0 +1,129 @@ +using BotSharp.Core.Rules.Models; + +namespace BotSharp.Core.Rules.Services.Actions; + +public sealed class MessageQueueRuleAction : IRuleAction +{ + private readonly IServiceProvider _services; + private readonly ILogger _logger; + + public MessageQueueRuleAction( + IServiceProvider services, + ILogger logger) + { + _services = services; + _logger = logger; + } + + public string Name => "BotSharp-message-queue"; + + public async Task ExecuteAsync( + Agent agent, + IRuleTrigger trigger, + RuleActionContext context) + { + try + { + // Get message queue service + var mqService = _services.GetService(); + if (mqService == null) + { + var errorMsg = "Message queue service is not configured. Please ensure a message queue provider (e.g., RabbitMQ) is registered."; + _logger.LogWarning(errorMsg); + return RuleActionResult.Failed(errorMsg); + } + + // Create message payload + var payload = new RuleMessagePayload + { + AgentId = agent.Id, + TriggerName = trigger.Name, + Channel = trigger.Channel, + Text = context.Text, + Timestamp = DateTime.UtcNow, + States = context.Parameters + }; + + // Publish message to queue + var mqOptions = BuildMQPublishOptions(context); + var success = await mqService.PublishAsync(payload, mqOptions); + + if (success) + { + _logger.LogInformation("MessageQueue rule action executed successfully for agent {AgentId}", agent.Id); + return new RuleActionResult + { + Success = true, + Response = $"Message published to queue: {mqOptions.TopicName}-{mqOptions.RoutingKey}" + }; + } + else + { + var errorMsg = $"Failed to publish message to queue {mqOptions.TopicName}-{mqOptions.RoutingKey}"; + _logger.LogWarning(errorMsg); + return RuleActionResult.Failed(errorMsg); + } + } + catch (Exception ex) + { + _logger.LogError(ex, "Error executing MessageQueue rule action for agent {AgentId} and trigger {TriggerName}", + agent.Id, trigger.Name); + return RuleActionResult.Failed(ex.Message); + } + } + + private MQPublishOptions BuildMQPublishOptions(RuleActionContext context) + { + var topicName = context.Parameters.TryGetValueOrDefault("mq_topic_name", string.Empty); + var routingKey = context.Parameters.TryGetValueOrDefault("mq_routing_key", string.Empty); + var delayMilliseconds = ParseDelay(context); + + return new MQPublishOptions + { + TopicName = topicName, + RoutingKey = routingKey, + DelayMilliseconds = delayMilliseconds, + JsonOptions = context.JsonOptions + }; + } + + private long ParseDelay(RuleActionContext context) + { + var qty = (double)context.Parameters.TryGetValueOrDefault("mq_delay_qty", 0); + if (qty == 0) + { + qty = context.Parameters.TryGetValueOrDefault("mq_delay_qty", 0.0); + } + + if (qty <= 0) + { + return 0L; + } + + var unit = context.Parameters.TryGetValueOrDefault("mq_delay_unit", string.Empty) ?? string.Empty; + unit = unit.ToLower(); + + var milliseconds = 0L; + switch (unit) + { + case "second": + case "seconds": + milliseconds = (long)TimeSpan.FromSeconds(qty).TotalMilliseconds; + break; + case "minute": + case "minutes": + milliseconds = (long)TimeSpan.FromMinutes(qty).TotalMilliseconds; + break; + case "hour": + case "hours": + milliseconds = (long)TimeSpan.FromHours(qty).TotalMilliseconds; + break; + case "day": + case "days": + milliseconds = (long)TimeSpan.FromDays(qty).TotalMilliseconds; + break; + } + + return milliseconds; + } +} diff --git a/src/Infrastructure/BotSharp.Core.Rules/Services/Criteria/CodeScriptRuleCriteria.cs b/src/Infrastructure/BotSharp.Core.Rules/Services/Criteria/CodeScriptRuleCriteria.cs new file mode 100644 index 000000000..ad3489bc6 --- /dev/null +++ b/src/Infrastructure/BotSharp.Core.Rules/Services/Criteria/CodeScriptRuleCriteria.cs @@ -0,0 +1,130 @@ +using System.Text.Json; + +namespace BotSharp.Core.Rules.Services.Criteria; + +public class CodeScriptRuleCriteria : IRuleCriteria +{ + private readonly IServiceProvider _services; + private readonly ILogger _logger; + private readonly CodingSettings _codingSettings; + + public CodeScriptRuleCriteria( + IServiceProvider services, + ILogger logger, + CodingSettings codingSettings) + { + _services = services; + _logger = logger; + _codingSettings = codingSettings; + } + + public string Provider => RuleConstant.DEFAULT_CRITERIA_PROVIDER; + + public async Task ValidateAsync(Agent agent, IRuleTrigger trigger, RuleCriteriaContext context) + { + var result = new RuleCriteriaResult(); + + if (string.IsNullOrWhiteSpace(agent?.Id)) + { + return result; + } + + var provider = context.Parameters.TryGetValueOrDefault("code_processor") ?? BuiltInCodeProcessor.PyInterpreter; + var processor = _services.GetServices().FirstOrDefault(x => x.Provider.IsEqualTo(provider)); + if (processor == null) + { + _logger.LogWarning($"Unable to find code processor: {provider}."); + return result; + } + + var agentService = _services.GetRequiredService(); + var scriptName = context.Parameters.TryGetValueOrDefault("code_script_name") ?? $"{trigger.Name}_rule.py"; + var codeScript = await agentService.GetAgentCodeScript(agent.Id, scriptName, scriptType: AgentCodeScriptType.Src); + + var msg = $"rule trigger ({trigger.Name}) code script ({scriptName}) in agent ({agent.Name})."; + + if (codeScript == null || string.IsNullOrWhiteSpace(codeScript.Content)) + { + _logger.LogWarning($"Unable to find {msg}."); + return result; + } + + try + { + var hooks = _services.GetHooks(agent.Id); + + var argName = context.Parameters.TryGetValueOrDefault("argument_name"); + var argValue = context.Parameters.TryGetValueOrDefault("argument_value"); + var arguments = BuildArguments(argName, argValue); + var codeExeContext = new CodeExecutionContext + { + CodeScript = codeScript, + Arguments = arguments + }; + + foreach (var hook in hooks) + { + await hook.BeforeCodeExecution(agent, codeExeContext); + } + + var (useLock, useProcess, timeoutSeconds) = CodingUtil.GetCodeExecutionConfig(_codingSettings); + using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(timeoutSeconds)); + var response = processor.Run(codeScript.Content, options: new() + { + ScriptName = scriptName, + Arguments = arguments, + UseLock = useLock, + UseProcess = useProcess + }, cancellationToken: cts.Token); + + var codeResponse = new CodeExecutionResponseModel + { + CodeProcessor = processor.Provider, + CodeScript = codeScript, + Arguments = arguments.DistinctBy(x => x.Key).ToDictionary(x => x.Key, x => x.Value ?? string.Empty), + ExecutionResult = response + }; + + foreach (var hook in hooks) + { + await hook.AfterCodeExecution(agent, codeResponse); + } + + if (response == null || !response.Success) + { + _logger.LogWarning($"Failed to handle {msg}"); + return result; + } + + LogLevel logLevel; + if (response.Result.IsEqualTo("true")) + { + logLevel = LogLevel.Information; + result.Success = true; + result.IsValid = true; + } + else + { + logLevel = LogLevel.Warning; + } + + _logger.Log(logLevel, $"Code script execution result ({response}) from {msg}"); + return result; + } + catch (Exception ex) + { + _logger.LogError(ex, $"Error when handling {msg}"); + return result; + } + } + + private List BuildArguments(string? name, JsonElement? args) + { + var keyValues = new List(); + if (args != null) + { + keyValues.Add(new KeyValue(name ?? "trigger_args", args.Value.GetRawText())); + } + return keyValues; + } +} diff --git a/src/Infrastructure/BotSharp.Core.Rules/Using.cs b/src/Infrastructure/BotSharp.Core.Rules/Using.cs index a4353c960..2d1dc6844 100644 --- a/src/Infrastructure/BotSharp.Core.Rules/Using.cs +++ b/src/Infrastructure/BotSharp.Core.Rules/Using.cs @@ -1,5 +1,7 @@ global using Microsoft.Extensions.Configuration; global using Microsoft.Extensions.DependencyInjection; +global using Microsoft.Extensions.Logging; +global using System.Text; global using BotSharp.Abstraction.Agents.Enums; global using BotSharp.Abstraction.Plugins; @@ -8,4 +10,24 @@ global using BotSharp.Abstraction.Instructs; global using BotSharp.Abstraction.Instructs.Models; -global using BotSharp.Abstraction.Rules; \ No newline at end of file +global using BotSharp.Abstraction.Agents.Models; +global using BotSharp.Abstraction.Conversations; + +global using BotSharp.Abstraction.Infrastructures.MessageQueues; +global using BotSharp.Abstraction.Infrastructures.MessageQueues.Models; +global using BotSharp.Abstraction.Models; +global using BotSharp.Abstraction.Repositories.Filters; +global using BotSharp.Abstraction.Rules; +global using BotSharp.Abstraction.Rules.Options; +global using BotSharp.Abstraction.Rules.Models; +global using BotSharp.Abstraction.Rules.Hooks; +global using BotSharp.Abstraction.Utilities; +global using BotSharp.Abstraction.Coding; +global using BotSharp.Abstraction.Coding.Contexts; +global using BotSharp.Abstraction.Coding.Enums; +global using BotSharp.Abstraction.Coding.Models; +global using BotSharp.Abstraction.Coding.Utils; +global using BotSharp.Abstraction.Coding.Settings; +global using BotSharp.Abstraction.Hooks; + +global using BotSharp.Core.Rules.Constants; \ No newline at end of file diff --git a/src/Infrastructure/BotSharp.Core/Messaging/MessagingPlugin.cs b/src/Infrastructure/BotSharp.Core/Messaging/MessagingPlugin.cs new file mode 100644 index 000000000..5c84fcb63 --- /dev/null +++ b/src/Infrastructure/BotSharp.Core/Messaging/MessagingPlugin.cs @@ -0,0 +1,18 @@ +using BotSharp.Abstraction.Infrastructures.MessageQueues; +using Microsoft.Extensions.Configuration; + +namespace BotSharp.Core.Messaging; + +public class MessagingPlugin : IBotSharpPlugin +{ + public string Id => "52a0aa30-4820-42a9-9cae-df0be81bad2b"; + public string Name => "Messaging"; + public string Description => "Provides message queue services."; + + public void RegisterDI(IServiceCollection services, IConfiguration config) + { + var mqSettings = new MessageQueueSettings(); + config.Bind("MessageQueue", mqSettings); + services.AddSingleton(mqSettings); + } +} \ No newline at end of file diff --git a/src/Infrastructure/BotSharp.OpenAPI/Controllers/Agent/AgentController.Rule.cs b/src/Infrastructure/BotSharp.OpenAPI/Controllers/Agent/AgentController.Rule.cs index 52fd719fd..366157418 100644 --- a/src/Infrastructure/BotSharp.OpenAPI/Controllers/Agent/AgentController.Rule.cs +++ b/src/Infrastructure/BotSharp.OpenAPI/Controllers/Agent/AgentController.Rule.cs @@ -1,4 +1,3 @@ -using BotSharp.Abstraction.Agents.Models; using BotSharp.Abstraction.Rules; namespace BotSharp.OpenAPI.Controllers; @@ -18,9 +17,15 @@ public IEnumerable GetRuleTriggers() }).OrderBy(x => x.TriggerName); } - [HttpGet("/rule/formalization")] - public async Task GetFormalizedRuleDefinition([FromBody] AgentRule rule) + [HttpGet("/rule/criteria-providers")] + public async Task> GetRuleCriteriaProviders() { - return "{}"; + return _services.GetServices().Select(x => x.Provider).OrderBy(x => x); + } + + [HttpGet("/rule/actions")] + public async Task> GetRuleActions() + { + return _services.GetServices().Select(x => x.Name).OrderBy(x => x); } } diff --git a/src/Plugins/BotSharp.Plugin.Graph/GraphDb.cs b/src/Plugins/BotSharp.Plugin.Graph/GraphDb.cs index be189898e..8e29bb1bb 100644 --- a/src/Plugins/BotSharp.Plugin.Graph/GraphDb.cs +++ b/src/Plugins/BotSharp.Plugin.Graph/GraphDb.cs @@ -84,7 +84,7 @@ private async Task SendRequest(string url, GraphQueryRequest r } catch (Exception ex) { - _logger.LogError(ex, $"Error when fetching Lessen GLM response (Endpoint: {url})."); + _logger.LogError(ex, $"Error when fetching {Provider} Graph db response (Endpoint: {url})."); return result; } } diff --git a/src/Plugins/BotSharp.Plugin.MongoStorage/Models/AgentRuleMongoElement.cs b/src/Plugins/BotSharp.Plugin.MongoStorage/Models/AgentRuleMongoElement.cs index 4205fdc46..d031f4a96 100644 --- a/src/Plugins/BotSharp.Plugin.MongoStorage/Models/AgentRuleMongoElement.cs +++ b/src/Plugins/BotSharp.Plugin.MongoStorage/Models/AgentRuleMongoElement.cs @@ -1,4 +1,5 @@ using BotSharp.Abstraction.Agents.Models; +using System.Text.Json; namespace BotSharp.Plugin.MongoStorage.Models; @@ -7,7 +8,8 @@ public class AgentRuleMongoElement { public string TriggerName { get; set; } = default!; public bool Disabled { get; set; } - public string Criteria { get; set; } = default!; + public AgentRuleCriteriaMongoModel? RuleCriteria { get; set; } + public AgentRuleActionMongoModel? RuleAction { get; set; } public static AgentRuleMongoElement ToMongoElement(AgentRule rule) { @@ -15,7 +17,8 @@ public static AgentRuleMongoElement ToMongoElement(AgentRule rule) { TriggerName = rule.TriggerName, Disabled = rule.Disabled, - Criteria = rule.Criteria + RuleCriteria = AgentRuleCriteriaMongoModel.ToMongoModel(rule.RuleCriteria), + RuleAction = AgentRuleActionMongoModel.ToMongoModel(rule.RuleAction) }; } @@ -25,7 +28,88 @@ public static AgentRule ToDomainElement(AgentRuleMongoElement rule) { TriggerName = rule.TriggerName, Disabled = rule.Disabled, - Criteria = rule.Criteria + RuleCriteria = AgentRuleCriteriaMongoModel.ToDomainModel(rule.RuleCriteria), + RuleAction = AgentRuleActionMongoModel.ToDomainModel(rule.RuleAction) }; } } + +[BsonIgnoreExtraElements(Inherited = true)] +public class AgentRuleCriteriaMongoModel : AgentRuleConfigMongoModel +{ + public string CriteriaText { get; set; } + + public static AgentRuleCriteriaMongoModel? ToMongoModel(AgentRuleCriteria? criteria) + { + if (criteria == null) + { + return null; + } + + return new AgentRuleCriteriaMongoModel + { + Name = criteria.Name, + CriteriaText = criteria.CriteriaText, + Disabled = criteria.Disabled, + Config = criteria.Config != null ? BsonDocument.Parse(criteria.Config.RootElement.GetRawText()) : null + }; + } + + public static AgentRuleCriteria? ToDomainModel(AgentRuleCriteriaMongoModel? criteria) + { + if (criteria == null) + { + return null; + } + + return new AgentRuleCriteria + { + Name = criteria.Name, + CriteriaText = criteria.CriteriaText, + Disabled = criteria.Disabled, + Config = criteria.Config != null ? JsonDocument.Parse(criteria.Config.ToJson()) : null + }; + } +} + +[BsonIgnoreExtraElements(Inherited = true)] +public class AgentRuleActionMongoModel : AgentRuleConfigMongoModel +{ + public static AgentRuleActionMongoModel? ToMongoModel(AgentRuleAction? action) + { + if (action == null) + { + return null; + } + + return new AgentRuleActionMongoModel + { + Name = action.Name, + Disabled = action.Disabled, + Config = action.Config != null ? BsonDocument.Parse(action.Config.RootElement.GetRawText()) : null + }; + } + + public static AgentRuleAction? ToDomainModel(AgentRuleActionMongoModel? action) + { + if (action == null) + { + return null; + } + + return new AgentRuleAction + { + Name = action.Name, + Disabled = action.Disabled, + Config = action.Config != null ? JsonDocument.Parse(action.Config.ToJson()) : null + }; + } +} + +[BsonIgnoreExtraElements(Inherited = true)] +public class AgentRuleConfigMongoModel +{ + public string Name { get; set; } + public bool Disabled { get; set; } + public BsonDocument? Config { get; set; } +} \ No newline at end of file diff --git a/src/Plugins/BotSharp.Plugin.MongoStorage/Repository/MongoRepository.Conversation.cs b/src/Plugins/BotSharp.Plugin.MongoStorage/Repository/MongoRepository.Conversation.cs index 300700a64..83b2f1658 100644 --- a/src/Plugins/BotSharp.Plugin.MongoStorage/Repository/MongoRepository.Conversation.cs +++ b/src/Plugins/BotSharp.Plugin.MongoStorage/Repository/MongoRepository.Conversation.cs @@ -634,8 +634,7 @@ public async Task> TruncateConversation(string conversationId, stri continue; } - var values = state.Values.Where(x => x.MessageId != messageId) - .Where(x => x.UpdateTime < refTime) + var values = state.Values.Where(x => x.MessageId != messageId && x.UpdateTime < refTime) .ToList(); if (values.Count == 0) continue; diff --git a/src/Plugins/BotSharp.Plugin.RabbitMQ/BotSharp.Plugin.RabbitMQ.csproj b/src/Plugins/BotSharp.Plugin.RabbitMQ/BotSharp.Plugin.RabbitMQ.csproj new file mode 100644 index 000000000..4a8f3ff20 --- /dev/null +++ b/src/Plugins/BotSharp.Plugin.RabbitMQ/BotSharp.Plugin.RabbitMQ.csproj @@ -0,0 +1,22 @@ + + + + $(TargetFramework) + enable + $(LangVersion) + $(BotSharpVersion) + $(GeneratePackageOnBuild) + $(GenerateDocumentationFile) + $(SolutionDir)packages + + + + + + + + + + + + diff --git a/src/Plugins/BotSharp.Plugin.RabbitMQ/Connections/RabbitMQChannelPool.cs b/src/Plugins/BotSharp.Plugin.RabbitMQ/Connections/RabbitMQChannelPool.cs new file mode 100644 index 000000000..81c7de270 --- /dev/null +++ b/src/Plugins/BotSharp.Plugin.RabbitMQ/Connections/RabbitMQChannelPool.cs @@ -0,0 +1,73 @@ +using Microsoft.Extensions.ObjectPool; +using RabbitMQ.Client; + +namespace BotSharp.Plugin.RabbitMQ.Connections; + +public class RabbitMQChannelPool +{ + private readonly ObjectPool _pool; + private readonly ILogger _logger; + private readonly int _tryLimit = 3; + + public RabbitMQChannelPool( + IServiceProvider services, + IRabbitMQConnection mqConnection) + { + _logger = services.GetRequiredService().CreateLogger(); + var poolProvider = new DefaultObjectPoolProvider(); + var policy = new ChannelPoolPolicy(mqConnection.Connection); + _pool = poolProvider.Create(policy); + } + + public IChannel Get() + { + var count = 0; + var channel = _pool.Get(); + + while (count < _tryLimit && channel.IsClosed) + { + channel.Dispose(); + channel = _pool.Get(); + count++; + } + + if (channel.IsClosed) + { + _logger.LogWarning($"No open channel from the pool after {_tryLimit} retries."); + } + + return channel; + } + + public void Return(IChannel channel) + { + if (channel.IsOpen) + { + _pool.Return(channel); + } + else + { + channel.Dispose(); + } + } +} + +internal class ChannelPoolPolicy : IPooledObjectPolicy +{ + private readonly IConnection _connection; + + public ChannelPoolPolicy(IConnection connection) + { + _connection = connection; + } + + public IChannel Create() + { + return _connection.CreateChannelAsync().ConfigureAwait(false).GetAwaiter().GetResult(); + } + + public bool Return(IChannel obj) + { + return true; + } +} \ No newline at end of file diff --git a/src/Plugins/BotSharp.Plugin.RabbitMQ/Connections/RabbitMQChannelPoolFactory.cs b/src/Plugins/BotSharp.Plugin.RabbitMQ/Connections/RabbitMQChannelPoolFactory.cs new file mode 100644 index 000000000..989c0a7b7 --- /dev/null +++ b/src/Plugins/BotSharp.Plugin.RabbitMQ/Connections/RabbitMQChannelPoolFactory.cs @@ -0,0 +1,13 @@ +using System.Collections.Concurrent; + +namespace BotSharp.Plugin.RabbitMQ.Connections; + +public static class RabbitMQChannelPoolFactory +{ + private static readonly ConcurrentDictionary _poolDict = new(); + + public static RabbitMQChannelPool GetChannelPool(IServiceProvider services, IRabbitMQConnection rabbitMQConnection) + { + return _poolDict.GetOrAdd(rabbitMQConnection.Connection.ToString()!, key => new RabbitMQChannelPool(services, rabbitMQConnection)); + } +} diff --git a/src/Plugins/BotSharp.Plugin.RabbitMQ/Connections/RabbitMQConnection.cs b/src/Plugins/BotSharp.Plugin.RabbitMQ/Connections/RabbitMQConnection.cs new file mode 100644 index 000000000..dac9e8c07 --- /dev/null +++ b/src/Plugins/BotSharp.Plugin.RabbitMQ/Connections/RabbitMQConnection.cs @@ -0,0 +1,154 @@ +using Polly; +using Polly.Retry; +using RabbitMQ.Client; +using RabbitMQ.Client.Events; +using System.Threading; + +namespace BotSharp.Plugin.RabbitMQ.Connections; + +public class RabbitMQConnection : IRabbitMQConnection +{ + private readonly RabbitMQSettings _settings; + private readonly IConnectionFactory _connectionFactory; + private readonly SemaphoreSlim _lock = new(initialCount: 1, maxCount: 1); + private readonly ILogger _logger; + private readonly int _retryCount = 5; + + private IConnection _connection; + private bool _disposed = false; + + public RabbitMQConnection( + RabbitMQSettings settings, + ILogger logger) + { + _settings = settings; + _logger = logger; + _connectionFactory = new ConnectionFactory + { + HostName = settings.HostName, + Port = settings.Port, + UserName = settings.UserName, + Password = settings.Password, + VirtualHost = settings.VirtualHost, + ConsumerDispatchConcurrency = 1, + AutomaticRecoveryEnabled = true, + HandshakeContinuationTimeout = TimeSpan.FromSeconds(20) + }; + } + + public bool IsConnected => _connection != null && _connection.IsOpen && !_disposed; + + public IConnection Connection => _connection; + + public async Task CreateChannelAsync() + { + if (!IsConnected) + { + throw new InvalidOperationException("Rabbit MQ is not connectioned."); + } + return await _connection.CreateChannelAsync(); + } + + public async Task ConnectAsync() + { + await _lock.WaitAsync(); + + try + { + if (IsConnected) + { + return true; + } + + var policy = BuildRetryPolicy(); + await policy.Execute(async () => + { + _connection = await _connectionFactory.CreateConnectionAsync(); + }); + + if (IsConnected) + { + _connection.ConnectionShutdownAsync += OnConnectionShutdownAsync; + _connection.CallbackExceptionAsync += OnCallbackExceptionAsync; + _connection.ConnectionBlockedAsync += OnConnectionBlockedAsync; + _logger.LogInformation($"Rabbit MQ client connection success. host: {_connection.Endpoint.HostName}, port: {_connection.Endpoint.Port}, localPort:{_connection.LocalPort}"); + return true; + } + _logger.LogError("Rabbit MQ client connection error."); + return false; + } + finally + { + _lock.Release(); + } + + } + + private RetryPolicy BuildRetryPolicy() + { + return Policy.Handle().WaitAndRetry( + _retryCount, + retryAttempt => TimeSpan.FromSeconds(Math.Pow(2, retryAttempt)), + (ex, time) => + { + _logger.LogError(ex, $"RabbitMQ cannot build connection: after {time.TotalSeconds:n1}s"); + }); + } + + private Task OnConnectionShutdownAsync(object sender, ShutdownEventArgs e) + { + if (_disposed) + { + return Task.CompletedTask; + } + + _logger.LogError($"Rabbit MQ connection is shutdown. {e}."); + return Task.CompletedTask; + } + + private Task OnCallbackExceptionAsync(object sender, CallbackExceptionEventArgs e) + { + if (_disposed) + { + return Task.CompletedTask; + } + + _logger.LogError($"Rabbit MQ connection throw exception. Trying to reconnect, {e.Exception}."); + return Task.CompletedTask; + } + + private Task OnConnectionBlockedAsync(object sender, ConnectionBlockedEventArgs e) + { + if (_disposed) + { + return Task.CompletedTask; + } + + _logger.LogError($"Rabbit MQ connection is shutdown. Trying to reconnect, {e.Reason}."); + return Task.CompletedTask; + } + + + public void Dispose() + { + if (_disposed) + { + return; + } + + _logger.LogWarning("Start disposing Rabbit MQ connection."); + + try + { + _connection.Dispose(); + _disposed = true; + _logger.LogWarning("Disposed Rabbit MQ connection."); + } + catch (Exception ex) + { + _logger.LogError(ex, $"Error when disposing Rabbit MQ connection"); + } + + GC.SuppressFinalize(this); + } +} diff --git a/src/Plugins/BotSharp.Plugin.RabbitMQ/Consumers/DummyMessageConsumer.cs b/src/Plugins/BotSharp.Plugin.RabbitMQ/Consumers/DummyMessageConsumer.cs new file mode 100644 index 000000000..36af0df90 --- /dev/null +++ b/src/Plugins/BotSharp.Plugin.RabbitMQ/Consumers/DummyMessageConsumer.cs @@ -0,0 +1,24 @@ +namespace BotSharp.Plugin.RabbitMQ.Consumers; + +public class DummyMessageConsumer : MQConsumerBase +{ + public override object Config => new RabbitMQConsumerConfig + { + ExchangeName = "my.exchange", + QueueName = "dummy.queue", + RoutingKey = "my.routing" + }; + + public DummyMessageConsumer( + IServiceProvider services, + ILogger logger) + : base(services, logger) + { + } + + public override async Task HandleMessageAsync(string channel, string data) + { + _logger.LogCritical($"Received delayed dummy message data: {data}"); + return await Task.FromResult(true); + } +} diff --git a/src/Plugins/BotSharp.Plugin.RabbitMQ/Consumers/ScheduledMessageConsumer.cs b/src/Plugins/BotSharp.Plugin.RabbitMQ/Consumers/ScheduledMessageConsumer.cs new file mode 100644 index 000000000..f6040dcd7 --- /dev/null +++ b/src/Plugins/BotSharp.Plugin.RabbitMQ/Consumers/ScheduledMessageConsumer.cs @@ -0,0 +1,25 @@ +namespace BotSharp.Plugin.RabbitMQ.Consumers; + +public class ScheduledMessageConsumer : MQConsumerBase +{ + public override object Config => new RabbitMQConsumerConfig + { + ExchangeName = "my.exchange", + QueueName = "scheduled.queue", + RoutingKey = "my.routing" + }; + + public ScheduledMessageConsumer( + IServiceProvider services, + ILogger logger) + : base(services, logger) + { + } + + public override async Task HandleMessageAsync(string channel, string data) + { + _logger.LogCritical($"Received delayed scheduled message data: {data}"); + return await Task.FromResult(true); + } +} + diff --git a/src/Plugins/BotSharp.Plugin.RabbitMQ/Controllers/RabbitMQController.cs b/src/Plugins/BotSharp.Plugin.RabbitMQ/Controllers/RabbitMQController.cs new file mode 100644 index 000000000..802e4fa1b --- /dev/null +++ b/src/Plugins/BotSharp.Plugin.RabbitMQ/Controllers/RabbitMQController.cs @@ -0,0 +1,89 @@ +using Microsoft.AspNetCore.Authorization; +using Microsoft.AspNetCore.Http; +using Microsoft.AspNetCore.Mvc; + +namespace BotSharp.Plugin.RabbitMQ.Controllers; + +[Authorize] +[ApiController] +public class RabbitMQController : ControllerBase +{ + private readonly IServiceProvider _services; + private readonly IMQService _mqService; + private readonly ILogger _logger; + + public RabbitMQController( + IServiceProvider services, + IMQService mqService, + ILogger logger) + { + _services = services; + _mqService = mqService; + _logger = logger; + } + + /// + /// Publish a scheduled message to be delivered after a delay + /// + /// The scheduled message request + [HttpPost("/message-queue/publish")] + public async Task PublishScheduledMessage([FromBody] PublishScheduledMessageRequest request) + { + if (request == null) + { + return BadRequest(new PublishMessageResponse { Success = false, Error = "Request body is required." }); + } + + try + { + var payload = new ScheduledMessagePayload + { + Name = request.Name ?? "Hello" + }; + + var success = await _mqService.PublishAsync( + payload, + options: new() + { + TopicName = "my.exchange", + RoutingKey = "my.routing", + DelayMilliseconds = request.DelayMilliseconds ?? 10000, + MessageId = request.MessageId + }); + return Ok(new { Success = success }); + } + catch (Exception ex) + { + _logger.LogError(ex, "Failed to publish scheduled message"); + return StatusCode(StatusCodes.Status500InternalServerError, + new PublishMessageResponse { Success = false, Error = ex.Message }); + } + } + + /// + /// Unsubscribe a consumer + /// + /// + /// + [HttpPost("/message-queue/unsubscribe/consumer")] + public async Task UnSubscribeConsuer([FromBody] UnsubscribeConsumerRequest request) + { + if (request == null) + { + return BadRequest(new { Success = false, Error = "Request body is required." }); + } + + try + { + var success = await _mqService.UnsubscribeAsync(request.Name); + return Ok(new { Success = success }); + } + catch (Exception ex) + { + _logger.LogError(ex, $"Failed to unsubscribe consumer {request.Name}"); + return StatusCode(StatusCodes.Status500InternalServerError, + new { Success = false, Error = ex.Message }); + } + } +} + diff --git a/src/Plugins/BotSharp.Plugin.RabbitMQ/Interfaces/IRabbitMQConnection.cs b/src/Plugins/BotSharp.Plugin.RabbitMQ/Interfaces/IRabbitMQConnection.cs new file mode 100644 index 000000000..cb89c2976 --- /dev/null +++ b/src/Plugins/BotSharp.Plugin.RabbitMQ/Interfaces/IRabbitMQConnection.cs @@ -0,0 +1,11 @@ +using RabbitMQ.Client; + +namespace BotSharp.Plugin.RabbitMQ.Interfaces; + +public interface IRabbitMQConnection : IDisposable +{ + bool IsConnected { get; } + IConnection Connection { get; } + Task CreateChannelAsync(); + Task ConnectAsync(); +} diff --git a/src/Plugins/BotSharp.Plugin.RabbitMQ/Models/PublishDelayedMessageRequest.cs b/src/Plugins/BotSharp.Plugin.RabbitMQ/Models/PublishDelayedMessageRequest.cs new file mode 100644 index 000000000..ad655b795 --- /dev/null +++ b/src/Plugins/BotSharp.Plugin.RabbitMQ/Models/PublishDelayedMessageRequest.cs @@ -0,0 +1,31 @@ +namespace BotSharp.Plugin.RabbitMQ.Models; + +/// +/// Request model for publishing a scheduled message +/// +public class PublishScheduledMessageRequest +{ + public string? Name { get; set; } + + public long? DelayMilliseconds { get; set; } + + public string? MessageId { get; set; } +} + + +/// +/// Response model for publish operations +/// +public class PublishMessageResponse +{ + /// + /// Whether the message was successfully published + /// + public bool Success { get; set; } + + /// + /// Error message if publish failed + /// + public string? Error { get; set; } +} + diff --git a/src/Plugins/BotSharp.Plugin.RabbitMQ/Models/RabbitMQConsumerConfig.cs b/src/Plugins/BotSharp.Plugin.RabbitMQ/Models/RabbitMQConsumerConfig.cs new file mode 100644 index 000000000..93754d455 --- /dev/null +++ b/src/Plugins/BotSharp.Plugin.RabbitMQ/Models/RabbitMQConsumerConfig.cs @@ -0,0 +1,24 @@ +namespace BotSharp.Plugin.RabbitMQ.Models; + +internal class RabbitMQConsumerConfig +{ + /// + /// The exchange name (topic in some MQ systems). + /// + internal string ExchangeName { get; set; } = "rabbitmq.exchange"; + + /// + /// The queue name (subscription in some MQ systems). + /// + internal string QueueName { get; set; } = "rabbitmq.queue"; + + /// + /// The routing key (filter in some MQ systems). + /// + internal string RoutingKey { get; set; } = "rabbitmq.routing"; + + /// + /// Additional arguments for the consumer configuration. + /// + internal Dictionary Arguments { get; set; } = new(); +} diff --git a/src/Plugins/BotSharp.Plugin.RabbitMQ/Models/ScheduledMessagePayload.cs b/src/Plugins/BotSharp.Plugin.RabbitMQ/Models/ScheduledMessagePayload.cs new file mode 100644 index 000000000..2180fb2d7 --- /dev/null +++ b/src/Plugins/BotSharp.Plugin.RabbitMQ/Models/ScheduledMessagePayload.cs @@ -0,0 +1,9 @@ +namespace BotSharp.Plugin.RabbitMQ.Models; + +/// +/// Payload for scheduled/delayed messages +/// +public class ScheduledMessagePayload +{ + public string Name { get; set; } +} diff --git a/src/Plugins/BotSharp.Plugin.RabbitMQ/Models/UnsubscribeConsumerRequest.cs b/src/Plugins/BotSharp.Plugin.RabbitMQ/Models/UnsubscribeConsumerRequest.cs new file mode 100644 index 000000000..509d432b2 --- /dev/null +++ b/src/Plugins/BotSharp.Plugin.RabbitMQ/Models/UnsubscribeConsumerRequest.cs @@ -0,0 +1,6 @@ +namespace BotSharp.Plugin.RabbitMQ.Models; + +public class UnsubscribeConsumerRequest +{ + public string Name { get; set; } +} diff --git a/src/Plugins/BotSharp.Plugin.RabbitMQ/RabbitMQPlugin.cs b/src/Plugins/BotSharp.Plugin.RabbitMQ/RabbitMQPlugin.cs new file mode 100644 index 000000000..ff45dfe48 --- /dev/null +++ b/src/Plugins/BotSharp.Plugin.RabbitMQ/RabbitMQPlugin.cs @@ -0,0 +1,56 @@ +using BotSharp.Plugin.RabbitMQ.Services; +using Microsoft.AspNetCore.Builder; +using Microsoft.Extensions.Configuration; + +namespace BotSharp.Plugin.RabbitMQ; + +public class RabbitMQPlugin : IBotSharpAppPlugin +{ + public string Id => "3f93407f-3c37-4e25-be28-142a2da9b514"; + public string Name => "RabbitMQ"; + public string Description => "Handle AI messages in RabbitMQ."; + public string IconUrl => "https://icon-library.com/images/message-queue-icon/message-queue-icon-13.jpg"; + + public void RegisterDI(IServiceCollection services, IConfiguration config) + { + var settings = new RabbitMQSettings(); + config.Bind("RabbitMQ", settings); + services.AddSingleton(settings); + + var mqSettings = new MessageQueueSettings(); + config.Bind("MessageQueue", mqSettings); + + if (mqSettings.Enabled && mqSettings.Provider.IsEqualTo("RabbitMQ")) + { + services.AddSingleton(); + services.AddSingleton(); + } + } + + public void Configure(IApplicationBuilder app) + { +#if DEBUG + var sp = app.ApplicationServices; + var mqSettings = sp.GetRequiredService(); + + if (mqSettings.Enabled && mqSettings.Provider.IsEqualTo("RabbitMQ")) + { + var mqService = sp.GetRequiredService(); + var loggerFactory = sp.GetRequiredService(); + + // Create and subscribe the consumer using the abstract interface + var scheduledConsumer = new ScheduledMessageConsumer(sp, loggerFactory.CreateLogger()); + mqService.SubscribeAsync(nameof(ScheduledMessageConsumer), scheduledConsumer) + .ConfigureAwait(false) + .GetAwaiter() + .GetResult(); + + var dummyConsumer = new DummyMessageConsumer(sp, loggerFactory.CreateLogger()); + mqService.SubscribeAsync(nameof(DummyMessageConsumer), dummyConsumer) + .ConfigureAwait(false) + .GetAwaiter() + .GetResult(); + } +#endif + } +} \ No newline at end of file diff --git a/src/Plugins/BotSharp.Plugin.RabbitMQ/Services/RabbitMQService.cs b/src/Plugins/BotSharp.Plugin.RabbitMQ/Services/RabbitMQService.cs new file mode 100644 index 000000000..0117bad14 --- /dev/null +++ b/src/Plugins/BotSharp.Plugin.RabbitMQ/Services/RabbitMQService.cs @@ -0,0 +1,318 @@ +using Polly; +using Polly.Retry; +using RabbitMQ.Client; +using RabbitMQ.Client.Events; +using System.Collections.Concurrent; + +namespace BotSharp.Plugin.RabbitMQ.Services; + +public class RabbitMQService : IMQService +{ + private readonly IRabbitMQConnection _mqConnection; + private readonly IServiceProvider _services; + private readonly ILogger _logger; + + private readonly int _retryCount = 5; + private bool _disposed = false; + private static readonly ConcurrentDictionary _consumers = []; + + public RabbitMQService( + IRabbitMQConnection mqConnection, + IServiceProvider services, + ILogger logger) + { + _mqConnection = mqConnection; + _services = services; + _logger = logger; + } + + public async Task SubscribeAsync(string key, IMQConsumer consumer) + { + if (_consumers.ContainsKey(key)) + { + _logger.LogWarning($"Consumer with key '{key}' is already subscribed."); + return false; + } + + var registration = await CreateConsumerRegistrationAsync(consumer); + if (registration != null && _consumers.TryAdd(key, registration)) + { + var config = consumer.Config as RabbitMQConsumerConfig ?? new(); + _logger.LogInformation($"Consumer '{key}' subscribed to queue '{config.QueueName}'."); + return true; + } + + return false; + } + + public async Task UnsubscribeAsync(string key) + { + if (!_consumers.TryRemove(key, out var registration)) + { + return false; + } + + try + { + if (registration.Channel != null) + { + registration.Channel.Dispose(); + } + registration.Consumer.Dispose(); + _logger.LogInformation($"Consumer '{key}' unsubscribed."); + return true; + } + catch (Exception ex) + { + _logger.LogError(ex, $"Error unsubscribing consumer '{key}'."); + return false; + } + } + + private async Task CreateConsumerRegistrationAsync(IMQConsumer consumer) + { + try + { + var channel = await CreateChannelAsync(consumer); + + var config = consumer.Config as RabbitMQConsumerConfig ?? new(); + var registration = new ConsumerRegistration(consumer, channel); + + var asyncConsumer = new AsyncEventingBasicConsumer(channel); + asyncConsumer.ReceivedAsync += async (sender, eventArgs) => + { + await ConsumeEventAsync(registration, eventArgs); + }; + + await channel.BasicConsumeAsync( + queue: config.QueueName, + autoAck: false, + consumer: asyncConsumer); + + _logger.LogWarning($"RabbitMQ consuming queue '{config.QueueName}'."); + return registration; + } + catch (Exception ex) + { + _logger.LogError(ex, $"Error when register consumer in RabbitMQ."); + return null; + } + } + + private async Task CreateChannelAsync(IMQConsumer consumer) + { + if (!_mqConnection.IsConnected) + { + await _mqConnection.ConnectAsync(); + } + + var config = consumer.Config as RabbitMQConsumerConfig ?? new(); + var channel = await _mqConnection.CreateChannelAsync(); + _logger.LogWarning($"Created RabbitMQ channel {channel.ChannelNumber} for queue '{config.QueueName}'"); + + var args = new Dictionary + { + ["x-delayed-type"] = "direct" + }; + + if (config.Arguments != null) + { + foreach (var kvp in config.Arguments) + { + args[kvp.Key] = kvp.Value; + } + } + + await channel.ExchangeDeclareAsync( + exchange: config.ExchangeName, + type: "x-delayed-message", + durable: true, + autoDelete: false, + arguments: args); + + await channel.QueueDeclareAsync( + queue: config.QueueName, + durable: true, + exclusive: false, + autoDelete: false); + + await channel.QueueBindAsync( + queue: config.QueueName, + exchange: config.ExchangeName, + routingKey: config.RoutingKey); + + return channel; + } + + private async Task ConsumeEventAsync(ConsumerRegistration registration, BasicDeliverEventArgs eventArgs) + { + var data = string.Empty; + var config = registration.Consumer.Config as RabbitMQConsumerConfig ?? new(); + + try + { + data = Encoding.UTF8.GetString(eventArgs.Body.Span); + _logger.LogInformation($"Message received on '{config.QueueName}', id: {eventArgs.BasicProperties?.MessageId}, data: {data}"); + + var isHandled = await registration.Consumer.HandleMessageAsync(config.QueueName, data); + if (registration.Channel?.IsOpen == true) + { + if (isHandled) + { + await registration.Channel.BasicAckAsync(eventArgs.DeliveryTag, multiple: false); + } + else + { + await registration.Channel.BasicNackAsync(eventArgs.DeliveryTag, multiple: false, requeue: false); + } + } + } + catch (Exception ex) + { + _logger.LogError(ex, $"Error consuming message on queue '{config.QueueName}': {data}"); + if (registration.Channel?.IsOpen == true) + { + await registration.Channel.BasicNackAsync(eventArgs.DeliveryTag, multiple: false, requeue: false); + } + } + } + + public async Task PublishAsync(T payload, MQPublishOptions options) + { + try + { + if (options == null) + { + return false; + } + + if (!_mqConnection.IsConnected) + { + await _mqConnection.ConnectAsync(); + } + + var isPublished = false; + var policy = BuildRetryPolicy(); + await policy.Execute(async () => + { + var channelPool = RabbitMQChannelPoolFactory.GetChannelPool(_services, _mqConnection); + var channel = channelPool.Get(); + + try + { + var args = new Dictionary + { + ["x-delayed-type"] = "direct" + }; + + if (!options.Arguments.IsNullOrEmpty()) + { + foreach (var kvp in options.Arguments) + { + args[kvp.Key] = kvp.Value; + } + } + + await channel.ExchangeDeclareAsync( + exchange: options.TopicName, + type: "x-delayed-message", + durable: true, + autoDelete: false, + arguments: args); + + var messageId = options.MessageId ?? Guid.NewGuid().ToString(); + var message = new MQMessage(payload, messageId); + var body = ConvertToBinary(message, options.JsonOptions); + var properties = new BasicProperties + { + MessageId = messageId, + DeliveryMode = DeliveryModes.Persistent, + Headers = new Dictionary + { + ["x-delay"] = options.DelayMilliseconds + } + }; + + await channel.BasicPublishAsync( + exchange: options.TopicName, + routingKey: options.RoutingKey, + mandatory: true, + basicProperties: properties, + body: body); + + isPublished = true; + } + catch (Exception) + { + throw; + } + finally + { + channelPool.Return(channel); + } + }); + + return isPublished; + } + catch (Exception ex) + { + _logger.LogError(ex, $"Error when RabbitMQ publish message."); + return false; + } + } + + private RetryPolicy BuildRetryPolicy() + { + return Policy.Handle().WaitAndRetry( + _retryCount, + retryAttempt => TimeSpan.FromSeconds(Math.Pow(2, retryAttempt)), + (ex, time) => + { + _logger.LogError(ex, $"RabbitMQ publish error: after {time.TotalSeconds:n1}s"); + }); + } + + private static byte[] ConvertToBinary(T data, JsonSerializerOptions? jsonOptions = null) + { + var jsonStr = JsonSerializer.Serialize(data, jsonOptions); + var body = Encoding.UTF8.GetBytes(jsonStr); + return body; + } + + public void Dispose() + { + if (_disposed) + { + return; + } + + _logger.LogWarning($"Disposing {nameof(RabbitMQService)}"); + + foreach (var item in _consumers) + { + if (item.Value.Channel != null) + { + item.Value.Channel.Dispose(); + } + item.Value.Consumer.Dispose(); + } + + _disposed = true; + GC.SuppressFinalize(this); + } + + /// + /// Internal class to track consumer registrations with their RabbitMQ channels. + /// + private class ConsumerRegistration + { + public IMQConsumer Consumer { get; } + public IChannel? Channel { get; } + + public ConsumerRegistration(IMQConsumer consumer, IChannel? channel) + { + Consumer = consumer; + Channel = channel; + } + } +} diff --git a/src/Plugins/BotSharp.Plugin.RabbitMQ/Settings/RabbitMQSettings.cs b/src/Plugins/BotSharp.Plugin.RabbitMQ/Settings/RabbitMQSettings.cs new file mode 100644 index 000000000..0e61b5c71 --- /dev/null +++ b/src/Plugins/BotSharp.Plugin.RabbitMQ/Settings/RabbitMQSettings.cs @@ -0,0 +1,10 @@ +namespace BotSharp.Plugin.RabbitMQ.Settings; + +public class RabbitMQSettings +{ + public string HostName { get; set; } = "localhost"; + public int Port { get; set; } = 5672; + public string UserName { get; set; } = "guest"; + public string Password { get; set; } = "guest"; + public string VirtualHost { get; set; } = "/"; +} diff --git a/src/Plugins/BotSharp.Plugin.RabbitMQ/Using.cs b/src/Plugins/BotSharp.Plugin.RabbitMQ/Using.cs new file mode 100644 index 000000000..0a8a8c3a5 --- /dev/null +++ b/src/Plugins/BotSharp.Plugin.RabbitMQ/Using.cs @@ -0,0 +1,38 @@ +global using System; +global using System.Collections.Generic; +global using System.Text; +global using System.Linq; +global using System.Text.Json; +global using System.Net.Mime; +global using System.Threading.Tasks; +global using Microsoft.Extensions.DependencyInjection; +global using Microsoft.Extensions.Logging; +global using BotSharp.Abstraction.Agents; +global using BotSharp.Abstraction.Conversations; +global using BotSharp.Abstraction.Plugins; +global using BotSharp.Abstraction.Conversations.Models; +global using BotSharp.Abstraction.Functions; +global using BotSharp.Abstraction.Agents.Models; +global using BotSharp.Abstraction.Agents.Enums; +global using BotSharp.Abstraction.Files.Enums; +global using BotSharp.Abstraction.Files.Models; +global using BotSharp.Abstraction.Files.Converters; +global using BotSharp.Abstraction.Files; +global using BotSharp.Abstraction.MLTasks; +global using BotSharp.Abstraction.Utilities; +global using BotSharp.Abstraction.Agents.Settings; +global using BotSharp.Abstraction.Functions.Models; +global using BotSharp.Abstraction.Repositories; +global using BotSharp.Abstraction.Settings; +global using BotSharp.Abstraction.Messaging; +global using BotSharp.Abstraction.Messaging.Models.RichContent; +global using BotSharp.Abstraction.Options; +global using BotSharp.Abstraction.Models; +global using BotSharp.Abstraction.Infrastructures.MessageQueues; +global using BotSharp.Abstraction.Infrastructures.MessageQueues.Models; + +global using BotSharp.Plugin.RabbitMQ.Settings; +global using BotSharp.Plugin.RabbitMQ.Models; +global using BotSharp.Plugin.RabbitMQ.Interfaces; +global using BotSharp.Plugin.RabbitMQ.Consumers; +global using BotSharp.Plugin.RabbitMQ.Connections; \ No newline at end of file diff --git a/src/WebStarter/WebStarter.csproj b/src/WebStarter/WebStarter.csproj index 9374d95fd..be332a38e 100644 --- a/src/WebStarter/WebStarter.csproj +++ b/src/WebStarter/WebStarter.csproj @@ -42,6 +42,7 @@ + diff --git a/src/WebStarter/appsettings.json b/src/WebStarter/appsettings.json index 39587b64e..b3c2b35e9 100644 --- a/src/WebStarter/appsettings.json +++ b/src/WebStarter/appsettings.json @@ -1006,6 +1006,7 @@ "Language": "en" } }, + "A2AIntegration": { "Enabled": true, "DefaultTimeoutSeconds": 30, @@ -1018,12 +1019,27 @@ } ] }, + + "MessageQueue": { + "Enabled": false, + "Provider": "RabbitMQ" + }, + + "RabbitMQ": { + "HostName": "localhost", + "Port": 5672, + "UserName": "guest", + "Password": "guest", + "VirtualHost": "/" + }, + "PluginLoader": { "Assemblies": [ "BotSharp.Core", "BotSharp.Core.A2A", "BotSharp.Core.SideCar", "BotSharp.Core.Crontab", + "BotSharp.Core.Rules", "BotSharp.Core.Realtime", "BotSharp.Logger", "BotSharp.Plugin.MongoStorage", @@ -1061,7 +1077,8 @@ "BotSharp.Plugin.PythonInterpreter", "BotSharp.Plugin.FuzzySharp", "BotSharp.Plugin.MMPEmbedding", - "BotSharp.Plugin.MultiTenancy" + "BotSharp.Plugin.MultiTenancy", + "BotSharp.Plugin.RabbitMQ" ] },