diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 2a8ff8b36..31017d667 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -144,3 +144,133 @@ jobs: - name: Report coverage run: coverage report working-directory: ./backend + unit-test-titan: + name: Titan (Go) unit tests + runs-on: ubuntu-latest + permissions: + contents: read + steps: + - name: Checkout branch + uses: actions/checkout@v3.1.0 + with: + fetch-depth: 0 # Fetch all history for comparing changes + - name: Check for Titan changes + id: check_changes + run: | + if [ "${{ github.event_name }}" == "pull_request" ]; then + # For PRs, check if titan/ directory has changes + if git diff --name-only origin/${{ github.base_ref }}...HEAD | grep -q '^titan/'; then + echo "changed=true" >> $GITHUB_OUTPUT + echo "Titan package has changes" + else + echo "changed=false" >> $GITHUB_OUTPUT + echo "No changes in Titan package, skipping tests" + fi + else + # For pushes to main or manual triggers, always run + echo "changed=true" >> $GITHUB_OUTPUT + fi + - name: Set up Go + if: steps.check_changes.outputs.changed == 'true' + uses: actions/setup-go@v4 + with: + go-version: '1.18' + cache-dependency-path: titan/go.sum + + - name: Install ClamAV + if: steps.check_changes.outputs.changed == 'true' + run: | + sudo apt-get update + sudo apt-get install -y clamav clamav-daemon netcat-openbsd + + # Stop the auto-update service + sudo systemctl stop clamav-freshclam || true + sudo systemctl disable clamav-freshclam || true + + # Create a minimal test database instead of downloading full databases + # This approach is based on ClamAV's own unit test setup + echo "Creating minimal ClamAV test database..." + sudo mkdir -p /var/lib/clamav + + # Create custom signature files (text-based, no CVD needed) + # EICAR test signature in hexadecimal format (full 68-byte EICAR string) + echo 'EICAR-Test-Signature:0:*:58354f2150254041505b345c505a58353428505e2937434329377d2445494341522d5354414e444152442d414e544956495255532d544553542d46494c452124482b482a' | sudo tee /var/lib/clamav/test.ndb + + # Create a minimal valid HSB signature file (for ClamAV to recognize database directory) + echo '# Minimal test database' | sudo tee /var/lib/clamav/test.hsb + + # Disable freshclam (automatic updates) to avoid interfering with our test database + sudo systemctl stop clamav-freshclam.service 2>/dev/null || true + sudo systemctl disable clamav-freshclam.service 2>/dev/null || true + + sudo chown -R clamav:clamav /var/lib/clamav + + echo "Test database files created:" + ls -lh /var/lib/clamav/ + + # Configure clamd for TCP and to NOT require standard databases + sudo tee -a /etc/clamav/clamd.conf << EOF + TCPSocket 3310 + TCPAddr 127.0.0.1 + EOF + + # Override systemd conditions that require daily/main CVD files + # Create a drop-in override to remove those conditions + sudo mkdir -p /etc/systemd/system/clamav-daemon.service.d + sudo tee /etc/systemd/system/clamav-daemon.service.d/override.conf << EOF + [Unit] + # Remove conditions that require standard CVD databases + ConditionPathExistsGlob= + EOF + + sudo systemctl daemon-reload + + # Start clamd with our minimal test database + echo "Starting clamd with minimal test database..." + sudo systemctl start clamav-daemon.service # Wait for clamd to be ready on TCP (should be fast with minimal database) + echo "Waiting for clamd to start with minimal test database..." + for i in {1..30}; do + if echo "PING" | nc -w 1 127.0.0.1 3310 2>/dev/null | grep -q "PONG"; then + echo "✓ clamd is ready and responding on TCP port 3310" + exit 0 + fi + if [ $((i % 5)) -eq 0 ]; then + echo "Still waiting for clamd... ($i/30 attempts)" + echo "=== Recent service logs ===" + sudo journalctl -u clamav-daemon.service --no-pager -n 10 + fi + sleep 2 + done + + # If we got here, clamd didn't start in time - fail the build + echo "ERROR: clamd failed to start within 60 seconds" + echo "=== Service Status ===" + sudo systemctl status clamav-daemon.service --no-pager + echo "=== Recent Logs ===" + sudo journalctl -u clamav-daemon.service -n 100 --no-pager + echo "=== Config File ===" + sudo cat /etc/clamav/clamd.conf | grep -E "TCPSocket|TCPAddr|DatabaseDirectory" + echo "=== Database Files ===" + ls -lh /var/lib/clamav/ + exit 1 + - name: Run unit tests + if: steps.check_changes.outputs.changed == 'true' + run: go test -v -race -short -coverprofile=coverage-unit.out ./pkg/... + working-directory: ./titan + - name: Run integration tests + if: steps.check_changes.outputs.changed == 'true' + run: go test -v -race -coverprofile=coverage-integration.out ./pkg/... + working-directory: ./titan + env: + REQUIRE_CLAMD: "true" + CLAMD_TCP: "true" + - name: Report unit test coverage + run: | + echo "=== Unit Test Coverage ===" + go tool cover -func=coverage-unit.out + working-directory: ./titan + - name: Report integration test coverage + run: | + echo "=== Integration Test Coverage (All Tests) ===" + go tool cover -func=coverage-integration.out + working-directory: ./titan diff --git a/backend/siarnaq/gcloud/titan.py b/backend/siarnaq/gcloud/titan.py index 290619d79..4ff4d23da 100644 --- a/backend/siarnaq/gcloud/titan.py +++ b/backend/siarnaq/gcloud/titan.py @@ -1,5 +1,6 @@ import datetime import io +import json import google.cloud.storage as storage import structlog @@ -7,17 +8,61 @@ from google.auth import impersonated_credentials from PIL import Image +from siarnaq.gcloud import saturn + logger = structlog.get_logger(__name__) def request_scan(blob: storage.Blob) -> None: """Request that Titan scan a blob for malware.""" - # Titan responds to google.cloud.storage.object.v1.metadataUpdated events via - # Eventarc, so it suffices to set the Titan metadata field. logger.info("titan_request", message="Requesting scan on file.", file=blob.name) + + # Set metadata to Unverified before publishing to Pub/Sub blob.metadata = {"Titan-Status": "Unverified"} blob.patch() + # Publish scan request to Pub/Sub topic + if not settings.GCLOUD_ENABLE_ACTIONS: + logger.warn("titan_disabled", message="Titan scan queue is disabled.") + return + + publish_client = saturn.get_publish_client() + topic = publish_client.topic_path( + settings.GCLOUD_PROJECT, settings.GCLOUD_TOPIC_SCAN + ) + + payload = { + "bucket": blob.bucket.name, + "name": blob.name, + } + + try: + future = publish_client.publish( + topic=topic, + data=json.dumps(payload).encode(), + ordering_key=settings.GCLOUD_ORDER_SCAN, + ) + message_id = future.result() + logger.info( + "titan_enqueued", + message="Scan request has been queued.", + message_id=message_id, + bucket=blob.bucket.name, + file=blob.name, + ) + except Exception: + logger.error( + "titan_publish_error", + message="Scan request could not be queued.", + exc_info=True, + bucket=blob.bucket.name, + file=blob.name, + ) + publish_client.resume_publish( + topic=topic, + ordering_key=settings.GCLOUD_ORDER_SCAN, + ) + def get_object( bucket: str, name: str, check_safety: bool, get_raw: bool = False diff --git a/backend/siarnaq/settings.py b/backend/siarnaq/settings.py index 8ff79c661..0b468dfde 100644 --- a/backend/siarnaq/settings.py +++ b/backend/siarnaq/settings.py @@ -270,8 +270,10 @@ class Local(Base): GCLOUD_BUCKET_EPHEMERAL = "nowhere-ephemeral" GCLOUD_TOPIC_COMPILE = "nowhere-siarnaq-compile" GCLOUD_TOPIC_EXECUTE = "nowhere-siarnaq-execute" + GCLOUD_TOPIC_SCAN = "nowhere-siarnaq-scan" GCLOUD_ORDER_COMPILE = "compile-order" GCLOUD_ORDER_EXECUTE = "execute-order" + GCLOUD_ORDER_SCAN = "scan-order" GCLOUD_SCHEDULER_PREFIX = "nothing" GCLOUD_BRACKET_QUEUE = "nowhere-siarnaq-bracket" GCLOUD_RATING_QUEUE = "nowhere-siarnaq-rating" @@ -336,8 +338,10 @@ class Staging(Base): GCLOUD_BUCKET_EPHEMERAL = "mitbattlecode-staging-ephemeral" GCLOUD_TOPIC_COMPILE = "staging-siarnaq-compile" GCLOUD_TOPIC_EXECUTE = "staging-siarnaq-execute" + GCLOUD_TOPIC_SCAN = "staging-siarnaq-scan" GCLOUD_ORDER_COMPILE = "compile-order" GCLOUD_ORDER_EXECUTE = "execute-order" + GCLOUD_ORDER_SCAN = "scan-order" GCLOUD_SCHEDULER_PREFIX = "staging" GCLOUD_BRACKET_QUEUE = "staging-siarnaq-bracket" GCLOUD_RATING_QUEUE = "staging-siarnaq-rating" @@ -429,8 +433,10 @@ class Production(Base): GCLOUD_BUCKET_EPHEMERAL = "mitbattlecode-production-ephemeral" GCLOUD_TOPIC_COMPILE = "production-siarnaq-compile" GCLOUD_TOPIC_EXECUTE = "production-siarnaq-execute" + GCLOUD_TOPIC_SCAN = "production-siarnaq-scan" GCLOUD_ORDER_COMPILE = "compile-order" GCLOUD_ORDER_EXECUTE = "execute-order" + GCLOUD_ORDER_SCAN = "scan-order" GCLOUD_SCHEDULER_PREFIX = "production" GCLOUD_BRACKET_QUEUE = "production-siarnaq-bracket" GCLOUD_RATING_QUEUE = "production-siarnaq-rating" diff --git a/deploy/.terraform.lock.hcl b/deploy/.terraform.lock.hcl index adc6db3f4..ba3d6b82a 100644 --- a/deploy/.terraform.lock.hcl +++ b/deploy/.terraform.lock.hcl @@ -6,6 +6,7 @@ provider "registry.terraform.io/hashicorp/google" { constraints = ">= 3.53.0, < 5.0.0" hashes = [ "h1:ZVDZuhYSIWhCkSuDkwFeSIJjn0/DcCxak2W/cHW4OQQ=", + "h1:aSRZcEKF2wOi/v24IA+k9J2Y7aKVV1cHi/R0V3EhxXQ=", "zh:17d60a6a6c1741cf1e09ac6731433a30950285eac88236e623ab4cbf23832ca3", "zh:1c70254c016439dbb75cab646b4beace6ceeff117c75d81f2cc27d41c312f752", "zh:35e2aa2cc7ac84ce55e05bb4de7b461b169d3582e56d3262e249ff09d64fe008", @@ -26,6 +27,7 @@ provider "registry.terraform.io/hashicorp/google-beta" { constraints = ">= 3.53.0, < 5.0.0" hashes = [ "h1:CSE8cBR85sVefrE2Kg6KT5vOb7855t158oMo+iDfL0M=", + "h1:YkCDGkP0AUZoNobLoxRnM52Pi4alYE9EFXalEu8p8E8=", "zh:40e9c7ec46955b4d79065a14185043a4ad6af8d0246715853fc5c99208b66980", "zh:5950a9ba2f96420ea5335b543e315b1a47a705f9a9abfc53c6fec52d084eddcb", "zh:5dfa98d32246a5d97e018f2b91b0e921cc6f061bc8591884f3b144f0d62f1c20", @@ -44,6 +46,7 @@ provider "registry.terraform.io/hashicorp/google-beta" { provider "registry.terraform.io/hashicorp/null" { version = "3.2.3" hashes = [ + "h1:+AnORRgFbRO6qqcfaQyeX80W0eX3VmjadjnUFUJTiXo=", "h1:I0Um8UkrMUb81Fxq/dxbr3HLP2cecTH2WMJiwKSrwQY=", "zh:22d062e5278d872fe7aed834f5577ba0a5afe34a3bdac2b81f828d8d3e6706d2", "zh:23dead00493ad863729495dc212fd6c29b8293e707b055ce5ba21ee453ce552d", @@ -63,6 +66,7 @@ provider "registry.terraform.io/hashicorp/null" { provider "registry.terraform.io/hashicorp/random" { version = "3.7.1" hashes = [ + "h1:/qtweZW2sk0kBNiQM02RvBXmlVdI9oYqRMCyBZ8XA98=", "h1:t152MY0tQH4a8fLzTtEWx70ITd3azVOrFDn/pQblbto=", "zh:3193b89b43bf5805493e290374cdda5132578de6535f8009547c8b5d7a351585", "zh:3218320de4be943e5812ed3de995946056db86eb8d03aa3f074e0c7316599bef", @@ -82,6 +86,7 @@ provider "registry.terraform.io/hashicorp/random" { provider "registry.terraform.io/hashicorp/tls" { version = "4.0.6" hashes = [ + "h1:dYSb3V94K5dDMtrBRLPzBpkMTPn+3cXZ/kIJdtFL+2M=", "h1:n3M50qfWfRSpQV9Pwcvuse03pEizqrmYEryxKky4so4=", "zh:10de0d8af02f2e578101688fd334da3849f56ea91b0d9bd5b1f7a243417fdda8", "zh:37fc01f8b2bc9d5b055dc3e78bfd1beb7c42cfb776a4c81106e19c8911366297", diff --git a/deploy/galaxy/main.tf b/deploy/galaxy/main.tf index f873a8671..440b3d769 100644 --- a/deploy/galaxy/main.tf +++ b/deploy/galaxy/main.tf @@ -167,8 +167,9 @@ module "titan" { gcp_zone = var.gcp_zone labels = merge(var.labels, {component="titan"}) - image = var.titan_image - storage_names = [google_storage_bucket.public.name, google_storage_bucket.secure.name] + image = var.titan_image + storage_names = [google_storage_bucket.public.name, google_storage_bucket.secure.name] + pubsub_topic_scan_name = module.siarnaq.pubsub_topic_scan_name } resource "google_secret_manager_secret" "saturn" { diff --git a/deploy/siarnaq/main.tf b/deploy/siarnaq/main.tf index 88317643c..ed806bd7a 100644 --- a/deploy/siarnaq/main.tf +++ b/deploy/siarnaq/main.tf @@ -1,5 +1,5 @@ locals { - pubsub_topics = toset(["compile", "execute"]) + pubsub_topics = toset(["compile", "execute", "scan"]) } resource "google_service_account" "this" { diff --git a/deploy/siarnaq/outputs.tf b/deploy/siarnaq/outputs.tf index 07a3d09da..d1b64a2c6 100644 --- a/deploy/siarnaq/outputs.tf +++ b/deploy/siarnaq/outputs.tf @@ -13,3 +13,7 @@ output "topic_compile_name" { output "topic_execute_name" { value = google_pubsub_topic.this["execute"].name } + +output "pubsub_topic_scan_name" { + value = google_pubsub_topic.this["scan"].name +} diff --git a/deploy/titan/main.tf b/deploy/titan/main.tf index 7c95ca09d..7b1552b9f 100644 --- a/deploy/titan/main.tf +++ b/deploy/titan/main.tf @@ -4,12 +4,6 @@ resource "google_service_account" "this" { description = "Service account for perform Titan actions" } -resource "google_project_iam_member" "eventarc" { - project = var.gcp_project - role = "roles/eventarc.eventReceiver" - member = "serviceAccount:${google_service_account.this.email}" -} - resource "google_storage_bucket_iam_member" "this" { for_each = toset(var.storage_names) @@ -18,42 +12,28 @@ resource "google_storage_bucket_iam_member" "this" { member = "serviceAccount:${google_service_account.this.email}" } -data "google_storage_project_service_account" "gcs_account" { -} - -resource "google_project_iam_member" "storage_pubsub" { - project = var.gcp_project - role = "roles/pubsub.publisher" - member = "serviceAccount:${data.google_storage_project_service_account.gcs_account.email_address}" -} - -resource "google_eventarc_trigger" "this" { - for_each = toset(var.storage_names) +resource "google_pubsub_subscription" "scan" { + name = "${var.name}-scan" + topic = var.pubsub_topic_scan_name + labels = var.labels - name = "${var.name}-${each.value}" - location = var.gcp_region - service_account = google_service_account.this.email - labels = var.labels + push_config { + push_endpoint = google_cloud_run_service.this.status[0].url - matching_criteria { - attribute = "type" - value = "google.cloud.storage.object.v1.metadataUpdated" + oidc_token { + service_account_email = google_service_account.this.email + } } - matching_criteria { - attribute = "bucket" - value = each.value - } + ack_deadline_seconds = 600 + message_retention_duration = "604800s" # 7 days - destination { - cloud_run_service { - service = google_cloud_run_service.this.name - region = var.gcp_region - } + retry_policy { + minimum_backoff = "10s" + maximum_backoff = "600s" } depends_on = [ - google_project_iam_member.eventarc, google_cloud_run_service_iam_member.this, ] } diff --git a/deploy/titan/variables.tf b/deploy/titan/variables.tf index 6d27ba19a..34b6c15ee 100644 --- a/deploy/titan/variables.tf +++ b/deploy/titan/variables.tf @@ -32,3 +32,8 @@ variable "storage_names" { description = "Name of Google Cloud Storage buckets to be scanned" type = list(string) } + +variable "pubsub_topic_scan_name" { + description = "Name of the Pub/Sub topic for scan requests" + type = string +} diff --git a/titan/Dockerfile b/titan/Dockerfile index 2ac5683b3..8cc96975c 100644 --- a/titan/Dockerfile +++ b/titan/Dockerfile @@ -10,7 +10,7 @@ COPY . . RUN GOOS=linux GOARCH=amd64 CGO_ENABLED=0 go build -o /titan -ldflags="-s -w" ./cmd/titan/main.go -FROM clamav/clamav:0.105.1_base +FROM clamav/clamav:1.4_base ENV APP_HOME /app WORKDIR $APP_HOME diff --git a/titan/README.md b/titan/README.md index 9076a6c61..98e94c0ad 100644 --- a/titan/README.md +++ b/titan/README.md @@ -7,20 +7,98 @@ scan these files for malware. Titan is our antivirus module that performs asynchronous malware scanning on-demand. -## How to request a scan +## Development + +### Running Tests + +#### Unit Tests (Fast, No Dependencies) -Titan is configured (by Terraform) to respond to scan requests for the `public` and -`secure` storage buckets. To request a scan, tag the object with the following metadata -after uploading it: +Run the fast unit tests with mocked dependencies: +```bash +cd titan +go test -v -short ./pkg/... ``` -Titan-Status: Unverified + +#### Integration Tests (Requires ClamAV) + +The integration tests require ClamAV to be installed and running. These tests verify the actual scanning functionality with a real clamd daemon. + +**Install ClamAV:** + +```bash +# Ubuntu/Debian +sudo apt-get install clamav clamav-daemon + +# macOS +brew install clamav ``` -Titan will detect this and scan the file. Once the file is scanned, the status will be -replaced with either of `Verified` or `Malicious`. +**Start ClamAV daemon:** + +```bash +# Ubuntu/Debian +sudo systemctl start clamav-daemon + +# macOS +# Edit /opt/homebrew/etc/clamav/clamd.conf if needed +clamd +``` + +**Run integration tests:** + +```bash +cd titan +go test -v ./pkg/... +``` + +**Run all tests with coverage:** + +```bash +go test -v -coverprofile=coverage.out ./pkg/... +go tool cover -html=coverage.out # View coverage in browser +``` + +**Run tests with race detection:** + +```bash +go test -v -race ./pkg/... +``` + +The tests are automatically run as part of CI on every pull request. The CI pipeline uses a minimal custom test database (only EICAR signature) rather than downloading the full ~8GB ClamAV database set. This keeps CI execution fast while still validating real scanning functionality. + +## How to request a scan + +To request a scan from Siarnaq, use the `titan.request_scan()` function with a blob: + +```python +from siarnaq.gcloud import titan + +# After uploading a file to GCS +titan.request_scan(blob) +``` + +This function: +1. Sets the blob's metadata to `Titan-Status: Unverified` +2. Publishes a scan request message to the Pub/Sub topic + +Once the file is scanned, Titan updates the metadata status to either `Verified` or +`Malicious`. The status can be checked using `titan.get_object()`. ## System architecture -Titan receives file metadata update events via Google EventArc. These events are pushed -to the server and the file is scanned. +Titan receives scan requests via Google Cloud Pub/Sub: + +1. **Siarnaq** publishes scan request messages to the `{environment}-siarnaq-scan` topic +2. **Pub/Sub** pushes these messages to Titan's Cloud Run service via OIDC-authenticated HTTP POST +3. **Titan** processes the message, retrieves the file from GCS, scans it with ClamAV, and updates the file's metadata + +The message payload is a simple JSON object: +```json +{ + "bucket": "bucket-name", + "name": "path/to/file" +} +``` + +Titan runs on Cloud Run and scales automatically based on incoming Pub/Sub push requests. diff --git a/titan/go.mod b/titan/go.mod index 1bc4b9e7c..e29f72358 100644 --- a/titan/go.mod +++ b/titan/go.mod @@ -5,7 +5,6 @@ go 1.18 require ( cloud.google.com/go/storage v1.28.1 github.com/baruwa-enterprise/clamd v1.0.1 - github.com/cloudevents/sdk-go/v2 v2.15.2 github.com/rs/zerolog v1.28.0 ) diff --git a/titan/pkg/titan/file.go b/titan/pkg/titan/file.go index ef7904e61..090a47861 100644 --- a/titan/pkg/titan/file.go +++ b/titan/pkg/titan/file.go @@ -19,8 +19,8 @@ const ( // StorageClient is a client to a file storage service. type StorageClient interface { - // GetFile retrieves the file specified by an event. - GetFile(context.Context, *EventPayload) (File, error) + // GetFile retrieves the file specified by a scan payload. + GetFile(context.Context, *ScanPayload) (File, error) } // File is a handle to manage a file stored in a file storage service. diff --git a/titan/pkg/titan/gcs.go b/titan/pkg/titan/gcs.go index a35ad4e6d..49c2fcd5d 100644 --- a/titan/pkg/titan/gcs.go +++ b/titan/pkg/titan/gcs.go @@ -26,7 +26,7 @@ func NewGCSClient(ctx context.Context) (*GCSClient, error) { return &GCSClient{client}, nil } -func (c *GCSClient) GetFile(ctx context.Context, data *EventPayload) (File, error) { +func (c *GCSClient) GetFile(ctx context.Context, data *ScanPayload) (File, error) { handle := c.client.Bucket(data.Bucket).Object(data.Name) attrs, err := handle.Attrs(ctx) if err != nil { diff --git a/titan/pkg/titan/scan.go b/titan/pkg/titan/scan.go index dae12bc93..7431b03f6 100644 --- a/titan/pkg/titan/scan.go +++ b/titan/pkg/titan/scan.go @@ -3,6 +3,7 @@ package titan import ( "context" "fmt" + "os" "github.com/baruwa-enterprise/clamd" ) @@ -19,8 +20,19 @@ type ClamdClient struct { } // NewClamdClient creates a new client to clamd. +// It uses TCP connection if CLAMD_TCP is set, otherwise Unix socket (default). func NewClamdClient(ctx context.Context) (*ClamdClient, error) { - client, err := clamd.NewClient("", "") + var client *clamd.Client + var err error + + // In CI or if CLAMD_TCP is set, use TCP connection + if os.Getenv("CLAMD_TCP") != "" { + client, err = clamd.NewClient("tcp", "127.0.0.1:3310") + } else { + // Default: use Unix socket + client, err = clamd.NewClient("", "") + } + if err != nil { return nil, fmt.Errorf("clamd.NewClient: %v", err) } diff --git a/titan/pkg/titan/titan.go b/titan/pkg/titan/titan.go index 1317477d9..4c6da2a5d 100644 --- a/titan/pkg/titan/titan.go +++ b/titan/pkg/titan/titan.go @@ -2,31 +2,40 @@ package titan import ( "context" + "encoding/base64" + "encoding/json" "errors" "fmt" + "io" "net/http" - cloudevents "github.com/cloudevents/sdk-go/v2" "github.com/rs/zerolog/log" ) -// EventArcTriggerType is the EventArc trigger type that this server should respond to. -const EventArcTriggerType = "google.cloud.storage.object.v1.metadataUpdated" +// PubsubMessage represents the structure of a Pub/Sub push message. +type PubsubMessage struct { + Message struct { + Data string `json:"data"` + MessageID string `json:"messageId"` + PublishTime string `json:"publishTime"` + } `json:"message"` + Subscription string `json:"subscription"` +} -// EventPayload contains the details of the event delivered by EventArc. -type EventPayload struct { +// ScanPayload contains the details of a scan request published by Siarnaq. +type ScanPayload struct { Bucket string `json:"bucket"` Name string `json:"name"` } -// WithLogger updates the logger context to include event payload details. -func (e *EventPayload) WithLogger(ctx context.Context) context.Context { - log := log.Ctx(ctx).With().Str("bucket", e.Bucket).Str("name", e.Name).Logger() +// WithLogger updates the logger context to include scan payload details. +func (s *ScanPayload) WithLogger(ctx context.Context) context.Context { + log := log.Ctx(ctx).With().Str("bucket", s.Bucket).Str("name", s.Name).Logger() return log.WithContext(ctx) } -// Titan is the main application that receives EventArc events from Google Cloud -// Storage, and sends them to clamd for scanning. +// Titan is the main application that receives Pub/Sub push messages from Siarnaq +// and sends files to clamd for scanning. type Titan struct { storage StorageClient scanner Scanner @@ -49,40 +58,61 @@ func New(ctx context.Context) (*Titan, error) { return &Titan{storage, scanner}, nil } -// Start begins the HTTP server to receive EventArc events. +// Start begins the HTTP server to receive Pub/Sub push messages. func (t *Titan) Start(ctx context.Context, addr string) error { - p, err := cloudevents.NewHTTP() - if err != nil { - return fmt.Errorf("cloudevents.NewHTTP: %v", err) + http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) { + if err := t.HandleHTTP(r.Context(), w, r); err != nil { + log.Ctx(r.Context()).Error().Err(err).Msg("Failed to handle request.") + // Always return 200 to prevent Pub/Sub from retrying (we don't want retries) + w.WriteHeader(http.StatusOK) + return + } + w.WriteHeader(http.StatusOK) + }) + + err := http.ListenAndServe(addr, nil) + if err != nil && !errors.Is(err, http.ErrServerClosed) { + return fmt.Errorf("http.ListenAndServe: %v", err) + } + return nil +} + +// HandleHTTP processes an HTTP request from Pub/Sub push subscription. +func (t *Titan) HandleHTTP(ctx context.Context, w http.ResponseWriter, r *http.Request) error { + if r.Method != http.MethodPost { + log.Ctx(ctx).Warn().Str("method", r.Method).Msg("Unexpected HTTP method.") + return fmt.Errorf("unexpected method: %s", r.Method) } - h, err := cloudevents.NewHTTPReceiveHandler(ctx, p, t.Handle) + body, err := io.ReadAll(r.Body) if err != nil { - return fmt.Errorf("cloudevents.NewHTTPReceiveHandler: %v", err) + log.Ctx(ctx).Warn().Err(err).Msg("Failed to read request body.") + return fmt.Errorf("io.ReadAll: %v", err) } - err = http.ListenAndServe(addr, h) - if err != nil && !errors.Is(err, http.ErrServerClosed) { - return fmt.Errorf("http.ListenAndServe: %v", err) + var pubsubMsg PubsubMessage + if err := json.Unmarshal(body, &pubsubMsg); err != nil { + log.Ctx(ctx).Warn().Err(err).Msg("Failed to parse Pub/Sub message.") + return fmt.Errorf("json.Unmarshal: %v", err) } - return nil -} -// Handle responds to a single EventArc event. -func (t *Titan) Handle(ctx context.Context, event cloudevents.Event) error { - if t := event.Type(); t != EventArcTriggerType { - log.Ctx(ctx).Warn().Str("eventType", t).Msgf("Unexpected event type: %v.", t) - return fmt.Errorf("unexpected event type: %v", t) + // Decode base64-encoded message data + data, err := base64.StdEncoding.DecodeString(pubsubMsg.Message.Data) + if err != nil { + log.Ctx(ctx).Warn().Err(err).Msg("Failed to decode message data.") + return fmt.Errorf("base64.DecodeString: %v", err) } - data := &EventPayload{} - if err := event.DataAs(data); err != nil { - log.Ctx(ctx).Warn().Err(err).Msg("Failed to parse event payload.") - return fmt.Errorf("event.DataAs: %v", err) + var payload ScanPayload + if err := json.Unmarshal(data, &payload); err != nil { + log.Ctx(ctx).Warn().Err(err).Msg("Failed to parse scan payload.") + return fmt.Errorf("json.Unmarshal payload: %v", err) } - ctx = data.WithLogger(ctx) - file, err := t.storage.GetFile(ctx, data) + ctx = payload.WithLogger(ctx) + log.Ctx(ctx).Debug().Str("messageId", pubsubMsg.Message.MessageID).Msg("Received scan request.") + + file, err := t.storage.GetFile(ctx, &payload) if err != nil { log.Ctx(ctx).Warn().Err(err).Msg("Failed to retrieve file.") return fmt.Errorf("storage.GetFile: %v", err) diff --git a/titan/pkg/titan/titan_integration_test.go b/titan/pkg/titan/titan_integration_test.go new file mode 100644 index 000000000..d5ec24c02 --- /dev/null +++ b/titan/pkg/titan/titan_integration_test.go @@ -0,0 +1,351 @@ +package titan + +import ( + "bytes" + "context" + "io" + "os" + "os/exec" + "strings" + "testing" + "time" +) + +// Integration tests that use actual ClamAV daemon. +// Run with: go test -v -tags=integration ./pkg/... +// Or skip with regular: go test ./pkg/... + +// testFile implements the File interface for testing +type testFile struct { + content []byte + status FileStatus +} + +func (f *testFile) NewReader(ctx context.Context) (io.ReadCloser, error) { + return io.NopCloser(bytes.NewReader(f.content)), nil +} + +func (f *testFile) Status() FileStatus { + return f.status +} + +func (f *testFile) SetStatus(ctx context.Context, status FileStatus) error { + f.status = status + return nil +} + +// setupClamD starts a clamd daemon for testing and returns a cleanup function. +// It waits for clamd to be ready before returning. +func setupClamD(t *testing.T) func() { + t.Helper() + + requireClamd := os.Getenv("REQUIRE_CLAMD") == "true" + + // Check if clamd is already running + if isClamdRunning() { + t.Log("Using existing clamd instance") + return func() {} // No cleanup needed + } + + // In CI, clamd should already be running - if not, fail immediately + if requireClamd { + t.Fatal("clamd is not running (required in CI - check CI setup step)") + } + + t.Log("clamd not found running, attempting to start locally...") + + // Try to start clamd locally (for local development) + cmd := exec.Command("clamd") + if err := cmd.Start(); err != nil { + t.Skipf("Cannot start clamd (install clamav-daemon): %v", err) + } + + // Wait for clamd to be ready (up to 30 seconds for local environments) + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + + ready := make(chan bool) + go func() { + for { + if isClamdRunning() { + ready <- true + return + } + select { + case <-ctx.Done(): + return + case <-time.After(500 * time.Millisecond): + // Continue waiting + } + } + }() + + select { + case <-ready: + t.Log("clamd started successfully") + case <-ctx.Done(): + if cmd.Process != nil { + cmd.Process.Kill() + } + t.Skip("clamd did not start in time") + } + + return func() { + if cmd.Process != nil { + cmd.Process.Kill() + } + } +} + +// isClamdRunning checks if clamd is responding +func isClamdRunning() bool { + client, err := NewClamdClient(context.Background()) + if err != nil { + return false + } + // Try to ping clamd to test connectivity + _, err = client.client.Ping(context.Background()) + return err == nil +} + +// TestClamdIntegration_CleanFile tests scanning a clean file with actual clamd +func TestClamdIntegration_CleanFile(t *testing.T) { + if testing.Short() { + t.Skip("Skipping integration test in short mode") + } + + cleanup := setupClamD(t) + defer cleanup() + + ctx := context.Background() + + // Create a clamd client + scanner, err := NewClamdClient(ctx) + if err != nil { + t.Fatalf("Failed to create clamd client: %v", err) + } + + // Create a clean test file + cleanContent := []byte("This is a clean test file with no malware.") + file := &testFile{ + content: cleanContent, + status: TitanStatusUnverified, + } + + // Scan the file + signatures, err := scanner.Scan(ctx, file) + if err != nil { + t.Fatalf("Scan failed: %v", err) + } + + // Should find no threats + if len(signatures) != 0 { + t.Errorf("Expected no threats, but found: %v", signatures) + } +} + +// TestClamdIntegration_EICARTestFile tests scanning the EICAR test file +// EICAR is a standard test file that all antivirus software should detect +func TestClamdIntegration_EICARTestFile(t *testing.T) { + if testing.Short() { + t.Skip("Skipping integration test in short mode") + } + + cleanup := setupClamD(t) + defer cleanup() + + ctx := context.Background() + + // Create a clamd client + scanner, err := NewClamdClient(ctx) + if err != nil { + t.Fatalf("Failed to create clamd client: %v", err) + } + + // EICAR test string - standard test file for antivirus software + // This is NOT actual malware, just a test pattern + eicarString := "X5O!P%@AP[4\\PZX54(P^)7CC)7}$EICAR-STANDARD-ANTIVIRUS-TEST-FILE!$H+H*" + file := &testFile{ + content: []byte(eicarString), + status: TitanStatusUnverified, + } + + // Scan the file + signatures, err := scanner.Scan(ctx, file) + if err != nil { + t.Fatalf("Scan failed: %v", err) + } + + // Should detect the EICAR test file + if len(signatures) == 0 { + t.Error("Expected EICAR test file to be detected, but no threats found") + } else { + t.Logf("Detected signatures: %v", signatures) + // Check that it found something related to EICAR + foundEICAR := false + for _, sig := range signatures { + if strings.Contains(strings.ToUpper(sig), "EICAR") { + foundEICAR = true + break + } + } + if !foundEICAR { + t.Errorf("Expected EICAR signature, but got: %v", signatures) + } + } +} + +// TestTitanIntegration_HandleFileWithClamD tests the full HandleFile workflow +func TestTitanIntegration_HandleFileWithClamD(t *testing.T) { + if testing.Short() { + t.Skip("Skipping integration test in short mode") + } + + cleanup := setupClamD(t) + defer cleanup() + + ctx := context.Background() + + // Create a real scanner + scanner, err := NewClamdClient(ctx) + if err != nil { + t.Fatalf("Failed to create clamd client: %v", err) + } + + // Create Titan with mock storage but real scanner + titan := &Titan{ + storage: &mockStorageClient{ + files: make(map[string]*mockFile), + }, + scanner: scanner, + } + + tests := []struct { + name string + fileContent []byte + initialStatus FileStatus + expectedStatus FileStatus + expectThreats bool + }{ + { + name: "clean file gets verified", + fileContent: []byte("This is a safe test file."), + initialStatus: TitanStatusUnverified, + expectedStatus: TitanStatusVerified, + expectThreats: false, + }, + { + name: "EICAR test file marked malicious", + fileContent: []byte("X5O!P%@AP[4\\PZX54(P^)7CC)7}$EICAR-STANDARD-ANTIVIRUS-TEST-FILE!$H+H*"), + initialStatus: TitanStatusUnverified, + expectedStatus: TitanStatusMalicious, + expectThreats: true, + }, + { + name: "untracked file not scanned", + fileContent: []byte("X5O!P%@AP[4\\PZX54(P^)7CC)7}$EICAR-STANDARD-ANTIVIRUS-TEST-FILE!$H+H*"), + initialStatus: TitanStatusUntracked, + expectedStatus: TitanStatusUntracked, + expectThreats: false, // Should not scan + }, + { + name: "already verified file not rescanned", + fileContent: []byte("X5O!P%@AP[4\\PZX54(P^)7CC)7}$EICAR-STANDARD-ANTIVIRUS-TEST-FILE!$H+H*"), + initialStatus: TitanStatusVerified, + expectedStatus: TitanStatusVerified, + expectThreats: false, // Should not scan + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + file := &testFile{ + content: tt.fileContent, + status: tt.initialStatus, + } + + err := titan.HandleFile(ctx, file) + if err != nil { + t.Logf("HandleFile returned error: %v", err) + } + + if file.Status() != tt.expectedStatus { + t.Errorf("Status = %v, want %v", file.Status(), tt.expectedStatus) + } + }) + } +} + +// TestClamdIntegration_LargeFile tests scanning a larger file +func TestClamdIntegration_LargeFile(t *testing.T) { + if testing.Short() { + t.Skip("Skipping integration test in short mode") + } + + cleanup := setupClamD(t) + defer cleanup() + + ctx := context.Background() + + scanner, err := NewClamdClient(ctx) + if err != nil { + t.Fatalf("Failed to create clamd client: %v", err) + } + + // Create a larger clean file (1MB of repeated text) + content := bytes.Repeat([]byte("This is a clean test file. "), 40000) + file := &testFile{ + content: content, + status: TitanStatusUnverified, + } + + signatures, err := scanner.Scan(ctx, file) + if err != nil { + t.Fatalf("Scan failed: %v", err) + } + + if len(signatures) != 0 { + t.Errorf("Expected no threats in large clean file, but found: %v", signatures) + } +} + +// TestClamdIntegration_MultipleScans tests scanning multiple files in sequence +func TestClamdIntegration_MultipleScans(t *testing.T) { + if testing.Short() { + t.Skip("Skipping integration test in short mode") + } + + cleanup := setupClamD(t) + defer cleanup() + + ctx := context.Background() + + scanner, err := NewClamdClient(ctx) + if err != nil { + t.Fatalf("Failed to create clamd client: %v", err) + } + + // Scan multiple files to ensure the scanner handles multiple requests + for i := 0; i < 5; i++ { + file := &testFile{ + content: []byte("Clean file number " + string(rune(i))), + status: TitanStatusUnverified, + } + + signatures, err := scanner.Scan(ctx, file) + if err != nil { + t.Fatalf("Scan %d failed: %v", i, err) + } + + if len(signatures) != 0 { + t.Errorf("Scan %d: expected no threats, but found: %v", i, signatures) + } + } +} + +// TestMain can be used to set up test environment +func TestMain(m *testing.M) { + // m.Run() must be called first to parse flags before testing.Short() can be used + code := m.Run() + os.Exit(code) +} diff --git a/titan/pkg/titan/titan_test.go b/titan/pkg/titan/titan_test.go new file mode 100644 index 000000000..c47ff3077 --- /dev/null +++ b/titan/pkg/titan/titan_test.go @@ -0,0 +1,350 @@ +package titan + +import ( + "bytes" + "context" + "encoding/base64" + "encoding/json" + "io" + "net/http" + "net/http/httptest" + "testing" +) + +// Mock implementations for testing + +type mockFile struct { + status FileStatus + reader io.ReadCloser +} + +func (m *mockFile) NewReader(ctx context.Context) (io.ReadCloser, error) { + return m.reader, nil +} + +func (m *mockFile) Status() FileStatus { + return m.status +} + +func (m *mockFile) SetStatus(ctx context.Context, status FileStatus) error { + m.status = status + return nil +} + +type mockStorageClient struct { + files map[string]*mockFile +} + +func (m *mockStorageClient) GetFile(ctx context.Context, payload *ScanPayload) (File, error) { + key := payload.Bucket + "/" + payload.Name + if file, ok := m.files[key]; ok { + return file, nil + } + return &mockFile{status: TitanStatusUnverified}, nil +} + +type mockScanner struct { + scanResults map[string][]string + nextResult []string +} + +func (m *mockScanner) Scan(ctx context.Context, file File) ([]string, error) { + // If nextResult is set, return it + if m.nextResult != nil { + return m.nextResult, nil + } + // Return empty slice (no threats) for testing + return []string{}, nil +} + +// Partitions: +// - message format: valid Pub/Sub structure, invalid JSON +// - payload encoding: correctly base64-encoded, malformed base64 +// - file path: simple filename, nested path structure +// - HTTP method: POST (valid), GET/PUT/DELETE (invalid) +func TestHandleHTTP_PubSubPush(t *testing.T) { + tests := []struct { + name string + payload ScanPayload + wantStatusCode int + }{ + { + name: "valid pubsub push message", + payload: ScanPayload{ + Bucket: "test-bucket", + Name: "file.pdf", + }, + wantStatusCode: http.StatusOK, + }, + { + name: "pubsub message with nested path", + payload: ScanPayload{ + Bucket: "secure-bucket", + Name: "episode/bc24/team/42/resume.pdf", + }, + wantStatusCode: http.StatusOK, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + // Create Titan instance with mocks + titan := &Titan{ + storage: &mockStorageClient{ + files: make(map[string]*mockFile), + }, + scanner: &mockScanner{ + scanResults: make(map[string][]string), + }, + } + + // Create Pub/Sub push message format + payloadJSON, err := json.Marshal(tt.payload) + if err != nil { + t.Fatalf("failed to marshal payload: %v", err) + } + + pubsubMessage := PubsubMessage{ + Subscription: "projects/test/subscriptions/titan-scan", + } + pubsubMessage.Message.Data = base64.StdEncoding.EncodeToString(payloadJSON) + pubsubMessage.Message.MessageID = "test-message-123" + pubsubMessage.Message.PublishTime = "2025-11-30T12:00:00Z" + + body, err := json.Marshal(pubsubMessage) + if err != nil { + t.Fatalf("failed to marshal pubsub message: %v", err) + } + + // Create HTTP request + req := httptest.NewRequest(http.MethodPost, "/", bytes.NewReader(body)) + req.Header.Set("Content-Type", "application/json") + + // Create response recorder + w := httptest.NewRecorder() + + // Call handler + err = titan.HandleHTTP(context.Background(), w, req) + if err != nil { + t.Logf("HandleHTTP returned error (expected for test): %v", err) + } + + // Verify the message can be decoded correctly + var decoded PubsubMessage + if err := json.Unmarshal(body, &decoded); err != nil { + t.Errorf("failed to unmarshal pubsub message: %v", err) + } + + // Decode the base64 data + data, err := base64.StdEncoding.DecodeString(decoded.Message.Data) + if err != nil { + t.Errorf("failed to decode base64 data: %v", err) + } + + // Verify payload + var parsed ScanPayload + if err := json.Unmarshal(data, &parsed); err != nil { + t.Errorf("failed to unmarshal payload: %v", err) + } + + if parsed.Bucket != tt.payload.Bucket { + t.Errorf("bucket mismatch: got %v, want %v", parsed.Bucket, tt.payload.Bucket) + } + if parsed.Name != tt.payload.Name { + t.Errorf("name mismatch: got %v, want %v", parsed.Name, tt.payload.Name) + } + }) + } +} + +// Partitions: +// - JSON structure: valid, malformed syntax +// - required fields: both present, bucket missing, name missing +// - file path format: simple filename, nested directory structure, special characters +// - bucket name: valid GCS bucket name, empty string +func TestScanPayload(t *testing.T) { + tests := []struct { + name string + json string + want ScanPayload + wantErr bool + }{ + { + name: "valid payload", + json: `{"bucket":"test-bucket","name":"file.pdf"}`, + want: ScanPayload{ + Bucket: "test-bucket", + Name: "file.pdf", + }, + wantErr: false, + }, + { + name: "payload with nested path", + json: `{"bucket":"secure","name":"episode/bc24/submission/123/source.zip"}`, + want: ScanPayload{ + Bucket: "secure", + Name: "episode/bc24/submission/123/source.zip", + }, + wantErr: false, + }, + { + name: "invalid json", + json: `{invalid}`, + wantErr: true, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + var got ScanPayload + err := json.Unmarshal([]byte(tt.json), &got) + + if (err != nil) != tt.wantErr { + t.Errorf("Unmarshal() error = %v, wantErr %v", err, tt.wantErr) + return + } + + if !tt.wantErr { + if got.Bucket != tt.want.Bucket { + t.Errorf("Bucket = %v, want %v", got.Bucket, tt.want.Bucket) + } + if got.Name != tt.want.Name { + t.Errorf("Name = %v, want %v", got.Name, tt.want.Name) + } + } + }) + } +} + +// Partitions: +// - initial file status: Untracked, Unverified, Verified, Malicious +// - scan result: no threats (empty), single threat, multiple threats +// - scanner behavior: success, error/failure +func TestHandleFile(t *testing.T) { + tests := []struct { + name string + initialStatus FileStatus + scanSignatures []string + wantStatus FileStatus + }{ + { + name: "unverified file with no threats", + initialStatus: TitanStatusUnverified, + scanSignatures: []string{}, + wantStatus: TitanStatusVerified, + }, + { + name: "unverified file with malware", + initialStatus: TitanStatusUnverified, + scanSignatures: []string{"Win.Test.EICAR"}, + wantStatus: TitanStatusMalicious, + }, + { + name: "untracked file should not be scanned", + initialStatus: TitanStatusUntracked, + wantStatus: TitanStatusUntracked, + }, + { + name: "already verified file should not be rescanned", + initialStatus: TitanStatusVerified, + wantStatus: TitanStatusVerified, + }, + { + name: "already malicious file should not be rescanned", + initialStatus: TitanStatusMalicious, + wantStatus: TitanStatusMalicious, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + // Create mock file + file := &mockFile{ + status: tt.initialStatus, + } + + // Create mock scanner that returns configured signatures + scanner := &mockScanner{ + scanResults: make(map[string][]string), + nextResult: tt.scanSignatures, + } + + // Create Titan instance + titan := &Titan{ + storage: &mockStorageClient{ + files: make(map[string]*mockFile), + }, + scanner: scanner, + } + + // Handle the file + err := titan.HandleFile(context.Background(), file) + if err != nil { + t.Logf("HandleFile returned error: %v", err) + } + + // For unverified files, status should change + // For other files, status should remain unchanged + if tt.initialStatus == TitanStatusUnverified { + if file.Status() != tt.wantStatus { + t.Errorf("Status = %v, want %v", file.Status(), tt.wantStatus) + } + } else { + // File should not be scanned if not unverified + if file.Status() != tt.initialStatus { + t.Errorf("Status changed from %v to %v, should have remained %v", + tt.initialStatus, file.Status(), tt.initialStatus) + } + } + }) + } +} + +// Partitions: +// - message structure: valid Google format, missing fields, extra fields +// - data encoding: valid base64, invalid base64, empty data +// - nested payload: valid JSON, invalid JSON, empty payload +// - field types: correct types, type mismatches +func TestPubsubMessageFormat(t *testing.T) { + // Example message from Google Cloud Pub/Sub documentation + messageJSON := `{ + "message": { + "data": "eyJidWNrZXQiOiJ0ZXN0IiwibmFtZSI6ImZpbGUucGRmIn0=", + "messageId": "136969346945", + "publishTime": "2025-11-30T12:00:00.000Z" + }, + "subscription": "projects/myproject/subscriptions/mysubscription" + }` + + var msg PubsubMessage + if err := json.Unmarshal([]byte(messageJSON), &msg); err != nil { + t.Fatalf("failed to unmarshal message: %v", err) + } + + // Verify structure + if msg.Message.MessageID != "136969346945" { + t.Errorf("MessageID = %v, want 136969346945", msg.Message.MessageID) + } + + if msg.Subscription != "projects/myproject/subscriptions/mysubscription" { + t.Errorf("Subscription = %v, want projects/myproject/subscriptions/mysubscription", msg.Subscription) + } + + // Decode the data + data, err := base64.StdEncoding.DecodeString(msg.Message.Data) + if err != nil { + t.Fatalf("failed to decode data: %v", err) + } + + var payload ScanPayload + if err := json.Unmarshal(data, &payload); err != nil { + t.Fatalf("failed to unmarshal payload: %v", err) + } + + if payload.Bucket != "test" { + t.Errorf("Bucket = %v, want test", payload.Bucket) + } + if payload.Name != "file.pdf" { + t.Errorf("Name = %v, want file.pdf", payload.Name) + } +}