diff --git a/BotSharp.sln b/BotSharp.sln
index ad95f29e8..6abc5b47b 100644
--- a/BotSharp.sln
+++ b/BotSharp.sln
@@ -157,6 +157,8 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "BotSharp.Core.A2A", "src\In
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "BotSharp.Plugin.MultiTenancy", "src\Plugins\BotSharp.Plugin.MultiTenancy\BotSharp.Plugin.MultiTenancy.csproj", "{562DD0C6-DAC8-02CC-C1DD-D43DF186CE76}"
EndProject
+Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "BotSharp.Plugin.RabbitMQ", "src\Plugins\BotSharp.Plugin.RabbitMQ\BotSharp.Plugin.RabbitMQ.csproj", "{8E609A1C-0421-5BB5-DEA9-5FDB68F6D1C5}"
+EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
@@ -669,6 +671,14 @@ Global
{562DD0C6-DAC8-02CC-C1DD-D43DF186CE76}.Release|Any CPU.Build.0 = Release|Any CPU
{562DD0C6-DAC8-02CC-C1DD-D43DF186CE76}.Release|x64.ActiveCfg = Release|Any CPU
{562DD0C6-DAC8-02CC-C1DD-D43DF186CE76}.Release|x64.Build.0 = Release|Any CPU
+ {8E609A1C-0421-5BB5-DEA9-5FDB68F6D1C5}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
+ {8E609A1C-0421-5BB5-DEA9-5FDB68F6D1C5}.Debug|Any CPU.Build.0 = Debug|Any CPU
+ {8E609A1C-0421-5BB5-DEA9-5FDB68F6D1C5}.Debug|x64.ActiveCfg = Debug|Any CPU
+ {8E609A1C-0421-5BB5-DEA9-5FDB68F6D1C5}.Debug|x64.Build.0 = Debug|Any CPU
+ {8E609A1C-0421-5BB5-DEA9-5FDB68F6D1C5}.Release|Any CPU.ActiveCfg = Release|Any CPU
+ {8E609A1C-0421-5BB5-DEA9-5FDB68F6D1C5}.Release|Any CPU.Build.0 = Release|Any CPU
+ {8E609A1C-0421-5BB5-DEA9-5FDB68F6D1C5}.Release|x64.ActiveCfg = Release|Any CPU
+ {8E609A1C-0421-5BB5-DEA9-5FDB68F6D1C5}.Release|x64.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
@@ -745,6 +755,7 @@ Global
{13223C71-9EAC-9835-28ED-5A4833E6F915} = {53E7CD86-0D19-40D9-A0FA-AB4613837E89}
{E8D01281-D52A-BFF4-33DB-E35D91754272} = {E29DC6C4-5E57-48C5-BCB0-6B8F84782749}
{562DD0C6-DAC8-02CC-C1DD-D43DF186CE76} = {51AFE054-AE99-497D-A593-69BAEFB5106F}
+ {8E609A1C-0421-5BB5-DEA9-5FDB68F6D1C5} = {64264688-0F5C-4AB0-8F2B-B59B717CCE00}
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {A9969D89-C98B-40A5-A12B-FC87E55B3A19}
diff --git a/Directory.Packages.props b/Directory.Packages.props
index 1c198a828..96897fb92 100644
--- a/Directory.Packages.props
+++ b/Directory.Packages.props
@@ -10,6 +10,7 @@
+
diff --git a/src/Infrastructure/BotSharp.Abstraction/Agents/Models/AgentRule.cs b/src/Infrastructure/BotSharp.Abstraction/Agents/Models/AgentRule.cs
index 75c0985a8..e0411801e 100644
--- a/src/Infrastructure/BotSharp.Abstraction/Agents/Models/AgentRule.cs
+++ b/src/Infrastructure/BotSharp.Abstraction/Agents/Models/AgentRule.cs
@@ -1,3 +1,5 @@
+using System.Text.Json;
+
namespace BotSharp.Abstraction.Agents.Models;
public class AgentRule
@@ -8,6 +10,58 @@ public class AgentRule
[JsonPropertyName("disabled")]
public bool Disabled { get; set; }
- [JsonPropertyName("criteria")]
- public string Criteria { get; set; } = string.Empty;
+ [JsonPropertyName("rule_criteria")]
+ [JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingNull)]
+ public AgentRuleCriteria? RuleCriteria { get; set; }
+
+ [JsonPropertyName("rule_action")]
+ [JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingNull)]
+ public AgentRuleAction? RuleAction { get; set; }
+}
+
+public class AgentRuleCriteria : AgentRuleConfigBase
+{
+ ///
+ /// Criteria
+ ///
+ [JsonPropertyName("criteria_text")]
+ [JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingNull)]
+ public string CriteriaText { get; set; } = string.Empty;
+
+ ///
+ /// Adaptive configuration for rule criteria.
+ /// This flexible JSON document can store any criteria-specific configuration.
+ /// The structure depends on the criteria executor
+ ///
+ [JsonPropertyName("config")]
+ [JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingNull)]
+ public override JsonDocument? Config { get; set; }
+}
+
+public class AgentRuleAction : AgentRuleConfigBase
+{
+ ///
+ /// Adaptive configuration for rule actions.
+ /// This flexible JSON document can store any action-specific configuration.
+ /// The structure depends on the action type:
+ /// - For "Http" action: contains http_context with base_url, relative_url, method, etc.
+ /// - For "MessageQueue" action: contains mq_config with topic_name, routing_key, etc.
+ /// - For custom actions: can contain any custom configuration structure
+ ///
+ [JsonPropertyName("config")]
+ [JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingNull)]
+ public override JsonDocument? Config { get; set; }
}
+
+public class AgentRuleConfigBase
+{
+ [JsonPropertyName("name")]
+ public virtual string Name { get; set; }
+
+ [JsonPropertyName("disabled")]
+ public virtual bool Disabled { get; set; }
+
+ [JsonPropertyName("config")]
+ [JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingNull)]
+ public virtual JsonDocument? Config { get; set; }
+}
\ No newline at end of file
diff --git a/src/Infrastructure/BotSharp.Abstraction/Infrastructures/MessageQueues/IMQConsumer.cs b/src/Infrastructure/BotSharp.Abstraction/Infrastructures/MessageQueues/IMQConsumer.cs
new file mode 100644
index 000000000..4df43dd0e
--- /dev/null
+++ b/src/Infrastructure/BotSharp.Abstraction/Infrastructures/MessageQueues/IMQConsumer.cs
@@ -0,0 +1,21 @@
+namespace BotSharp.Abstraction.Infrastructures.MessageQueues;
+
+///
+/// Abstract interface for message queue consumers.
+/// Implement this interface to create consumers that are independent of MQ products (e.g., RabbitMQ, Kafka, Azure Service Bus).
+///
+public interface IMQConsumer : IDisposable
+{
+ ///
+ /// Gets the consumer config
+ ///
+ object Config { get; }
+
+ ///
+ /// Handles the received message from the queue.
+ ///
+ /// The consumer channel identifier
+ /// The message data as string
+ /// True if the message was handled successfully, false otherwise
+ Task HandleMessageAsync(string channel, string data);
+}
diff --git a/src/Infrastructure/BotSharp.Abstraction/Infrastructures/MessageQueues/IMQService.cs b/src/Infrastructure/BotSharp.Abstraction/Infrastructures/MessageQueues/IMQService.cs
new file mode 100644
index 000000000..672e539c1
--- /dev/null
+++ b/src/Infrastructure/BotSharp.Abstraction/Infrastructures/MessageQueues/IMQService.cs
@@ -0,0 +1,31 @@
+using BotSharp.Abstraction.Infrastructures.MessageQueues.Models;
+
+namespace BotSharp.Abstraction.Infrastructures.MessageQueues;
+
+public interface IMQService : IDisposable
+{
+ ///
+ /// Subscribe a consumer to the message queue.
+ /// The consumer will be initialized with the appropriate MQ-specific infrastructure.
+ ///
+ /// Unique identifier for the consumer
+ /// The consumer implementing IMQConsumer interface
+ /// Task representing the async subscription operation
+ Task SubscribeAsync(string key, IMQConsumer consumer);
+
+ ///
+ /// Unsubscribe a consumer from the message queue.
+ ///
+ /// Unique identifier for the consumer
+ /// Task representing the async unsubscription operation
+ Task UnsubscribeAsync(string key);
+
+ ///
+ /// Publish payload to message queue
+ ///
+ ///
+ ///
+ ///
+ ///
+ Task PublishAsync(T payload, MQPublishOptions options);
+}
diff --git a/src/Infrastructure/BotSharp.Abstraction/Infrastructures/MessageQueues/MQConsumerBase.cs b/src/Infrastructure/BotSharp.Abstraction/Infrastructures/MessageQueues/MQConsumerBase.cs
new file mode 100644
index 000000000..cd66be1fd
--- /dev/null
+++ b/src/Infrastructure/BotSharp.Abstraction/Infrastructures/MessageQueues/MQConsumerBase.cs
@@ -0,0 +1,51 @@
+using Microsoft.Extensions.Logging;
+
+namespace BotSharp.Abstraction.Infrastructures.MessageQueues;
+
+///
+/// Abstract base class for RabbitMQ consumers.
+/// Implements IMQConsumer to allow other projects to define consumers independently of RabbitMQ.
+/// The RabbitMQ-specific infrastructure is handled by RabbitMQService.
+///
+public abstract class MQConsumerBase : IMQConsumer
+{
+ protected readonly IServiceProvider _services;
+ protected readonly ILogger _logger;
+ private bool _disposed = false;
+
+ ///
+ /// Gets the consumer config for this consumer.
+ /// Override this property to customize exchange, queue and routing configuration.
+ ///
+ public abstract object Config { get; }
+
+ protected MQConsumerBase(
+ IServiceProvider services,
+ ILogger logger)
+ {
+ _services = services;
+ _logger = logger;
+ }
+
+ ///
+ /// Handles the received message from the queue.
+ ///
+ /// The consumer channel identifier
+ /// The message data as string
+ /// True if the message was handled successfully, false otherwise
+ public abstract Task HandleMessageAsync(string channel, string data);
+
+ public void Dispose()
+ {
+ if (_disposed)
+ {
+ return;
+ }
+
+ var consumerName = GetType().Name;
+ _logger.LogWarning($"Disposing consumer: {consumerName}");
+ _disposed = true;
+ GC.SuppressFinalize(this);
+ }
+}
+
diff --git a/src/Infrastructure/BotSharp.Abstraction/Infrastructures/MessageQueues/MessageQueueSettings.cs b/src/Infrastructure/BotSharp.Abstraction/Infrastructures/MessageQueues/MessageQueueSettings.cs
new file mode 100644
index 000000000..b08a5a054
--- /dev/null
+++ b/src/Infrastructure/BotSharp.Abstraction/Infrastructures/MessageQueues/MessageQueueSettings.cs
@@ -0,0 +1,7 @@
+namespace BotSharp.Abstraction.Infrastructures.MessageQueues;
+
+public class MessageQueueSettings
+{
+ public bool Enabled { get; set; }
+ public string Provider { get; set; }
+}
diff --git a/src/Infrastructure/BotSharp.Abstraction/Infrastructures/MessageQueues/Models/MQMessage.cs b/src/Infrastructure/BotSharp.Abstraction/Infrastructures/MessageQueues/Models/MQMessage.cs
new file mode 100644
index 000000000..e940aff01
--- /dev/null
+++ b/src/Infrastructure/BotSharp.Abstraction/Infrastructures/MessageQueues/Models/MQMessage.cs
@@ -0,0 +1,14 @@
+namespace BotSharp.Abstraction.Infrastructures.MessageQueues.Models;
+
+public class MQMessage
+{
+ public MQMessage(T payload, string messageId)
+ {
+ Payload = payload;
+ MessageId = messageId;
+ }
+
+ public T Payload { get; set; }
+ public string MessageId { get; set; }
+ public DateTime CreateDate { get; set; } = DateTime.UtcNow;
+}
diff --git a/src/Infrastructure/BotSharp.Abstraction/Infrastructures/MessageQueues/Models/MQPublishOptions.cs b/src/Infrastructure/BotSharp.Abstraction/Infrastructures/MessageQueues/Models/MQPublishOptions.cs
new file mode 100644
index 000000000..dead523be
--- /dev/null
+++ b/src/Infrastructure/BotSharp.Abstraction/Infrastructures/MessageQueues/Models/MQPublishOptions.cs
@@ -0,0 +1,40 @@
+using System.Text.Json;
+
+namespace BotSharp.Abstraction.Infrastructures.MessageQueues.Models;
+
+///
+/// Configuration options for publishing messages to a message queue.
+/// These options are MQ-product agnostic and can be adapted by different implementations.
+///
+public class MQPublishOptions
+{
+ ///
+ /// The topic name (exchange in RabbitMQ, topic in Kafka/Azure Service Bus).
+ ///
+ public string TopicName { get; set; } = string.Empty;
+
+ ///
+ /// The routing key (partition key in some MQ systems, used for message routing).
+ ///
+ public string RoutingKey { get; set; } = string.Empty;
+
+ ///
+ /// Delay in milliseconds before the message is delivered.
+ ///
+ public long DelayMilliseconds { get; set; }
+
+ ///
+ /// Optional unique identifier for the message.
+ ///
+ public string? MessageId { get; set; }
+
+ ///
+ /// Additional arguments for the publish configuration (MQ-specific).
+ ///
+ public Dictionary Arguments { get; set; } = [];
+
+ ///
+ /// Json serializer options
+ ///
+ public JsonSerializerOptions? JsonOptions { get; set; }
+}
diff --git a/src/Infrastructure/BotSharp.Abstraction/Rules/Hooks/IRuleTriggerHook.cs b/src/Infrastructure/BotSharp.Abstraction/Rules/Hooks/IRuleTriggerHook.cs
new file mode 100644
index 000000000..9e9817890
--- /dev/null
+++ b/src/Infrastructure/BotSharp.Abstraction/Rules/Hooks/IRuleTriggerHook.cs
@@ -0,0 +1,13 @@
+using BotSharp.Abstraction.Hooks;
+using BotSharp.Abstraction.Rules.Models;
+
+namespace BotSharp.Abstraction.Rules.Hooks;
+
+public interface IRuleTriggerHook : IHookBase
+{
+ Task BeforeRuleCriteriaExecuted(Agent agent, IRuleTrigger trigger, RuleCriteriaContext context) => Task.CompletedTask;
+ Task AfterRuleCriteriaExecuted(Agent agent, IRuleTrigger trigger, RuleCriteriaResult result) => Task.CompletedTask;
+
+ Task BeforeRuleActionExecuted(Agent agent, IRuleTrigger trigger, RuleActionContext context) => Task.CompletedTask;
+ Task AfterRuleActionExecuted(Agent agent, IRuleTrigger trigger, RuleActionResult result) => Task.CompletedTask;
+}
diff --git a/src/Infrastructure/BotSharp.Abstraction/Rules/IRuleAction.cs b/src/Infrastructure/BotSharp.Abstraction/Rules/IRuleAction.cs
index bebac5d6f..9c2bf03d9 100644
--- a/src/Infrastructure/BotSharp.Abstraction/Rules/IRuleAction.cs
+++ b/src/Infrastructure/BotSharp.Abstraction/Rules/IRuleAction.cs
@@ -1,5 +1,29 @@
+using BotSharp.Abstraction.Agents.Models;
+using BotSharp.Abstraction.Conversations.Models;
+using BotSharp.Abstraction.Rules.Models;
+using BotSharp.Abstraction.Rules.Options;
+
namespace BotSharp.Abstraction.Rules;
+///
+/// Base interface for rule actions that can be executed by the RuleEngine
+///
public interface IRuleAction
{
-}
+ ///
+ /// The unique name of the rule action provider
+ ///
+ string Name { get; }
+
+ ///
+ /// Execute the rule action
+ ///
+ /// The agent that triggered the rule
+ /// The rule trigger
+ /// The action context
+ /// The action execution result
+ Task ExecuteAsync(
+ Agent agent,
+ IRuleTrigger trigger,
+ RuleActionContext context);
+}
\ No newline at end of file
diff --git a/src/Infrastructure/BotSharp.Abstraction/Rules/IRuleCriteria.cs b/src/Infrastructure/BotSharp.Abstraction/Rules/IRuleCriteria.cs
index bc5022911..f94e0b36f 100644
--- a/src/Infrastructure/BotSharp.Abstraction/Rules/IRuleCriteria.cs
+++ b/src/Infrastructure/BotSharp.Abstraction/Rules/IRuleCriteria.cs
@@ -1,5 +1,11 @@
+using BotSharp.Abstraction.Rules.Models;
+
namespace BotSharp.Abstraction.Rules;
public interface IRuleCriteria
{
+ string Provider { get; }
+
+ Task ValidateAsync(Agent agent, IRuleTrigger trigger, RuleCriteriaContext context)
+ => Task.FromResult(new RuleCriteriaResult());
}
diff --git a/src/Infrastructure/BotSharp.Abstraction/Rules/Models/RuleActionContext.cs b/src/Infrastructure/BotSharp.Abstraction/Rules/Models/RuleActionContext.cs
new file mode 100644
index 000000000..534130c63
--- /dev/null
+++ b/src/Infrastructure/BotSharp.Abstraction/Rules/Models/RuleActionContext.cs
@@ -0,0 +1,10 @@
+using System.Text.Json;
+
+namespace BotSharp.Abstraction.Rules.Models;
+
+public class RuleActionContext
+{
+ public string Text { get; set; } = string.Empty;
+ public Dictionary Parameters { get; set; } = [];
+ public JsonSerializerOptions? JsonOptions { get; set; }
+}
diff --git a/src/Infrastructure/BotSharp.Abstraction/Rules/Models/RuleActionResult.cs b/src/Infrastructure/BotSharp.Abstraction/Rules/Models/RuleActionResult.cs
new file mode 100644
index 000000000..ffdaeab2e
--- /dev/null
+++ b/src/Infrastructure/BotSharp.Abstraction/Rules/Models/RuleActionResult.cs
@@ -0,0 +1,46 @@
+namespace BotSharp.Abstraction.Rules.Models;
+
+///
+/// Result of a rule action execution
+///
+public class RuleActionResult
+{
+ ///
+ /// Whether the action executed successfully
+ ///
+ public bool Success { get; set; }
+
+ ///
+ /// The conversation ID if a new conversation was created
+ ///
+ public string? ConversationId { get; set; }
+
+ ///
+ /// Response content from the action
+ ///
+ public string? Response { get; set; }
+
+ ///
+ /// Error message if the action failed
+ ///
+ public string? ErrorMessage { get; set; }
+
+ public static RuleActionResult Succeeded(string? response = null)
+ {
+ return new RuleActionResult
+ {
+ Success = true,
+ Response = response
+ };
+ }
+
+ public static RuleActionResult Failed(string errorMessage)
+ {
+ return new RuleActionResult
+ {
+ Success = false,
+ ErrorMessage = errorMessage
+ };
+ }
+}
+
diff --git a/src/Infrastructure/BotSharp.Abstraction/Rules/Models/RuleCriteriaContext.cs b/src/Infrastructure/BotSharp.Abstraction/Rules/Models/RuleCriteriaContext.cs
new file mode 100644
index 000000000..709e7be3e
--- /dev/null
+++ b/src/Infrastructure/BotSharp.Abstraction/Rules/Models/RuleCriteriaContext.cs
@@ -0,0 +1,10 @@
+using System.Text.Json;
+
+namespace BotSharp.Abstraction.Rules.Models;
+
+public class RuleCriteriaContext
+{
+ public string Text { get; set; } = string.Empty;
+ public Dictionary Parameters { get; set; } = [];
+ public JsonSerializerOptions? JsonOptions { get; set; }
+}
diff --git a/src/Infrastructure/BotSharp.Abstraction/Rules/Models/RuleCriteriaResult.cs b/src/Infrastructure/BotSharp.Abstraction/Rules/Models/RuleCriteriaResult.cs
new file mode 100644
index 000000000..fc1df6ceb
--- /dev/null
+++ b/src/Infrastructure/BotSharp.Abstraction/Rules/Models/RuleCriteriaResult.cs
@@ -0,0 +1,19 @@
+namespace BotSharp.Abstraction.Rules.Models;
+
+public class RuleCriteriaResult
+{
+ ///
+ /// Whether the criteria executed successfully
+ ///
+ public bool Success { get; set; }
+
+ ///
+ /// Response content from the action
+ ///
+ public bool IsValid { get; set; }
+
+ ///
+ /// Error message if the criteria failed
+ ///
+ public string? ErrorMessage { get; set; }
+}
diff --git a/src/Infrastructure/BotSharp.Abstraction/Rules/Options/RuleCriteriaOptions.cs b/src/Infrastructure/BotSharp.Abstraction/Rules/Options/RuleCriteriaOptions.cs
new file mode 100644
index 000000000..27b5167f4
--- /dev/null
+++ b/src/Infrastructure/BotSharp.Abstraction/Rules/Options/RuleCriteriaOptions.cs
@@ -0,0 +1,39 @@
+using System.Text.Json;
+
+namespace BotSharp.Abstraction.Rules.Options;
+
+public class RuleCriteriaOptions : CriteriaExecuteOptions
+{
+ ///
+ /// Criteria execution provider
+ ///
+ public string Provider { get; set; } = "botsharp-rule";
+}
+
+public class CriteriaExecuteOptions
+{
+ ///
+ /// Code processor provider
+ ///
+ public string? CodeProcessor { get; set; }
+
+ ///
+ /// Code script name
+ ///
+ public string? CodeScriptName { get; set; }
+
+ ///
+ /// Argument name as an input key to the code script
+ ///
+ public string? ArgumentName { get; set; }
+
+ ///
+ /// Json arguments as an input value to the code script
+ ///
+ public JsonDocument? ArgumentContent { get; set; }
+
+ ///
+ /// Custom parameters
+ ///
+ public Dictionary Parameters { get; set; } = [];
+}
\ No newline at end of file
diff --git a/src/Infrastructure/BotSharp.Abstraction/Rules/Options/RuleTriggerOptions.cs b/src/Infrastructure/BotSharp.Abstraction/Rules/Options/RuleTriggerOptions.cs
index 068052b0b..abba98115 100644
--- a/src/Infrastructure/BotSharp.Abstraction/Rules/Options/RuleTriggerOptions.cs
+++ b/src/Infrastructure/BotSharp.Abstraction/Rules/Options/RuleTriggerOptions.cs
@@ -1,3 +1,4 @@
+using BotSharp.Abstraction.Repositories.Filters;
using System.Text.Json;
namespace BotSharp.Abstraction.Rules.Options;
@@ -5,22 +6,12 @@ namespace BotSharp.Abstraction.Rules.Options;
public class RuleTriggerOptions
{
///
- /// Code processor provider
+ /// Filter agents
///
- public string? CodeProcessor { get; set; }
+ public AgentFilter? AgentFilter { get; set; }
///
- /// Code script name
+ /// Json serializer options
///
- public string? CodeScriptName { get; set; }
-
- ///
- /// Argument name as an input key to the code script
- ///
- public string? ArgumentName { get; set; }
-
- ///
- /// Json arguments as an input value to the code script
- ///
- public JsonDocument? ArgumentContent { get; set; }
+ public JsonSerializerOptions? JsonOptions { get; set; }
}
diff --git a/src/Infrastructure/BotSharp.Abstraction/Utilities/ObjectExtensions.cs b/src/Infrastructure/BotSharp.Abstraction/Utilities/ObjectExtensions.cs
index a36516c8a..9efdf6f42 100644
--- a/src/Infrastructure/BotSharp.Abstraction/Utilities/ObjectExtensions.cs
+++ b/src/Infrastructure/BotSharp.Abstraction/Utilities/ObjectExtensions.cs
@@ -65,4 +65,42 @@ public static class ObjectExtensions
return null;
}
}
+
+ public static T? TryGetValueOrDefault(this IDictionary dict, string key, T? defaultValue = default, JsonSerializerOptions? jsonOptions = null)
+ {
+ return dict.TryGetValue(key, out var value, jsonOptions)
+ ? value!
+ : defaultValue;
+ }
+
+ public static bool TryGetValue(this IDictionary dict, string key, out T? result, JsonSerializerOptions? jsonOptions = null)
+ {
+ result = default;
+
+ if (!dict.TryGetValue(key, out var value) || value is null)
+ {
+ return false;
+ }
+
+ if (value is T t)
+ {
+ result = t;
+ return true;
+ }
+
+ if (value is JsonElement je)
+ {
+ try
+ {
+ result = je.Deserialize(jsonOptions);
+ return true;
+ }
+ catch
+ {
+ return false;
+ }
+ }
+
+ return false;
+ }
}
diff --git a/src/Infrastructure/BotSharp.Core.Rules/Constants/RuleConstant.cs b/src/Infrastructure/BotSharp.Core.Rules/Constants/RuleConstant.cs
new file mode 100644
index 000000000..e9c180120
--- /dev/null
+++ b/src/Infrastructure/BotSharp.Core.Rules/Constants/RuleConstant.cs
@@ -0,0 +1,7 @@
+namespace BotSharp.Core.Rules.Constants;
+
+public static class RuleConstant
+{
+ public const string DEFAULT_CRITERIA_PROVIDER = "BotSharp-code-script";
+ public const string DEFAULT_ACTION_NAME = "BotSharp-chat";
+}
diff --git a/src/Infrastructure/BotSharp.Core.Rules/Controllers/RuleController.cs b/src/Infrastructure/BotSharp.Core.Rules/Controllers/RuleController.cs
new file mode 100644
index 000000000..feb314ef7
--- /dev/null
+++ b/src/Infrastructure/BotSharp.Core.Rules/Controllers/RuleController.cs
@@ -0,0 +1,42 @@
+using BotSharp.Core.Rules.Models;
+using Microsoft.AspNetCore.Authorization;
+using Microsoft.AspNetCore.Mvc;
+
+namespace BotSharp.Core.Rules.Controllers;
+
+[Authorize]
+[ApiController]
+public class RuleController : ControllerBase
+{
+ private readonly IServiceProvider _services;
+ private readonly ILogger _logger;
+ private readonly IRuleEngine _ruleEngine;
+
+ public RuleController(
+ IServiceProvider services,
+ ILogger logger,
+ IRuleEngine ruleEngine)
+ {
+ _services = services;
+ _logger = logger;
+ _ruleEngine = ruleEngine;
+ }
+
+ [HttpPost("/rule/trigger/action")]
+ public async Task RunAction([FromBody] RuleTriggerActionRequest request)
+ {
+ if (request == null)
+ {
+ return BadRequest(new { Success = false, Error = "Request cannnot be empty." });
+ }
+
+ var trigger = _services.GetServices().FirstOrDefault(x => x.Name.IsEqualTo(request.TriggerName));
+ if (trigger == null)
+ {
+ return BadRequest(new { Success = false, Error = "Unable to find rule trigger." });
+ }
+
+ var result = await _ruleEngine.Triggered(trigger, request.Text, request.States, request.Options);
+ return Ok(new { Success = true });
+ }
+}
diff --git a/src/Infrastructure/BotSharp.Core.Rules/Engines/RuleEngine.cs b/src/Infrastructure/BotSharp.Core.Rules/Engines/RuleEngine.cs
index 5f68b722d..53b748ea5 100644
--- a/src/Infrastructure/BotSharp.Core.Rules/Engines/RuleEngine.cs
+++ b/src/Infrastructure/BotSharp.Core.Rules/Engines/RuleEngine.cs
@@ -1,17 +1,3 @@
-using BotSharp.Abstraction.Agents.Models;
-using BotSharp.Abstraction.Coding;
-using BotSharp.Abstraction.Coding.Contexts;
-using BotSharp.Abstraction.Coding.Enums;
-using BotSharp.Abstraction.Coding.Models;
-using BotSharp.Abstraction.Coding.Settings;
-using BotSharp.Abstraction.Coding.Utils;
-using BotSharp.Abstraction.Conversations;
-using BotSharp.Abstraction.Hooks;
-using BotSharp.Abstraction.Models;
-using BotSharp.Abstraction.Repositories.Filters;
-using BotSharp.Abstraction.Rules.Options;
-using BotSharp.Abstraction.Utilities;
-using Microsoft.Extensions.Logging;
using System.Data;
using System.Text.Json;
@@ -21,16 +7,13 @@ public class RuleEngine : IRuleEngine
{
private readonly IServiceProvider _services;
private readonly ILogger _logger;
- private readonly CodingSettings _codingSettings;
public RuleEngine(
IServiceProvider services,
- ILogger logger,
- CodingSettings codingSettings)
+ ILogger logger)
{
_services = services;
_logger = logger;
- _codingSettings = codingSettings;
}
public async Task> Triggered(IRuleTrigger trigger, string text, IEnumerable? states = null, RuleTriggerOptions? options = null)
@@ -39,7 +22,7 @@ public async Task> Triggered(IRuleTrigger trigger, string te
// Pull all user defined rules
var agentService = _services.GetRequiredService();
- var agents = await agentService.GetAgents(new AgentFilter
+ var agents = await agentService.GetAgents(options?.AgentFilter ?? new AgentFilter
{
Pager = new Pagination
{
@@ -51,154 +34,210 @@ public async Task> Triggered(IRuleTrigger trigger, string te
var filteredAgents = agents.Items.Where(x => x.Rules.Exists(r => r.TriggerName.IsEqualTo(trigger.Name) && !x.Disabled)).ToList();
foreach (var agent in filteredAgents)
{
- // Code trigger
- if (options != null)
+ var rule = agent.Rules.FirstOrDefault(x => x.TriggerName.IsEqualTo(trigger.Name) && !x.Disabled);
+ if (rule == null)
{
- var isTriggered = await TriggerCodeScript(agent, trigger.Name, options);
- if (!isTriggered)
+ continue;
+ }
+
+ // Criteria validation
+ if (!string.IsNullOrEmpty(rule.RuleCriteria?.Name) && !rule.RuleCriteria.Disabled)
+ {
+ var criteriaResult = await ExecuteCriteriaAsync(agent, rule, trigger, rule.RuleCriteria?.Name, text, states, options);
+ if (criteriaResult?.IsValid == false)
{
+ _logger.LogWarning("Criteria validation failed for agent {AgentId} with trigger {TriggerName}", agent.Id, trigger.Name);
continue;
}
}
-
- var convService = _services.GetRequiredService();
- var conv = await convService.NewConversation(new Conversation
- {
- Channel = trigger.Channel,
- Title = text,
- AgentId = agent.Id
- });
-
- var message = new RoleDialogModel(AgentRole.User, text);
-
- var allStates = new List
+
+ // Execute action
+ if (rule.RuleAction?.Disabled == true)
{
- new("channel", trigger.Channel)
- };
+ continue;
+ }
- if (states != null)
+ var actionName = !string.IsNullOrEmpty(rule?.RuleAction?.Name) ? rule.RuleAction.Name : RuleConstant.DEFAULT_ACTION_NAME;
+ var actionResult = await ExecuteActionAsync(agent, rule, trigger, actionName, text, states, options);
+ if (actionResult?.Success == true && !string.IsNullOrEmpty(actionResult.ConversationId))
{
- allStates.AddRange(states);
+ newConversationIds.Add(actionResult.ConversationId);
}
-
- await convService.SetConversationId(conv.Id, allStates);
-
- await convService.SendMessage(agent.Id,
- message,
- null,
- msg => Task.CompletedTask);
-
- await convService.SaveStates();
- newConversationIds.Add(conv.Id);
}
return newConversationIds;
}
- #region Private methods
- private async Task TriggerCodeScript(Agent agent, string triggerName, RuleTriggerOptions options)
+
+ #region Criteria
+ private async Task ExecuteCriteriaAsync(
+ Agent agent,
+ AgentRule rule,
+ IRuleTrigger trigger,
+ string? criteriaProvider,
+ string text,
+ IEnumerable? states,
+ RuleTriggerOptions? triggerOptions)
{
- if (string.IsNullOrWhiteSpace(agent?.Id))
- {
- return false;
- }
+ var result = new RuleCriteriaResult();
- var provider = options.CodeProcessor ?? BuiltInCodeProcessor.PyInterpreter;
- var processor = _services.GetServices().FirstOrDefault(x => x.Provider.IsEqualTo(provider));
- if (processor == null)
+ try
{
- _logger.LogWarning($"Unable to find code processor: {provider}.");
- return false;
- }
+ var criteria = _services.GetServices()
+ .FirstOrDefault(x => x.Provider == criteriaProvider);
- var agentService = _services.GetRequiredService();
- var scriptName = options.CodeScriptName ?? $"{triggerName}_rule.py";
- var codeScript = await agentService.GetAgentCodeScript(agent.Id, scriptName, scriptType: AgentCodeScriptType.Src);
+ if (criteria == null)
+ {
+ return result;
+ }
- var msg = $"rule trigger ({triggerName}) code script ({scriptName}) in agent ({agent.Name}) => args: {options.ArgumentContent?.RootElement.GetRawText()}.";
- if (codeScript == null || string.IsNullOrWhiteSpace(codeScript.Content))
- {
- _logger.LogWarning($"Unable to find {msg}.");
- return false;
- }
+ var context = new RuleCriteriaContext
+ {
+ Text = text,
+ Parameters = BuildContextParameters(rule.RuleCriteria?.Config, states),
+ JsonOptions = triggerOptions?.JsonOptions
+ };
- try
- {
- var hooks = _services.GetHooks(agent.Id);
+ _logger.LogInformation("Start execution rule criteria {CriteriaProvider} for agent {AgentId} with trigger {TriggerName}",
+ criteria.Provider, agent.Id, trigger.Name);
- var arguments = BuildArguments(options.ArgumentName, options.ArgumentContent);
- var context = new CodeExecutionContext
+ var hooks = _services.GetHooks(agent.Id);
+ foreach (var hook in hooks)
{
- CodeScript = codeScript,
- Arguments = arguments
- };
+ await hook.BeforeRuleCriteriaExecuted(agent, trigger, context);
+ }
+
+ // Execute criteria
+ context.Parameters ??= [];
+ result = await criteria.ValidateAsync(agent, trigger, context);
foreach (var hook in hooks)
{
- await hook.BeforeCodeExecution(agent, context);
+ await hook.AfterRuleCriteriaExecuted(agent, trigger, result);
}
- var (useLock, useProcess, timeoutSeconds) = CodingUtil.GetCodeExecutionConfig(_codingSettings);
- using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(timeoutSeconds));
- var response = processor.Run(codeScript.Content, options: new()
+ return result;
+ }
+ catch (Exception ex)
+ {
+ _logger.LogError(ex, "Error executing rule criteria {CriteriaProvider} for agent {AgentId}", criteriaProvider ?? string.Empty, agent.Id);
+ return result;
+ }
+ }
+ #endregion
+
+
+ #region Action
+ private async Task ExecuteActionAsync(
+ Agent agent,
+ AgentRule rule,
+ IRuleTrigger trigger,
+ string? actionName,
+ string text,
+ IEnumerable? states,
+ RuleTriggerOptions? triggerOptions)
+ {
+ try
+ {
+ // Get all registered rule actions
+ var actions = _services.GetServices();
+
+ // Find the matching action
+ var action = actions.FirstOrDefault(x => x.Name.IsEqualTo(actionName));
+
+ if (action == null)
{
- ScriptName = scriptName,
- Arguments = arguments,
- UseLock = useLock,
- UseProcess = useProcess
- }, cancellationToken: cts.Token);
+ var errorMsg = $"No rule action {actionName} is found";
+ _logger.LogWarning(errorMsg);
+ return RuleActionResult.Failed(errorMsg);
+ }
- var codeResponse = new CodeExecutionResponseModel
+ var context = new RuleActionContext
{
- CodeProcessor = processor.Provider,
- CodeScript = codeScript,
- Arguments = arguments.DistinctBy(x => x.Key).ToDictionary(x => x.Key, x => x.Value ?? string.Empty),
- ExecutionResult = response
+ Text = text,
+ Parameters = BuildContextParameters(rule.RuleAction?.Config, states),
+ JsonOptions = triggerOptions?.JsonOptions
};
+ _logger.LogInformation("Start execution rule action {ActionName} for agent {AgentId} with trigger {TriggerName}",
+ action.Name, agent.Id, trigger.Name);
+
+ var hooks = _services.GetHooks(agent.Id);
foreach (var hook in hooks)
{
- await hook.AfterCodeExecution(agent, codeResponse);
+ await hook.BeforeRuleActionExecuted(agent, trigger, context);
}
- if (response == null || !response.Success)
- {
- _logger.LogWarning($"Failed to handle {msg}");
- return false;
- }
+ // Execute action
+ context.Parameters ??= [];
+ var result = await action.ExecuteAsync(agent, trigger, context);
- bool result;
- LogLevel logLevel;
- if (response.Result.IsEqualTo("true"))
- {
- logLevel = LogLevel.Information;
- result = true;
- }
- else
+ foreach (var hook in hooks)
{
- logLevel = LogLevel.Warning;
- result = false;
+ await hook.AfterRuleActionExecuted(agent, trigger, result);
}
- _logger.Log(logLevel, $"Code script execution result ({response}) from {msg}");
return result;
}
catch (Exception ex)
{
- _logger.LogError(ex, $"Error when handling {msg}");
- return false;
+ _logger.LogError(ex, "Error executing rule action {ActionName} for agent {AgentId}", actionName, agent.Id);
+ return RuleActionResult.Failed(ex.Message);
+ }
+ }
+ #endregion
+
+
+ #region Private methods
+ private Dictionary BuildContextParameters(JsonDocument? config, IEnumerable? states)
+ {
+ var dict = new Dictionary();
+
+ if (config != null)
+ {
+ dict = ConvertToDictionary(config);
+ }
+
+ if (!states.IsNullOrEmpty())
+ {
+ foreach (var state in states!)
+ {
+ dict[state.Key] = state.Value;
+ }
}
+
+ return dict;
}
- private List BuildArguments(string? name, JsonDocument? args)
+ private static Dictionary ConvertToDictionary(JsonDocument doc)
{
- var keyValues = new List();
- if (args != null)
+ var dict = new Dictionary();
+
+ foreach (var prop in doc.RootElement.EnumerateObject())
{
- keyValues.Add(new KeyValue(name ?? "trigger_args", args.RootElement.GetRawText()));
+ dict[prop.Name] = prop.Value.ValueKind switch
+ {
+ JsonValueKind.String => prop.Value.GetString(),
+ JsonValueKind.Number when prop.Value.TryGetDecimal(out decimal decimalValue) => decimalValue,
+ JsonValueKind.Number when prop.Value.TryGetDouble(out double doubleValue) => doubleValue,
+ JsonValueKind.Number when prop.Value.TryGetInt32(out int intValue) => intValue,
+ JsonValueKind.Number when prop.Value.TryGetInt64(out long longValue) => longValue,
+ JsonValueKind.Number when prop.Value.TryGetDateTime(out DateTime dateTimeValue) => dateTimeValue,
+ JsonValueKind.Number when prop.Value.TryGetDateTimeOffset(out DateTimeOffset dateTimeOffsetValue) => dateTimeOffsetValue,
+ JsonValueKind.Number when prop.Value.TryGetGuid(out Guid guidValue) => guidValue,
+ JsonValueKind.Number => prop.Value.GetDouble(),
+ JsonValueKind.True => true,
+ JsonValueKind.False => false,
+ JsonValueKind.Null => null,
+ JsonValueKind.Undefined => null,
+ JsonValueKind.Array => prop.Value,
+ JsonValueKind.Object => prop.Value,
+ _ => prop.Value
+ };
}
- return keyValues;
+
+ return dict;
+ #endregion
}
- #endregion
}
diff --git a/src/Infrastructure/BotSharp.Core.Rules/Models/RuleMessagePayload.cs b/src/Infrastructure/BotSharp.Core.Rules/Models/RuleMessagePayload.cs
new file mode 100644
index 000000000..1c5e81d71
--- /dev/null
+++ b/src/Infrastructure/BotSharp.Core.Rules/Models/RuleMessagePayload.cs
@@ -0,0 +1,11 @@
+namespace BotSharp.Core.Rules.Models;
+
+public class RuleMessagePayload
+{
+ public string AgentId { get; set; }
+ public string TriggerName { get; set; }
+ public string Channel { get; set; }
+ public string Text { get; set; }
+ public Dictionary States { get; set; }
+ public DateTime Timestamp { get; set; }
+}
diff --git a/src/Infrastructure/BotSharp.Core.Rules/Models/RuleTriggerActionRequest.cs b/src/Infrastructure/BotSharp.Core.Rules/Models/RuleTriggerActionRequest.cs
new file mode 100644
index 000000000..0abea08b0
--- /dev/null
+++ b/src/Infrastructure/BotSharp.Core.Rules/Models/RuleTriggerActionRequest.cs
@@ -0,0 +1,18 @@
+using System.Text.Json.Serialization;
+
+namespace BotSharp.Core.Rules.Models;
+
+public class RuleTriggerActionRequest
+{
+ [JsonPropertyName("trigger_name")]
+ public string TriggerName { get; set; }
+
+ [JsonPropertyName("text")]
+ public string Text { get; set; } = string.Empty;
+
+ [JsonPropertyName("states")]
+ public IEnumerable? States { get; set; }
+
+ [JsonPropertyName("options")]
+ public RuleTriggerOptions? Options { get; set; }
+}
diff --git a/src/Infrastructure/BotSharp.Core.Rules/RulesPlugin.cs b/src/Infrastructure/BotSharp.Core.Rules/RulesPlugin.cs
index 56e1fb8ae..5b8ade6b0 100644
--- a/src/Infrastructure/BotSharp.Core.Rules/RulesPlugin.cs
+++ b/src/Infrastructure/BotSharp.Core.Rules/RulesPlugin.cs
@@ -1,4 +1,6 @@
using BotSharp.Core.Rules.Engines;
+using BotSharp.Core.Rules.Services.Actions;
+using BotSharp.Core.Rules.Services.Criteria;
namespace BotSharp.Core.Rules;
@@ -17,5 +19,12 @@ public class RulesPlugin : IBotSharpPlugin
public void RegisterDI(IServiceCollection services, IConfiguration config)
{
services.AddScoped();
+ services.AddScoped();
+
+ // Register rule actions
+ services.AddScoped();
+ services.AddScoped();
+ services.AddScoped();
+ services.AddScoped();
}
}
diff --git a/src/Infrastructure/BotSharp.Core.Rules/Services/Actions/ChatRuleAction.cs b/src/Infrastructure/BotSharp.Core.Rules/Services/Actions/ChatRuleAction.cs
new file mode 100644
index 000000000..4c599a3e9
--- /dev/null
+++ b/src/Infrastructure/BotSharp.Core.Rules/Services/Actions/ChatRuleAction.cs
@@ -0,0 +1,72 @@
+namespace BotSharp.Core.Rules.Services.Actions;
+
+public sealed class ChatRuleAction : IRuleAction
+{
+ private readonly IServiceProvider _services;
+ private readonly ILogger _logger;
+
+ public ChatRuleAction(
+ IServiceProvider services,
+ ILogger logger)
+ {
+ _services = services;
+ _logger = logger;
+ }
+
+ public string Name => RuleConstant.DEFAULT_ACTION_NAME;
+
+ public async Task ExecuteAsync(
+ Agent agent,
+ IRuleTrigger trigger,
+ RuleActionContext context)
+ {
+ using var scope = _services.CreateScope();
+ var sp = scope.ServiceProvider;
+
+ try
+ {
+ var channel = trigger.Channel;
+ var convService = sp.GetRequiredService();
+ var conv = await convService.NewConversation(new Conversation
+ {
+ Channel = channel,
+ Title = context.Text,
+ AgentId = agent.Id
+ });
+
+ var message = new RoleDialogModel(AgentRole.User, context.Text);
+
+ var allStates = new List
+ {
+ new("channel", channel)
+ };
+
+ if (!context.Parameters.IsNullOrEmpty())
+ {
+ var states = context.Parameters.Select(x => new MessageState(x.Key, x.Value));
+ allStates.AddRange(states);
+ }
+
+ await convService.SetConversationId(conv.Id, allStates);
+ await convService.SendMessage(agent.Id,
+ message,
+ null,
+ msg => Task.CompletedTask);
+
+ await convService.SaveStates();
+
+ _logger.LogInformation("Chat rule action executed successfully for agent {AgentId}, conversation {ConversationId}", agent.Id, conv.Id);
+
+ return new RuleActionResult
+ {
+ Success = true,
+ ConversationId = conv.Id
+ };
+ }
+ catch (Exception ex)
+ {
+ _logger.LogError(ex, "Error when sending chat via rule action for agent {AgentId} and trigger {TriggerName}", agent.Id, trigger.Name);
+ return RuleActionResult.Failed(ex.Message);
+ }
+ }
+}
diff --git a/src/Infrastructure/BotSharp.Core.Rules/Services/Actions/FunctionCallRuleAction.cs b/src/Infrastructure/BotSharp.Core.Rules/Services/Actions/FunctionCallRuleAction.cs
new file mode 100644
index 000000000..811db8124
--- /dev/null
+++ b/src/Infrastructure/BotSharp.Core.Rules/Services/Actions/FunctionCallRuleAction.cs
@@ -0,0 +1,44 @@
+using BotSharp.Abstraction.Functions;
+
+namespace BotSharp.Core.Rules.Services.Actions;
+
+public sealed class FunctionCallRuleAction : IRuleAction
+{
+ private readonly IServiceProvider _services;
+ private readonly ILogger _logger;
+
+ public FunctionCallRuleAction(
+ IServiceProvider services,
+ ILogger logger)
+ {
+ _services = services;
+ _logger = logger;
+ }
+
+ public string Name => "BotSharp-function-call";
+
+ public async Task ExecuteAsync(
+ Agent agent,
+ IRuleTrigger trigger,
+ RuleActionContext context)
+ {
+ var funcName = context.Parameters.TryGetValueOrDefault("function_name", string.Empty);
+ var func = _services.GetServices().FirstOrDefault(x => x.Name.IsEqualTo(funcName));
+
+ if (func == null)
+ {
+ var errorMsg = $"Unable to find function '{funcName}' when running action {agent.Name}-{trigger.Name}";
+ _logger.LogWarning(errorMsg);
+ return RuleActionResult.Failed(errorMsg);
+ }
+
+ var funcArg = context.Parameters.TryGetValueOrDefault("function_argument") ?? new();
+ await func.Execute(funcArg);
+
+ return new RuleActionResult
+ {
+ Success = true,
+ Response = funcArg?.RichContent?.Message?.Text ?? funcArg?.Content
+ };
+ }
+}
diff --git a/src/Infrastructure/BotSharp.Core.Rules/Services/Actions/HttpRuleAction.cs b/src/Infrastructure/BotSharp.Core.Rules/Services/Actions/HttpRuleAction.cs
new file mode 100644
index 000000000..5e60e6c3f
--- /dev/null
+++ b/src/Infrastructure/BotSharp.Core.Rules/Services/Actions/HttpRuleAction.cs
@@ -0,0 +1,194 @@
+using System.Net.Mime;
+using System.Text.Json;
+using System.Text.Json.Serialization;
+using System.Web;
+
+namespace BotSharp.Core.Rules.Services.Actions;
+
+public sealed class HttpRuleAction : IRuleAction
+{
+ private readonly IServiceProvider _services;
+ private readonly ILogger _logger;
+ private readonly IHttpClientFactory _httpClientFactory;
+
+ private readonly JsonSerializerOptions _defaultJsonOptions = new()
+ {
+ PropertyNameCaseInsensitive = true,
+ PropertyNamingPolicy = JsonNamingPolicy.CamelCase,
+ AllowTrailingCommas = true,
+ ReferenceHandler = ReferenceHandler.IgnoreCycles
+ };
+
+ public HttpRuleAction(
+ IServiceProvider services,
+ ILogger logger,
+ IHttpClientFactory httpClientFactory)
+ {
+ _services = services;
+ _logger = logger;
+ _httpClientFactory = httpClientFactory;
+ }
+
+ public string Name => "BotSharp-http";
+
+ public async Task ExecuteAsync(
+ Agent agent,
+ IRuleTrigger trigger,
+ RuleActionContext context)
+ {
+ try
+ {
+ var httpMethod = GetHttpMethod(context);
+ if (httpMethod == null)
+ {
+ var errorMsg = $"HTTP method is not supported in agent rule {agent.Name}-{trigger.Name}";
+ _logger.LogWarning(errorMsg);
+ return RuleActionResult.Failed(errorMsg);
+ }
+
+ // Build the full URL
+ var fullUrl = BuildUrl(context);
+
+ using var client = _httpClientFactory.CreateClient();
+
+ // Add headers
+ AddHttpHeaders(client, context);
+
+ // Create request
+ var request = new HttpRequestMessage(httpMethod, fullUrl);
+
+ // Add request body if provided
+ var requestBodyStr = GetHttpRequestBody(context);
+ if (!string.IsNullOrEmpty(requestBodyStr))
+ {
+ request.Content = new StringContent(requestBodyStr, Encoding.UTF8, MediaTypeNames.Application.Json);
+ }
+
+ _logger.LogInformation("Executing HTTP rule action for agent {AgentId}, URL: {Url}, Method: {Method}",
+ agent.Id, fullUrl, httpMethod);
+
+ // Send request
+ var response = await client.SendAsync(request);
+ var responseContent = await response.Content.ReadAsStringAsync();
+
+ if (response.IsSuccessStatusCode)
+ {
+ _logger.LogInformation("HTTP rule action executed successfully for agent {AgentId}, Status: {StatusCode}",
+ agent.Id, response.StatusCode);
+
+ return new RuleActionResult
+ {
+ Success = true,
+ Response = responseContent
+ };
+ }
+ else
+ {
+ var errorMsg = $"HTTP request failed with status code {response.StatusCode}: {responseContent}";
+ _logger.LogWarning(errorMsg);
+ return RuleActionResult.Failed(errorMsg);
+ }
+ }
+ catch (Exception ex)
+ {
+ _logger.LogError(ex, "Error executing HTTP rule action for agent {AgentId} and trigger {TriggerName}",
+ agent.Id, trigger.Name);
+ return RuleActionResult.Failed(ex.Message);
+ }
+ }
+
+ private string BuildUrl(RuleActionContext context)
+ {
+ var url = context.Parameters.TryGetValueOrDefault("http_url");
+ if (string.IsNullOrEmpty(url))
+ {
+ throw new ArgumentNullException("Unable to find http_url in context");
+ }
+
+ // Fill in placeholders in url
+ foreach (var param in context.Parameters)
+ {
+ var value = param.Value?.ToString();
+ if (string.IsNullOrEmpty(value))
+ {
+ continue;
+ }
+ url = url.Replace($"{{{param.Key}}}", value);
+ }
+
+ // Add query parameters
+ var queryParams = context.Parameters.TryGetValueOrDefault>("http_query_params");
+ if (!queryParams.IsNullOrEmpty())
+ {
+ var builder = new UriBuilder(url);
+ var query = HttpUtility.ParseQueryString(builder.Query);
+
+ // Add new query params
+ foreach (var kv in queryParams!.Where(x => x.Value != null))
+ {
+ query[kv.Key] = kv.Value!;
+ }
+
+ // Assign merged query back
+ builder.Query = query.ToString();
+ url = builder.ToString();
+ }
+
+ return url;
+ }
+
+ private HttpMethod? GetHttpMethod(RuleActionContext context)
+ {
+ var method = context.Parameters.TryGetValueOrDefault("http_method", string.Empty);
+ var innerMethod = method?.Trim()?.ToUpper();
+ HttpMethod? matchMethod = null;
+
+ switch (innerMethod)
+ {
+ case "GET":
+ matchMethod = HttpMethod.Get;
+ break;
+ case "POST":
+ matchMethod = HttpMethod.Post;
+ break;
+ case "DELETE":
+ matchMethod = HttpMethod.Delete;
+ break;
+ case "PUT":
+ matchMethod = HttpMethod.Put;
+ break;
+ case "PATCH":
+ matchMethod = HttpMethod.Patch;
+ break;
+ default:
+ break;
+
+ }
+
+ return matchMethod;
+ }
+
+ private void AddHttpHeaders(HttpClient client, RuleActionContext context)
+ {
+ var headerParams = context.Parameters.TryGetValueOrDefault>("http_headers");
+ if (!headerParams.IsNullOrEmpty())
+ {
+ foreach (var header in headerParams!)
+ {
+ client.DefaultRequestHeaders.TryAddWithoutValidation(header.Key, header.Value);
+ }
+ }
+ }
+
+ private string? GetHttpRequestBody(RuleActionContext context)
+ {
+ var body = context.Parameters.GetValueOrDefault("http_request_body");
+ if (body == null)
+ {
+ return null;
+ }
+
+ return JsonSerializer.Serialize(body, context.JsonOptions ?? _defaultJsonOptions);
+ }
+}
+
diff --git a/src/Infrastructure/BotSharp.Core.Rules/Services/Actions/MessageQueueRuleAction.cs b/src/Infrastructure/BotSharp.Core.Rules/Services/Actions/MessageQueueRuleAction.cs
new file mode 100644
index 000000000..2f819c6f5
--- /dev/null
+++ b/src/Infrastructure/BotSharp.Core.Rules/Services/Actions/MessageQueueRuleAction.cs
@@ -0,0 +1,129 @@
+using BotSharp.Core.Rules.Models;
+
+namespace BotSharp.Core.Rules.Services.Actions;
+
+public sealed class MessageQueueRuleAction : IRuleAction
+{
+ private readonly IServiceProvider _services;
+ private readonly ILogger _logger;
+
+ public MessageQueueRuleAction(
+ IServiceProvider services,
+ ILogger logger)
+ {
+ _services = services;
+ _logger = logger;
+ }
+
+ public string Name => "BotSharp-message-queue";
+
+ public async Task ExecuteAsync(
+ Agent agent,
+ IRuleTrigger trigger,
+ RuleActionContext context)
+ {
+ try
+ {
+ // Get message queue service
+ var mqService = _services.GetService();
+ if (mqService == null)
+ {
+ var errorMsg = "Message queue service is not configured. Please ensure a message queue provider (e.g., RabbitMQ) is registered.";
+ _logger.LogWarning(errorMsg);
+ return RuleActionResult.Failed(errorMsg);
+ }
+
+ // Create message payload
+ var payload = new RuleMessagePayload
+ {
+ AgentId = agent.Id,
+ TriggerName = trigger.Name,
+ Channel = trigger.Channel,
+ Text = context.Text,
+ Timestamp = DateTime.UtcNow,
+ States = context.Parameters
+ };
+
+ // Publish message to queue
+ var mqOptions = BuildMQPublishOptions(context);
+ var success = await mqService.PublishAsync(payload, mqOptions);
+
+ if (success)
+ {
+ _logger.LogInformation("MessageQueue rule action executed successfully for agent {AgentId}", agent.Id);
+ return new RuleActionResult
+ {
+ Success = true,
+ Response = $"Message published to queue: {mqOptions.TopicName}-{mqOptions.RoutingKey}"
+ };
+ }
+ else
+ {
+ var errorMsg = $"Failed to publish message to queue {mqOptions.TopicName}-{mqOptions.RoutingKey}";
+ _logger.LogWarning(errorMsg);
+ return RuleActionResult.Failed(errorMsg);
+ }
+ }
+ catch (Exception ex)
+ {
+ _logger.LogError(ex, "Error executing MessageQueue rule action for agent {AgentId} and trigger {TriggerName}",
+ agent.Id, trigger.Name);
+ return RuleActionResult.Failed(ex.Message);
+ }
+ }
+
+ private MQPublishOptions BuildMQPublishOptions(RuleActionContext context)
+ {
+ var topicName = context.Parameters.TryGetValueOrDefault("mq_topic_name", string.Empty);
+ var routingKey = context.Parameters.TryGetValueOrDefault("mq_routing_key", string.Empty);
+ var delayMilliseconds = ParseDelay(context);
+
+ return new MQPublishOptions
+ {
+ TopicName = topicName,
+ RoutingKey = routingKey,
+ DelayMilliseconds = delayMilliseconds,
+ JsonOptions = context.JsonOptions
+ };
+ }
+
+ private long ParseDelay(RuleActionContext context)
+ {
+ var qty = (double)context.Parameters.TryGetValueOrDefault("mq_delay_qty", 0);
+ if (qty == 0)
+ {
+ qty = context.Parameters.TryGetValueOrDefault("mq_delay_qty", 0.0);
+ }
+
+ if (qty <= 0)
+ {
+ return 0L;
+ }
+
+ var unit = context.Parameters.TryGetValueOrDefault("mq_delay_unit", string.Empty) ?? string.Empty;
+ unit = unit.ToLower();
+
+ var milliseconds = 0L;
+ switch (unit)
+ {
+ case "second":
+ case "seconds":
+ milliseconds = (long)TimeSpan.FromSeconds(qty).TotalMilliseconds;
+ break;
+ case "minute":
+ case "minutes":
+ milliseconds = (long)TimeSpan.FromMinutes(qty).TotalMilliseconds;
+ break;
+ case "hour":
+ case "hours":
+ milliseconds = (long)TimeSpan.FromHours(qty).TotalMilliseconds;
+ break;
+ case "day":
+ case "days":
+ milliseconds = (long)TimeSpan.FromDays(qty).TotalMilliseconds;
+ break;
+ }
+
+ return milliseconds;
+ }
+}
diff --git a/src/Infrastructure/BotSharp.Core.Rules/Services/Criteria/CodeScriptRuleCriteria.cs b/src/Infrastructure/BotSharp.Core.Rules/Services/Criteria/CodeScriptRuleCriteria.cs
new file mode 100644
index 000000000..ad3489bc6
--- /dev/null
+++ b/src/Infrastructure/BotSharp.Core.Rules/Services/Criteria/CodeScriptRuleCriteria.cs
@@ -0,0 +1,130 @@
+using System.Text.Json;
+
+namespace BotSharp.Core.Rules.Services.Criteria;
+
+public class CodeScriptRuleCriteria : IRuleCriteria
+{
+ private readonly IServiceProvider _services;
+ private readonly ILogger _logger;
+ private readonly CodingSettings _codingSettings;
+
+ public CodeScriptRuleCriteria(
+ IServiceProvider services,
+ ILogger logger,
+ CodingSettings codingSettings)
+ {
+ _services = services;
+ _logger = logger;
+ _codingSettings = codingSettings;
+ }
+
+ public string Provider => RuleConstant.DEFAULT_CRITERIA_PROVIDER;
+
+ public async Task ValidateAsync(Agent agent, IRuleTrigger trigger, RuleCriteriaContext context)
+ {
+ var result = new RuleCriteriaResult();
+
+ if (string.IsNullOrWhiteSpace(agent?.Id))
+ {
+ return result;
+ }
+
+ var provider = context.Parameters.TryGetValueOrDefault("code_processor") ?? BuiltInCodeProcessor.PyInterpreter;
+ var processor = _services.GetServices().FirstOrDefault(x => x.Provider.IsEqualTo(provider));
+ if (processor == null)
+ {
+ _logger.LogWarning($"Unable to find code processor: {provider}.");
+ return result;
+ }
+
+ var agentService = _services.GetRequiredService();
+ var scriptName = context.Parameters.TryGetValueOrDefault("code_script_name") ?? $"{trigger.Name}_rule.py";
+ var codeScript = await agentService.GetAgentCodeScript(agent.Id, scriptName, scriptType: AgentCodeScriptType.Src);
+
+ var msg = $"rule trigger ({trigger.Name}) code script ({scriptName}) in agent ({agent.Name}).";
+
+ if (codeScript == null || string.IsNullOrWhiteSpace(codeScript.Content))
+ {
+ _logger.LogWarning($"Unable to find {msg}.");
+ return result;
+ }
+
+ try
+ {
+ var hooks = _services.GetHooks(agent.Id);
+
+ var argName = context.Parameters.TryGetValueOrDefault("argument_name");
+ var argValue = context.Parameters.TryGetValueOrDefault("argument_value");
+ var arguments = BuildArguments(argName, argValue);
+ var codeExeContext = new CodeExecutionContext
+ {
+ CodeScript = codeScript,
+ Arguments = arguments
+ };
+
+ foreach (var hook in hooks)
+ {
+ await hook.BeforeCodeExecution(agent, codeExeContext);
+ }
+
+ var (useLock, useProcess, timeoutSeconds) = CodingUtil.GetCodeExecutionConfig(_codingSettings);
+ using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(timeoutSeconds));
+ var response = processor.Run(codeScript.Content, options: new()
+ {
+ ScriptName = scriptName,
+ Arguments = arguments,
+ UseLock = useLock,
+ UseProcess = useProcess
+ }, cancellationToken: cts.Token);
+
+ var codeResponse = new CodeExecutionResponseModel
+ {
+ CodeProcessor = processor.Provider,
+ CodeScript = codeScript,
+ Arguments = arguments.DistinctBy(x => x.Key).ToDictionary(x => x.Key, x => x.Value ?? string.Empty),
+ ExecutionResult = response
+ };
+
+ foreach (var hook in hooks)
+ {
+ await hook.AfterCodeExecution(agent, codeResponse);
+ }
+
+ if (response == null || !response.Success)
+ {
+ _logger.LogWarning($"Failed to handle {msg}");
+ return result;
+ }
+
+ LogLevel logLevel;
+ if (response.Result.IsEqualTo("true"))
+ {
+ logLevel = LogLevel.Information;
+ result.Success = true;
+ result.IsValid = true;
+ }
+ else
+ {
+ logLevel = LogLevel.Warning;
+ }
+
+ _logger.Log(logLevel, $"Code script execution result ({response}) from {msg}");
+ return result;
+ }
+ catch (Exception ex)
+ {
+ _logger.LogError(ex, $"Error when handling {msg}");
+ return result;
+ }
+ }
+
+ private List BuildArguments(string? name, JsonElement? args)
+ {
+ var keyValues = new List();
+ if (args != null)
+ {
+ keyValues.Add(new KeyValue(name ?? "trigger_args", args.Value.GetRawText()));
+ }
+ return keyValues;
+ }
+}
diff --git a/src/Infrastructure/BotSharp.Core.Rules/Using.cs b/src/Infrastructure/BotSharp.Core.Rules/Using.cs
index a4353c960..2d1dc6844 100644
--- a/src/Infrastructure/BotSharp.Core.Rules/Using.cs
+++ b/src/Infrastructure/BotSharp.Core.Rules/Using.cs
@@ -1,5 +1,7 @@
global using Microsoft.Extensions.Configuration;
global using Microsoft.Extensions.DependencyInjection;
+global using Microsoft.Extensions.Logging;
+global using System.Text;
global using BotSharp.Abstraction.Agents.Enums;
global using BotSharp.Abstraction.Plugins;
@@ -8,4 +10,24 @@
global using BotSharp.Abstraction.Instructs;
global using BotSharp.Abstraction.Instructs.Models;
-global using BotSharp.Abstraction.Rules;
\ No newline at end of file
+global using BotSharp.Abstraction.Agents.Models;
+global using BotSharp.Abstraction.Conversations;
+
+global using BotSharp.Abstraction.Infrastructures.MessageQueues;
+global using BotSharp.Abstraction.Infrastructures.MessageQueues.Models;
+global using BotSharp.Abstraction.Models;
+global using BotSharp.Abstraction.Repositories.Filters;
+global using BotSharp.Abstraction.Rules;
+global using BotSharp.Abstraction.Rules.Options;
+global using BotSharp.Abstraction.Rules.Models;
+global using BotSharp.Abstraction.Rules.Hooks;
+global using BotSharp.Abstraction.Utilities;
+global using BotSharp.Abstraction.Coding;
+global using BotSharp.Abstraction.Coding.Contexts;
+global using BotSharp.Abstraction.Coding.Enums;
+global using BotSharp.Abstraction.Coding.Models;
+global using BotSharp.Abstraction.Coding.Utils;
+global using BotSharp.Abstraction.Coding.Settings;
+global using BotSharp.Abstraction.Hooks;
+
+global using BotSharp.Core.Rules.Constants;
\ No newline at end of file
diff --git a/src/Infrastructure/BotSharp.Core/Messaging/MessagingPlugin.cs b/src/Infrastructure/BotSharp.Core/Messaging/MessagingPlugin.cs
new file mode 100644
index 000000000..5c84fcb63
--- /dev/null
+++ b/src/Infrastructure/BotSharp.Core/Messaging/MessagingPlugin.cs
@@ -0,0 +1,18 @@
+using BotSharp.Abstraction.Infrastructures.MessageQueues;
+using Microsoft.Extensions.Configuration;
+
+namespace BotSharp.Core.Messaging;
+
+public class MessagingPlugin : IBotSharpPlugin
+{
+ public string Id => "52a0aa30-4820-42a9-9cae-df0be81bad2b";
+ public string Name => "Messaging";
+ public string Description => "Provides message queue services.";
+
+ public void RegisterDI(IServiceCollection services, IConfiguration config)
+ {
+ var mqSettings = new MessageQueueSettings();
+ config.Bind("MessageQueue", mqSettings);
+ services.AddSingleton(mqSettings);
+ }
+}
\ No newline at end of file
diff --git a/src/Infrastructure/BotSharp.OpenAPI/Controllers/Agent/AgentController.Rule.cs b/src/Infrastructure/BotSharp.OpenAPI/Controllers/Agent/AgentController.Rule.cs
index 52fd719fd..366157418 100644
--- a/src/Infrastructure/BotSharp.OpenAPI/Controllers/Agent/AgentController.Rule.cs
+++ b/src/Infrastructure/BotSharp.OpenAPI/Controllers/Agent/AgentController.Rule.cs
@@ -1,4 +1,3 @@
-using BotSharp.Abstraction.Agents.Models;
using BotSharp.Abstraction.Rules;
namespace BotSharp.OpenAPI.Controllers;
@@ -18,9 +17,15 @@ public IEnumerable GetRuleTriggers()
}).OrderBy(x => x.TriggerName);
}
- [HttpGet("/rule/formalization")]
- public async Task GetFormalizedRuleDefinition([FromBody] AgentRule rule)
+ [HttpGet("/rule/criteria-providers")]
+ public async Task> GetRuleCriteriaProviders()
{
- return "{}";
+ return _services.GetServices().Select(x => x.Provider).OrderBy(x => x);
+ }
+
+ [HttpGet("/rule/actions")]
+ public async Task> GetRuleActions()
+ {
+ return _services.GetServices().Select(x => x.Name).OrderBy(x => x);
}
}
diff --git a/src/Plugins/BotSharp.Plugin.Graph/GraphDb.cs b/src/Plugins/BotSharp.Plugin.Graph/GraphDb.cs
index be189898e..8e29bb1bb 100644
--- a/src/Plugins/BotSharp.Plugin.Graph/GraphDb.cs
+++ b/src/Plugins/BotSharp.Plugin.Graph/GraphDb.cs
@@ -84,7 +84,7 @@ private async Task SendRequest(string url, GraphQueryRequest r
}
catch (Exception ex)
{
- _logger.LogError(ex, $"Error when fetching Lessen GLM response (Endpoint: {url}).");
+ _logger.LogError(ex, $"Error when fetching {Provider} Graph db response (Endpoint: {url}).");
return result;
}
}
diff --git a/src/Plugins/BotSharp.Plugin.MongoStorage/Models/AgentRuleMongoElement.cs b/src/Plugins/BotSharp.Plugin.MongoStorage/Models/AgentRuleMongoElement.cs
index 4205fdc46..d031f4a96 100644
--- a/src/Plugins/BotSharp.Plugin.MongoStorage/Models/AgentRuleMongoElement.cs
+++ b/src/Plugins/BotSharp.Plugin.MongoStorage/Models/AgentRuleMongoElement.cs
@@ -1,4 +1,5 @@
using BotSharp.Abstraction.Agents.Models;
+using System.Text.Json;
namespace BotSharp.Plugin.MongoStorage.Models;
@@ -7,7 +8,8 @@ public class AgentRuleMongoElement
{
public string TriggerName { get; set; } = default!;
public bool Disabled { get; set; }
- public string Criteria { get; set; } = default!;
+ public AgentRuleCriteriaMongoModel? RuleCriteria { get; set; }
+ public AgentRuleActionMongoModel? RuleAction { get; set; }
public static AgentRuleMongoElement ToMongoElement(AgentRule rule)
{
@@ -15,7 +17,8 @@ public static AgentRuleMongoElement ToMongoElement(AgentRule rule)
{
TriggerName = rule.TriggerName,
Disabled = rule.Disabled,
- Criteria = rule.Criteria
+ RuleCriteria = AgentRuleCriteriaMongoModel.ToMongoModel(rule.RuleCriteria),
+ RuleAction = AgentRuleActionMongoModel.ToMongoModel(rule.RuleAction)
};
}
@@ -25,7 +28,88 @@ public static AgentRule ToDomainElement(AgentRuleMongoElement rule)
{
TriggerName = rule.TriggerName,
Disabled = rule.Disabled,
- Criteria = rule.Criteria
+ RuleCriteria = AgentRuleCriteriaMongoModel.ToDomainModel(rule.RuleCriteria),
+ RuleAction = AgentRuleActionMongoModel.ToDomainModel(rule.RuleAction)
};
}
}
+
+[BsonIgnoreExtraElements(Inherited = true)]
+public class AgentRuleCriteriaMongoModel : AgentRuleConfigMongoModel
+{
+ public string CriteriaText { get; set; }
+
+ public static AgentRuleCriteriaMongoModel? ToMongoModel(AgentRuleCriteria? criteria)
+ {
+ if (criteria == null)
+ {
+ return null;
+ }
+
+ return new AgentRuleCriteriaMongoModel
+ {
+ Name = criteria.Name,
+ CriteriaText = criteria.CriteriaText,
+ Disabled = criteria.Disabled,
+ Config = criteria.Config != null ? BsonDocument.Parse(criteria.Config.RootElement.GetRawText()) : null
+ };
+ }
+
+ public static AgentRuleCriteria? ToDomainModel(AgentRuleCriteriaMongoModel? criteria)
+ {
+ if (criteria == null)
+ {
+ return null;
+ }
+
+ return new AgentRuleCriteria
+ {
+ Name = criteria.Name,
+ CriteriaText = criteria.CriteriaText,
+ Disabled = criteria.Disabled,
+ Config = criteria.Config != null ? JsonDocument.Parse(criteria.Config.ToJson()) : null
+ };
+ }
+}
+
+[BsonIgnoreExtraElements(Inherited = true)]
+public class AgentRuleActionMongoModel : AgentRuleConfigMongoModel
+{
+ public static AgentRuleActionMongoModel? ToMongoModel(AgentRuleAction? action)
+ {
+ if (action == null)
+ {
+ return null;
+ }
+
+ return new AgentRuleActionMongoModel
+ {
+ Name = action.Name,
+ Disabled = action.Disabled,
+ Config = action.Config != null ? BsonDocument.Parse(action.Config.RootElement.GetRawText()) : null
+ };
+ }
+
+ public static AgentRuleAction? ToDomainModel(AgentRuleActionMongoModel? action)
+ {
+ if (action == null)
+ {
+ return null;
+ }
+
+ return new AgentRuleAction
+ {
+ Name = action.Name,
+ Disabled = action.Disabled,
+ Config = action.Config != null ? JsonDocument.Parse(action.Config.ToJson()) : null
+ };
+ }
+}
+
+[BsonIgnoreExtraElements(Inherited = true)]
+public class AgentRuleConfigMongoModel
+{
+ public string Name { get; set; }
+ public bool Disabled { get; set; }
+ public BsonDocument? Config { get; set; }
+}
\ No newline at end of file
diff --git a/src/Plugins/BotSharp.Plugin.MongoStorage/Repository/MongoRepository.Conversation.cs b/src/Plugins/BotSharp.Plugin.MongoStorage/Repository/MongoRepository.Conversation.cs
index 300700a64..83b2f1658 100644
--- a/src/Plugins/BotSharp.Plugin.MongoStorage/Repository/MongoRepository.Conversation.cs
+++ b/src/Plugins/BotSharp.Plugin.MongoStorage/Repository/MongoRepository.Conversation.cs
@@ -634,8 +634,7 @@ public async Task> TruncateConversation(string conversationId, stri
continue;
}
- var values = state.Values.Where(x => x.MessageId != messageId)
- .Where(x => x.UpdateTime < refTime)
+ var values = state.Values.Where(x => x.MessageId != messageId && x.UpdateTime < refTime)
.ToList();
if (values.Count == 0) continue;
diff --git a/src/Plugins/BotSharp.Plugin.RabbitMQ/BotSharp.Plugin.RabbitMQ.csproj b/src/Plugins/BotSharp.Plugin.RabbitMQ/BotSharp.Plugin.RabbitMQ.csproj
new file mode 100644
index 000000000..4a8f3ff20
--- /dev/null
+++ b/src/Plugins/BotSharp.Plugin.RabbitMQ/BotSharp.Plugin.RabbitMQ.csproj
@@ -0,0 +1,22 @@
+
+
+
+ $(TargetFramework)
+ enable
+ $(LangVersion)
+ $(BotSharpVersion)
+ $(GeneratePackageOnBuild)
+ $(GenerateDocumentationFile)
+ $(SolutionDir)packages
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/src/Plugins/BotSharp.Plugin.RabbitMQ/Connections/RabbitMQChannelPool.cs b/src/Plugins/BotSharp.Plugin.RabbitMQ/Connections/RabbitMQChannelPool.cs
new file mode 100644
index 000000000..81c7de270
--- /dev/null
+++ b/src/Plugins/BotSharp.Plugin.RabbitMQ/Connections/RabbitMQChannelPool.cs
@@ -0,0 +1,73 @@
+using Microsoft.Extensions.ObjectPool;
+using RabbitMQ.Client;
+
+namespace BotSharp.Plugin.RabbitMQ.Connections;
+
+public class RabbitMQChannelPool
+{
+ private readonly ObjectPool _pool;
+ private readonly ILogger _logger;
+ private readonly int _tryLimit = 3;
+
+ public RabbitMQChannelPool(
+ IServiceProvider services,
+ IRabbitMQConnection mqConnection)
+ {
+ _logger = services.GetRequiredService().CreateLogger();
+ var poolProvider = new DefaultObjectPoolProvider();
+ var policy = new ChannelPoolPolicy(mqConnection.Connection);
+ _pool = poolProvider.Create(policy);
+ }
+
+ public IChannel Get()
+ {
+ var count = 0;
+ var channel = _pool.Get();
+
+ while (count < _tryLimit && channel.IsClosed)
+ {
+ channel.Dispose();
+ channel = _pool.Get();
+ count++;
+ }
+
+ if (channel.IsClosed)
+ {
+ _logger.LogWarning($"No open channel from the pool after {_tryLimit} retries.");
+ }
+
+ return channel;
+ }
+
+ public void Return(IChannel channel)
+ {
+ if (channel.IsOpen)
+ {
+ _pool.Return(channel);
+ }
+ else
+ {
+ channel.Dispose();
+ }
+ }
+}
+
+internal class ChannelPoolPolicy : IPooledObjectPolicy
+{
+ private readonly IConnection _connection;
+
+ public ChannelPoolPolicy(IConnection connection)
+ {
+ _connection = connection;
+ }
+
+ public IChannel Create()
+ {
+ return _connection.CreateChannelAsync().ConfigureAwait(false).GetAwaiter().GetResult();
+ }
+
+ public bool Return(IChannel obj)
+ {
+ return true;
+ }
+}
\ No newline at end of file
diff --git a/src/Plugins/BotSharp.Plugin.RabbitMQ/Connections/RabbitMQChannelPoolFactory.cs b/src/Plugins/BotSharp.Plugin.RabbitMQ/Connections/RabbitMQChannelPoolFactory.cs
new file mode 100644
index 000000000..989c0a7b7
--- /dev/null
+++ b/src/Plugins/BotSharp.Plugin.RabbitMQ/Connections/RabbitMQChannelPoolFactory.cs
@@ -0,0 +1,13 @@
+using System.Collections.Concurrent;
+
+namespace BotSharp.Plugin.RabbitMQ.Connections;
+
+public static class RabbitMQChannelPoolFactory
+{
+ private static readonly ConcurrentDictionary _poolDict = new();
+
+ public static RabbitMQChannelPool GetChannelPool(IServiceProvider services, IRabbitMQConnection rabbitMQConnection)
+ {
+ return _poolDict.GetOrAdd(rabbitMQConnection.Connection.ToString()!, key => new RabbitMQChannelPool(services, rabbitMQConnection));
+ }
+}
diff --git a/src/Plugins/BotSharp.Plugin.RabbitMQ/Connections/RabbitMQConnection.cs b/src/Plugins/BotSharp.Plugin.RabbitMQ/Connections/RabbitMQConnection.cs
new file mode 100644
index 000000000..dac9e8c07
--- /dev/null
+++ b/src/Plugins/BotSharp.Plugin.RabbitMQ/Connections/RabbitMQConnection.cs
@@ -0,0 +1,154 @@
+using Polly;
+using Polly.Retry;
+using RabbitMQ.Client;
+using RabbitMQ.Client.Events;
+using System.Threading;
+
+namespace BotSharp.Plugin.RabbitMQ.Connections;
+
+public class RabbitMQConnection : IRabbitMQConnection
+{
+ private readonly RabbitMQSettings _settings;
+ private readonly IConnectionFactory _connectionFactory;
+ private readonly SemaphoreSlim _lock = new(initialCount: 1, maxCount: 1);
+ private readonly ILogger _logger;
+ private readonly int _retryCount = 5;
+
+ private IConnection _connection;
+ private bool _disposed = false;
+
+ public RabbitMQConnection(
+ RabbitMQSettings settings,
+ ILogger logger)
+ {
+ _settings = settings;
+ _logger = logger;
+ _connectionFactory = new ConnectionFactory
+ {
+ HostName = settings.HostName,
+ Port = settings.Port,
+ UserName = settings.UserName,
+ Password = settings.Password,
+ VirtualHost = settings.VirtualHost,
+ ConsumerDispatchConcurrency = 1,
+ AutomaticRecoveryEnabled = true,
+ HandshakeContinuationTimeout = TimeSpan.FromSeconds(20)
+ };
+ }
+
+ public bool IsConnected => _connection != null && _connection.IsOpen && !_disposed;
+
+ public IConnection Connection => _connection;
+
+ public async Task CreateChannelAsync()
+ {
+ if (!IsConnected)
+ {
+ throw new InvalidOperationException("Rabbit MQ is not connectioned.");
+ }
+ return await _connection.CreateChannelAsync();
+ }
+
+ public async Task ConnectAsync()
+ {
+ await _lock.WaitAsync();
+
+ try
+ {
+ if (IsConnected)
+ {
+ return true;
+ }
+
+ var policy = BuildRetryPolicy();
+ await policy.Execute(async () =>
+ {
+ _connection = await _connectionFactory.CreateConnectionAsync();
+ });
+
+ if (IsConnected)
+ {
+ _connection.ConnectionShutdownAsync += OnConnectionShutdownAsync;
+ _connection.CallbackExceptionAsync += OnCallbackExceptionAsync;
+ _connection.ConnectionBlockedAsync += OnConnectionBlockedAsync;
+ _logger.LogInformation($"Rabbit MQ client connection success. host: {_connection.Endpoint.HostName}, port: {_connection.Endpoint.Port}, localPort:{_connection.LocalPort}");
+ return true;
+ }
+ _logger.LogError("Rabbit MQ client connection error.");
+ return false;
+ }
+ finally
+ {
+ _lock.Release();
+ }
+
+ }
+
+ private RetryPolicy BuildRetryPolicy()
+ {
+ return Policy.Handle().WaitAndRetry(
+ _retryCount,
+ retryAttempt => TimeSpan.FromSeconds(Math.Pow(2, retryAttempt)),
+ (ex, time) =>
+ {
+ _logger.LogError(ex, $"RabbitMQ cannot build connection: after {time.TotalSeconds:n1}s");
+ });
+ }
+
+ private Task OnConnectionShutdownAsync(object sender, ShutdownEventArgs e)
+ {
+ if (_disposed)
+ {
+ return Task.CompletedTask;
+ }
+
+ _logger.LogError($"Rabbit MQ connection is shutdown. {e}.");
+ return Task.CompletedTask;
+ }
+
+ private Task OnCallbackExceptionAsync(object sender, CallbackExceptionEventArgs e)
+ {
+ if (_disposed)
+ {
+ return Task.CompletedTask;
+ }
+
+ _logger.LogError($"Rabbit MQ connection throw exception. Trying to reconnect, {e.Exception}.");
+ return Task.CompletedTask;
+ }
+
+ private Task OnConnectionBlockedAsync(object sender, ConnectionBlockedEventArgs e)
+ {
+ if (_disposed)
+ {
+ return Task.CompletedTask;
+ }
+
+ _logger.LogError($"Rabbit MQ connection is shutdown. Trying to reconnect, {e.Reason}.");
+ return Task.CompletedTask;
+ }
+
+
+ public void Dispose()
+ {
+ if (_disposed)
+ {
+ return;
+ }
+
+ _logger.LogWarning("Start disposing Rabbit MQ connection.");
+
+ try
+ {
+ _connection.Dispose();
+ _disposed = true;
+ _logger.LogWarning("Disposed Rabbit MQ connection.");
+ }
+ catch (Exception ex)
+ {
+ _logger.LogError(ex, $"Error when disposing Rabbit MQ connection");
+ }
+
+ GC.SuppressFinalize(this);
+ }
+}
diff --git a/src/Plugins/BotSharp.Plugin.RabbitMQ/Consumers/DummyMessageConsumer.cs b/src/Plugins/BotSharp.Plugin.RabbitMQ/Consumers/DummyMessageConsumer.cs
new file mode 100644
index 000000000..36af0df90
--- /dev/null
+++ b/src/Plugins/BotSharp.Plugin.RabbitMQ/Consumers/DummyMessageConsumer.cs
@@ -0,0 +1,24 @@
+namespace BotSharp.Plugin.RabbitMQ.Consumers;
+
+public class DummyMessageConsumer : MQConsumerBase
+{
+ public override object Config => new RabbitMQConsumerConfig
+ {
+ ExchangeName = "my.exchange",
+ QueueName = "dummy.queue",
+ RoutingKey = "my.routing"
+ };
+
+ public DummyMessageConsumer(
+ IServiceProvider services,
+ ILogger logger)
+ : base(services, logger)
+ {
+ }
+
+ public override async Task HandleMessageAsync(string channel, string data)
+ {
+ _logger.LogCritical($"Received delayed dummy message data: {data}");
+ return await Task.FromResult(true);
+ }
+}
diff --git a/src/Plugins/BotSharp.Plugin.RabbitMQ/Consumers/ScheduledMessageConsumer.cs b/src/Plugins/BotSharp.Plugin.RabbitMQ/Consumers/ScheduledMessageConsumer.cs
new file mode 100644
index 000000000..f6040dcd7
--- /dev/null
+++ b/src/Plugins/BotSharp.Plugin.RabbitMQ/Consumers/ScheduledMessageConsumer.cs
@@ -0,0 +1,25 @@
+namespace BotSharp.Plugin.RabbitMQ.Consumers;
+
+public class ScheduledMessageConsumer : MQConsumerBase
+{
+ public override object Config => new RabbitMQConsumerConfig
+ {
+ ExchangeName = "my.exchange",
+ QueueName = "scheduled.queue",
+ RoutingKey = "my.routing"
+ };
+
+ public ScheduledMessageConsumer(
+ IServiceProvider services,
+ ILogger logger)
+ : base(services, logger)
+ {
+ }
+
+ public override async Task HandleMessageAsync(string channel, string data)
+ {
+ _logger.LogCritical($"Received delayed scheduled message data: {data}");
+ return await Task.FromResult(true);
+ }
+}
+
diff --git a/src/Plugins/BotSharp.Plugin.RabbitMQ/Controllers/RabbitMQController.cs b/src/Plugins/BotSharp.Plugin.RabbitMQ/Controllers/RabbitMQController.cs
new file mode 100644
index 000000000..802e4fa1b
--- /dev/null
+++ b/src/Plugins/BotSharp.Plugin.RabbitMQ/Controllers/RabbitMQController.cs
@@ -0,0 +1,89 @@
+using Microsoft.AspNetCore.Authorization;
+using Microsoft.AspNetCore.Http;
+using Microsoft.AspNetCore.Mvc;
+
+namespace BotSharp.Plugin.RabbitMQ.Controllers;
+
+[Authorize]
+[ApiController]
+public class RabbitMQController : ControllerBase
+{
+ private readonly IServiceProvider _services;
+ private readonly IMQService _mqService;
+ private readonly ILogger _logger;
+
+ public RabbitMQController(
+ IServiceProvider services,
+ IMQService mqService,
+ ILogger logger)
+ {
+ _services = services;
+ _mqService = mqService;
+ _logger = logger;
+ }
+
+ ///
+ /// Publish a scheduled message to be delivered after a delay
+ ///
+ /// The scheduled message request
+ [HttpPost("/message-queue/publish")]
+ public async Task PublishScheduledMessage([FromBody] PublishScheduledMessageRequest request)
+ {
+ if (request == null)
+ {
+ return BadRequest(new PublishMessageResponse { Success = false, Error = "Request body is required." });
+ }
+
+ try
+ {
+ var payload = new ScheduledMessagePayload
+ {
+ Name = request.Name ?? "Hello"
+ };
+
+ var success = await _mqService.PublishAsync(
+ payload,
+ options: new()
+ {
+ TopicName = "my.exchange",
+ RoutingKey = "my.routing",
+ DelayMilliseconds = request.DelayMilliseconds ?? 10000,
+ MessageId = request.MessageId
+ });
+ return Ok(new { Success = success });
+ }
+ catch (Exception ex)
+ {
+ _logger.LogError(ex, "Failed to publish scheduled message");
+ return StatusCode(StatusCodes.Status500InternalServerError,
+ new PublishMessageResponse { Success = false, Error = ex.Message });
+ }
+ }
+
+ ///
+ /// Unsubscribe a consumer
+ ///
+ ///
+ ///
+ [HttpPost("/message-queue/unsubscribe/consumer")]
+ public async Task UnSubscribeConsuer([FromBody] UnsubscribeConsumerRequest request)
+ {
+ if (request == null)
+ {
+ return BadRequest(new { Success = false, Error = "Request body is required." });
+ }
+
+ try
+ {
+ var success = await _mqService.UnsubscribeAsync(request.Name);
+ return Ok(new { Success = success });
+ }
+ catch (Exception ex)
+ {
+ _logger.LogError(ex, $"Failed to unsubscribe consumer {request.Name}");
+ return StatusCode(StatusCodes.Status500InternalServerError,
+ new { Success = false, Error = ex.Message });
+ }
+ }
+}
+
diff --git a/src/Plugins/BotSharp.Plugin.RabbitMQ/Interfaces/IRabbitMQConnection.cs b/src/Plugins/BotSharp.Plugin.RabbitMQ/Interfaces/IRabbitMQConnection.cs
new file mode 100644
index 000000000..cb89c2976
--- /dev/null
+++ b/src/Plugins/BotSharp.Plugin.RabbitMQ/Interfaces/IRabbitMQConnection.cs
@@ -0,0 +1,11 @@
+using RabbitMQ.Client;
+
+namespace BotSharp.Plugin.RabbitMQ.Interfaces;
+
+public interface IRabbitMQConnection : IDisposable
+{
+ bool IsConnected { get; }
+ IConnection Connection { get; }
+ Task CreateChannelAsync();
+ Task ConnectAsync();
+}
diff --git a/src/Plugins/BotSharp.Plugin.RabbitMQ/Models/PublishDelayedMessageRequest.cs b/src/Plugins/BotSharp.Plugin.RabbitMQ/Models/PublishDelayedMessageRequest.cs
new file mode 100644
index 000000000..ad655b795
--- /dev/null
+++ b/src/Plugins/BotSharp.Plugin.RabbitMQ/Models/PublishDelayedMessageRequest.cs
@@ -0,0 +1,31 @@
+namespace BotSharp.Plugin.RabbitMQ.Models;
+
+///
+/// Request model for publishing a scheduled message
+///
+public class PublishScheduledMessageRequest
+{
+ public string? Name { get; set; }
+
+ public long? DelayMilliseconds { get; set; }
+
+ public string? MessageId { get; set; }
+}
+
+
+///
+/// Response model for publish operations
+///
+public class PublishMessageResponse
+{
+ ///
+ /// Whether the message was successfully published
+ ///
+ public bool Success { get; set; }
+
+ ///
+ /// Error message if publish failed
+ ///
+ public string? Error { get; set; }
+}
+
diff --git a/src/Plugins/BotSharp.Plugin.RabbitMQ/Models/RabbitMQConsumerConfig.cs b/src/Plugins/BotSharp.Plugin.RabbitMQ/Models/RabbitMQConsumerConfig.cs
new file mode 100644
index 000000000..93754d455
--- /dev/null
+++ b/src/Plugins/BotSharp.Plugin.RabbitMQ/Models/RabbitMQConsumerConfig.cs
@@ -0,0 +1,24 @@
+namespace BotSharp.Plugin.RabbitMQ.Models;
+
+internal class RabbitMQConsumerConfig
+{
+ ///
+ /// The exchange name (topic in some MQ systems).
+ ///
+ internal string ExchangeName { get; set; } = "rabbitmq.exchange";
+
+ ///
+ /// The queue name (subscription in some MQ systems).
+ ///
+ internal string QueueName { get; set; } = "rabbitmq.queue";
+
+ ///
+ /// The routing key (filter in some MQ systems).
+ ///
+ internal string RoutingKey { get; set; } = "rabbitmq.routing";
+
+ ///
+ /// Additional arguments for the consumer configuration.
+ ///
+ internal Dictionary Arguments { get; set; } = new();
+}
diff --git a/src/Plugins/BotSharp.Plugin.RabbitMQ/Models/ScheduledMessagePayload.cs b/src/Plugins/BotSharp.Plugin.RabbitMQ/Models/ScheduledMessagePayload.cs
new file mode 100644
index 000000000..2180fb2d7
--- /dev/null
+++ b/src/Plugins/BotSharp.Plugin.RabbitMQ/Models/ScheduledMessagePayload.cs
@@ -0,0 +1,9 @@
+namespace BotSharp.Plugin.RabbitMQ.Models;
+
+///
+/// Payload for scheduled/delayed messages
+///
+public class ScheduledMessagePayload
+{
+ public string Name { get; set; }
+}
diff --git a/src/Plugins/BotSharp.Plugin.RabbitMQ/Models/UnsubscribeConsumerRequest.cs b/src/Plugins/BotSharp.Plugin.RabbitMQ/Models/UnsubscribeConsumerRequest.cs
new file mode 100644
index 000000000..509d432b2
--- /dev/null
+++ b/src/Plugins/BotSharp.Plugin.RabbitMQ/Models/UnsubscribeConsumerRequest.cs
@@ -0,0 +1,6 @@
+namespace BotSharp.Plugin.RabbitMQ.Models;
+
+public class UnsubscribeConsumerRequest
+{
+ public string Name { get; set; }
+}
diff --git a/src/Plugins/BotSharp.Plugin.RabbitMQ/RabbitMQPlugin.cs b/src/Plugins/BotSharp.Plugin.RabbitMQ/RabbitMQPlugin.cs
new file mode 100644
index 000000000..ff45dfe48
--- /dev/null
+++ b/src/Plugins/BotSharp.Plugin.RabbitMQ/RabbitMQPlugin.cs
@@ -0,0 +1,56 @@
+using BotSharp.Plugin.RabbitMQ.Services;
+using Microsoft.AspNetCore.Builder;
+using Microsoft.Extensions.Configuration;
+
+namespace BotSharp.Plugin.RabbitMQ;
+
+public class RabbitMQPlugin : IBotSharpAppPlugin
+{
+ public string Id => "3f93407f-3c37-4e25-be28-142a2da9b514";
+ public string Name => "RabbitMQ";
+ public string Description => "Handle AI messages in RabbitMQ.";
+ public string IconUrl => "https://icon-library.com/images/message-queue-icon/message-queue-icon-13.jpg";
+
+ public void RegisterDI(IServiceCollection services, IConfiguration config)
+ {
+ var settings = new RabbitMQSettings();
+ config.Bind("RabbitMQ", settings);
+ services.AddSingleton(settings);
+
+ var mqSettings = new MessageQueueSettings();
+ config.Bind("MessageQueue", mqSettings);
+
+ if (mqSettings.Enabled && mqSettings.Provider.IsEqualTo("RabbitMQ"))
+ {
+ services.AddSingleton();
+ services.AddSingleton();
+ }
+ }
+
+ public void Configure(IApplicationBuilder app)
+ {
+#if DEBUG
+ var sp = app.ApplicationServices;
+ var mqSettings = sp.GetRequiredService();
+
+ if (mqSettings.Enabled && mqSettings.Provider.IsEqualTo("RabbitMQ"))
+ {
+ var mqService = sp.GetRequiredService();
+ var loggerFactory = sp.GetRequiredService();
+
+ // Create and subscribe the consumer using the abstract interface
+ var scheduledConsumer = new ScheduledMessageConsumer(sp, loggerFactory.CreateLogger());
+ mqService.SubscribeAsync(nameof(ScheduledMessageConsumer), scheduledConsumer)
+ .ConfigureAwait(false)
+ .GetAwaiter()
+ .GetResult();
+
+ var dummyConsumer = new DummyMessageConsumer(sp, loggerFactory.CreateLogger());
+ mqService.SubscribeAsync(nameof(DummyMessageConsumer), dummyConsumer)
+ .ConfigureAwait(false)
+ .GetAwaiter()
+ .GetResult();
+ }
+#endif
+ }
+}
\ No newline at end of file
diff --git a/src/Plugins/BotSharp.Plugin.RabbitMQ/Services/RabbitMQService.cs b/src/Plugins/BotSharp.Plugin.RabbitMQ/Services/RabbitMQService.cs
new file mode 100644
index 000000000..0117bad14
--- /dev/null
+++ b/src/Plugins/BotSharp.Plugin.RabbitMQ/Services/RabbitMQService.cs
@@ -0,0 +1,318 @@
+using Polly;
+using Polly.Retry;
+using RabbitMQ.Client;
+using RabbitMQ.Client.Events;
+using System.Collections.Concurrent;
+
+namespace BotSharp.Plugin.RabbitMQ.Services;
+
+public class RabbitMQService : IMQService
+{
+ private readonly IRabbitMQConnection _mqConnection;
+ private readonly IServiceProvider _services;
+ private readonly ILogger _logger;
+
+ private readonly int _retryCount = 5;
+ private bool _disposed = false;
+ private static readonly ConcurrentDictionary _consumers = [];
+
+ public RabbitMQService(
+ IRabbitMQConnection mqConnection,
+ IServiceProvider services,
+ ILogger logger)
+ {
+ _mqConnection = mqConnection;
+ _services = services;
+ _logger = logger;
+ }
+
+ public async Task SubscribeAsync(string key, IMQConsumer consumer)
+ {
+ if (_consumers.ContainsKey(key))
+ {
+ _logger.LogWarning($"Consumer with key '{key}' is already subscribed.");
+ return false;
+ }
+
+ var registration = await CreateConsumerRegistrationAsync(consumer);
+ if (registration != null && _consumers.TryAdd(key, registration))
+ {
+ var config = consumer.Config as RabbitMQConsumerConfig ?? new();
+ _logger.LogInformation($"Consumer '{key}' subscribed to queue '{config.QueueName}'.");
+ return true;
+ }
+
+ return false;
+ }
+
+ public async Task UnsubscribeAsync(string key)
+ {
+ if (!_consumers.TryRemove(key, out var registration))
+ {
+ return false;
+ }
+
+ try
+ {
+ if (registration.Channel != null)
+ {
+ registration.Channel.Dispose();
+ }
+ registration.Consumer.Dispose();
+ _logger.LogInformation($"Consumer '{key}' unsubscribed.");
+ return true;
+ }
+ catch (Exception ex)
+ {
+ _logger.LogError(ex, $"Error unsubscribing consumer '{key}'.");
+ return false;
+ }
+ }
+
+ private async Task CreateConsumerRegistrationAsync(IMQConsumer consumer)
+ {
+ try
+ {
+ var channel = await CreateChannelAsync(consumer);
+
+ var config = consumer.Config as RabbitMQConsumerConfig ?? new();
+ var registration = new ConsumerRegistration(consumer, channel);
+
+ var asyncConsumer = new AsyncEventingBasicConsumer(channel);
+ asyncConsumer.ReceivedAsync += async (sender, eventArgs) =>
+ {
+ await ConsumeEventAsync(registration, eventArgs);
+ };
+
+ await channel.BasicConsumeAsync(
+ queue: config.QueueName,
+ autoAck: false,
+ consumer: asyncConsumer);
+
+ _logger.LogWarning($"RabbitMQ consuming queue '{config.QueueName}'.");
+ return registration;
+ }
+ catch (Exception ex)
+ {
+ _logger.LogError(ex, $"Error when register consumer in RabbitMQ.");
+ return null;
+ }
+ }
+
+ private async Task CreateChannelAsync(IMQConsumer consumer)
+ {
+ if (!_mqConnection.IsConnected)
+ {
+ await _mqConnection.ConnectAsync();
+ }
+
+ var config = consumer.Config as RabbitMQConsumerConfig ?? new();
+ var channel = await _mqConnection.CreateChannelAsync();
+ _logger.LogWarning($"Created RabbitMQ channel {channel.ChannelNumber} for queue '{config.QueueName}'");
+
+ var args = new Dictionary
+ {
+ ["x-delayed-type"] = "direct"
+ };
+
+ if (config.Arguments != null)
+ {
+ foreach (var kvp in config.Arguments)
+ {
+ args[kvp.Key] = kvp.Value;
+ }
+ }
+
+ await channel.ExchangeDeclareAsync(
+ exchange: config.ExchangeName,
+ type: "x-delayed-message",
+ durable: true,
+ autoDelete: false,
+ arguments: args);
+
+ await channel.QueueDeclareAsync(
+ queue: config.QueueName,
+ durable: true,
+ exclusive: false,
+ autoDelete: false);
+
+ await channel.QueueBindAsync(
+ queue: config.QueueName,
+ exchange: config.ExchangeName,
+ routingKey: config.RoutingKey);
+
+ return channel;
+ }
+
+ private async Task ConsumeEventAsync(ConsumerRegistration registration, BasicDeliverEventArgs eventArgs)
+ {
+ var data = string.Empty;
+ var config = registration.Consumer.Config as RabbitMQConsumerConfig ?? new();
+
+ try
+ {
+ data = Encoding.UTF8.GetString(eventArgs.Body.Span);
+ _logger.LogInformation($"Message received on '{config.QueueName}', id: {eventArgs.BasicProperties?.MessageId}, data: {data}");
+
+ var isHandled = await registration.Consumer.HandleMessageAsync(config.QueueName, data);
+ if (registration.Channel?.IsOpen == true)
+ {
+ if (isHandled)
+ {
+ await registration.Channel.BasicAckAsync(eventArgs.DeliveryTag, multiple: false);
+ }
+ else
+ {
+ await registration.Channel.BasicNackAsync(eventArgs.DeliveryTag, multiple: false, requeue: false);
+ }
+ }
+ }
+ catch (Exception ex)
+ {
+ _logger.LogError(ex, $"Error consuming message on queue '{config.QueueName}': {data}");
+ if (registration.Channel?.IsOpen == true)
+ {
+ await registration.Channel.BasicNackAsync(eventArgs.DeliveryTag, multiple: false, requeue: false);
+ }
+ }
+ }
+
+ public async Task PublishAsync(T payload, MQPublishOptions options)
+ {
+ try
+ {
+ if (options == null)
+ {
+ return false;
+ }
+
+ if (!_mqConnection.IsConnected)
+ {
+ await _mqConnection.ConnectAsync();
+ }
+
+ var isPublished = false;
+ var policy = BuildRetryPolicy();
+ await policy.Execute(async () =>
+ {
+ var channelPool = RabbitMQChannelPoolFactory.GetChannelPool(_services, _mqConnection);
+ var channel = channelPool.Get();
+
+ try
+ {
+ var args = new Dictionary
+ {
+ ["x-delayed-type"] = "direct"
+ };
+
+ if (!options.Arguments.IsNullOrEmpty())
+ {
+ foreach (var kvp in options.Arguments)
+ {
+ args[kvp.Key] = kvp.Value;
+ }
+ }
+
+ await channel.ExchangeDeclareAsync(
+ exchange: options.TopicName,
+ type: "x-delayed-message",
+ durable: true,
+ autoDelete: false,
+ arguments: args);
+
+ var messageId = options.MessageId ?? Guid.NewGuid().ToString();
+ var message = new MQMessage(payload, messageId);
+ var body = ConvertToBinary(message, options.JsonOptions);
+ var properties = new BasicProperties
+ {
+ MessageId = messageId,
+ DeliveryMode = DeliveryModes.Persistent,
+ Headers = new Dictionary
+ {
+ ["x-delay"] = options.DelayMilliseconds
+ }
+ };
+
+ await channel.BasicPublishAsync(
+ exchange: options.TopicName,
+ routingKey: options.RoutingKey,
+ mandatory: true,
+ basicProperties: properties,
+ body: body);
+
+ isPublished = true;
+ }
+ catch (Exception)
+ {
+ throw;
+ }
+ finally
+ {
+ channelPool.Return(channel);
+ }
+ });
+
+ return isPublished;
+ }
+ catch (Exception ex)
+ {
+ _logger.LogError(ex, $"Error when RabbitMQ publish message.");
+ return false;
+ }
+ }
+
+ private RetryPolicy BuildRetryPolicy()
+ {
+ return Policy.Handle().WaitAndRetry(
+ _retryCount,
+ retryAttempt => TimeSpan.FromSeconds(Math.Pow(2, retryAttempt)),
+ (ex, time) =>
+ {
+ _logger.LogError(ex, $"RabbitMQ publish error: after {time.TotalSeconds:n1}s");
+ });
+ }
+
+ private static byte[] ConvertToBinary(T data, JsonSerializerOptions? jsonOptions = null)
+ {
+ var jsonStr = JsonSerializer.Serialize(data, jsonOptions);
+ var body = Encoding.UTF8.GetBytes(jsonStr);
+ return body;
+ }
+
+ public void Dispose()
+ {
+ if (_disposed)
+ {
+ return;
+ }
+
+ _logger.LogWarning($"Disposing {nameof(RabbitMQService)}");
+
+ foreach (var item in _consumers)
+ {
+ if (item.Value.Channel != null)
+ {
+ item.Value.Channel.Dispose();
+ }
+ item.Value.Consumer.Dispose();
+ }
+
+ _disposed = true;
+ GC.SuppressFinalize(this);
+ }
+
+ ///
+ /// Internal class to track consumer registrations with their RabbitMQ channels.
+ ///
+ private class ConsumerRegistration
+ {
+ public IMQConsumer Consumer { get; }
+ public IChannel? Channel { get; }
+
+ public ConsumerRegistration(IMQConsumer consumer, IChannel? channel)
+ {
+ Consumer = consumer;
+ Channel = channel;
+ }
+ }
+}
diff --git a/src/Plugins/BotSharp.Plugin.RabbitMQ/Settings/RabbitMQSettings.cs b/src/Plugins/BotSharp.Plugin.RabbitMQ/Settings/RabbitMQSettings.cs
new file mode 100644
index 000000000..0e61b5c71
--- /dev/null
+++ b/src/Plugins/BotSharp.Plugin.RabbitMQ/Settings/RabbitMQSettings.cs
@@ -0,0 +1,10 @@
+namespace BotSharp.Plugin.RabbitMQ.Settings;
+
+public class RabbitMQSettings
+{
+ public string HostName { get; set; } = "localhost";
+ public int Port { get; set; } = 5672;
+ public string UserName { get; set; } = "guest";
+ public string Password { get; set; } = "guest";
+ public string VirtualHost { get; set; } = "/";
+}
diff --git a/src/Plugins/BotSharp.Plugin.RabbitMQ/Using.cs b/src/Plugins/BotSharp.Plugin.RabbitMQ/Using.cs
new file mode 100644
index 000000000..0a8a8c3a5
--- /dev/null
+++ b/src/Plugins/BotSharp.Plugin.RabbitMQ/Using.cs
@@ -0,0 +1,38 @@
+global using System;
+global using System.Collections.Generic;
+global using System.Text;
+global using System.Linq;
+global using System.Text.Json;
+global using System.Net.Mime;
+global using System.Threading.Tasks;
+global using Microsoft.Extensions.DependencyInjection;
+global using Microsoft.Extensions.Logging;
+global using BotSharp.Abstraction.Agents;
+global using BotSharp.Abstraction.Conversations;
+global using BotSharp.Abstraction.Plugins;
+global using BotSharp.Abstraction.Conversations.Models;
+global using BotSharp.Abstraction.Functions;
+global using BotSharp.Abstraction.Agents.Models;
+global using BotSharp.Abstraction.Agents.Enums;
+global using BotSharp.Abstraction.Files.Enums;
+global using BotSharp.Abstraction.Files.Models;
+global using BotSharp.Abstraction.Files.Converters;
+global using BotSharp.Abstraction.Files;
+global using BotSharp.Abstraction.MLTasks;
+global using BotSharp.Abstraction.Utilities;
+global using BotSharp.Abstraction.Agents.Settings;
+global using BotSharp.Abstraction.Functions.Models;
+global using BotSharp.Abstraction.Repositories;
+global using BotSharp.Abstraction.Settings;
+global using BotSharp.Abstraction.Messaging;
+global using BotSharp.Abstraction.Messaging.Models.RichContent;
+global using BotSharp.Abstraction.Options;
+global using BotSharp.Abstraction.Models;
+global using BotSharp.Abstraction.Infrastructures.MessageQueues;
+global using BotSharp.Abstraction.Infrastructures.MessageQueues.Models;
+
+global using BotSharp.Plugin.RabbitMQ.Settings;
+global using BotSharp.Plugin.RabbitMQ.Models;
+global using BotSharp.Plugin.RabbitMQ.Interfaces;
+global using BotSharp.Plugin.RabbitMQ.Consumers;
+global using BotSharp.Plugin.RabbitMQ.Connections;
\ No newline at end of file
diff --git a/src/WebStarter/WebStarter.csproj b/src/WebStarter/WebStarter.csproj
index 9374d95fd..be332a38e 100644
--- a/src/WebStarter/WebStarter.csproj
+++ b/src/WebStarter/WebStarter.csproj
@@ -42,6 +42,7 @@
+
diff --git a/src/WebStarter/appsettings.json b/src/WebStarter/appsettings.json
index 39587b64e..b3c2b35e9 100644
--- a/src/WebStarter/appsettings.json
+++ b/src/WebStarter/appsettings.json
@@ -1006,6 +1006,7 @@
"Language": "en"
}
},
+
"A2AIntegration": {
"Enabled": true,
"DefaultTimeoutSeconds": 30,
@@ -1018,12 +1019,27 @@
}
]
},
+
+ "MessageQueue": {
+ "Enabled": false,
+ "Provider": "RabbitMQ"
+ },
+
+ "RabbitMQ": {
+ "HostName": "localhost",
+ "Port": 5672,
+ "UserName": "guest",
+ "Password": "guest",
+ "VirtualHost": "/"
+ },
+
"PluginLoader": {
"Assemblies": [
"BotSharp.Core",
"BotSharp.Core.A2A",
"BotSharp.Core.SideCar",
"BotSharp.Core.Crontab",
+ "BotSharp.Core.Rules",
"BotSharp.Core.Realtime",
"BotSharp.Logger",
"BotSharp.Plugin.MongoStorage",
@@ -1061,7 +1077,8 @@
"BotSharp.Plugin.PythonInterpreter",
"BotSharp.Plugin.FuzzySharp",
"BotSharp.Plugin.MMPEmbedding",
- "BotSharp.Plugin.MultiTenancy"
+ "BotSharp.Plugin.MultiTenancy",
+ "BotSharp.Plugin.RabbitMQ"
]
},