diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml index a1a4e22f..8ca3f118 100644 --- a/.github/workflows/ci.yaml +++ b/.github/workflows/ci.yaml @@ -330,7 +330,10 @@ jobs: # Install ucm mkdir ucm - curl -L https://github.com/unisonweb/unison/releases/download/release%2F1.0.0/ucm-linux-x64.tar.gz | tar -xz -C ucm + + # Use latest trunk build to get comment upload/download support for now. + # Old: https://github.com/unisonweb/unison/releases/download/release%2F1.0.0/ucm-linux-x64.tar.gz + curl -L https://github.com/unisonweb/unison/releases/download/trunk-build/ucm-linux-x64.tar.gz | tar -xz -C ucm export PATH=$PWD/ucm:$PATH # Start share and it's dependencies in the background diff --git a/share-api/package.yaml b/share-api/package.yaml index f1b5caf2..7fdc83c2 100644 --- a/share-api/package.yaml +++ b/share-api/package.yaml @@ -154,6 +154,7 @@ dependencies: - wai-extra - wai-middleware-prometheus - warp +- websockets - witch - witherable - x509 diff --git a/share-api/share-api.cabal b/share-api/share-api.cabal index 07c04783..95bd0074 100644 --- a/share-api/share-api.cabal +++ b/share-api/share-api.cabal @@ -130,6 +130,7 @@ library Share.Utils.Servant.Client Share.Utils.Servant.PathInfo Share.Utils.Servant.RawRequest + Share.Utils.Servant.Streaming Share.Utils.Tags Share.Utils.Unison Share.Web.Admin.API @@ -194,6 +195,9 @@ library Share.Web.Support.Impl Share.Web.Support.Types Share.Web.Types + Share.Web.UCM.HistoryComments.API + Share.Web.UCM.HistoryComments.Impl + Share.Web.UCM.HistoryComments.Queries Share.Web.UCM.Projects.Impl Share.Web.UCM.Sync.HashJWT Share.Web.UCM.Sync.Impl @@ -356,6 +360,7 @@ library , wai-extra , wai-middleware-prometheus , warp + , websockets , witch , witherable , x509 @@ -513,6 +518,7 @@ executable share-api , wai-extra , wai-middleware-prometheus , warp + , websockets , witch , witherable , x509 diff --git a/share-api/src/Share/Postgres.hs b/share-api/src/Share/Postgres.hs index b215028a..fcc36abb 100644 --- a/share-api/src/Share/Postgres.hs +++ b/share-api/src/Share/Postgres.hs @@ -685,7 +685,7 @@ cachedFor = cachedForOf traversed -- ) SELECT * FROM something JOIN users on something.user_id = users.id -- |] -- @@ -whenNonEmpty :: forall m f a x. (Monad m, Foldable f, Monoid a) => f x -> m a -> m a +whenNonEmpty :: forall m f a x. (Foldable f, Monoid a, Applicative m) => f x -> m a -> m a whenNonEmpty f m = if null f then pure mempty else m timeTransaction :: (QueryM m) => String -> m a -> m a diff --git a/share-api/src/Share/Postgres/IDs.hs b/share-api/src/Share/Postgres/IDs.hs index 6f2d698b..c16da11f 100644 --- a/share-api/src/Share/Postgres/IDs.hs +++ b/share-api/src/Share/Postgres/IDs.hs @@ -23,6 +23,7 @@ module Share.Postgres.IDs NamespaceTermMappingId (..), NamespaceTypeMappingId (..), ComponentSummaryDigest (..), + PersonalKeyId (..), -- * Conversions hash32AsComponentHash_, @@ -104,6 +105,10 @@ newtype ComponentSummaryDigest = ComponentSummaryDigest {unComponentSummaryDiges deriving stock (Show, Eq, Ord) deriving (PG.EncodeValue, PG.DecodeValue) via ByteString +newtype PersonalKeyId = PersonalKeyId {unPersonalKeyId :: Int32} + deriving stock (Eq, Ord, Show) + deriving (PG.DecodeValue, PG.EncodeValue) via Int32 + toHash32 :: (Coercible h Hash) => h -> Hash32 toHash32 = Hash32.fromHash . coerce diff --git a/share-api/src/Share/Postgres/Orphans.hs b/share-api/src/Share/Postgres/Orphans.hs index d9e38bd0..a549d79d 100644 --- a/share-api/src/Share/Postgres/Orphans.hs +++ b/share-api/src/Share/Postgres/Orphans.hs @@ -33,10 +33,11 @@ import U.Codebase.TermEdit qualified as TermEdit import U.Util.Base32Hex qualified as Base32Hex import Unison.Hash (Hash) import Unison.Hash qualified as Hash -import Unison.Hash32 (Hash32) +import Unison.Hash32 (Hash32 (..)) import Unison.Hash32 qualified as Hash32 import Unison.Name (Name) import Unison.NameSegment.Internal (NameSegment (..)) +import Unison.Server.HistoryComments.Types import Unison.SyncV2.Types (CBORBytes (..)) import Unison.Syntax.Name qualified as Name import UnliftIO (MonadUnliftIO (..)) @@ -103,6 +104,14 @@ deriving via Hash instance FromHttpApiData ComponentHash deriving via Hash instance ToHttpApiData ComponentHash +deriving via Hash32 instance Hasql.DecodeValue HistoryCommentHash32 + +deriving via Hash32 instance Hasql.EncodeValue HistoryCommentHash32 + +deriving via Hash32 instance Hasql.DecodeValue HistoryCommentRevisionHash32 + +deriving via Hash32 instance Hasql.EncodeValue HistoryCommentRevisionHash32 + deriving via Text instance Hasql.DecodeValue NameSegment deriving via Text instance Hasql.EncodeValue NameSegment diff --git a/share-api/src/Share/Prelude/Orphans.hs b/share-api/src/Share/Prelude/Orphans.hs index 055834ca..fa8ba593 100644 --- a/share-api/src/Share/Prelude/Orphans.hs +++ b/share-api/src/Share/Prelude/Orphans.hs @@ -6,6 +6,7 @@ module Share.Prelude.Orphans () where import Control.Comonad.Cofree (Cofree (..)) +import Control.Monad.Except import Control.Monad.Trans (lift) import Control.Monad.Trans.Maybe (MaybeT) import Data.Align (Semialign (..)) @@ -47,3 +48,6 @@ instance From ShortHash Text where instance (MonadTracer m) => MonadTracer (MaybeT m) where getTracer = lift getTracer + +instance (MonadTracer m) => MonadTracer (ExceptT e m) where + getTracer = lift getTracer diff --git a/share-api/src/Share/Utils/Logging.hs b/share-api/src/Share/Utils/Logging.hs index 0bcabc2d..0d98cf39 100644 --- a/share-api/src/Share/Utils/Logging.hs +++ b/share-api/src/Share/Utils/Logging.hs @@ -45,6 +45,7 @@ import Data.Text qualified as Text import Data.Text.Encoding qualified as Text import Data.Text.IO qualified as Text import GHC.Stack (CallStack, callStack, prettyCallStack) +import Network.WebSockets qualified as WS import Servant.Client qualified as Servant import Share.Env.Types qualified as Env import Share.OAuth.Errors (OAuth2Error) @@ -56,6 +57,8 @@ import Share.Utils.Logging.Types as X import Share.Utils.Tags (MonadTags) import System.Log.FastLogger qualified as FL import Unison.Server.Backend qualified as Backend +import Unison.Server.HistoryComments.Types (DownloadCommentsResponse (..), UploadCommentsResponse (..)) +import Unison.Server.Types (BranchRef (..)) import Unison.Sync.Types qualified as Sync import Unison.Util.Monoid (intercalateMap) import Unison.Util.Monoid qualified as Monoid @@ -267,3 +270,30 @@ instance Loggable Sync.UploadEntitiesError where Sync.UploadEntitiesError'UserNotFound userHandle -> textLog ("User not found: " <> userHandle) & withSeverity UserFault + +instance Loggable UploadCommentsResponse where + toLog = \case + UploadCommentsProjectBranchNotFound (BranchRef branchRef) -> + textLog ("Project branch not found: " <> branchRef) + & withSeverity UserFault + UploadCommentsNotAuthorized (BranchRef branchRef) -> + textLog ("Not authorized to upload comments to branch: " <> branchRef) + & withSeverity UserFault + UploadCommentsGenericFailure errMsg -> + textLog ("Upload comments generic failure: " <> errMsg) + & withSeverity Error + +instance Loggable WS.ConnectionException where + toLog = withSeverity Error . showLog + +instance Loggable DownloadCommentsResponse where + toLog = \case + DownloadCommentsProjectBranchNotFound (BranchRef branchRef) -> + textLog ("Project branch not found: " <> branchRef) + & withSeverity UserFault + DownloadCommentsNotAuthorized (BranchRef branchRef) -> + textLog ("Not authorized to download comments from branch: " <> branchRef) + & withSeverity UserFault + DownloadCommentsGenericFailure errMsg -> + textLog ("Download comments generic failure: " <> errMsg) + & withSeverity Error diff --git a/share-api/src/Share/Utils/Servant/Streaming.hs b/share-api/src/Share/Utils/Servant/Streaming.hs new file mode 100644 index 00000000..e03e5071 --- /dev/null +++ b/share-api/src/Share/Utils/Servant/Streaming.hs @@ -0,0 +1,63 @@ +module Share.Utils.Servant.Streaming + ( toConduit, + cborStreamToConduit, + fromConduit, + sourceIOWithAsync, + queueToCBORStream, + queueToSourceIO, + ) +where + +-- Orphan instances for SourceIO + +import Codec.Serialise qualified as CBOR +import Conduit +import Control.Concurrent.STM.TBMQueue qualified as STM +import Control.Monad.Except +import Data.ByteString.Builder qualified as Builder +import Ki.Unlifted qualified as Ki +import Servant +import Servant.Conduit (conduitToSourceIO) +import Servant.Types.SourceT +import Share.Prelude +import Unison.Util.Servant.CBOR +import UnliftIO.STM qualified as STM + +-- | Run the provided IO action in the background while streaming results. +-- +-- Servant doesn't provide any easier way to do bracketing like this, all the IO must be +-- inside the SourceIO somehow. +sourceIOWithAsync :: IO a -> SourceIO r -> SourceIO r +sourceIOWithAsync action (SourceT k) = + SourceT \k' -> + Ki.scoped \scope -> do + _ <- Ki.fork scope action + k k' + +toConduit :: (MonadIO m, MonadIO n) => SourceIO o -> m (ConduitT void o n ()) +toConduit sourceIO = fmap (transPipe liftIO) . liftIO $ fromSourceIO $ sourceIO + +cborStreamToConduit :: (MonadIO m, MonadIO n, CBOR.Serialise o) => SourceIO (CBORStream o) -> m (ConduitT void o (ExceptT CBORStreamError n) ()) +cborStreamToConduit sourceIO = toConduit sourceIO <&> \stream -> (stream .| unpackCBORBytesStream) + +fromConduit :: ConduitT void o IO () -> SourceIO o +fromConduit = conduitToSourceIO + +queueToCBORStream :: forall a f. (CBOR.Serialise a, Foldable f) => STM.TBMQueue (f a) -> ConduitT () (CBORStream a) IO () +queueToCBORStream q = do + let loop :: ConduitT () (CBORStream a) IO () + loop = do + liftIO (STM.atomically (STM.readTBMQueue q)) >>= \case + -- The queue is closed. + Nothing -> do + pure () + Just batches -> do + batches + & foldMap (CBOR.serialiseIncremental) + & (CBORStream . Builder.toLazyByteString) + & Conduit.yield + loop + loop + +queueToSourceIO :: forall a f. (CBOR.Serialise a, Foldable f) => STM.TBMQueue (f a) -> SourceIO (CBORStream a) +queueToSourceIO q = fromConduit (queueToCBORStream q) diff --git a/share-api/src/Share/Web/API.hs b/share-api/src/Share/Web/API.hs index 2c7816b3..ab374607 100644 --- a/share-api/src/Share/Web/API.hs +++ b/share-api/src/Share/Web/API.hs @@ -18,6 +18,7 @@ import Share.Web.Share.Webhooks.API qualified as Webhooks import Share.Web.Support.API qualified as Support import Share.Web.Types import Share.Web.UCM.SyncV2.API qualified as SyncV2 +import Unison.Server.HistoryComments.API qualified as Unison.HistoryComments import Unison.Share.API.Projects qualified as UCMProjects import Unison.Sync.API qualified as Unison.Sync @@ -53,6 +54,7 @@ type API = -- This path is deprecated, but is still in use by existing clients. :<|> ("sync" :> MaybeAuthenticatedSession :> Unison.Sync.API) :<|> ("ucm" :> "v1" :> "sync" :> MaybeAuthenticatedSession :> Unison.Sync.API) + :<|> ("ucm" :> "v1" :> "history-comments" :> MaybeAuthenticatedUserId :> Unison.HistoryComments.API) :<|> ("ucm" :> "v1" :> "projects" :> MaybeAuthenticatedSession :> UCMProjects.ProjectsAPI) :<|> ("ucm" :> "v2" :> "sync" :> MaybeAuthenticatedUserId :> SyncV2.API) :<|> ("admin" :> Admin.API) diff --git a/share-api/src/Share/Web/Authentication.hs b/share-api/src/Share/Web/Authentication.hs index dde86c50..f3d6373d 100644 --- a/share-api/src/Share/Web/Authentication.hs +++ b/share-api/src/Share/Web/Authentication.hs @@ -6,6 +6,7 @@ module Share.Web.Authentication ( cookieSessionTTL, requireAuthenticatedUser, + requireAuthenticatedUser', UnauthenticatedError (..), pattern MaybeAuthedUserID, pattern AuthenticatedUser, @@ -39,3 +40,7 @@ instance ToServerError UnauthenticatedError where requireAuthenticatedUser :: Maybe Session -> WebApp UserId requireAuthenticatedUser (AuthenticatedUser uid) = pure uid requireAuthenticatedUser _ = Errors.respondError UnauthenticatedError + +requireAuthenticatedUser' :: Maybe UserId -> WebApp UserId +requireAuthenticatedUser' (Just uid) = pure uid +requireAuthenticatedUser' _ = Errors.respondError UnauthenticatedError diff --git a/share-api/src/Share/Web/Authorization.hs b/share-api/src/Share/Web/Authorization.hs index fd489671..7ec7c426 100644 --- a/share-api/src/Share/Web/Authorization.hs +++ b/share-api/src/Share/Web/Authorization.hs @@ -43,7 +43,7 @@ module Share.Web.Authorization checkUploadToUserCodebase, checkUploadToProjectBranchCodebase, checkUserUpdate, - checkDownloadFromUserCodebase, + hashJWTAuthOverride, checkDownloadFromProjectBranchCodebase, checkCreateOrg, checkReadOrgRolesList, @@ -389,17 +389,20 @@ checkUploadToUserCodebase reqUserId codebaseOwnerUserId = maybePermissionFailure assertUsersEqual reqUserId codebaseOwnerUserId pure $ AuthZ.UnsafeAuthZReceipt Nothing --- | The download endpoint currently does all of its own auth using HashJWTs, +-- | The download endpoints currently do all of its own auth using HashJWTs, -- So we don't add any other authz checks here, the HashJWT check is sufficient. -checkDownloadFromUserCodebase :: WebApp (Either AuthZFailure AuthZ.AuthZReceipt) -checkDownloadFromUserCodebase = +hashJWTAuthOverride :: WebApp (Either AuthZFailure AuthZ.AuthZReceipt) +hashJWTAuthOverride = pure . Right $ AuthZ.UnsafeAuthZReceipt Nothing -- | The download endpoint currently does all of its own auth using HashJWTs, -- So we don't add any other authz checks here, the HashJWT check is sufficient. -checkDownloadFromProjectBranchCodebase :: WebApp (Either AuthZFailure AuthZ.AuthZReceipt) -checkDownloadFromProjectBranchCodebase = - pure . Right $ AuthZ.UnsafeAuthZReceipt Nothing +checkDownloadFromProjectBranchCodebase :: Maybe UserId -> ProjectId -> WebApp (Either AuthZFailure AuthZ.AuthZReceipt) +checkDownloadFromProjectBranchCodebase reqUserId projectId = + mapLeft (const authzError) <$> do + checkProjectGet reqUserId projectId + where + authzError = AuthZFailure $ (ProjectPermission (ProjectBranchBrowse projectId)) checkProjectCreate :: UserId -> UserId -> WebApp (Either AuthZFailure AuthZ.AuthZReceipt) checkProjectCreate reqUserId targetUserId = maybePermissionFailure (ProjectPermission (ProjectCreate targetUserId)) $ do diff --git a/share-api/src/Share/Web/Errors.hs b/share-api/src/Share/Web/Errors.hs index 55591eda..7024a64b 100644 --- a/share-api/src/Share/Web/Errors.hs +++ b/share-api/src/Share/Web/Errors.hs @@ -53,6 +53,7 @@ import Data.Text qualified as Text import Data.Text.Encoding qualified as Text import GHC.Stack qualified as GHC import GHC.TypeLits qualified as TL +import Network.WebSockets qualified as WS import Servant import Servant.Client import Share.Env.Types qualified as Env @@ -67,6 +68,8 @@ import Share.Utils.URI (URIParam (..), addQueryParam) import Share.Web.App import Unison.Server.Backend qualified as Backend import Unison.Server.Errors qualified as Backend +import Unison.Server.HistoryComments.Types (DownloadCommentsResponse (..), UploadCommentsResponse (..)) +import Unison.Server.Types (BranchRef (..)) import Unison.Sync.Types qualified as Sync import UnliftIO qualified @@ -423,3 +426,32 @@ instance ToServerError Sync.UploadEntitiesError where Sync.UploadEntitiesError'NoWritePermission _ -> ("no-write-permission", err403 {errBody = "No Write Permission"}) Sync.UploadEntitiesError'ProjectNotFound _ -> ("project-not-found", err404 {errBody = "Project Not Found"}) Sync.UploadEntitiesError'UserNotFound _ -> ("user-not-found", err404 {errBody = "User Not Found"}) + +instance ToServerError UploadCommentsResponse where + toServerError = \case + UploadCommentsProjectBranchNotFound (BranchRef branchRef) -> + (ErrorID "upload-comments:project-branch-not-found", err404 {errBody = BL.fromStrict $ Text.encodeUtf8 $ "Project branch not found: " <> branchRef}) + UploadCommentsNotAuthorized (BranchRef branchRef) -> + (ErrorID "upload-comments:not-authorized", err403 {errBody = BL.fromStrict $ Text.encodeUtf8 $ "Not authorized to upload comments to branch: " <> branchRef}) + UploadCommentsGenericFailure errMsg -> + (ErrorID "upload-comments:generic-failure", err500 {errBody = BL.fromStrict $ Text.encodeUtf8 $ "Upload comments failure: " <> errMsg}) + +instance ToServerError WS.ConnectionException where + toServerError = \case + WS.CloseRequest _ _ -> + (ErrorID "websocket:close-request", err400 {errBody = "WebSocket closed by client"}) + WS.ParseException msg -> + (ErrorID "websocket:parse-exception", err400 {errBody = BL.fromStrict $ Text.encodeUtf8 $ "Invalid message: parse exception: " <> Text.pack msg}) + WS.UnicodeException msg -> + (ErrorID "websocket:unicode-exception", err400 {errBody = BL.fromStrict $ Text.encodeUtf8 $ "Unicode decoding exception: " <> Text.pack msg}) + WS.ConnectionClosed -> + (ErrorID "websocket:connection-closed", err400 {errBody = "WebSocket connection closed"}) + +instance ToServerError DownloadCommentsResponse where + toServerError = \case + DownloadCommentsProjectBranchNotFound (BranchRef branchRef) -> + (ErrorID "download-comments:project-branch-not-found", err404 {errBody = BL.fromStrict $ Text.encodeUtf8 $ "Project branch not found: " <> branchRef}) + DownloadCommentsNotAuthorized (BranchRef branchRef) -> + (ErrorID "download-comments:not-authorized", err403 {errBody = BL.fromStrict $ Text.encodeUtf8 $ "Not authorized to download comments from branch: " <> branchRef}) + DownloadCommentsGenericFailure errMsg -> + (ErrorID "download-comments:generic-failure", err500 {errBody = BL.fromStrict $ Text.encodeUtf8 $ "Download comments failure: " <> errMsg}) diff --git a/share-api/src/Share/Web/Impl.hs b/share-api/src/Share/Web/Impl.hs index 13d5cad4..4c789722 100644 --- a/share-api/src/Share/Web/Impl.hs +++ b/share-api/src/Share/Web/Impl.hs @@ -27,6 +27,7 @@ import Share.Web.Share.Projects.Impl qualified as Projects import Share.Web.Share.Webhooks.Impl qualified as Webhooks import Share.Web.Support.Impl qualified as Support import Share.Web.Types +import Share.Web.UCM.HistoryComments.Impl qualified as HistoryComments import Share.Web.UCM.Projects.Impl qualified as UCMProjects import Share.Web.UCM.Sync.Impl qualified as Sync import Share.Web.UCM.SyncV2.Impl qualified as SyncV2 @@ -89,6 +90,7 @@ server = :<|> healthEndpoint :<|> Sync.server -- Deprecated path :<|> Sync.server + :<|> HistoryComments.server :<|> UCMProjects.server :<|> SyncV2.server :<|> Admin.server diff --git a/share-api/src/Share/Web/UCM/HistoryComments/API.hs b/share-api/src/Share/Web/UCM/HistoryComments/API.hs new file mode 100644 index 00000000..2875d432 --- /dev/null +++ b/share-api/src/Share/Web/UCM/HistoryComments/API.hs @@ -0,0 +1,9 @@ +{-# LANGUAGE DataKinds #-} +{-# LANGUAGE TypeOperators #-} + +module Share.Web.UCM.HistoryComments.API (API) where + +import Servant +import Unison.Server.HistoryComments.API qualified as HistoryComments + +type API = NamedRoutes HistoryComments.Routes diff --git a/share-api/src/Share/Web/UCM/HistoryComments/Impl.hs b/share-api/src/Share/Web/UCM/HistoryComments/Impl.hs new file mode 100644 index 00000000..af234168 --- /dev/null +++ b/share-api/src/Share/Web/UCM/HistoryComments/Impl.hs @@ -0,0 +1,319 @@ +module Share.Web.UCM.HistoryComments.Impl (server) where + +import Control.Concurrent.STM.TBMQueue (TBMQueue, closeTBMQueue, newTBMQueueIO, readTBMQueue, writeTBMQueue) +import Control.Lens +import Control.Monad.Except +import Control.Monad.Trans.Maybe +import Data.List.NonEmpty qualified as NEL +import Data.Monoid (All (..)) +import Data.Set qualified as Set +import Data.Set.NonEmpty qualified as NESet +import Ki.Unlifted qualified as Ki +import Network.WebSockets.Connection +import Share.Branch +import Share.IDs +import Share.IDs qualified as IDs +import Share.Postgres qualified as PG +import Share.Postgres.Cursors qualified as Cursor +import Share.Postgres.Queries qualified as PGQ +import Share.Postgres.Users.Queries qualified as UserQ +import Share.Prelude +import Share.Project +import Share.User +import Share.Web.App (WebApp, WebAppServer) +import Share.Web.Authentication qualified as AuthN +import Share.Web.Authorization qualified as AuthZ +import Share.Web.Errors (reportError) +import Share.Web.UCM.HistoryComments.Queries qualified as Q +import Unison.Debug qualified as Debug +import Unison.Server.HistoryComments.API qualified as HistoryComments +import Unison.Server.HistoryComments.Types (HistoryCommentDownloaderChunk (..), HistoryCommentUploaderChunk (..), UploadCommentsResponse (..)) +import Unison.Server.HistoryComments.Types qualified as Sync +import Unison.Server.Types +import Unison.Util.Websockets +import UnliftIO + +server :: Maybe UserId -> HistoryComments.Routes WebAppServer +server mayCaller = + HistoryComments.Routes + { downloadHistoryComments = downloadHistoryCommentsStreamImpl mayCaller, + uploadHistoryComments = uploadHistoryCommentsStreamImpl mayCaller + } + +wsMessageBufferSize :: Int +wsMessageBufferSize = 100 + +downloadHistoryCommentsStreamImpl :: Maybe UserId -> BranchRef -> Connection -> WebApp () +downloadHistoryCommentsStreamImpl mayCallerUserId br@(BranchRef branchRef) conn = do + result <- withQueues @(MsgOrError Sync.DownloadCommentsResponse HistoryCommentUploaderChunk) @(MsgOrError Void HistoryCommentDownloaderChunk) wsMessageBufferSize wsMessageBufferSize conn \q@(Queues {receive, send}) -> Ki.scoped \scope -> runExceptT $ do + projectBranchSH@ProjectBranchShortHand {userHandle, projectSlug, contributorHandle} <- case IDs.fromText @ProjectBranchShortHand branchRef of + Left err -> handleErrInQueue q (Sync.DownloadCommentsGenericFailure $ IDs.toText err) + Right pbsh -> pure pbsh + let projectSH = ProjectShortHand {userHandle, projectSlug} + mayInfo <- lift . runMaybeT $ mapMaybeT PG.runTransaction $ do + project <- MaybeT $ PGQ.projectByShortHand projectSH + branch <- MaybeT $ PGQ.branchByProjectBranchShortHand projectBranchSH + contributorUser <- for contributorHandle (MaybeT . UserQ.userByHandle) + pure (project, branch, contributorUser) + (project, branch, _contributorUser) <- maybe (handleErrInQueue q $ Sync.DownloadCommentsProjectBranchNotFound br) pure $ mayInfo + !authZ <- + lift (AuthZ.checkDownloadFromProjectBranchCodebase mayCallerUserId project.projectId) >>= \case + Left _authErr -> handleErrInQueue q (Sync.DownloadCommentsNotAuthorized br) + Right authZ -> pure authZ + + downloadableCommentsVar <- + liftIO $ newTVarIO @_ @(Set (Either Sync.HistoryCommentHash32 Sync.HistoryCommentRevisionHash32)) Set.empty + commentHashesToSendQ <- liftIO $ newTBMQueueIO @(Sync.HistoryCommentHash32, [Sync.HistoryCommentRevisionHash32]) 100 + commentsToSendQ <- liftIO $ newTBMQueueIO @(Either Sync.HistoryCommentHash32 Sync.HistoryCommentRevisionHash32) 100 + errMVar <- newEmptyTMVarIO + _ <- lift $ Ki.fork scope (hashNotifyWorker send commentHashesToSendQ) + senderThread <- lift $ Ki.fork scope (senderWorker send commentsToSendQ) + _ <- lift $ Ki.fork scope (receiverWorker receive commentsToSendQ errMVar downloadableCommentsVar) + lift $ PG.runTransaction $ do + cursor <- Q.projectBranchCommentsCursor authZ branch.causal + Cursor.foldBatched cursor 100 \hashes -> do + let (newHashes, chunks) = + hashes + & foldMap + ( \(commentHash, revisionHash) -> + ([Left commentHash, Right revisionHash], [(commentHash, [revisionHash])]) + ) + & first Set.fromList + PG.transactionUnsafeIO $ atomically $ do + modifyTVar downloadableCommentsVar (Set.union newHashes) + for chunks \chunk -> writeTBMQueue commentHashesToSendQ (chunk) + -- Close the hashes queue to signal we don't have any more, then wait for the notifier to finish + atomically $ closeTBMQueue commentHashesToSendQ + -- Now we just have to wait for the sender to finish sending all the comments we have queued up. + -- Once we've uploaded everything we can safely exit and the connection will be closed. + atomically $ Ki.await senderThread + case result of + Left err -> do + reportError err + Right (Left err, _leftovers {- Messages sent by server after we finished. -}) -> do + reportError err + Right (Right (), _leftovers {- Messages sent by server after we finished. -}) -> do + pure () + where + senderWorker :: + ( MsgOrError err HistoryCommentUploaderChunk -> + STM Bool + ) -> + TBMQueue (Either Sync.HistoryCommentHash32 Sync.HistoryCommentRevisionHash32) -> + WebApp () + senderWorker send commentsToSendQ = do + let loop = do + (hashesToSend, isClosed) <- atomically $ flushTBMQueue commentsToSendQ + withCommentsAndRevisions <- lift . PG.runTransaction $ do + -- Send comments first, then revisions + withComments <- Q.historyCommentsByHashOf (traversed . _Left) hashesToSend + Q.historyCommentRevisionsByHashOf (traversed . _Right) withComments + for_ withCommentsAndRevisions \commentOrRevision -> atomically . send . Msg $ intoChunk commentOrRevision + guard (not isClosed) + loop + void . runMaybeT $ loop + + receiverWorker :: + STM (Maybe (MsgOrError Void HistoryCommentDownloaderChunk)) -> + TBMQueue + ( Either + Sync.HistoryCommentHash32 + Sync.HistoryCommentRevisionHash32 + ) -> + TMVar Text -> + (TVar (Set (Either Sync.HistoryCommentHash32 Sync.HistoryCommentRevisionHash32))) -> + WebApp () + receiverWorker receive commentsToSendQ errMVar downloadableCommentsVar = do + let loop = do + msgOrError <- atomically receive + case msgOrError of + -- Channel closed, shut down + Nothing -> pure () + Just (Msg msg) -> case msg of + DoneCheckingHashesChunk -> do + -- The downloader is done checking hashes, and has issued all requests for + -- comments. + -- We can close the relevant queues now, we won't get any more requests. + atomically $ closeTBMQueue commentsToSendQ + loop + RequestCommentsChunk comments -> do + atomically $ do + downloadableComments <- readTVar downloadableCommentsVar + let validComments = Set.intersection (NESet.toSet comments) downloadableComments + for_ validComments $ writeTBMQueue commentsToSendQ + loop + Just (DeserialiseFailure msg) -> do + atomically $ putTMVar errMVar $ "uploadHistoryComments: deserialisation failure: " <> msg + loop + + hashNotifyWorker :: + (MsgOrError Sync.DownloadCommentsResponse HistoryCommentUploaderChunk -> STM Bool) -> + TBMQueue (Sync.HistoryCommentHash32, [Sync.HistoryCommentRevisionHash32]) -> + WebApp () + hashNotifyWorker send q = do + let loop = do + isClosed <- atomically $ do + (hashesToCheck, isClosed) <- flushTBMQueue q + All sendSuccess <- + NEL.nonEmpty hashesToCheck & foldMapM \possiblyNewHashes -> do + All <$> (send $ Msg $ PossiblyNewHashesChunk possiblyNewHashes) + pure (isClosed || not sendSuccess) + if isClosed + then do + -- If the queue is closed, send a DoneCheckingHashesChunk to notify the server we're done. + void . atomically $ send (Msg DoneSendingHashesChunk) + else loop + loop + intoChunk = either HistoryCommentChunk HistoryCommentRevisionChunk + +-- Re-run the given STM action at most n times, collecting the results into a list. +-- If the action returns Nothing, stop and return what has been collected so far, along with a Bool indicating whether the action was exhausted. +fetchChunk :: Int -> STM (Maybe a) -> STM ([a], Bool) +fetchChunk size action = do + let go 0 = pure ([], False) + go n = do + optional action >>= \case + Nothing -> do + -- No more values available at the moment + empty + Just Nothing -> do + -- Queue is closed + pure ([], True) + Just (Just val) -> do + (rest, exhausted) <- go (n - 1) <|> pure ([], False) + pure (val : rest, exhausted) + go size + +uploadHistoryCommentsStreamImpl :: Maybe UserId -> BranchRef -> Connection -> WebApp () +uploadHistoryCommentsStreamImpl mayCallerUserId br@(BranchRef branchRef) conn = do + callerUserId <- AuthN.requireAuthenticatedUser' mayCallerUserId + result <- withQueues @(MsgOrError UploadCommentsResponse HistoryCommentDownloaderChunk) @(MsgOrError Void HistoryCommentUploaderChunk) wsMessageBufferSize wsMessageBufferSize conn \q@(Queues {receive, send}) -> Ki.scoped \scope -> runExceptT $ do + projectBranchSH@ProjectBranchShortHand {userHandle, projectSlug, contributorHandle} <- case IDs.fromText @ProjectBranchShortHand branchRef of + Left err -> handleErrInQueue q (UploadCommentsGenericFailure $ IDs.toText err) + Right pbsh -> pure pbsh + let projectSH = ProjectShortHand {userHandle, projectSlug} + mayInfo <- lift . runMaybeT $ mapMaybeT PG.runTransaction $ do + project <- MaybeT $ PGQ.projectByShortHand projectSH + branch <- MaybeT $ PGQ.branchByProjectBranchShortHand projectBranchSH + contributorUser <- for contributorHandle (MaybeT . UserQ.userByHandle) + pure (project, branch, contributorUser) + (project, _branch, contributorUser) <- maybe (handleErrInQueue q $ UploadCommentsProjectBranchNotFound br) pure $ mayInfo + !authZ <- + lift (AuthZ.checkUploadToProjectBranchCodebase callerUserId project.projectId (user_id <$> contributorUser)) >>= \case + Left _authErr -> handleErrInQueue q (UploadCommentsNotAuthorized br) + Right authZ -> pure authZ + hashesToCheckQ <- liftIO $ newTBMQueueIO @(Sync.HistoryCommentHash32, [Sync.HistoryCommentRevisionHash32]) 100 + commentsQ <- liftIO $ newTBMQueueIO 100 + errMVar <- liftIO newEmptyTMVarIO + _receiverThread <- lift $ Ki.fork scope $ receiverWorker receive errMVar hashesToCheckQ commentsQ + inserterThread <- lift $ Ki.fork scope $ inserterWorker authZ commentsQ project.projectId + _hashCheckingThread <- lift $ Ki.fork scope $ hashCheckingWorker project.projectId send hashesToCheckQ + Debug.debugLogM Debug.HistoryComments "Upload history comments: waiting for inserter thread to finish" + -- The inserter thread will finish when the client closes the connection. + atomically $ Ki.await inserterThread + Debug.debugLogM Debug.HistoryComments "Done. Closing connection." + case result of + Left err -> reportError err + Right (Left err, _leftovers) -> reportError err + Right (Right (), _leftovers) -> pure () + where + inserterWorker :: + AuthZ.AuthZReceipt -> + TBMQueue (Either Sync.HistoryComment Sync.HistoryCommentRevision) -> + ProjectId -> + WebApp () + inserterWorker authZ commentsQ projectId = do + let loop = do + (chunk, closed) <- atomically (fetchChunk insertCommentBatchSize (readTBMQueue commentsQ)) + PG.whenNonEmpty chunk do + Debug.debugM Debug.HistoryComments "Inserting comments chunk of size" (length chunk) + PG.runTransaction $ Q.insertHistoryComments authZ projectId chunk + when closed $ Debug.debugLogM Debug.HistoryComments "Inserter worker: comments queue closed" + when (not closed) loop + loop + Debug.debugLogM Debug.HistoryComments "Inserter worker finished" + + hashCheckingWorker :: + ProjectId -> + (MsgOrError err HistoryCommentDownloaderChunk -> STM Bool) -> + TBMQueue (Sync.HistoryCommentHash32, [Sync.HistoryCommentRevisionHash32]) -> + WebApp () + hashCheckingWorker projectId send hashesToCheckQ = do + let loop = do + (hashes, closed) <- atomically (fetchChunk insertCommentBatchSize (readTBMQueue hashesToCheckQ)) + Debug.debugM Debug.HistoryComments "Checking hashes chunk of size" (length hashes) + PG.whenNonEmpty hashes $ do + unknownCommentHashes <- fmap Set.fromList $ PG.runTransaction $ do + Q.filterForUnknownHistoryCommentHashes (Sync.unHistoryCommentHash32 . fst <$> hashes) + let (revisionHashesWeDefinitelyNeed, revisionHashesToCheck) = + hashes + -- Only check revisions for comments that are unknown + & foldMap \case + (commentHash, revisionHashes) + -- If the comment hash is unknown, we need _all_ its revisions, we + -- don't need to check them. + -- Otherwise, we need to check which revisions are unknown. + | Set.member (Sync.unHistoryCommentHash32 commentHash) unknownCommentHashes -> (revisionHashes, []) + | otherwise -> ([], revisionHashes) + unknownRevsFiltered <- PG.runTransaction $ Q.filterForUnknownHistoryCommentRevisionHashes projectId (Sync.unHistoryCommentRevisionHash32 <$> revisionHashesToCheck) + let allNeededHashes = + (Set.map (Left . Sync.HistoryCommentHash32) unknownCommentHashes) + <> (Set.fromList . fmap Right $ revisionHashesWeDefinitelyNeed) + <> (Set.fromList (Right . Sync.HistoryCommentRevisionHash32 <$> unknownRevsFiltered)) + case NESet.nonEmptySet allNeededHashes of + Nothing -> pure () + Just unknownHashesSet -> do + Debug.debugM Debug.HistoryComments "Requesting unknown hashes" unknownHashesSet + void . atomically $ send $ Msg $ RequestCommentsChunk unknownHashesSet + when closed $ Debug.debugLogM Debug.HistoryComments "Hash checking worker: hashes queue closed" + when (not closed) loop + loop + void . atomically $ send $ Msg $ DoneCheckingHashesChunk + Debug.debugLogM Debug.HistoryComments "Hash checking worker finished" + receiverWorker :: STM (Maybe (MsgOrError Void HistoryCommentUploaderChunk)) -> TMVar Text -> TBMQueue (Sync.HistoryCommentHash32, [Sync.HistoryCommentRevisionHash32]) -> TBMQueue (Either Sync.HistoryComment Sync.HistoryCommentRevision) -> WebApp () + receiverWorker recv errMVar hashesToCheckQ commentsQ = do + let loop = do + next <- atomically do + recv >>= \case + Nothing -> do + Debug.debugLogM Debug.HistoryComments "Receiver worker: connection closed" + closeTBMQueue hashesToCheckQ + closeTBMQueue commentsQ + pure (pure ()) + Just (DeserialiseFailure err) -> do + Debug.debugM Debug.HistoryComments "Receiver worker: deserialisation failure" err + putTMVar errMVar err + pure (pure ()) + Just (Msg msg) -> do + case msg of + PossiblyNewHashesChunk hashesToCheck -> do + for_ hashesToCheck $ \h -> writeTBMQueue hashesToCheckQ h + DoneSendingHashesChunk -> do + closeTBMQueue hashesToCheckQ + HistoryCommentChunk comment -> do + writeTBMQueue commentsQ (Left comment) + HistoryCommentRevisionChunk revision -> do + writeTBMQueue commentsQ (Right revision) + pure loop + next + loop + Debug.debugLogM Debug.HistoryComments "Receiver worker finished" + insertCommentBatchSize = 100 + +handleErrInQueue :: forall o x e a. Queues (MsgOrError e a) o -> e -> ExceptT e WebApp x +handleErrInQueue Queues {send} e = do + _ <- atomically $ send $ UserErr e + throwError e + +-- Read all available values from a TBMQueue, returning them and whether the queue is closed. +flushTBMQueue :: TBMQueue a -> STM ([a], Bool) +flushTBMQueue q = do + optional (readTBMQueue q) >>= \case + -- No values available + Nothing -> empty + Just Nothing -> do + -- Queue closed + pure ([], True) + Just (Just v) -> do + (vs, closed) <- flushTBMQueue q <|> pure ([], False) + pure (v : vs, closed) diff --git a/share-api/src/Share/Web/UCM/HistoryComments/Queries.hs b/share-api/src/Share/Web/UCM/HistoryComments/Queries.hs new file mode 100644 index 00000000..5e04dff9 --- /dev/null +++ b/share-api/src/Share/Web/UCM/HistoryComments/Queries.hs @@ -0,0 +1,282 @@ +{-# LANGUAGE ApplicativeDo #-} +{-# LANGUAGE RecordWildCards #-} + +module Share.Web.UCM.HistoryComments.Queries + ( projectBranchCommentsCursor, + insertHistoryComments, + historyCommentsByHashOf, + historyCommentRevisionsByHashOf, + filterForUnknownHistoryCommentHashes, + filterForUnknownHistoryCommentRevisionHashes, + ) +where + +import Control.Lens +import Data.Foldable qualified as Foldable +import Data.Set qualified as Set +import Data.Set.NonEmpty (NESet) +import Data.Set.NonEmpty qualified as NESet +import Data.Time (UTCTime) +import Data.Time.Clock.POSIX qualified as POSIX +import Share.IDs +import Share.Postgres (whenNonEmpty) +import Share.Postgres qualified as PG +import Share.Postgres.Cursors (PGCursor) +import Share.Postgres.Cursors qualified as PG +import Share.Postgres.IDs +import Share.Prelude +import Share.Utils.Postgres (ordered) +import Share.Web.Authorization (AuthZReceipt) +import Unison.Hash32 (Hash32) +import Unison.Server.HistoryComments.Types + +projectBranchCommentsCursor :: AuthZReceipt -> CausalId -> PG.Transaction e (PGCursor (HistoryCommentHash32, HistoryCommentRevisionHash32)) +projectBranchCommentsCursor !_authZ causalId = do + PG.newRowCursor @(HistoryCommentHash32, HistoryCommentRevisionHash32) + "projectBranchCommentsCursor" + [PG.sql| + WITH history(causal_id) AS ( + SELECT ch.causal_id FROM causal_history(#{causalId}) AS ch(causal_id) + ) SELECT hc.comment_hash, hcr.revision_hash + FROM history + JOIN history_comments hc + ON hc.causal_id = history.causal_id + JOIN causals causal + ON hc.causal_id = causal.id + JOIN personal_keys key + ON hc.author_key_id = key.id + JOIN LATERAL ( + SELECT rev.revision_hash + FROM history_comment_revisions rev + WHERE rev.comment_id = hc.id + ORDER BY rev.created_at_ms DESC + LIMIT 1 + ) hcr + ON TRUE + |] + +historyCommentsByHashOf :: (PG.QueryA m) => Traversal s t HistoryCommentHash32 HistoryComment -> s -> m t +historyCommentsByHashOf trav s = do + s + & asListOf trav %%~ \hashes -> + PG.queryListRows + [PG.sql| + WITH hashes (ord, hash) AS ( + SELECT * FROM ^{PG.toTable $ ordered hashes} + ) SELECT hc.author, hc.created_at_ms, key.thumbprint, causal.hash AS causal_hash, hc.comment_hash + FROM hashes + JOIN history_comments hc + ON hc.comment_hash = hashes.hash + JOIN causals causal + ON hc.causal_id = causal.id + JOIN personal_keys key + ON hc.author_key_id = key.id + ORDER BY hashes.ord ASC + |] + <&> fmap + \( author, + createdAt, + authorThumbprint, + causalHash, + commentHash + ) -> + HistoryComment + { author, + createdAt, + authorThumbprint, + causalHash, + commentHash + } + +historyCommentRevisionsByHashOf :: (PG.QueryA m) => Traversal s t HistoryCommentRevisionHash32 HistoryCommentRevision -> s -> m t +historyCommentRevisionsByHashOf trav s = do + s + & asListOf trav %%~ \hashes -> do + PG.queryListRows + [PG.sql| + WITH hashes (ord, hash) AS ( + SELECT * FROM ^{PG.toTable $ ordered hashes} + ) SELECT hcr.subject, hcr.contents, hcr.created_at_ms, hcr.hidden, hcr.author_signature, hcr.revision_hash, hc.comment_hash + FROM hashes + JOIN history_comment_revisions hcr + ON hcr.revision_hash = hashes.hash + JOIN history_comments hc + ON hcr.comment_id = hc.id + ORDER BY hashes.ord ASC + |] + <&> fmap + \( subject, + content, + createdAt, + isHidden, + authorSignature, + revisionHash, + commentHash + ) -> + HistoryCommentRevision + { subject, + content, + createdAt, + isHidden, + authorSignature, + revisionHash, + commentHash + } + +insertThumbprints :: (PG.QueryA m) => NESet Text -> m () +insertThumbprints thumbprints = do + PG.execute_ + [PG.sql| + WITH thumbprints(thumbprint) AS ( + SELECT * FROM ^{PG.singleColumnTable $ Foldable.toList thumbprints} + ) + INSERT INTO personal_keys (thumbprint) + SELECT thumbprint FROM thumbprints + ON CONFLICT DO NOTHING + |] + +-- Convert milliseconds since epoch to UTCTime _exactly_. +-- UTCTime has picosecond precision so this is lossless. +_millisToUTCTime :: Int64 -> UTCTime +_millisToUTCTime ms = + toRational ms + & (/ (1_000 :: Rational)) + & fromRational + & POSIX.posixSecondsToUTCTime + +utcTimeToMillis :: UTCTime -> Int64 +utcTimeToMillis utcTime = + POSIX.utcTimeToPOSIXSeconds utcTime + & toRational + & (* (1_000 :: Rational)) + & round + +insertHistoryComments :: AuthZReceipt -> ProjectId -> [Either HistoryComment HistoryCommentRevision] -> PG.Transaction e () +insertHistoryComments !_authZ projectId chunks = do + let thumbprints = NESet.nonEmptySet $ Set.fromList (comments <&> \HistoryComment {authorThumbprint} -> authorThumbprint) + for thumbprints insertThumbprints + whenNonEmpty comments $ insertHistoryComments comments + whenNonEmpty revisions $ insertRevisions revisions + whenNonEmpty revisions $ insertDiscoveryInfo revisions + pure () + where + (comments, revisions) = + chunks & foldMap \case + Left comment -> ([comment], []) + Right revision -> ([], [revision]) + insertHistoryComments :: (PG.QueryA m) => [HistoryComment] -> m () + insertHistoryComments comments = do + PG.execute_ + [PG.sql| + WITH new_comments(author, created_at_ms, author_thumbprint, causal_hash, comment_hash) AS ( + SELECT * FROM ^{PG.toTable commentsTable} + ) + INSERT INTO history_comments(causal_id, author, created_at_ms, comment_hash, author_key_id) + SELECT causal.id, nc.author, nc.created_at_ms, nc.comment_hash, pk.id + FROM new_comments nc + JOIN causals causal + ON causal.hash = nc.causal_hash + JOIN personal_keys pk + ON pk.thumbprint = nc.author_thumbprint + ON CONFLICT (comment_hash) DO NOTHING + |] + where + commentsTable :: [(Text, Int64, Text, Hash32, Hash32)] + commentsTable = + comments <&> \HistoryComment {..} -> + ( author, + utcTimeToMillis createdAt, + authorThumbprint, + causalHash, + commentHash + ) + + insertRevisions :: (PG.QueryA m) => [HistoryCommentRevision] -> m () + insertRevisions revs = do + let doRevs = + PG.execute_ + [PG.sql| + WITH new_revisions(subject, contents, created_at_ms, hidden, author_signature, revision_hash, comment_hash) AS ( + SELECT * FROM ^{PG.toTable revsTable} + ) + INSERT INTO history_comment_revisions(comment_id, subject, contents, created_at_ms, hidden, author_signature, revision_hash) + SELECT hc.id, nr.subject, nr.contents, nr.created_at_ms, nr.hidden, nr.author_signature, nr.revision_hash + FROM new_revisions nr + JOIN history_comments hc + ON hc.comment_hash = nr.comment_hash + ON CONFLICT DO NOTHING + |] + doDiscovery = + PG.execute_ + [PG.sql| + WITH new_discoveries(revision_hash) AS ( + SELECT * FROM ^{PG.singleColumnTable revHashTable} + ) + INSERT INTO history_comment_revisions_project_discovery(project_id, comment_revision_id) + SELECT #{projectId}, hcr.id + FROM new_discoveries nd + JOIN history_comment_revisions hcr + ON hcr.revision_hash = nd.revision_hash + ON CONFLICT DO NOTHING + |] + doRevs *> doDiscovery + where + revsTable = + revs <&> \HistoryCommentRevision {..} -> + ( subject, + content, + utcTimeToMillis createdAt, + isHidden, + authorSignature, + revisionHash, + commentHash + ) + revHashTable = revs <&> \HistoryCommentRevision {..} -> (revisionHash) + insertDiscoveryInfo :: (PG.QueryA m) => [HistoryCommentRevision] -> m () + insertDiscoveryInfo revs = do + PG.execute_ + [PG.sql| + WITH new_discoveries(project_id, history_comment_hash) AS ( + SELECT * FROM ^{PG.toTable discoveryTable} + ) + INSERT INTO history_comment_revisions_project_discovery(project_id, comment_revision_id) + SELECT #{projectId}, hcr.id + FROM new_discoveries nd + JOIN history_comments hc + ON hc.comment_hash = nd.history_comment_hash + JOIN history_comment_revisions hcr + ON hcr.comment_id = hc.id + ON CONFLICT DO NOTHING + |] + where + discoveryTable :: [(ProjectId, Hash32)] + discoveryTable = + revs <&> \HistoryCommentRevision {..} -> + ( projectId, + commentHash + ) + +filterForUnknownHistoryCommentHashes :: (PG.QueryA m) => [Hash32] -> m [Hash32] +filterForUnknownHistoryCommentHashes commentHashes = do + PG.queryListCol + [PG.sql| + SELECT hash FROM ^{PG.singleColumnTable commentHashes} AS t(hash) + WHERE NOT EXISTS ( + SELECT FROM history_comments hc + WHERE hc.comment_hash = t.hash + ) + |] + +filterForUnknownHistoryCommentRevisionHashes :: (PG.QueryA m) => ProjectId -> [Hash32] -> m [Hash32] +filterForUnknownHistoryCommentRevisionHashes projectId revisionHashes = do + PG.queryListCol + [PG.sql| + SELECT hash FROM ^{PG.singleColumnTable revisionHashes} AS t(hash) + WHERE NOT EXISTS ( + SELECT FROM history_comment_revisions_project_discovery hcrpd + JOIN history_comment_revisions hcr + ON hcrpd.comment_revision_id = hcr.id + WHERE hcrpd.project_id = #{projectId} + AND hcr.revision_hash = t.hash + ) + |] diff --git a/share-api/src/Share/Web/UCM/Sync/Impl.hs b/share-api/src/Share/Web/UCM/Sync/Impl.hs index e214134c..2bbedf1a 100644 --- a/share-api/src/Share/Web/UCM/Sync/Impl.hs +++ b/share-api/src/Share/Web/UCM/Sync/Impl.hs @@ -149,7 +149,7 @@ downloadEntitiesEndpoint mayUserId DownloadEntitiesRequest {repoInfo, hashes = h Left err -> throwError (DownloadEntitiesFailure $ DownloadEntitiesInvalidRepoInfo err repoInfo) Right (RepoInfoUser userHandle) -> do User {user_id = repoOwnerUserId} <- lift (PG.runTransaction (UserQ.userByHandle userHandle)) `whenNothingM` throwError (DownloadEntitiesFailure . DownloadEntitiesUserNotFound $ IDs.toText @UserHandle userHandle) - authZToken <- lift AuthZ.checkDownloadFromUserCodebase `whenLeftM` \_err -> throwError (DownloadEntitiesFailure $ DownloadEntitiesNoReadPermission repoInfo) + authZToken <- lift AuthZ.hashJWTAuthOverride `whenLeftM` \_err -> throwError (DownloadEntitiesFailure $ DownloadEntitiesNoReadPermission repoInfo) let codebaseLoc = Codebase.codebaseLocationForUserCodebase repoOwnerUserId pure $ Codebase.codebaseEnv authZToken codebaseLoc Right (RepoInfoProjectBranch ProjectBranchShortHand {userHandle, projectSlug, contributorHandle}) -> do @@ -158,7 +158,7 @@ downloadEntitiesEndpoint mayUserId DownloadEntitiesRequest {repoInfo, hashes = h project <- (PGQ.projectByShortHand projectShortHand) `whenNothingM` throwError (DownloadEntitiesFailure . DownloadEntitiesProjectNotFound $ IDs.toText @ProjectShortHand projectShortHand) mayContributorUserId <- for contributorHandle \ch -> fmap user_id $ (UserQ.userByHandle ch) `whenNothingM` throwError (DownloadEntitiesFailure . DownloadEntitiesUserNotFound $ IDs.toText @UserHandle ch) pure (project, mayContributorUserId) - authZToken <- lift AuthZ.checkDownloadFromProjectBranchCodebase `whenLeftM` \_err -> throwError (DownloadEntitiesFailure $ DownloadEntitiesNoReadPermission repoInfo) + authZToken <- lift AuthZ.hashJWTAuthOverride `whenLeftM` \_err -> throwError (DownloadEntitiesFailure $ DownloadEntitiesNoReadPermission repoInfo) let codebaseLoc = Codebase.codebaseLocationForProjectBranchCodebase projectOwnerUserId contributorId pure $ Codebase.codebaseEnv authZToken codebaseLoc Right (RepoInfoProjectRelease ProjectReleaseShortHand {userHandle, projectSlug}) -> do @@ -166,7 +166,7 @@ downloadEntitiesEndpoint mayUserId DownloadEntitiesRequest {repoInfo, hashes = h (Project {ownerUserId = projectOwnerUserId}, contributorId) <- ExceptT . PG.tryRunTransaction $ do project <- PGQ.projectByShortHand projectShortHand `whenNothingM` throwError (DownloadEntitiesFailure . DownloadEntitiesProjectNotFound $ IDs.toText @ProjectShortHand projectShortHand) pure (project, Nothing) - authZToken <- lift AuthZ.checkDownloadFromProjectBranchCodebase `whenLeftM` \_err -> throwError (DownloadEntitiesFailure $ DownloadEntitiesNoReadPermission repoInfo) + authZToken <- lift AuthZ.hashJWTAuthOverride `whenLeftM` \_err -> throwError (DownloadEntitiesFailure $ DownloadEntitiesNoReadPermission repoInfo) let codebaseLoc = Codebase.codebaseLocationForProjectBranchCodebase projectOwnerUserId contributorId pure $ Codebase.codebaseEnv authZToken codebaseLoc Env.Env {maxParallelismPerDownloadRequest} <- ask diff --git a/share-api/src/Share/Web/UCM/SyncV2/Impl.hs b/share-api/src/Share/Web/UCM/SyncV2/Impl.hs index c73bdbb1..1fba40ce 100644 --- a/share-api/src/Share/Web/UCM/SyncV2/Impl.hs +++ b/share-api/src/Share/Web/UCM/SyncV2/Impl.hs @@ -4,19 +4,15 @@ module Share.Web.UCM.SyncV2.Impl (server) where import Codec.Serialise qualified as CBOR -import Conduit qualified as C import Control.Concurrent.STM qualified as STM import Control.Concurrent.STM.TBMQueue qualified as STM import Control.Monad.Except (ExceptT (ExceptT), withExceptT) import Control.Monad.Trans.Except (runExceptT) -import Data.Binary.Builder qualified as Builder import Data.Set qualified as Set import Data.Text.Encoding qualified as Text import Data.Vector qualified as Vector -import Ki.Unlifted qualified as Ki import Servant import Servant.Conduit (ConduitToSourceIO (..)) -import Servant.Types.SourceT (SourceT (..)) import Servant.Types.SourceT qualified as SourceT import Share.Codebase qualified as Codebase import Share.IDs (ProjectBranchShortHand (..), ProjectReleaseShortHand (..), ProjectShortHand (..), UserHandle, UserId) @@ -30,6 +26,7 @@ import Share.Prelude import Share.Project (Project (..)) import Share.User (User (..)) import Share.Utils.Logging qualified as Logging +import Share.Utils.Servant.Streaming import Share.Utils.Unison (hash32ToCausalHash) import Share.Web.App import Share.Web.Authorization qualified as AuthZ @@ -95,7 +92,7 @@ downloadEntitiesStreamImpl mayCallerUserId (SyncV2.DownloadEntitiesRequest {caus PG.transactionUnsafeIO $ STM.atomically $ STM.writeTBMQueue q entityChunkBatch PG.transactionUnsafeIO $ STM.atomically $ STM.closeTBMQueue q pure $ sourceIOWithAsync streamResults $ conduitToSourceIO do - queueToStream q + queueToCBORStream q where emitErr :: SyncV2.DownloadEntitiesError -> SourceIO (SyncV2.CBORStream SyncV2.DownloadEntitiesChunk) emitErr err = SourceT.source [SyncV2.CBORStream . CBOR.serialise $ ErrorC (ErrorChunk err)] @@ -123,24 +120,7 @@ causalDependenciesStreamImpl mayCallerUserId (SyncV2.CausalDependenciesRequest { in CausalHashDepC {causalHash, dependencyType} PG.transactionUnsafeIO $ STM.atomically $ STM.writeTBMQueue q depBatch PG.transactionUnsafeIO $ STM.atomically $ STM.closeTBMQueue q - pure $ sourceIOWithAsync streamResults $ conduitToSourceIO do - queueToStream q - -queueToStream :: forall a f. (CBOR.Serialise a, Foldable f) => STM.TBMQueue (f a) -> C.ConduitT () (SyncV2.CBORStream a) IO () -queueToStream q = do - let loop :: C.ConduitT () (SyncV2.CBORStream a) IO () - loop = do - liftIO (STM.atomically (STM.readTBMQueue q)) >>= \case - -- The queue is closed. - Nothing -> do - pure () - Just batches -> do - batches - & foldMap (CBOR.serialiseIncremental) - & (SyncV2.CBORStream . Builder.toLazyByteString) - & C.yield - loop - loop + pure $ sourceIOWithAsync streamResults $ queueToSourceIO q data CodebaseLoadingError = CodebaseLoadingErrorProjectNotFound ProjectShortHand @@ -166,7 +146,7 @@ codebaseForBranchRef branchRef = do (Project {ownerUserId = projectOwnerUserId}, contributorId) <- ExceptT . PG.tryRunTransaction $ do project <- PGQ.projectByShortHand projectShortHand `whenNothingM` throwError (CodebaseLoadingErrorProjectNotFound $ projectShortHand) pure (project, Nothing) - authZToken <- lift AuthZ.checkDownloadFromProjectBranchCodebase `whenLeftM` \_err -> throwError (CodebaseLoadingErrorNoReadPermission branchRef) + authZToken <- lift AuthZ.hashJWTAuthOverride `whenLeftM` \_err -> throwError (CodebaseLoadingErrorNoReadPermission branchRef) let codebaseLoc = Codebase.codebaseLocationForProjectBranchCodebase projectOwnerUserId contributorId pure $ Codebase.codebaseEnv authZToken codebaseLoc Right (Right (ProjectBranchShortHand {userHandle, projectSlug, contributorHandle})) -> do @@ -175,17 +155,6 @@ codebaseForBranchRef branchRef = do project <- (PGQ.projectByShortHand projectShortHand) `whenNothingM` throwError (CodebaseLoadingErrorProjectNotFound projectShortHand) mayContributorUserId <- for contributorHandle \ch -> fmap user_id $ (UserQ.userByHandle ch) `whenNothingM` throwError (CodebaseLoadingErrorUserNotFound ch) pure (project, mayContributorUserId) - authZToken <- lift AuthZ.checkDownloadFromProjectBranchCodebase `whenLeftM` \_err -> throwError (CodebaseLoadingErrorNoReadPermission branchRef) + authZToken <- lift AuthZ.hashJWTAuthOverride `whenLeftM` \_err -> throwError (CodebaseLoadingErrorNoReadPermission branchRef) let codebaseLoc = Codebase.codebaseLocationForProjectBranchCodebase projectOwnerUserId contributorId pure $ Codebase.codebaseEnv authZToken codebaseLoc - --- | Run an IO action in the background while streaming the results. --- --- Servant doesn't provide any easier way to do bracketing like this, all the IO must be --- inside the SourceIO somehow. -sourceIOWithAsync :: IO a -> SourceIO r -> SourceIO r -sourceIOWithAsync action (SourceT k) = - SourceT \k' -> - Ki.scoped \scope -> do - _ <- Ki.fork scope action - k k' diff --git a/sql/2024-04-18-00-00_causal_history_function.sql b/sql/2024-04-18-00-00_causal_history_function.sql index 95df5ae6..adeb5eb8 100644 --- a/sql/2024-04-18-00-00_causal_history_function.sql +++ b/sql/2024-04-18-00-00_causal_history_function.sql @@ -1,5 +1,6 @@ --- Return all other causals in the history of a causal, including itself. +-- Return all other causals in the history (a.k.a. spine) of a causal, including itself. +-- This does not include any namespace child causals. CREATE FUNCTION causal_history(causal_id INTEGER) RETURNS TABLE (causal_id INTEGER) AS $$ diff --git a/sql/2025-11-20_history-comments.sql b/sql/2025-11-20_history-comments.sql new file mode 100644 index 00000000..12e76d08 --- /dev/null +++ b/sql/2025-11-20_history-comments.sql @@ -0,0 +1,77 @@ +CREATE TABLE personal_keys ( + id SERIAL PRIMARY KEY, + -- The public JWK for this key. + -- This may be null if the key is not yet registered. + public_jwk jsonb NULL, + thumbprint TEXT UNIQUE NOT NULL, + -- The user registered to this key, which is proven by providing a signed + -- assertion using the key. + -- It may be null if the key is not yet registered to a user. + user_id UUID NULL REFERENCES users(id) ON DELETE SET NULL, + created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), + + -- Ensure that public_jwk and user_id are either both null or both non-null. + CHECK ((public_jwk IS NULL) = (user_id IS NULL)) +); + +CREATE INDEX idx_personal_keys_user_id ON personal_keys(user_id, created_at) + WHERE user_id IS NOT NULL; +CREATE INDEX idx_personal_keys_thumbprint ON personal_keys(thumbprint) INCLUDE (user_id); + +CREATE TABLE history_comments ( + id SERIAL PRIMARY KEY, + causal_id INTEGER NOT NULL REFERENCES causals(id), + author TEXT NOT NULL, + + -- Milliseconds since epoch, + -- This is stored as an exact integer rather than a TIMESTAMPTZ because we want + -- to avoid floating point slop since it's used in hashing. + created_at_ms BIGINT NOT NULL, + + -- This is the time we inserted the comment into our database, + -- NOT the time the comment was created by the author. + discovered_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), + + -- The Hash of the comment, SHA-512 over (hashing_version, causal_id, author, created_at_ms) + comment_hash TEXT UNIQUE NOT NULL, + author_key_id INTEGER NOT NULL REFERENCES personal_keys(id) +); + +CREATE INDEX idx_history_comments_causal_id ON history_comments(causal_id); + +CREATE TABLE history_comment_revisions ( + id SERIAL PRIMARY KEY, + comment_id INTEGER NOT NULL REFERENCES history_comments(id), + subject TEXT NOT NULL, + contents TEXT NOT NULL, + + created_at_ms BIGINT NOT NULL, + + -- This is the time we inserted the comment revision into our database, + -- NOT the time the comment revision was created by the author. + discovered_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), + + -- In a distributed system you really can’t ever truly delete comments, + -- but you can ask to hide them. + hidden BOOL NOT NULL DEFAULT FALSE, + + -- The Hash of the comment revision, SHA-512 over the canonical CBOR encoding of (hashing_version, comment-hash, subject, content, hidden, created_at_ms) + revision_hash TEXT UNIQUE NOT NULL, + + -- The signature of the comment's author on the revision hash. + author_signature BYTEA NOT NULL +); + +CREATE INDEX idx_history_comment_revisions_comment_id ON history_comment_revisions(comment_id, created_at_ms DESC); + +-- Tracks when each comment was discovered by each project, which allows us to +-- use a simple timestamp-based approach to filter for new comments since last sync +-- on a given project. +CREATE TABLE history_comment_revisions_project_discovery ( + comment_revision_id INTEGER NOT NULL REFERENCES history_comment_revisions(id), + project_id UUID NOT NULL REFERENCES projects(id), + discovered_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), + PRIMARY KEY (project_id, comment_revision_id) +); + +CREATE INDEX idx_history_comment_revisions_project_discovery_project_id ON history_comment_revisions_project_discovery(project_id, discovered_at); diff --git a/transcripts/run-transcripts.zsh b/transcripts/run-transcripts.zsh index 2a5f0c2d..5dd27bb0 100755 --- a/transcripts/run-transcripts.zsh +++ b/transcripts/run-transcripts.zsh @@ -21,7 +21,7 @@ transcripts_location="transcripts/share-apis" for dir in "$transcripts_location"/*(/); do # Extract the directory name (transcript name) transcript="${dir:t}" - + # If the first argument is missing, run all transcripts, otherwise run only transcripts which match a prefix of the argument if [ -z "${1:-}" ] || [[ "$transcript" == "$1"* ]]; then pg_reset_fixtures diff --git a/transcripts/share-apis/history-comments/comment-pull.md b/transcripts/share-apis/history-comments/comment-pull.md new file mode 100644 index 00000000..76a4421b --- /dev/null +++ b/transcripts/share-apis/history-comments/comment-pull.md @@ -0,0 +1,4 @@ +```ucm +scratch/main> pull @transcripts/history-comments/main +scratch/main> history +``` diff --git a/transcripts/share-apis/history-comments/comment-pull.output.md b/transcripts/share-apis/history-comments/comment-pull.output.md new file mode 100644 index 00000000..6c966691 --- /dev/null +++ b/transcripts/share-apis/history-comments/comment-pull.output.md @@ -0,0 +1,34 @@ +``` ucm +scratch/main> pull @transcripts/history-comments/main + + Updating branch from #sg60bvjo91 to #tjd6qqlhod + + ✅ + + Successfully pulled into scratch/main, which was empty. + +scratch/main> history + + Note: The most recent namespace hash is immediately below this + message. + + ⊙ Unison + ┃ Renamed x to y + + ⊙ 1. #tjd6qqlhod + + + Adds / updates: + + y + + = Copies: + + Original name New name(s) + x y + + + ⊙ Unison + ┃ Initial commit with variable x set to 1 + + □ 2. #i52j9fd57b (start of history) +``` diff --git a/transcripts/share-apis/history-comments/comment-push.md b/transcripts/share-apis/history-comments/comment-push.md new file mode 100644 index 00000000..225b9f69 --- /dev/null +++ b/transcripts/share-apis/history-comments/comment-push.md @@ -0,0 +1,19 @@ +```ucm:hide +history-comments/main> builtins.mergeio lib.builtins +``` + +Add some history, then set comments on it. + +```unison:hide +x = 1 +``` + +```ucm +scratch/main> update +scratch/main> config.set author.name Unison +scratch/main> history.comment /main: "Initial commit with variable x set to 1" +scratch/main> alias.term x y +scratch/main> history.comment /main: "Renamed x to y" +scratch/main> history +scratch/main> push @transcripts/history-comments/main +``` diff --git a/transcripts/share-apis/history-comments/comment-push.output.md b/transcripts/share-apis/history-comments/comment-push.output.md new file mode 100644 index 00000000..5bdf4a56 --- /dev/null +++ b/transcripts/share-apis/history-comments/comment-push.output.md @@ -0,0 +1,63 @@ +``` ucm :hide +history-comments/main> builtins.mergeio lib.builtins +``` + +Add some history, then set comments on it. + +``` unison :hide +x = 1 +``` + +``` ucm +scratch/main> update + + Done. + +scratch/main> config.set author.name Unison + +scratch/main> history.comment /main: "Initial commit with variable x set to 1" + + Done. + +scratch/main> alias.term x y + + Done. + +scratch/main> history.comment /main: "Renamed x to y" + + Done. + +scratch/main> history + + Note: The most recent namespace hash is immediately below this + message. + + ⊙ Unison + ┃ Renamed x to y + + ⊙ 1. #tjd6qqlhod + + + Adds / updates: + + y + + = Copies: + + Original name New name(s) + x y + + + ⊙ Unison + ┃ Initial commit with variable x set to 1 + + □ 2. #i52j9fd57b (start of history) + +scratch/main> push @transcripts/history-comments/main + + Uploaded 5 entities. + + I just created @transcripts/history-comments on + http://localhost:5424 + + View it here: @transcripts/history-comments/main on http://localhost:5424 +``` diff --git a/transcripts/share-apis/history-comments/run.zsh b/transcripts/share-apis/history-comments/run.zsh new file mode 100755 index 00000000..bd19981b --- /dev/null +++ b/transcripts/share-apis/history-comments/run.zsh @@ -0,0 +1,14 @@ +#!/usr/bin/env zsh + +set -e + +source "../../transcript_helpers.sh" + +# Currently this must be manually enabled +export UNISON_SYNC_HISTORY_COMMENTS=true + +# Create some history +transcript_ucm transcript comment-push.md + +# Pull the history +transcript_ucm transcript comment-pull.md