|
| 1 | +package store |
| 2 | + |
| 3 | +import ( |
| 4 | + "fmt" |
| 5 | + "github.com/portworx/kvdb" |
| 6 | + "time" |
| 7 | +) |
| 8 | + |
| 9 | +// PX specific scheduler constants |
| 10 | +const ( |
| 11 | + // Kubernetes identifies kubernetes as the scheduler |
| 12 | + Kubernetes = "kubernetes" |
| 13 | +) |
| 14 | + |
| 15 | +// Params is the parameters to use for the Store object |
| 16 | +type Params struct { |
| 17 | + // Kv is the bootstrap kvdb instance |
| 18 | + Kv kvdb.Kvdb |
| 19 | + // InternalKvdb indicates if PX is using internal kvdb or not |
| 20 | + InternalKvdb bool |
| 21 | + // SchedulerType indicates the platform pods are running on. e.g Kubernetes |
| 22 | + SchedulerType string |
| 23 | +} |
| 24 | + |
| 25 | +// Lock identifies a lock taken over CloudDrive store |
| 26 | +type Lock struct { |
| 27 | + // Key is the name on which the lock is acquired. |
| 28 | + // This is used by the callers for logging purpose. Hence public |
| 29 | + Key string |
| 30 | + // Name of the owner who acquired the lock |
| 31 | + owner string |
| 32 | + // true if this lock was acquired using LockWithKey() interface |
| 33 | + lockedWithKey bool |
| 34 | + // lock structure as returned from the KVDB interface |
| 35 | + internalLock interface{} |
| 36 | +} |
| 37 | + |
| 38 | +// KeyDoesNotExist is error type when the key does not exist |
| 39 | +type KeyDoesNotExist struct { |
| 40 | + Key string |
| 41 | +} |
| 42 | + |
| 43 | +func (e *KeyDoesNotExist) Error() string { |
| 44 | + return fmt.Sprintf("key %s does not exist", e.Key) |
| 45 | +} |
| 46 | + |
| 47 | +// KeyExists is error type when the key already exist in store |
| 48 | +type KeyExists struct { |
| 49 | + // Key that exists |
| 50 | + Key string |
| 51 | + // Message is an optional message to the user |
| 52 | + Message string |
| 53 | +} |
| 54 | + |
| 55 | +func (e *KeyExists) Error() string { |
| 56 | + errMsg := fmt.Sprintf("key %s already exists in store", e.Key) |
| 57 | + if len(e.Message) > 0 { |
| 58 | + errMsg += " " + e.Message |
| 59 | + } |
| 60 | + return errMsg |
| 61 | +} |
| 62 | + |
| 63 | +// Store provides a set of APIs to CloudDrive to store its metadata |
| 64 | +// in a persistent store |
| 65 | +type Store interface { |
| 66 | + // Lock locks the cloud drive store for a node to perform operations |
| 67 | + Lock(owner string) (*Lock, error) |
| 68 | + // Unlock unlocks the cloud drive store |
| 69 | + Unlock(storeLock *Lock) error |
| 70 | + // LockWithKey locks the cloud drive store with an arbitrary key |
| 71 | + LockWithKey(owner, key string) (*Lock, error) |
| 72 | + // IsKeyLocked checks if the specified key is currently locked |
| 73 | + IsKeyLocked(key string) (bool, string, error) |
| 74 | + // CreateKey creates the given key with the value |
| 75 | + CreateKey(key string, value []byte) error |
| 76 | + // PutKey updates the given key with the value |
| 77 | + PutKey(key string, value []byte) error |
| 78 | + // GetKey returns the value for the given key |
| 79 | + GetKey(key string) ([]byte, error) |
| 80 | + // DeleteKey deletes the given key |
| 81 | + DeleteKey(key string) error |
| 82 | + // EnumerateWithKeyPrefix enumerates all keys in the store that begin with the given key |
| 83 | + EnumerateWithKeyPrefix(key string) ([]string, error) |
| 84 | +} |
| 85 | + |
| 86 | +// GetStoreWithParams returns instance for Store |
| 87 | +// kv: bootstrap kvdb |
| 88 | +// schedulerType: node scheduler type e.g Kubernetes |
| 89 | +// internalKvdb: If the cluster is configured to have internal kvdb |
| 90 | +// name: Name for the store |
| 91 | +// lockTryDuration: Total time to try acquiring the lock for |
| 92 | +// lockHoldTimeout: Once a lock is acquired, if it's held beyond this time, there will be panic |
| 93 | +func GetStoreWithParams( |
| 94 | + kv kvdb.Kvdb, |
| 95 | + schedulerType string, |
| 96 | + internalKvdb bool, |
| 97 | + name string, |
| 98 | + lockTryDuration time.Duration, |
| 99 | + lockHoldTimeout time.Duration, |
| 100 | +) (Store, error) { |
| 101 | + if len(name) == 0 { |
| 102 | + return nil, fmt.Errorf("name required to create Store") |
| 103 | + } |
| 104 | + var ( |
| 105 | + s Store |
| 106 | + err error |
| 107 | + ) |
| 108 | + |
| 109 | + if internalKvdb && schedulerType == Kubernetes { |
| 110 | + s, _, err = newK8sStoreWithParams(name, lockTryDuration, lockHoldTimeout) |
| 111 | + } else if internalKvdb && kv == nil { |
| 112 | + return nil, fmt.Errorf("bootstrap kvdb cannot be empty") |
| 113 | + } else { |
| 114 | + // Two cases: |
| 115 | + // internal kvdb && kv is not nil |
| 116 | + // external kvdb |
| 117 | + if !internalKvdb { |
| 118 | + if kvdb.Instance() == nil { |
| 119 | + return nil, fmt.Errorf("kvdb is not initialized") |
| 120 | + } |
| 121 | + kv = kvdb.Instance() |
| 122 | + } |
| 123 | + s, err = newKVStoreWithParams(kv, name, lockTryDuration, lockHoldTimeout) |
| 124 | + } |
| 125 | + return s, err |
| 126 | +} |
0 commit comments