Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,10 @@
using Microsoft.DotNet.Interactive.Events;
using Microsoft.DotNet.Interactive.CSharp;
using Microsoft.DotNet.Interactive.App;
using Microsoft.DotNet.Interactive.Commands;
using FluentAssertions;
using Xunit;
using System.Collections.Generic;

namespace Microsoft.DotNet.Interactive.PostgreSql.Tests;

Expand Down Expand Up @@ -82,8 +84,148 @@ public async Task It_returns_error_if_query_is_not_valid()
.Contain("column \"not_known_column\" does not exist");
}

[PostgreSqlFact]
public async Task When_variable_does_not_exist_then_an_error_is_returned()
{
var connectionString = PostgreSqlFactAttribute.GetConnectionStringForTests();
using var kernel = CreateKernel();
var result = await kernel.SubmitCodeAsync(
$"#!connect postgres --kernel-name adventureworks \"{connectionString}\"");

result.Events
.Should()
.NotContainErrors();

var sqlKernel = kernel.FindKernelByName("sql-adventureworks");

result = await sqlKernel.SendAsync(new RequestValue("my_data_result"));

result.Events.Should()
.ContainSingle<CommandFailed>()
.Which
.Message
.Should()
.Contain("Value 'my_data_result' not found in kernel sql-adventureworks");
}

[PostgreSqlFact]
public async Task It_can_store_result_set_with_a_name()
{
var connectionString = PostgreSqlFactAttribute.GetConnectionStringForTests();
using var kernel = CreateKernel();
await kernel.SubmitCodeAsync(
$"#!connect postgres --kernel-name adventureworks \"{connectionString}\"");

await kernel.SubmitCodeAsync("""
#!sql-adventureworks --name my_data_result
SELECT * FROM customers LIMIT 10;
""");

var result = await kernel.SubmitCodeAsync("""
#!csharp
#!share --from sql-adventureworks my_data_result
my_data_result
""");

result.Events
.Should()
.ContainSingle<ReturnValueProduced>()
.Which
.Value
.Should()
.BeAssignableTo<IEnumerable<TabularDataResource>>()
.Which.Count()
.Should()
.Be(1);
}

[PostgreSqlFact]
public async Task Stored_query_results_are_listed_in_ValueInfos()
{
var connectionString = PostgreSqlFactAttribute.GetConnectionStringForTests();
using var kernel = CreateKernel();
await kernel.SubmitCodeAsync(
$"#!connect postgres --kernel-name adventureworks \"{connectionString}\"");

await kernel.SubmitCodeAsync("""
#!sql-adventureworks --name my_data_result
SELECT * FROM customers LIMIT 10;
""");

var sqlKernel = kernel.FindKernelByName("sql-adventureworks");

var result = await sqlKernel.SendAsync(new RequestValueInfos());

var valueInfos = result.Events.Should().ContainSingle<ValueInfosProduced>()
.Which.ValueInfos;

valueInfos.Should().Contain(v => v.Name == "my_data_result");
}

[PostgreSqlFact]
public async Task Storing_results_does_interfere_with_subsequent_executions()
{
var connectionString = PostgreSqlFactAttribute.GetConnectionStringForTests();
using var kernel = CreateKernel();
await kernel.SubmitCodeAsync(
$"#!connect postgres --kernel-name adventureworks \"{connectionString}\"");

await kernel.SubmitCodeAsync("""
#!sql-adventureworks --name my_data_result
SELECT * FROM customers LIMIT 10;
""");

var sqlKernel = kernel.FindKernelByName("sql-adventureworks");

var result = await sqlKernel.SendAsync(new RequestValueInfos());

var valueInfos = result.Events.Should().ContainSingle<ValueInfosProduced>()
.Which.ValueInfos;

valueInfos.Should().Contain(v => v.Name == "my_data_result");

result = await kernel.SubmitCodeAsync("""
#!sql-adventureworks --name my_data_result
SELECT * FROM customers LIMIT 10;
""");

result.Events.Should().NotContainErrors();
}

[PostgreSqlFact]
public async Task It_can_store_multiple_result_set_with_a_name()
{
var connectionString = PostgreSqlFactAttribute.GetConnectionStringForTests();
using var kernel = CreateKernel();
await kernel.SubmitCodeAsync(
$"#!connect postgres --kernel-name adventureworks \"{connectionString}\"");

await kernel.SubmitCodeAsync("""
#!sql-adventureworks --name my_data_result
SELECT * FROM customers LIMIT 5;
SELECT * FROM customers LIMIT 5;
""");

var result = await kernel.SubmitCodeAsync("""
#!csharp
#!share --from sql-adventureworks my_data_result
my_data_result
""");

result.Events
.Should()
.ContainSingle<ReturnValueProduced>()
.Which
.Value
.Should()
.BeAssignableTo<IEnumerable<TabularDataResource>>()
.Which.Count()
.Should()
.Be(2);
}

public void Dispose()
{
DataExplorer.ResetToDefault();
}
}
}
113 changes: 100 additions & 13 deletions src/Microsoft.DotNet.Interactive.PostgreSql/PostgreSqlKernel.cs
Original file line number Diff line number Diff line change
@@ -1,25 +1,33 @@
// Copyright (c) .NET Foundation and contributors. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.

using System;
using System.Collections.Generic;
using System.Data;
using System.Data.Common;
using System.Linq;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Html;
using Microsoft.DotNet.Interactive.Commands;
using Microsoft.DotNet.Interactive.Directives;
using Microsoft.DotNet.Interactive.Events;
using Microsoft.DotNet.Interactive.Formatting;
using Microsoft.DotNet.Interactive.Formatting.TabularData;
using Microsoft.DotNet.Interactive.ValueSharing;
using Npgsql;
using Enumerable = System.Linq.Enumerable;

namespace Microsoft.DotNet.Interactive.PostgreSql;

public class PostgreSqlKernel :
Kernel,
IKernelCommandHandler<SubmitCode>
IKernelCommandHandler<SubmitCode>,
IKernelCommandHandler<RequestValue>,
IKernelCommandHandler<RequestValueInfos>
{
private readonly string _connectionString;
private IEnumerable<IEnumerable<IEnumerable<(string name, object value)>>> _tables;
private readonly Dictionary<string, object> _resultSets = new(StringComparer.Ordinal);

public PostgreSqlKernel(string name, string connectionString) : base(name)
{
Expand All @@ -31,6 +39,16 @@ Query a PostgreSQL database
_connectionString = connectionString;
}

public override KernelSpecifierDirective KernelSpecifierDirective
{
get
{
var directive = base.KernelSpecifierDirective;
directive.Parameters.Add(new("--name"));
return directive;
}
}

private DbConnection OpenConnection()
{
return new NpgsqlConnection(_connectionString);
Expand All @@ -40,24 +58,36 @@ async Task IKernelCommandHandler<SubmitCode>.HandleAsync(
SubmitCode submitCode,
KernelInvocationContext context)
{
await using var connection = OpenConnection();
if (connection.State is not ConnectionState.Open)
var results = new List<TabularDataResource>();
try
{
await connection.OpenAsync();
}
await using var connection = OpenConnection();
if (connection.State is not ConnectionState.Open)
{
await connection.OpenAsync();
}

await using var dbCommand = connection.CreateCommand();
await using var dbCommand = connection.CreateCommand();

dbCommand.CommandText = submitCode.Code;
dbCommand.CommandText = submitCode.Code;

_tables = Execute(dbCommand);
_tables = Execute(dbCommand);

foreach (var table in _tables)
{
var tabularDataResource = table.ToTabularDataResource();
foreach (var table in _tables)
{
var tabularDataResource = table.ToTabularDataResource();

var explorer = DataExplorer.CreateDefault(tabularDataResource);
context.Display(explorer);
var explorer = DataExplorer.CreateDefault(tabularDataResource);
context.Display(explorer);

results.Add(tabularDataResource);
}
}
finally
{
submitCode.Parameters.TryGetValue("--name", out var queryName);
string name = queryName ?? "";
_resultSets[name] = results;
}
}

Expand Down Expand Up @@ -125,4 +155,61 @@ public static void AddPostgreSqlKernelConnectorToCurrentRoot()
"text/html");
}
}

private bool TryGetValue<T>(string name, out T value)
{
if (_resultSets.TryGetValue(name, out var resultSet) &&
resultSet is T resultSetT)
{
value = resultSetT;
return true;
}

value = default;
return false;
}

Task IKernelCommandHandler<RequestValue>.HandleAsync(RequestValue command, KernelInvocationContext context)
{
if (TryGetValue<object>(command.Name, out var value))
{
context.Publish(new ValueProduced(
value,
command.Name,
new FormattedValue(
command.MimeType,
value.ToDisplayString(command.MimeType)),
command));
}
else
{
context.Fail(command, message: $"Value '{command.Name}' not found in kernel {Name}");
}

return Task.CompletedTask;
}

Task IKernelCommandHandler<RequestValueInfos>.HandleAsync(RequestValueInfos command, KernelInvocationContext context)
{
var valueInfos = CreateKernelValueInfos(_resultSets, command.MimeType).ToArray();

context.Publish(new ValueInfosProduced(valueInfos, command));

return Task.CompletedTask;

static IEnumerable<KernelValueInfo> CreateKernelValueInfos(IReadOnlyDictionary<string, object> source, string mimeType)
{
return source.Keys.Select(key =>
{
var formattedValues = FormattedValue.CreateSingleFromObject(
source[key],
mimeType);

return new KernelValueInfo(
key,
formattedValues,
type: typeof(IEnumerable<TabularDataResource>));
});
}
}
}
Loading