Skip to content

Commit a26fd08

Browse files
feat(event-reporter): multi-sourced apps - resource git details reporting (#352)
* event-reporter: created dedicated func resolveApplicationVersions * event-reporter: added new types for variables grouping * event-reporter: removed redundant code comment * event-reporter(refactoring): params grouping, added new type ArgoTrackingMetadata to pass data between methods * event-reporter(refactoring): params grouping, added new type ReportedEntityParentApp to pass data between methods * event-reporter(refactoring): params grouping, added new type ReportedResource to pass data between methods, created new methods to group logic inside getResourceEventPayload * event-reporter(refactoring): fixed nil pointer issues after refactoring * event-reporter(refactoring): fixed lint issue * event-reporter(refactoring): fixed lint issue * event-reporter: added agroDb to ApplicationEventReporter * event-reporter: ObjectSource message proto extended to report operationSyncRevisions, destServer, destName, appMultiSourced * event-reporter / utils: added func GetOperationRevisions * event-reporter(non-grpc app client): added support of new query params SourcePositions, Revisions for GetManifests request * event-reporter(getDesiredManifests): updated logic to retrieve appVersion for multi-sourced applications based on sync result revisions * event-reporter(resource source payload): added new field OperationSyncRevisions * event-reporter(resource source payload): added new fields DestName, DestServer * event-reporter(resource source payload): added new field AppMultiSourced * event-reporter(resource source payload): added AppSourceIdx with source index to which this resource belongs. Also updated logic of retrieving correct git commit info based on AppSourceIdx * event-reporter: updated changelog * event-reporter: fixed lint issues * event-reporter: changes after pr review * fix lint issue * reporter: fix lint issue * reporter: fix lint issue * updated changelog record * event-reporter: refactored fields inside ObjectSource * event-reporter: added runtime version reporting with each event
1 parent bfe424f commit a26fd08

File tree

21 files changed

+1295
-358
lines changed

21 files changed

+1295
-358
lines changed

assets/swagger.json

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5294,6 +5294,14 @@
52945294
"sourceType": {
52955295
"type": "string"
52965296
},
5297+
"sourcesManifestsStartingIdx": {
5298+
"type": "array",
5299+
"title": "for multisourced apps will be [0,12,20], so this means that 0-11 - from first app source, 12-19 from second one, 20-x - third one",
5300+
"items": {
5301+
"type": "integer",
5302+
"format": "int32"
5303+
}
5304+
},
52975305
"verifyResult": {
52985306
"type": "string",
52995307
"title": "Raw response of git verify-commit operation (always the empty string for Helm)"
Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,3 @@
11
### Changes
2-
- fix(event-reporter): fixed issue when if managed resource in degraded state and all of it child nodes in non-degraded state no error reported
2+
- fix(event-reporter): fixed issue when if managed resource in degraded state and all of it child nodes in non-degraded state no error reported
3+
- fix(repo-server): fixed repeated resources generation for ApplicationSourceTypeDirectory

changelog/CHANGELOG.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,2 @@
1-
### Changes
2-
- fix(event-reporter): fixed issue when if managed resource in degraded state and all of it child nodes in non-degraded state no error reported
1+
### Features
2+
- feat(event-reporter): multisourced apps support improvements: reporting syncOperationRevisions, detecting correct resource sourceIdx, reporting correct git commit info

cmd/event-reporter-server/commands/event_reporter_server.go

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,7 @@ func NewCommand() *cobra.Command {
9696
codefreshUrl string
9797
codefreshToken string
9898
shardingAlgorithm string
99+
runtimeVersion string
99100
rootpath string
100101
useGrpc bool
101102

@@ -179,10 +180,11 @@ func NewCommand() *cobra.Command {
179180
ApplicationNamespaces: applicationNamespaces,
180181
ApplicationServiceClient: getApplicationClient(useGrpc, applicationServerAddress, argocdToken, rootpath),
181182
CodefreshConfig: &codefresh.CodefreshConfig{
182-
BaseURL: codefreshUrl,
183-
AuthToken: codefreshToken,
184-
TlsInsecure: codefreshTlsInsecure,
185-
CaCertPath: codefreshTlsCertPath,
183+
BaseURL: codefreshUrl,
184+
AuthToken: codefreshToken,
185+
TlsInsecure: codefreshTlsInsecure,
186+
CaCertPath: codefreshTlsCertPath,
187+
RuntimeVersion: runtimeVersion,
186188
},
187189
RateLimiterOpts: &reporter.RateLimiterOpts{
188190
Enabled: rateLimiterEnabled,
@@ -236,6 +238,7 @@ func NewCommand() *cobra.Command {
236238
command.Flags().StringVar(&codefreshUrl, "codefresh-url", env.StringFromEnv("CODEFRESH_URL", "https://g.codefresh.io"), "Codefresh API url")
237239
command.Flags().StringVar(&codefreshToken, "codefresh-token", env.StringFromEnv("CODEFRESH_TOKEN", ""), "Codefresh token")
238240
command.Flags().StringVar(&shardingAlgorithm, "sharding-method", env.StringFromEnv(common.EnvEventReporterShardingAlgorithm, common.DefaultEventReporterShardingAlgorithm), "Enables choice of sharding method. Supported sharding methods are : [legacy] ")
241+
command.Flags().StringVar(&runtimeVersion, "codefresh-runtime-version", env.StringFromEnv("CODEFRESH_RUNTIME_VERSION", ""), "Codefresh runtime version to be reported with each event to platform")
239242
command.Flags().StringSliceVar(&applicationNamespaces, "application-namespaces", env.StringsFromEnv("ARGOCD_APPLICATION_NAMESPACES", []string{}, ","), "List of additional namespaces where application resources can be managed in")
240243
command.Flags().BoolVar(&useGrpc, "grpc", env.ParseBoolFromEnv("USE_GRPC", false), "Use grpc for interact with argocd server")
241244
command.Flags().BoolVar(&rateLimiterEnabled, "rate-limiter-enabled", env.ParseBoolFromEnv("RATE_LIMITER_ENABLED", false), "Use rate limiter for prevent queue to be overflowed")

event_reporter/application/client.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -126,6 +126,16 @@ func (c *httpApplicationClient) GetManifests(ctx context.Context, in *appclient.
126126
if in.Revision != nil {
127127
params = fmt.Sprintf("%s&revision=%s", params, *in.Revision)
128128
}
129+
if in.SourcePositions != nil && len(in.SourcePositions) > 0 {
130+
for _, sourcePosition := range in.SourcePositions {
131+
params = fmt.Sprintf("%s&sourcePositions=%d", params, sourcePosition)
132+
}
133+
}
134+
if in.Revisions != nil && len(in.Revisions) > 0 {
135+
for _, revision := range in.Revisions {
136+
params = fmt.Sprintf("%s&revisions=%s", params, revision)
137+
}
138+
}
129139
url := fmt.Sprintf("%s/api/v1/applications/%s/manifests%s", c.baseUrl, *in.Name, params)
130140

131141
manifest := &repoapiclient.ManifestResponse{}

event_reporter/controller/controller.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,8 @@ import (
66
"strings"
77
"time"
88

9+
"github.com/argoproj/argo-cd/v2/util/db"
10+
911
appclient "github.com/argoproj/argo-cd/v2/event_reporter/application"
1012

1113
log "github.com/sirupsen/logrus"
@@ -43,15 +45,15 @@ type eventReporterController struct {
4345
metricsServer *metrics.MetricsServer
4446
}
4547

46-
func NewEventReporterController(appInformer cache.SharedIndexInformer, cache *servercache.Cache, settingsMgr *settings.SettingsManager, applicationServiceClient appclient.ApplicationClient, appLister applisters.ApplicationLister, codefreshConfig *codefresh.CodefreshConfig, metricsServer *metrics.MetricsServer, featureManager *reporter.FeatureManager, rateLimiterOpts *reporter.RateLimiterOpts) EventReporterController {
48+
func NewEventReporterController(appInformer cache.SharedIndexInformer, cache *servercache.Cache, settingsMgr *settings.SettingsManager, applicationServiceClient appclient.ApplicationClient, appLister applisters.ApplicationLister, codefreshConfig *codefresh.CodefreshConfig, metricsServer *metrics.MetricsServer, featureManager *reporter.FeatureManager, rateLimiterOpts *reporter.RateLimiterOpts, db db.ArgoDB) EventReporterController {
4749
appBroadcaster := reporter.NewBroadcaster(featureManager, metricsServer, rateLimiterOpts)
4850
_, err := appInformer.AddEventHandler(appBroadcaster)
4951
if err != nil {
5052
log.Error(err)
5153
}
5254
return &eventReporterController{
5355
appBroadcaster: appBroadcaster,
54-
applicationEventReporter: reporter.NewApplicationEventReporter(cache, applicationServiceClient, appLister, codefreshConfig, metricsServer),
56+
applicationEventReporter: reporter.NewApplicationEventReporter(cache, applicationServiceClient, appLister, codefreshConfig, metricsServer, db),
5557
cache: cache,
5658
settingsMgr: settingsMgr,
5759
applicationServiceClient: applicationServiceClient,

event_reporter/reporter/app_revision_test.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@ func TestGetRevisionsDetails(t *testing.T) {
5454
newAppLister(),
5555
appServiceClient,
5656
&metrics.MetricsServer{},
57+
fakeArgoDb(),
5758
}
5859

5960
result, _ := reporter.getRevisionsDetails(context.Background(), &app, []string{expectedRevision})
@@ -118,6 +119,7 @@ func TestGetRevisionsDetails(t *testing.T) {
118119
newAppLister(),
119120
appServiceClient,
120121
&metrics.MetricsServer{},
122+
fakeArgoDb(),
121123
}
122124

123125
result, _ := reporter.getRevisionsDetails(context.Background(), &app, []string{expectedRevision1, expectedRevision2})
@@ -159,6 +161,7 @@ func TestGetRevisionsDetails(t *testing.T) {
159161
newAppLister(),
160162
appServiceClient,
161163
&metrics.MetricsServer{},
164+
fakeArgoDb(),
162165
}
163166

164167
result, _ := reporter.getRevisionsDetails(context.Background(), &app, []string{expectedRevision})

event_reporter/reporter/application_event_reporter.go

Lines changed: 91 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,12 @@ import (
99
"strings"
1010
"time"
1111

12+
"github.com/argoproj/argo-cd/v2/util/db"
13+
1214
"github.com/argoproj/argo-cd/v2/event_reporter/utils"
1315

16+
argoutils "github.com/argoproj/argo-cd/v2/util/argo"
17+
1418
"github.com/argoproj/argo-cd/v2/reposerver/apiclient"
1519

1620
argocommon "github.com/argoproj/argo-cd/v2/common"
@@ -40,6 +44,8 @@ type applicationEventReporter struct {
4044
appLister applisters.ApplicationLister
4145
applicationServiceClient appclient.ApplicationClient
4246
metricsServer *metrics.MetricsServer
47+
db db.ArgoDB
48+
runtimeVersion string
4349
}
4450

4551
type ApplicationEventReporter interface {
@@ -53,13 +59,15 @@ type ApplicationEventReporter interface {
5359
ShouldSendApplicationEvent(ae *appv1.ApplicationWatchEvent) (shouldSend bool, syncStatusChanged bool)
5460
}
5561

56-
func NewApplicationEventReporter(cache *servercache.Cache, applicationServiceClient appclient.ApplicationClient, appLister applisters.ApplicationLister, codefreshConfig *codefresh.CodefreshConfig, metricsServer *metrics.MetricsServer) ApplicationEventReporter {
62+
func NewApplicationEventReporter(cache *servercache.Cache, applicationServiceClient appclient.ApplicationClient, appLister applisters.ApplicationLister, codefreshConfig *codefresh.CodefreshConfig, metricsServer *metrics.MetricsServer, db db.ArgoDB) ApplicationEventReporter {
5763
return &applicationEventReporter{
5864
cache: cache,
5965
applicationServiceClient: applicationServiceClient,
6066
codefreshClient: codefresh.NewCodefreshClient(codefreshConfig),
6167
appLister: appLister,
6268
metricsServer: metricsServer,
69+
db: db,
70+
runtimeVersion: codefreshConfig.RuntimeVersion,
6371
}
6472
}
6573

@@ -87,15 +95,28 @@ func (s *applicationEventReporter) shouldSendResourceEvent(a *appv1.Application,
8795
return true
8896
}
8997

90-
func (r *applicationEventReporter) getDesiredManifests(ctx context.Context, a *appv1.Application, revision *string, logCtx *log.Entry) (*apiclient.ManifestResponse, bool) {
98+
func (r *applicationEventReporter) getDesiredManifests(
99+
ctx context.Context,
100+
logCtx *log.Entry,
101+
a *appv1.Application,
102+
revision *string,
103+
sourcePositions *[]int64,
104+
revisions *[]string,
105+
) (*apiclient.ManifestResponse, bool) {
91106
// get the desired state manifests of the application
92107
project := a.Spec.GetProject()
93-
desiredManifests, err := r.applicationServiceClient.GetManifests(ctx, &application.ApplicationManifestQuery{
108+
query := application.ApplicationManifestQuery{
94109
Name: &a.Name,
95110
AppNamespace: &a.Namespace,
96111
Revision: revision,
97112
Project: &project,
98-
})
113+
}
114+
if sourcePositions != nil && query.Revisions != nil {
115+
query.SourcePositions = *sourcePositions
116+
query.Revisions = *revisions
117+
}
118+
119+
desiredManifests, err := r.applicationServiceClient.GetManifests(ctx, &query)
99120
if err != nil {
100121
// if it's manifest generation error we need to still report the actual state
101122
// of the resources, but since we can't get the desired state, we will report
@@ -137,7 +158,7 @@ func (s *applicationEventReporter) StreamApplicationEvents(
137158

138159
logCtx.Info("getting desired manifests")
139160

140-
desiredManifests, manifestGenErr := s.getDesiredManifests(ctx, a, nil, logCtx)
161+
desiredManifests, manifestGenErr := s.getDesiredManifests(ctx, logCtx, a, nil, nil, nil)
141162

142163
applicationVersions := s.resolveApplicationVersions(ctx, a, logCtx)
143164

@@ -158,17 +179,21 @@ func (s *applicationEventReporter) StreamApplicationEvents(
158179
rs := utils.GetAppAsResource(a)
159180
utils.SetHealthStatusIfMissing(rs)
160181

161-
parentDesiredManifests, manifestGenErr := s.getDesiredManifests(ctx, parentApplicationEntity, nil, logCtx)
182+
parentDesiredManifests, manifestGenErr := s.getDesiredManifests(ctx, logCtx, parentApplicationEntity, nil, nil, nil)
162183

163184
parentAppSyncRevisionsMetadata, err := s.getApplicationRevisionsMetadata(ctx, logCtx, parentApplicationEntity)
164185
if err != nil {
165186
logCtx.WithError(err).Warn("failed to get parent application's revision metadata, resuming")
166187
}
167188

189+
validatedDestination := parentApplicationEntity.Spec.Destination.DeepCopy()
190+
_ = argoutils.ValidateDestination(ctx, validatedDestination, s.db) // resolves server field if missing
191+
168192
err = s.processResource(ctx, *rs, logCtx, eventProcessingStartedAt, parentDesiredManifests, manifestGenErr, a, applicationVersions, &ReportedEntityParentApp{
169-
app: parentApplicationEntity,
170-
appTree: appTree,
171-
revisionsMetadata: parentAppSyncRevisionsMetadata,
193+
app: parentApplicationEntity,
194+
appTree: appTree,
195+
revisionsMetadata: parentAppSyncRevisionsMetadata,
196+
validatedDestination: validatedDestination,
172197
}, argoTrackingMetadata)
173198
if err != nil {
174199
s.metricsServer.IncErroredEventsCounter(metrics.MetricChildAppEventType, metrics.MetricEventUnknownErrorType, a.Name)
@@ -178,7 +203,7 @@ func (s *applicationEventReporter) StreamApplicationEvents(
178203
} else {
179204
// will get here only for root applications (not managed as a resource by another application)
180205
logCtx.Info("processing as root application")
181-
appEvent, err := s.getApplicationEventPayload(ctx, a, appTree, eventProcessingStartedAt, applicationVersions, argoTrackingMetadata)
206+
appEvent, err := s.getApplicationEventPayload(ctx, a, appTree, eventProcessingStartedAt, applicationVersions, argoTrackingMetadata, s.runtimeVersion)
182207
if err != nil {
183208
s.metricsServer.IncErroredEventsCounter(metrics.MetricParentAppEventType, metrics.MetricEventGetPayloadErrorType, a.Name)
184209
return fmt.Errorf("failed to get application event: %w", err)
@@ -197,6 +222,9 @@ func (s *applicationEventReporter) StreamApplicationEvents(
197222
s.metricsServer.ObserveEventProcessingDurationHistogramDuration(a.Name, metrics.MetricParentAppEventType, metricTimer.Duration())
198223
}
199224

225+
validatedDestination := a.Spec.Destination.DeepCopy()
226+
_ = argoutils.ValidateDestination(ctx, validatedDestination, s.db) // resolves server field if missing
227+
200228
revisionsMetadata, _ := s.getApplicationRevisionsMetadata(ctx, logCtx, a)
201229
// for each resource in the application get desired and actual state,
202230
// then stream the event
@@ -210,9 +238,10 @@ func (s *applicationEventReporter) StreamApplicationEvents(
210238
continue
211239
}
212240
err := s.processResource(ctx, rs, logCtx, eventProcessingStartedAt, desiredManifests, manifestGenErr, nil, nil, &ReportedEntityParentApp{
213-
app: a,
214-
appTree: appTree,
215-
revisionsMetadata: revisionsMetadata,
241+
app: a,
242+
appTree: appTree,
243+
revisionsMetadata: revisionsMetadata,
244+
validatedDestination: validatedDestination,
216245
}, argoTrackingMetadata)
217246
if err != nil {
218247
s.metricsServer.IncErroredEventsCounter(metrics.MetricResourceEventType, metrics.MetricEventUnknownErrorType, a.Name)
@@ -222,13 +251,30 @@ func (s *applicationEventReporter) StreamApplicationEvents(
222251
return nil
223252
}
224253

254+
// returns appVersion from first non-ref source for multisourced apps
225255
func (s *applicationEventReporter) resolveApplicationVersions(ctx context.Context, a *appv1.Application, logCtx *log.Entry) *apiclient.ApplicationVersions {
226-
syncRevision := utils.GetOperationStateRevision(a)
227-
if syncRevision == nil {
256+
if a.Spec.HasMultipleSources() {
257+
syncResultRevisions := utils.GetOperationSyncResultRevisions(a)
258+
if syncResultRevisions == nil {
259+
return nil
260+
}
261+
262+
var sourcePositions []int64
263+
for i := 0; i < len(*syncResultRevisions); i++ {
264+
sourcePositions = append(sourcePositions, int64(i+1))
265+
}
266+
267+
syncManifests, _ := s.getDesiredManifests(ctx, logCtx, a, nil, &sourcePositions, syncResultRevisions)
268+
return syncManifests.GetApplicationVersions()
269+
}
270+
271+
syncResultRevision := utils.GetOperationSyncResultRevision(a)
272+
273+
if syncResultRevision == nil {
228274
return nil
229275
}
230276

231-
syncManifests, _ := s.getDesiredManifests(ctx, a, syncRevision, logCtx)
277+
syncManifests, _ := s.getDesiredManifests(ctx, logCtx, a, syncResultRevision, nil, nil)
232278
return syncManifests.GetApplicationVersions()
233279
}
234280

@@ -279,7 +325,7 @@ func (s *applicationEventReporter) processResource(
279325
})
280326

281327
// get resource desired state
282-
desiredState := getResourceDesiredState(&rs, desiredManifests, logCtx)
328+
desiredState, appSourceIdx := getResourceDesiredState(&rs, desiredManifests, logCtx)
283329

284330
actualState, err := s.getResourceActualState(ctx, logCtx, metricsEventType, rs, reportedEntityParentApp.app, originalApplication)
285331
if err != nil {
@@ -304,18 +350,22 @@ func (s *applicationEventReporter) processResource(
304350
actualState: actualState,
305351
desiredState: desiredState,
306352
manifestGenErr: manifestGenErr,
353+
appSourceIdx: appSourceIdx,
307354
rsAsAppInfo: &ReportedResourceAsApp{
308355
app: originalApplication,
309356
revisionsMetadata: originalAppRevisionMetadata,
310357
applicationVersions: applicationVersions,
311358
},
312359
},
313360
&ReportedEntityParentApp{
314-
app: parentApplicationToReport,
315-
appTree: reportedEntityParentApp.appTree,
316-
revisionsMetadata: revisionMetadataToReport,
361+
app: parentApplicationToReport,
362+
appTree: reportedEntityParentApp.appTree,
363+
revisionsMetadata: revisionMetadataToReport,
364+
validatedDestination: reportedEntityParentApp.validatedDestination,
365+
desiredManifests: reportedEntityParentApp.desiredManifests,
317366
},
318367
argoTrackingMetadata,
368+
s.runtimeVersion,
319369
)
320370
if err != nil {
321371
s.metricsServer.IncErroredEventsCounter(metricsEventType, metrics.MetricEventGetPayloadErrorType, reportedEntityParentApp.app.Name)
@@ -474,11 +524,11 @@ func applicationMetadataChanged(ae *appv1.ApplicationWatchEvent, cachedApp *appv
474524
return !reflect.DeepEqual(newEventAppMeta, cachedAppMeta)
475525
}
476526

477-
func getResourceDesiredState(rs *appv1.ResourceStatus, ds *apiclient.ManifestResponse, logger *log.Entry) *apiclient.Manifest {
527+
func getResourceDesiredState(rs *appv1.ResourceStatus, ds *apiclient.ManifestResponse, logger *log.Entry) (manifest *apiclient.Manifest, sourceIdx int32) {
478528
if ds == nil {
479-
return &apiclient.Manifest{}
529+
return &apiclient.Manifest{}, 0
480530
}
481-
for _, m := range ds.Manifests {
531+
for idx, m := range ds.Manifests {
482532
u, err := appv1.UnmarshalToUnstructured(m.CompiledManifest)
483533
if err != nil {
484534
logger.WithError(err).Warnf("failed to unmarshal compiled manifest")
@@ -498,11 +548,27 @@ func getResourceDesiredState(rs *appv1.ResourceStatus, ds *apiclient.ManifestRes
498548
m.RawManifest = m.CompiledManifest
499549
}
500550

501-
return m
551+
return m, getResourceSourceIdxFromManifestResponse(idx, ds)
502552
}
503553
}
504554

505555
// no desired state for resource
506556
// it's probably deleted from git
507-
return &apiclient.Manifest{}
557+
return &apiclient.Manifest{}, 0
558+
}
559+
560+
func getResourceSourceIdxFromManifestResponse(rsIdx int, ds *apiclient.ManifestResponse) int32 {
561+
if ds.SourcesManifestsStartingIdx == nil {
562+
return -1
563+
}
564+
565+
sourceIdx := int32(-1)
566+
567+
for currentSourceIdx, sourceStartingIdx := range ds.SourcesManifestsStartingIdx {
568+
if int32(rsIdx) >= sourceStartingIdx {
569+
sourceIdx = int32(currentSourceIdx)
570+
}
571+
}
572+
573+
return sourceIdx
508574
}

0 commit comments

Comments
 (0)