-
Notifications
You must be signed in to change notification settings - Fork 174
Implement TestMetadata_Subscriptions E2E test #3324
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Implement TestMetadata_Subscriptions E2E test #3324
Conversation
This test verifies the full Knative Eventing flow using func subscribe: - Creates a producer function that sends CloudEvents to the broker - Creates a subscriber function that receives events via a Trigger - Validates the complete event delivery pipeline Fixes knative#3202
|
Hi @Kunal1522. Thanks for your PR. I'm waiting for a github.com member to verify that this patch is reasonable to test. If it is, they should reply with Once the patch is verified, the new status will be reflected by the I understand the commands that are listed here. DetailsInstructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes-sigs/prow repository. |
Codecov Report✅ All modified and coverable lines are covered by tests. Additional details and impacted files@@ Coverage Diff @@
## main #3324 +/- ##
==========================================
+ Coverage 55.09% 55.24% +0.14%
==========================================
Files 170 170
Lines 19850 19963 +113
==========================================
+ Hits 10937 11029 +92
- Misses 7845 7855 +10
- Partials 1068 1079 +11
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
The subscribe command doesn't support the -v (verbose) flag that newCmd() automatically adds when FUNC_E2E_VERBOSE=true. This causes test failures. Changed to use exec.Command directly for the subscribe call to bypass the automatic verbose flag addition while keeping it for other commands.
Check for broker-ingress service before running the test to gracefully skip if Knative Eventing is not installed or not ready yet.
The test was failing in CI because it tried to send events before the broker-ingress service was registered in DNS. This adds a retry loop to wait for the service to be available before proceeding. Fixes timing issues in fresh cluster environments.
- Remove unused sendEventToBroker* functions - Use callback pattern for reliable event verification - Clean up comments and reduce code size
|
i spent quite a bit of time trying different approaches to get the producer, subscriber, and broker pods to talk back to the test, basically saying "hey, i got the event." turns out a lot of things like reading pod logs, writing files inside containers, or hitting the broker directly don't really work because of container isolation. so instead of the test trying to pull info from pods, i just flipped it. now the subscriber pushes a callback to the test. the test spins up a small http server on a dynamic port, passes the url via CALLBACK_URL env var, and when the subscriber gets an event it posts the event id back. a go channel catches this and the test knows everything worked. hopefully this solves the todo at line 558-563 e2e/e2e_metadata_test.go |
|
/ok-to-test |
|
@Kunal1522: Cannot trigger testing until a trusted user reviews the PR and leaves an DetailsIn response to this:
Instructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes-sigs/prow repository. |
Yeah It seems good now , we should use defer for cleanup i think, not sure :) |
|
/ok-to-test |
|
PTAL @jrangelramos |
e2e/e2e_metadata_test.go
Outdated
| if !createBrokerWithCheck(t, Namespace, brokerName) { | ||
| t.Fatal("Failed to create broker") | ||
| } | ||
| defer deleteBroker(t, Namespace, brokerName) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think you can do the t.Fatal() from within createBrokerWithCheck(). Also you can install clean up in createBrokerWithCheck() using t.Cleanup() in there too. So these 4 lines turn into just one.
e2e/e2e_metadata_test.go
Outdated
| callbackURL, cleanup := startCallbackServer(t, eventReceived) | ||
| defer cleanup() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Again install the cleanup routine in startCallbackServer() using t.Cleanup().
|
I am fine with calling back host, however I am curious why just tapping to the consumer logs would not be sufficient? |
e2e/e2e_metadata_test.go
Outdated
| } | ||
|
|
||
| // HTTP handler that sends CloudEvents to broker | ||
| func producerCode(namespace, broker string) string { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need this service? Cannot we directly push an event to the broker from the host?
like so:
// THIS RUNS DIRECTLY IN TEST ON HOST
transport := fnhttp.NewRoundTripper() // our special transport that can dial services in cluster
defer transport.Close()
client := http.Client{
Transport: transport,
Timeout: 30 * time.Second,
}
eventID := "xyz"
url := "http://broker-ingress.knative-eventing.svc/NAMESPACE/BROKER"
req, _ := http.NewRequest("POST", url, strings.NewReader(`{}`))
req.Header.Set("Content-Type", "application/json")
req.Header.Set("ce-specversion", "1.0")
req.Header.Set("ce-type", "test.event")
req.Header.Set("ce-source", "producer")
req.Header.Set("ce-id", eventID)
resp, err := client.Do(req)
if err != nil {
return err
}
defer resp.Body.Close()There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
alternatively you can temporarily expose the broker via ingress:
func exposeBroker(t *testing.T) {
ingress := `apiVersion: networking.k8s.io/v1
kind: Ingress
metadata:
name: broker-ingress
namespace: knative-eventing
spec:
ingressClassName: contour-external
rules:
- host: broker.localtest.me
http:
paths:
- backend:
service:
name: broker-ingress
port:
number: 80
pathType: Prefix
path: /`
cmd := exec.Command("kubectl", "apply", "-f", "-")
cmd.Stdin = strings.NewReader(ingress)
cmd.Stdout = os.Stderr
cmd.Stderr = os.Stderr
cmd.Env = append(os.Environ(), "KUBECONFIG="+Kubeconfig)
err := cmd.Run()
if err != nil {
t.Fatal(err)
}
t.Cleanup(func() {
cmd := exec.Command("kubectl", "delete", "ingress", "broker-ingress", "-n", "knative-eventing", "--ignore-not-found")
cmd.Env = append(os.Environ(), "KUBECONFIG="+Kubeconfig)
_ = cmd.Run()
})
}Then you can call broker from the host as http://broker.localtest.me/<namespace>/<broker>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also we may consider exposing the broker permanently in hack/cluster.sh if it is useful for testing.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
After some consideration it would be probably best to just use the tunneling transport (fnhttp.NewRoundTripper()). It should work easily also on clusters other than our testing KinD cluster.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i am trying this out
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need this service? Cannot we directly push an event to the broker from the host? like so:
// THIS RUNS DIRECTLY IN TEST ON HOST transport := fnhttp.NewRoundTripper() // our special transport that can dial services in cluster defer transport.Close() client := http.Client{ Transport: transport, Timeout: 30 * time.Second, } eventID := "xyz" url := "http://broker-ingress.knative-eventing.svc/NAMESPACE/BROKER" req, _ := http.NewRequest("POST", url, strings.NewReader(`{}`)) req.Header.Set("Content-Type", "application/json") req.Header.Set("ce-specversion", "1.0") req.Header.Set("ce-type", "test.event") req.Header.Set("ce-source", "producer") req.Header.Set("ce-id", eventID) resp, err := client.Do(req) if err != nil { return err } defer resp.Body.Close()
Thanks for pointing this out . I thought the broker URL (broker-ingress.knative-eventing.svc.cluster.local) was unreachable from the host since it uses Kubernetes internal DNS .so I deployed a producer function t to send event
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, it's not really accessible. However we have custom dialer/transport that does tunneling for us.
transport := fnhttp.NewRoundTripper() // our special transport that can dial services in cluster
defer transport.Close()
client := http.Client{
Transport: transport,
Timeout: 30 * time.Second,
}above code basically create Go HTTP client that internally may spawn helper pod in cluster and do TCP dial using that pod.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
// NewRoundTripper returns new closable RoundTripper that first tries to dial connection in standard way,
// if the dial operation fails due to hostname resolution the RoundTripper tries to dial from in cluster pod.
//
// This is useful for accessing cluster internal services (pushing a CloudEvent into Knative broker).
func NewRoundTripper(opts ...Option) RoundTripCloser {|
Few comments, otherwise great job. |
The thing is: for me it appears the callback is suppressed FW. |
|
possible improvements: diff --git a/e2e/e2e_metadata_test.go b/e2e/e2e_metadata_test.go
index 6e018482b..d4d2c3791 100644
--- a/e2e/e2e_metadata_test.go
+++ b/e2e/e2e_metadata_test.go
@@ -4,21 +4,23 @@
package e2e
import (
+ "bufio"
"bytes"
"context"
"encoding/json"
"fmt"
"io"
- "net"
"net/http"
"os"
"os/exec"
"path/filepath"
+ "regexp"
"strings"
"testing"
"time"
fn "knative.dev/func/pkg/functions"
+ fnhttp "knative.dev/func/pkg/http"
)
// ---------------------------------------------------------------------------
@@ -562,18 +564,15 @@ func Handle(w http.ResponseWriter, _ *http.Request) {
// Tests the complete event flow using func subscribe
func TestMetadata_Subscriptions(t *testing.T) {
brokerName := "default"
- if !createBrokerWithCheck(t, Namespace, brokerName) {
- t.Fatal("Failed to create broker")
- }
- defer deleteBroker(t, Namespace, brokerName)
+
+ createBrokerWithCheck(t, Namespace, brokerName)
uniqueEventID := fmt.Sprintf("e2e-test-%d", time.Now().UnixNano())
- eventReceived := make(chan string, 10)
- callbackURL, cleanup := startCallbackServer(t, eventReceived)
- defer cleanup()
+ eventReceived := waitForEvent(t, uniqueEventID)
- subscriberName := "func-e2e-test-subscriber"
+ subscriber := "func-e2e-test-subscriber"
+ subscriberName := subscriber
subscriberRoot := fromCleanEnv(t, subscriberName)
if err := newCmd(t, "init", "-l=go", "-t=cloudevents").Run(); err != nil {
t.Fatal(err)
@@ -588,10 +587,6 @@ func TestMetadata_Subscriptions(t *testing.T) {
if err := subscribeCmd.Run(); err != nil {
t.Fatal(err)
}
- if err := newCmd(t, "config", "envs", "add",
- "--name=CALLBACK_URL", "--value="+callbackURL).Run(); err != nil {
- t.Fatal(err)
- }
f, err := fn.NewFunction(subscriberRoot)
if err != nil {
@@ -612,34 +607,28 @@ func TestMetadata_Subscriptions(t *testing.T) {
}
waitForTrigger(t, Namespace, subscriberName)
- producerName := "func-e2e-test-producer"
- producerRoot := fromCleanEnv(t, producerName)
- if err := newCmd(t, "init", "-l=go", "-t=http").Run(); err != nil {
- t.Fatal(err)
- }
- if err := os.WriteFile(filepath.Join(producerRoot, "handle.go"),
- []byte(producerCode(Namespace, brokerName)), 0644); err != nil {
- t.Fatal(err)
- }
- if err := newCmd(t, "deploy").Run(); err != nil {
- t.Fatal(err)
- }
- defer clean(t, producerName, Namespace)
-
- producerURL := fmt.Sprintf("http://%s.%s.%s", producerName, Namespace, Domain)
- if !waitFor(t, producerURL, withContentMatch("Producer is ready")) {
- t.Fatal("producer not ready")
+ transport := fnhttp.NewRoundTripper()
+ defer transport.Close()
+ client := http.Client{
+ Transport: transport,
+ Timeout: 30 * time.Second,
}
+ url := fmt.Sprintf("http://broker-ingress.knative-eventing.svc/%s/%s", Namespace, brokerName)
+ req, _ := http.NewRequestWithContext(t.Context(), "POST", url, strings.NewReader(`{}`))
+ req.Header.Set("Content-Type", "application/json")
+ req.Header.Set("ce-specversion", "1.0")
+ req.Header.Set("ce-type", "test.event")
+ req.Header.Set("ce-source", "producer")
+ req.Header.Set("ce-id", uniqueEventID)
- client := http.Client{Timeout: 30 * time.Second}
- resp, err := client.Post(producerURL+"?event_id="+uniqueEventID, "application/json", strings.NewReader("{}"))
+ resp, err := client.Do(req)
if err != nil {
t.Fatalf("Failed to invoke producer: %v", err)
}
body, _ := io.ReadAll(resp.Body)
resp.Body.Close()
- if resp.StatusCode != 200 {
- t.Fatalf("Broker rejected event: %s", body)
+ if resp.StatusCode != 202 {
+ t.Fatalf("Broker rejected event: code: %d, body: %q", resp.StatusCode, body)
}
t.Logf("Broker accepted event %s", uniqueEventID)
@@ -651,29 +640,38 @@ func TestMetadata_Subscriptions(t *testing.T) {
}
}
-// Starts HTTP server to receive callbacks from subscriber pod
-func startCallbackServer(t *testing.T, ch chan<- string) (string, func()) {
+func waitForEvent(t *testing.T, eventId string) <-chan string {
t.Helper()
- hostIP := getHostIPForCluster(t)
- listener, err := net.Listen("tcp", ":0")
+
+ eventReceived := make(chan string, 10)
+
+ ctx, cancel := context.WithCancel(context.Background())
+ t.Cleanup(cancel)
+
+ pr, pw := io.Pipe()
+ cmd := exec.CommandContext(ctx, "stern", "func-e2e-test-subscriber-.*")
+ cmd.Stderr = io.Discard
+ cmd.Stdout = pw
+ cmd.Env = append(os.Environ(), "KUBECONFIG="+Kubeconfig)
+ err := cmd.Start()
if err != nil {
- t.Fatalf("Failed to listen: %v", err)
+ t.Fatal(err)
}
- port := listener.Addr().(*net.TCPAddr).Port
-
- mux := http.NewServeMux()
- mux.HandleFunc("/callback", func(w http.ResponseWriter, r *http.Request) {
- body, _ := io.ReadAll(r.Body)
- select {
- case ch <- string(body):
- default:
+ go func() {
+ r := bufio.NewReader(pr)
+ m, e := regexp.MatchReader(`EVENT_RECEIVED: id=`+eventId, r)
+ if e != nil {
+ panic(e)
}
- w.WriteHeader(200)
- })
+ if m {
+ eventReceived <- "OK"
+ close(eventReceived)
+ cancel()
+ }
+ _, _ = io.Copy(io.Discard, r)
+ }()
- srv := &http.Server{Handler: mux}
- go srv.Serve(listener)
- return fmt.Sprintf("http://%s:%d/callback", hostIP, port), func() { srv.Shutdown(context.Background()) }
+ return eventReceived
}
// CloudEvents handler that calls back to test server
@@ -681,26 +679,13 @@ func subscriberCode() string {
return `package function
import (
- "bytes"
"context"
"fmt"
- "net/http"
- "os"
- "time"
"github.com/cloudevents/sdk-go/v2/event"
)
func Handle(ctx context.Context, e event.Event) (*event.Event, error) {
fmt.Printf("EVENT_RECEIVED: id=%s type=%s source=%s\n", e.ID(), e.Type(), e.Source())
- if url := os.Getenv("CALLBACK_URL"); url != "" {
- c := &http.Client{Timeout: 5 * time.Second}
- if resp, err := c.Post(url, "text/plain", bytes.NewBufferString(e.ID())); err == nil {
- resp.Body.Close()
- fmt.Printf("Callback sent to %s\n", url)
- } else {
- fmt.Printf("Callback failed: %v\n", err)
- }
- }
r := event.New()
r.SetID("response-" + e.ID())
r.SetSource("subscriber")
@@ -711,84 +696,8 @@ func Handle(ctx context.Context, e event.Event) (*event.Event, error) {
`
}
-// HTTP handler that sends CloudEvents to broker
-func producerCode(namespace, broker string) string {
- return fmt.Sprintf(`package function
-
-import (
- "fmt"
- "io"
- "net/http"
- "strings"
- "time"
-)
-
-func Handle(w http.ResponseWriter, r *http.Request) {
- if r.Method == "GET" {
- fmt.Fprint(w, "Producer is ready")
- return
- }
- eventID := r.URL.Query().Get("event_id")
- if eventID == "" {
- eventID = fmt.Sprintf("evt-%%d", time.Now().UnixNano())
- }
- url := "http://broker-ingress.knative-eventing.svc.cluster.local/%s/%s"
- req, _ := http.NewRequest("POST", url, strings.NewReader(`+"`"+`{}`+"`"+`))
- req.Header.Set("Content-Type", "application/json")
- req.Header.Set("ce-specversion", "1.0")
- req.Header.Set("ce-type", "test.event")
- req.Header.Set("ce-source", "producer")
- req.Header.Set("ce-id", eventID)
- resp, err := (&http.Client{Timeout: 30 * time.Second}).Do(req)
- if err != nil {
- http.Error(w, err.Error(), 500)
- return
- }
- defer resp.Body.Close()
- body, _ := io.ReadAll(resp.Body)
- if resp.StatusCode >= 200 && resp.StatusCode < 300 {
- fmt.Fprintf(w, "Event %%s sent (%%d)", eventID, resp.StatusCode)
- } else {
- http.Error(w, string(body), 500)
- }
-}
-`, namespace, broker)
-}
-
-// Returns host IP accessible from kind cluster
-func getHostIPForCluster(t *testing.T) string {
- t.Helper()
- cmd := exec.Command("docker", "network", "inspect", "kind", "-f", "{{(index .IPAM.Config 0).Gateway}}")
- output, err := cmd.Output()
- if err == nil && len(output) > 0 {
- ip := strings.TrimSpace(string(output))
-
- if ip != "" && !strings.Contains(ip, ":") {
- t.Logf("Using kind network gateway: %s", ip)
- return ip
- }
- }
-
- cmd = exec.Command("ip", "route", "get", "1")
- output, err = cmd.Output()
- if err == nil {
- // Parse output like "1.0.0.0 via 192.168.1.1 dev eth0 src 192.168.1.100"
- fields := strings.Fields(string(output))
- for i, f := range fields {
- if f == "src" && i+1 < len(fields) {
- t.Logf("Using host IP from route: %s", fields[i+1])
- return fields[i+1]
- }
- }
- }
-
- // Last resort: use common Docker bridge IP
- t.Log("Warning: Could not determine host IP, using 172.17.0.1 (Docker default)")
- return "172.17.0.1"
-}
-
-// createBrokerWithCheck creates a Knative Broker and returns true if successful.
-func createBrokerWithCheck(t *testing.T, namespace, name string) bool {
+// createBrokerWithCheck creates a Knative Broker
+func createBrokerWithCheck(t *testing.T, namespace, name string) {
t.Helper()
brokerYAML := fmt.Sprintf(`apiVersion: eventing.knative.dev/v1
@@ -804,9 +713,11 @@ metadata:
output, err := cmd.CombinedOutput()
if err != nil {
- t.Logf("Failed to create broker: %v, output: %s", err, string(output))
- return false
+ t.Fatalf("Failed to create broker: %v, output: %s", err, string(output))
}
+ t.Cleanup(func() {
+ deleteBroker(t, namespace, name)
+ })
t.Logf("Created broker %s in namespace %s", name, namespace)
waitCmd := exec.Command("kubectl", "wait", "--for=condition=Ready",
@@ -815,7 +726,6 @@ metadata:
waitOutput, err := waitCmd.CombinedOutput()
if err != nil {
t.Logf("Broker not ready: %v, output: %s", err, string(waitOutput))
- return false
}
t.Logf("Broker %s is ready", name)
@@ -826,12 +736,11 @@ metadata:
checkCmd.Env = append(os.Environ(), "KUBECONFIG="+Kubeconfig)
if err := checkCmd.Run(); err == nil {
t.Log("broker-ingress service is available")
- return true
+ return
}
time.Sleep(2 * time.Second)
}
- t.Log("broker-ingress service check timed out")
- return false
+ t.Fatal("broker-ingress service check timed out")
}
// deleteBroker removes a Knative Broker from the given namespace. |
cannot you wait for the event via logs like this: func waitForEvent(t *testing.T, eventId string) <-chan string {
t.Helper()
eventReceived := make(chan string, 10)
ctx, cancel := context.WithCancel(context.Background())
t.Cleanup(cancel)
pr, pw := io.Pipe()
cmd := exec.CommandContext(ctx, "stern", "func-e2e-test-subscriber-.*")
cmd.Stderr = io.Discard
cmd.Stdout = pw
cmd.Env = append(os.Environ(), "KUBECONFIG="+Kubeconfig)
err := cmd.Start()
if err != nil {
t.Fatal(err)
}
go func() {
r := bufio.NewReader(pr)
m, e := regexp.MatchReader(`EVENT_RECEIVED: id=`+eventId, r)
if e != nil {
panic(e)
}
if m {
eventReceived <- "OK"
close(eventReceived)
cancel()
}
_, _ = io.Copy(io.Discard, r)
}()
return eventReceived
} |
|
@Kunal1522 check out #3364 |
i originally tried to read logs but faced segmentation issues when spawning kubectl subprocesses to read files from the container. at last i could not resolve it so went with calling host.i think this will not be issue with stern approach u suggested .also maybe becoz of rootless podman on ur machine the subscriber pod had issues communicating back to http service. |
Yes, probably because of how networking works by default on rootless podman it does not work for me. So I think we should abandon that call back to host. There may be more devs using rootless podman. |
okay then i will go with design u suggested .am testing this on my end and will push it soon |
…ducer and callback server, and using stern to verify event reception.
|
/approve |
|
[APPROVALNOTIFIER] This PR is APPROVED This pull-request has been approved by: Kunal1522, matejvasek The full list of commands accepted by this bot can be found here. The pull request process is described here DetailsNeeds approval from an approver in each of these files:
Approvers can indicate their approval by writing |
Fixes #3202
What's in the test
Main test function:
Helper functions:
kubectl apply. Waits for it to be ready before returning.func subscribe) to reach Ready state. Without this, events might get lost if we send them too early.Unused legacy helpers (kept for potential future use):