Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion event_reporter/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,10 @@ func (c *eventReporterController) Run(ctx context.Context) {
}
trackingMethod := argoutil.GetTrackingMethod(c.settingsMgr)

err = c.applicationEventReporter.StreamApplicationEvents(ctx, &a, eventProcessingStartedAt, ignoreResourceCache, appInstanceLabelKey, trackingMethod)
err = c.applicationEventReporter.StreamApplicationEvents(ctx, &a, eventProcessingStartedAt, ignoreResourceCache, &reporter.ArgoTrackingMetadata{
AppInstanceLabelKey: &appInstanceLabelKey,
TrackingMethod: &trackingMethod,
})
if err != nil {
return err
}
Expand Down
87 changes: 55 additions & 32 deletions event_reporter/reporter/application_event_reporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,7 @@ type ApplicationEventReporter interface {
a *appv1.Application,
eventProcessingStartedAt string,
ignoreResourceCache bool,
appInstanceLabelKey string,
trackingMethod appv1.TrackingMethod,
argoTrackingMetadata *ArgoTrackingMetadata,
) error
ShouldSendApplicationEvent(ae *appv1.ApplicationWatchEvent) (shouldSend bool, syncStatusChanged bool)
}
Expand Down Expand Up @@ -113,8 +112,7 @@ func (s *applicationEventReporter) StreamApplicationEvents(
a *appv1.Application,
eventProcessingStartedAt string,
ignoreResourceCache bool,
appInstanceLabelKey string,
trackingMethod appv1.TrackingMethod,
argoTrackingMetadata *ArgoTrackingMetadata,
) error {
metricTimer := metricsUtils.NewMetricTimer()

Expand All @@ -141,18 +139,11 @@ func (s *applicationEventReporter) StreamApplicationEvents(

desiredManifests, manifestGenErr := s.getDesiredManifests(ctx, a, nil, logCtx)

syncRevision := utils.GetOperationStateRevision(a)
var applicationVersions *apiclient.ApplicationVersions
if syncRevision != nil {
syncManifests, _ := s.getDesiredManifests(ctx, a, syncRevision, logCtx)
applicationVersions = syncManifests.GetApplicationVersions()
} else {
applicationVersions = nil
}
applicationVersions := s.resolveApplicationVersions(ctx, a, logCtx)

logCtx.Info("getting parent application name")

parentAppIdentity := utils.GetParentAppIdentity(a, appInstanceLabelKey, trackingMethod)
parentAppIdentity := utils.GetParentAppIdentity(a, *argoTrackingMetadata.AppInstanceLabelKey, *argoTrackingMetadata.TrackingMethod)

if utils.IsChildApp(parentAppIdentity) {
logCtx.Info("processing as child application")
Expand All @@ -165,27 +156,29 @@ func (s *applicationEventReporter) StreamApplicationEvents(
}

rs := utils.GetAppAsResource(a)
utils.SetHealthStatusIfMissing(rs)

parentDesiredManifests, manifestGenErr := s.getDesiredManifests(ctx, parentApplicationEntity, nil, logCtx)

// helm app hasnt revision
// TODO: add check if it helm application
parentAppSyncRevisionsMetadata, err := s.getApplicationRevisionsMetadata(ctx, logCtx, parentApplicationEntity)
if err != nil {
logCtx.WithError(err).Warn("failed to get parent application's revision metadata, resuming")
}

utils.SetHealthStatusIfMissing(rs)
err = s.processResource(ctx, *rs, parentApplicationEntity, logCtx, eventProcessingStartedAt, parentDesiredManifests, appTree, manifestGenErr, a, parentAppSyncRevisionsMetadata, appInstanceLabelKey, trackingMethod, applicationVersions)
err = s.processResource(ctx, *rs, logCtx, eventProcessingStartedAt, parentDesiredManifests, manifestGenErr, a, applicationVersions, &ReportedEntityParentApp{
app: parentApplicationEntity,
appTree: appTree,
revisionsMetadata: parentAppSyncRevisionsMetadata,
}, argoTrackingMetadata)
if err != nil {
s.metricsServer.IncErroredEventsCounter(metrics.MetricChildAppEventType, metrics.MetricEventUnknownErrorType, a.Name)
return err
}
s.metricsServer.ObserveEventProcessingDurationHistogramDuration(a.Name, metrics.MetricChildAppEventType, metricTimer.Duration())
} else {
logCtx.Info("processing as root application")
// will get here only for root applications (not managed as a resource by another application)
appEvent, err := s.getApplicationEventPayload(ctx, a, appTree, eventProcessingStartedAt, appInstanceLabelKey, trackingMethod, applicationVersions)
logCtx.Info("processing as root application")
appEvent, err := s.getApplicationEventPayload(ctx, a, appTree, eventProcessingStartedAt, applicationVersions, argoTrackingMetadata)
if err != nil {
s.metricsServer.IncErroredEventsCounter(metrics.MetricParentAppEventType, metrics.MetricEventGetPayloadErrorType, a.Name)
return fmt.Errorf("failed to get application event: %w", err)
Expand Down Expand Up @@ -216,7 +209,11 @@ func (s *applicationEventReporter) StreamApplicationEvents(
s.metricsServer.IncCachedIgnoredEventsCounter(metrics.MetricResourceEventType, a.Name)
continue
}
err := s.processResource(ctx, rs, a, logCtx, eventProcessingStartedAt, desiredManifests, appTree, manifestGenErr, nil, revisionsMetadata, appInstanceLabelKey, trackingMethod, nil)
err := s.processResource(ctx, rs, logCtx, eventProcessingStartedAt, desiredManifests, manifestGenErr, nil, nil, &ReportedEntityParentApp{
app: a,
appTree: appTree,
revisionsMetadata: revisionsMetadata,
}, argoTrackingMetadata)
if err != nil {
s.metricsServer.IncErroredEventsCounter(metrics.MetricResourceEventType, metrics.MetricEventUnknownErrorType, a.Name)
return err
Expand All @@ -225,6 +222,16 @@ func (s *applicationEventReporter) StreamApplicationEvents(
return nil
}

func (s *applicationEventReporter) resolveApplicationVersions(ctx context.Context, a *appv1.Application, logCtx *log.Entry) *apiclient.ApplicationVersions {
syncRevision := utils.GetOperationStateRevision(a)
if syncRevision == nil {
return nil
}

syncManifests, _ := s.getDesiredManifests(ctx, a, syncRevision, logCtx)
return syncManifests.GetApplicationVersions()
}

func (s *applicationEventReporter) getAppForResourceReporting(
rs appv1.ResourceStatus,
ctx context.Context,
Expand Down Expand Up @@ -252,17 +259,14 @@ func (s *applicationEventReporter) getAppForResourceReporting(
func (s *applicationEventReporter) processResource(
ctx context.Context,
rs appv1.ResourceStatus,
parentApplication *appv1.Application,
logCtx *log.Entry,
appEventProcessingStartedAt string,
desiredManifests *apiclient.ManifestResponse,
appTree *appv1.ApplicationTree,
manifestGenErr bool,
originalApplication *appv1.Application,
revisionsMetadata *utils.AppSyncRevisionsMetadata,
appInstanceLabelKey string,
trackingMethod appv1.TrackingMethod,
applicationVersions *apiclient.ApplicationVersions,
originalApplication *appv1.Application, // passed onlu if resource is app
applicationVersions *apiclient.ApplicationVersions, // passed onlu if resource is app
reportedEntityParentApp *ReportedEntityParentApp,
argoTrackingMetadata *ArgoTrackingMetadata,
) error {
metricsEventType := metrics.MetricResourceEventType
if utils.IsApp(rs) {
Expand All @@ -277,25 +281,44 @@ func (s *applicationEventReporter) processResource(
// get resource desired state
desiredState := getResourceDesiredState(&rs, desiredManifests, logCtx)

actualState, err := s.getResourceActualState(ctx, logCtx, metricsEventType, rs, parentApplication, originalApplication)
actualState, err := s.getResourceActualState(ctx, logCtx, metricsEventType, rs, reportedEntityParentApp.app, originalApplication)
if err != nil {
return err
}
if actualState == nil {
return nil
}

parentApplicationToReport, revisionMetadataToReport := s.getAppForResourceReporting(rs, ctx, logCtx, parentApplication, revisionsMetadata)
parentApplicationToReport, revisionMetadataToReport := s.getAppForResourceReporting(rs, ctx, logCtx, reportedEntityParentApp.app, reportedEntityParentApp.revisionsMetadata)

var originalAppRevisionMetadata *utils.AppSyncRevisionsMetadata = nil

if originalApplication != nil {
originalAppRevisionMetadata, _ = s.getApplicationRevisionsMetadata(ctx, logCtx, originalApplication)
}

ev, err := getResourceEventPayload(parentApplicationToReport, &rs, actualState, desiredState, appTree, manifestGenErr, appEventProcessingStartedAt, originalApplication, revisionMetadataToReport, originalAppRevisionMetadata, appInstanceLabelKey, trackingMethod, applicationVersions)
ev, err := getResourceEventPayload(
appEventProcessingStartedAt,
&ReportedResource{
rs: &rs,
actualState: actualState,
desiredState: desiredState,
manifestGenErr: manifestGenErr,
rsAsAppInfo: &ReportedResourceAsApp{
app: originalApplication,
revisionsMetadata: originalAppRevisionMetadata,
applicationVersions: applicationVersions,
},
},
&ReportedEntityParentApp{
app: parentApplicationToReport,
appTree: reportedEntityParentApp.appTree,
revisionsMetadata: revisionMetadataToReport,
},
argoTrackingMetadata,
)
if err != nil {
s.metricsServer.IncErroredEventsCounter(metricsEventType, metrics.MetricEventGetPayloadErrorType, parentApplication.Name)
s.metricsServer.IncErroredEventsCounter(metricsEventType, metrics.MetricEventGetPayloadErrorType, reportedEntityParentApp.app.Name)
logCtx.WithError(err).Warn("failed to get event payload, resuming")
return nil
}
Expand All @@ -307,7 +330,7 @@ func (s *applicationEventReporter) processResource(
appName = appRes.Name
} else {
utils.LogWithResourceStatus(logCtx, rs).Info("streaming resource event")
appName = parentApplication.Name
appName = reportedEntityParentApp.app.Name
}

if err := s.codefreshClient.SendEvent(ctx, appName, ev); err != nil {
Expand Down
4 changes: 1 addition & 3 deletions event_reporter/reporter/application_event_reporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,12 +38,10 @@ import (
"github.com/stretchr/testify/assert"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

"github.com/argoproj/argo-cd/v2/common"
"github.com/argoproj/argo-cd/v2/pkg/apiclient/application"
"github.com/argoproj/argo-cd/v2/pkg/apiclient/events"
"github.com/argoproj/argo-cd/v2/pkg/apis/application/v1alpha1"
"github.com/argoproj/argo-cd/v2/pkg/codefresh"
"github.com/argoproj/argo-cd/v2/util/argo"
)

const (
Expand Down Expand Up @@ -226,7 +224,7 @@ func TestStreamApplicationEvent(t *testing.T) {
assert.Equal(t, *app, actualApp)
return nil
}
_ = eventReporter.StreamApplicationEvents(context.Background(), app, "", false, common.LabelKeyAppInstance, argo.TrackingMethodLabel)
_ = eventReporter.StreamApplicationEvents(context.Background(), app, "", false, getMockedArgoTrackingMetadata())
})
}

Expand Down
Loading
Loading