Skip to content

Conversation

@willh-db
Copy link
Collaborator

@willh-db willh-db commented Oct 30, 2025

Changes

  • Read from tenant_prefixes list in bucket config (passed by the init container)
  • If it is empty or does not exist, compactor proceeds as normal
  • If it exists, create one bucket per tenant with the appropriate prefixing (v1/raw/tenant) and proceed as normal per tenant, using the existing goroutine system

Verification

  • Added unit test for multi-tenant compactor

Note

  • Diff looks messy with many line changes because of whitespace changes in for loop -- please use split view to see changes

Comment on lines 169 to 172
type TenantBucketConfig struct {
TenantPrefixes []string `yaml:"tenant_prefixes"`
// Other config fields can be added here if needed
}
Copy link

@hczhu-db hczhu-db Oct 31, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've just realized the client.BucketConfig struct is in another repo as a vendor repo to Thanos repo. We can't easily extend that struct. We have to copy that struct to here.

  1. Copy and extend the struct.
type MultiTenancyBucketConfig struct {
	Type   objstore.ObjProvider `yaml:"type"`
	Config interface{}          `yaml:"config"`
	Prefix string               `yaml:"prefix" default:""`
    // Example value: "v1/raw/tenant_a,v1/raw/tenant_b,v1/raw/tenant_c"
    TenantPrefixes []string `yaml:"tenant_prefixes"`
}
  1. Generate N client.BucketConfig instances from the MultileTenancyBucketConfig instance. Each of the instances has a different client.BucketConfig.Prefix which is join(BucketConfig.Prefix, MultileTenancyBucketConfig.TenantPrefixes[i])
  2. Later on (not on this PR), compactor's init container will generate an MultiTenancyBucketConfig instance for its compactor container.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've just realized the client.BucketConfig struct is in another repo as a vendor repo to Thanos repo. We can't easily extend that struct. We have to copy that struct to here.

Sounds good. I wrote it this way thinking the init container could pass the list of tenants via the client.BucketConfig.Config, but this is cleaner and more type safe.

Comment on lines 169 to 172
type TenantBucketConfig struct {
TenantPrefixes []string `yaml:"tenant_prefixes"`
// Other config fields can be added here if needed
}

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@willh-db willh-db requested a review from hczhu-db October 31, 2025 06:31
// Determine tenant prefixes to use (if provided)
var multiTenancyBucketConfig MultiTenancyBucketConfig
if err := yaml.Unmarshal(confContentYaml, &multiTenancyBucketConfig); err != nil {
return errors.Wrap(err, "parse bucket config")

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"failed to parse MultiTenancyBucketConfig"

Config: multiTenancyBucketConfig.Config,
Prefix: multiTenancyBucketConfig.Prefix + tenantPrefix,
}

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Log something like "starting compaction loop for prefix: " + "bucketConf.prefix"

Comment on lines 248 to 251
// Create bucket for this tenant
if tenantPrefix != "" {
level.Info(logger).Log("msg", "creating compactor bucket with tenant prefix", "prefix", tenantPrefix)
}

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After this point, the tenant prefix is abstracted out and just becomes prefix. We don't need anything special after this point.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can remove it.

Comment on lines 255 to 260
if tenantPrefix != "" {
level.Info(logger).Log("msg", "azdatalake sdk wrapper enabled for tenant", "prefix", tenantPrefix, "name", bkt.Name())
} else {
level.Info(logger).Log("msg", "azdatalake sdk wrapper enabled", "name", bkt.Name())
}
}

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove the if and just log something agnostic to the tenant prefix.

Comment on lines 267 to 272
// Create tenant-specific logger
tenantLogger := logger
if tenantPrefix != "" {
tenantLogger = log.With(logger, "tenant_prefix", tenantPrefix)
}
sy, err = compact.NewMetaSyncer(
logger,
reg,
insBkt,
cf,
duplicateBlocksFilter,
ignoreDeletionMarkFilter,
compactMetrics.blocksMarked.WithLabelValues(metadata.DeletionMarkFilename, ""),
compactMetrics.garbageCollectedBlocks,
syncMetasTimeout,
)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is great!

@willh-db willh-db marked this pull request as ready for review November 10, 2025 23:33
@willh-db willh-db requested a review from jnyi November 17, 2025 18:58
Comment on lines 210 to 216
bkt, err := client.NewBucket(logger, confContentYaml, component.String(), nil)
if conf.enableFolderDeletion {
bkt, err = block.WrapWithAzDataLakeSdk(logger, confContentYaml, bkt)
level.Info(logger).Log("msg", "azdatalake sdk wrapper enabled", "name", bkt.Name())
}
if err != nil {
return err
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see a lot of code got shuffled rather than deleted which makes reviewing this pr hard. Wonder if it is possible to keep some of the unchanged code path the same? Might need to pay some efforts to find the right place to insert as well as introduce helper functions but may worth it (admit this function is ready long)

Copy link
Collaborator

@jnyi jnyi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

really cool, comments most about styling, also thanks for adding unit tests!

insBkt,
conf.acceptMalformedIndex,
enableVerticalCompaction,
tenantReg,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same for this variable, is it possible to override instead

planner,
comp,
compact.DefaultBlockDeletableChecker{},
overlappingCallback,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if we don't change reg variable name, this line won't be changed.

if err := os.MkdirAll(compactDir, os.ModePerm); err != nil {
return errors.Wrap(err, "create working compact directory")
}
overlappingCallback := compact.NewOverlappingCompactionLifecycleCallback(reg, logger, conf.enableOverlappingRemoval)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no need to create temp variable which is only used once later in the old place

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is run once per tenant as it is within the loop but it shouldn't be an issue

Comment on lines 318 to 331
if tenantPrefix != "" {
// For multi-tenant mode, we pass a nil registerer to avoid metric collisions
// TODO (willh-db): revisit metrics structure for multi-tenant mode
tenantReg = nil
insBkt = objstoretracing.WrapWithTraces(bkt)
} else {
tenantReg = reg
insBkt = objstoretracing.WrapWithTraces(objstore.WrapWithMetrics(bkt, extprom.WrapRegistererWithPrefix("thanos_", tenantReg), bkt.Name()))
}

progress.Set(compact.CleanBlocks)
compact.BestEffortCleanAbortedPartialUploads(ctx, logger, sy.Partial(), insBkt, compactMetrics.partialUploadDeleteAttempts, compactMetrics.blocksCleaned, compactMetrics.blockCleanupFailures)
if err := blocksCleaner.DeleteMarkedBlocks(ctx); err != nil {
return errors.Wrap(err, "cleaning marked blocks")
// Create tenant-specific logger
if tenantPrefix != "" {
logger = log.With(logger, "tenant_prefix", tenantPrefix)
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: both use the same if condition, could merge it to simply

Comment on lines 771 to 772
if tenantPrefix != "" {
f = baseMetaFetcher.NewMetaFetcher(nil, nil, "component", "globalBucketUI") // TODO (willh-db): revisit metrics here
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

some of the metrics might not be useful, so I don't think we need to alter all of them via this if-condition tenantPrefix != ""

bucketConf := &client.BucketConfig{
Type: initialBucketConf.Type,
Config: initialBucketConf.Config,
Prefix: path.Join(initialBucketConf.Prefix, tenantPrefix),
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just curious if tenantPrefix == "", it won't generate an empty folder in the middle?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Empty elements should be ignored: https://pkg.go.dev/path#Join

if tenantPrefix != "" {
// For multi-tenant mode, we pass a nil registerer to avoid metric collisions
// TODO (willh-db): revisit metrics structure for multi-tenant mode
tenantReg = nil
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd suggest to use this function to augment tenantReg instead.

        prometheus.Labels{"tenant": tenantPrefix},
        reg,
    )

progress.Set(compact.SyncMeta)
if err := sy.SyncMetas(ctx); err != nil {
return errors.Wrap(err, "syncing metas")
if tenantPrefix != "" {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see a lot of if conditions based on this, I'd suggest to create a bool variable instead:

isMultiTenantMode = tenantPrefix != ""

if isMultiTenantMode {
  ... // do sth

Copy link
Collaborator

@jnyi jnyi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hi @willh-db , I actually had a better idea, WDYT?

The original runCompact function can be broken into 3 parts:

  1. initial and parse config
  2. setup bucket and use it in middle
  3. post bucket setup after conf.wait

instead of injecting a for loop per tenant and change a umber of code inline, I am wondering if we could break the original function into 3 functions so most code lines retain its original position

func runCompact(args) {
   // part1

+ for tenants {
+ set up bucket per tenant
+  perBucketSetup(bucket_per_tenant, reg_per_tenant, logger_per_tenant)
+ }

+   postProcess()
+ }
+
+ func perBucketSetup() {. <-- add new func in the middle and call it from orignal with variable name unchanged.
   // part2 which we want to for-loop
   // most code don't need to change since we wrap it in a helper function
+ }
+ 
+ func postProcess() {
+ // part3
}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants