-
Notifications
You must be signed in to change notification settings - Fork 12
Support for tenant bucket prefixing in compactor #239
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: db_main
Are you sure you want to change the base?
Conversation
cmd/thanos/compact.go
Outdated
| type TenantBucketConfig struct { | ||
| TenantPrefixes []string `yaml:"tenant_prefixes"` | ||
| // Other config fields can be added here if needed | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've just realized the client.BucketConfig struct is in another repo as a vendor repo to Thanos repo. We can't easily extend that struct. We have to copy that struct to here.
- Copy and extend the struct.
type MultiTenancyBucketConfig struct {
Type objstore.ObjProvider `yaml:"type"`
Config interface{} `yaml:"config"`
Prefix string `yaml:"prefix" default:""`
// Example value: "v1/raw/tenant_a,v1/raw/tenant_b,v1/raw/tenant_c"
TenantPrefixes []string `yaml:"tenant_prefixes"`
}
- Generate N
client.BucketConfiginstances from the MultileTenancyBucketConfig instance. Each of the instances has a differentclient.BucketConfig.Prefixwhich isjoin(BucketConfig.Prefix, MultileTenancyBucketConfig.TenantPrefixes[i]) - Later on (not on this PR), compactor's init container will generate an
MultiTenancyBucketConfiginstance for its compactor container.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've just realized the client.BucketConfig struct is in another repo as a vendor repo to Thanos repo. We can't easily extend that struct. We have to copy that struct to here.
Sounds good. I wrote it this way thinking the init container could pass the list of tenants via the client.BucketConfig.Config, but this is cleaner and more type safe.
cmd/thanos/compact.go
Outdated
| type TenantBucketConfig struct { | ||
| TenantPrefixes []string `yaml:"tenant_prefixes"` | ||
| // Other config fields can be added here if needed | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
cmd/thanos/compact.go
Outdated
| // Determine tenant prefixes to use (if provided) | ||
| var multiTenancyBucketConfig MultiTenancyBucketConfig | ||
| if err := yaml.Unmarshal(confContentYaml, &multiTenancyBucketConfig); err != nil { | ||
| return errors.Wrap(err, "parse bucket config") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"failed to parse MultiTenancyBucketConfig"
| Config: multiTenancyBucketConfig.Config, | ||
| Prefix: multiTenancyBucketConfig.Prefix + tenantPrefix, | ||
| } | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Log something like "starting compaction loop for prefix: " + "bucketConf.prefix"
cmd/thanos/compact.go
Outdated
| // Create bucket for this tenant | ||
| if tenantPrefix != "" { | ||
| level.Info(logger).Log("msg", "creating compactor bucket with tenant prefix", "prefix", tenantPrefix) | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
After this point, the tenant prefix is abstracted out and just becomes prefix. We don't need anything special after this point.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can remove it.
cmd/thanos/compact.go
Outdated
| if tenantPrefix != "" { | ||
| level.Info(logger).Log("msg", "azdatalake sdk wrapper enabled for tenant", "prefix", tenantPrefix, "name", bkt.Name()) | ||
| } else { | ||
| level.Info(logger).Log("msg", "azdatalake sdk wrapper enabled", "name", bkt.Name()) | ||
| } | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Remove the if and just log something agnostic to the tenant prefix.
cmd/thanos/compact.go
Outdated
| // Create tenant-specific logger | ||
| tenantLogger := logger | ||
| if tenantPrefix != "" { | ||
| tenantLogger = log.With(logger, "tenant_prefix", tenantPrefix) | ||
| } | ||
| sy, err = compact.NewMetaSyncer( | ||
| logger, | ||
| reg, | ||
| insBkt, | ||
| cf, | ||
| duplicateBlocksFilter, | ||
| ignoreDeletionMarkFilter, | ||
| compactMetrics.blocksMarked.WithLabelValues(metadata.DeletionMarkFilename, ""), | ||
| compactMetrics.garbageCollectedBlocks, | ||
| syncMetasTimeout, | ||
| ) | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is great!
This reverts commit fd8b9ee.
| bkt, err := client.NewBucket(logger, confContentYaml, component.String(), nil) | ||
| if conf.enableFolderDeletion { | ||
| bkt, err = block.WrapWithAzDataLakeSdk(logger, confContentYaml, bkt) | ||
| level.Info(logger).Log("msg", "azdatalake sdk wrapper enabled", "name", bkt.Name()) | ||
| } | ||
| if err != nil { | ||
| return err |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see a lot of code got shuffled rather than deleted which makes reviewing this pr hard. Wonder if it is possible to keep some of the unchanged code path the same? Might need to pay some efforts to find the right place to insert as well as introduce helper functions but may worth it (admit this function is ready long)
jnyi
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
really cool, comments most about styling, also thanks for adding unit tests!
cmd/thanos/compact.go
Outdated
| insBkt, | ||
| conf.acceptMalformedIndex, | ||
| enableVerticalCompaction, | ||
| tenantReg, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same for this variable, is it possible to override instead
cmd/thanos/compact.go
Outdated
| planner, | ||
| comp, | ||
| compact.DefaultBlockDeletableChecker{}, | ||
| overlappingCallback, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if we don't change reg variable name, this line won't be changed.
cmd/thanos/compact.go
Outdated
| if err := os.MkdirAll(compactDir, os.ModePerm); err != nil { | ||
| return errors.Wrap(err, "create working compact directory") | ||
| } | ||
| overlappingCallback := compact.NewOverlappingCompactionLifecycleCallback(reg, logger, conf.enableOverlappingRemoval) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
no need to create temp variable which is only used once later in the old place
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is run once per tenant as it is within the loop but it shouldn't be an issue
cmd/thanos/compact.go
Outdated
| if tenantPrefix != "" { | ||
| // For multi-tenant mode, we pass a nil registerer to avoid metric collisions | ||
| // TODO (willh-db): revisit metrics structure for multi-tenant mode | ||
| tenantReg = nil | ||
| insBkt = objstoretracing.WrapWithTraces(bkt) | ||
| } else { | ||
| tenantReg = reg | ||
| insBkt = objstoretracing.WrapWithTraces(objstore.WrapWithMetrics(bkt, extprom.WrapRegistererWithPrefix("thanos_", tenantReg), bkt.Name())) | ||
| } | ||
|
|
||
| progress.Set(compact.CleanBlocks) | ||
| compact.BestEffortCleanAbortedPartialUploads(ctx, logger, sy.Partial(), insBkt, compactMetrics.partialUploadDeleteAttempts, compactMetrics.blocksCleaned, compactMetrics.blockCleanupFailures) | ||
| if err := blocksCleaner.DeleteMarkedBlocks(ctx); err != nil { | ||
| return errors.Wrap(err, "cleaning marked blocks") | ||
| // Create tenant-specific logger | ||
| if tenantPrefix != "" { | ||
| logger = log.With(logger, "tenant_prefix", tenantPrefix) | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: both use the same if condition, could merge it to simply
cmd/thanos/compact.go
Outdated
| if tenantPrefix != "" { | ||
| f = baseMetaFetcher.NewMetaFetcher(nil, nil, "component", "globalBucketUI") // TODO (willh-db): revisit metrics here |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
some of the metrics might not be useful, so I don't think we need to alter all of them via this if-condition tenantPrefix != ""
| bucketConf := &client.BucketConfig{ | ||
| Type: initialBucketConf.Type, | ||
| Config: initialBucketConf.Config, | ||
| Prefix: path.Join(initialBucketConf.Prefix, tenantPrefix), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
just curious if tenantPrefix == "", it won't generate an empty folder in the middle?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Empty elements should be ignored: https://pkg.go.dev/path#Join
cmd/thanos/compact.go
Outdated
| if tenantPrefix != "" { | ||
| // For multi-tenant mode, we pass a nil registerer to avoid metric collisions | ||
| // TODO (willh-db): revisit metrics structure for multi-tenant mode | ||
| tenantReg = nil |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd suggest to use this function to augment tenantReg instead.
prometheus.Labels{"tenant": tenantPrefix},
reg,
)
cmd/thanos/compact.go
Outdated
| progress.Set(compact.SyncMeta) | ||
| if err := sy.SyncMetas(ctx); err != nil { | ||
| return errors.Wrap(err, "syncing metas") | ||
| if tenantPrefix != "" { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see a lot of if conditions based on this, I'd suggest to create a bool variable instead:
isMultiTenantMode = tenantPrefix != ""
if isMultiTenantMode {
... // do sth
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hi @willh-db , I actually had a better idea, WDYT?
The original runCompact function can be broken into 3 parts:
- initial and parse config
- setup bucket and use it in middle
- post bucket setup after conf.wait
instead of injecting a for loop per tenant and change a umber of code inline, I am wondering if we could break the original function into 3 functions so most code lines retain its original position
func runCompact(args) {
// part1
+ for tenants {
+ set up bucket per tenant
+ perBucketSetup(bucket_per_tenant, reg_per_tenant, logger_per_tenant)
+ }
+ postProcess()
+ }
+
+ func perBucketSetup() {. <-- add new func in the middle and call it from orignal with variable name unchanged.
// part2 which we want to for-loop
// most code don't need to change since we wrap it in a helper function
+ }
+
+ func postProcess() {
+ // part3
}

Changes
tenant_prefixeslist in bucket config (passed by the init container)v1/raw/tenant) and proceed as normal per tenant, using the existing goroutine systemVerification
Note