diff --git a/blocking.go b/blocking.go index b65f81a..ee230bb 100644 --- a/blocking.go +++ b/blocking.go @@ -18,13 +18,9 @@ var _ Queue[any] = (*Blocking[any])(nil) // If there are no elements available the retrieve operations wait until // elements are added to the queue. type Blocking[T comparable] struct { - // elements queue - elements []T - elementsIndex int - - initialLen int - - capacity *int + initialElems []T + elems []T + capacity *int // synchronization lock sync.RWMutex @@ -45,20 +41,23 @@ func NewBlocking[T comparable]( o.apply(&options) } + // Store initial elements + initialElems := make([]T, len(elems)) + copy(initialElems, elems) + queue := &Blocking[T]{ - elements: elems, - elementsIndex: 0, - initialLen: len(elems), - capacity: options.capacity, - lock: sync.RWMutex{}, + elems: elems, + initialElems: initialElems, + capacity: options.capacity, + lock: sync.RWMutex{}, } queue.notEmptyCond = sync.NewCond(&queue.lock) queue.notFullCond = sync.NewCond(&queue.lock) if queue.capacity != nil { - if len(queue.elements) > *queue.capacity { - queue.elements = queue.elements[:*queue.capacity] + if len(queue.elems) > *queue.capacity { + queue.elems = queue.elems[:*queue.capacity] } } @@ -77,7 +76,7 @@ func (bq *Blocking[T]) OfferWait(elem T) { bq.notFullCond.Wait() } - bq.elements = append(bq.elements, elem) + bq.elems = append(bq.elems, elem) bq.notEmptyCond.Signal() } @@ -92,22 +91,21 @@ func (bq *Blocking[T]) Offer(elem T) error { return ErrQueueIsFull } - bq.elements = append(bq.elements, elem) + bq.elems = append(bq.elems, elem) bq.notEmptyCond.Signal() return nil } -// Reset sets the queue elements index to 0. The queue will be in its initial -// state. +// Reset sets the queue to its initial state with the original elements. func (bq *Blocking[T]) Reset() { bq.lock.Lock() defer bq.lock.Unlock() - bq.elementsIndex = 0 - - bq.elements = bq.elements[:bq.initialLen] + // Restore initial elements + bq.elems = make([]T, len(bq.initialElems)) + copy(bq.elems, bq.initialElems) bq.notEmptyCond.Broadcast() } @@ -117,29 +115,24 @@ func (bq *Blocking[T]) Reset() { // GetWait removes and returns the head of the elements queue. // If no element is available it waits until the queue // has an element available. -// -// It does not actually remove elements from the elements slice, but -// it's incrementing the underlying index. func (bq *Blocking[T]) GetWait() (v T) { bq.lock.Lock() defer bq.lock.Unlock() - defer bq.notFullCond.Signal() - - idx := bq.getNextIndexOrWait() + for bq.isEmpty() { + bq.notEmptyCond.Wait() + } - elem := bq.elements[idx] + elem := bq.elems[0] + bq.elems = bq.elems[1:] - bq.elementsIndex++ + bq.notFullCond.Signal() return elem } // Get removes and returns the head of the elements queue. // If no element is available it returns an ErrNoElementsAvailable error. -// -// It does not actually remove elements from the elements slice, but -// it's incrementing the underlying index. func (bq *Blocking[T]) Get() (v T, _ error) { bq.lock.Lock() defer bq.lock.Unlock() @@ -154,9 +147,9 @@ func (bq *Blocking[T]) Clear() []T { defer bq.notFullCond.Broadcast() - removed := bq.elements[bq.elementsIndex:] - - bq.elementsIndex += len(removed) + removed := make([]T, len(bq.elems)) + copy(removed, bq.elems) + bq.elems = bq.elems[:0] return removed } @@ -199,9 +192,7 @@ func (bq *Blocking[T]) Peek() (v T, _ error) { return v, ErrNoElementsAvailable } - elem := bq.elements[bq.elementsIndex] - - return elem, nil + return bq.elems[0], nil } // PeekWait retrieves but does not return the head of the queue. @@ -215,7 +206,7 @@ func (bq *Blocking[T]) PeekWait() T { bq.notEmptyCond.Wait() } - elem := bq.elements[bq.elementsIndex] + elem := bq.elems[0] // send the not empty signal again in case any remove method waits. bq.notEmptyCond.Signal() @@ -228,7 +219,7 @@ func (bq *Blocking[T]) Size() int { bq.lock.RLock() defer bq.lock.RUnlock() - return bq.size() + return len(bq.elems) } // Contains returns true if the queue contains the given element. @@ -236,8 +227,8 @@ func (bq *Blocking[T]) Contains(elem T) bool { bq.lock.RLock() defer bq.lock.RUnlock() - for i := range bq.elements[bq.elementsIndex:] { - if bq.elements[i] == elem { + for _, e := range bq.elems { + if e == elem { return true } } @@ -255,20 +246,9 @@ func (bq *Blocking[T]) IsEmpty() bool { // ===================================Helpers================================== -// getNextIndexOrWait returns the next available index of the elements slice. -func (bq *Blocking[T]) getNextIndexOrWait() int { - if !bq.isEmpty() { - return bq.elementsIndex - } - - bq.notEmptyCond.Wait() - - return bq.getNextIndexOrWait() -} - // isEmpty returns true if the queue is empty. func (bq *Blocking[T]) isEmpty() bool { - return bq.elementsIndex >= len(bq.elements) + return len(bq.elems) == 0 } // isFull returns true if the queue is full. @@ -277,23 +257,22 @@ func (bq *Blocking[T]) isFull() bool { return false } - return len(bq.elements)-bq.elementsIndex >= *bq.capacity + return len(bq.elems) >= *bq.capacity } func (bq *Blocking[T]) size() int { - return len(bq.elements) - bq.elementsIndex + return len(bq.elems) } func (bq *Blocking[T]) get() (v T, _ error) { - defer bq.notFullCond.Signal() - if bq.isEmpty() { return v, ErrNoElementsAvailable } - elem := bq.elements[bq.elementsIndex] + elem := bq.elems[0] + bq.elems = bq.elems[1:] - bq.elementsIndex++ + bq.notFullCond.Signal() return elem, nil } @@ -301,19 +280,14 @@ func (bq *Blocking[T]) get() (v T, _ error) { // MarshalJSON serializes the Blocking queue to JSON. func (bq *Blocking[T]) MarshalJSON() ([]byte, error) { bq.lock.RLock() + defer bq.lock.RUnlock() - if bq.IsEmpty() { - bq.lock.RUnlock() + if bq.isEmpty() { return []byte("[]"), nil } - // Extract elements from `elements` starting at `elementsIndex`. - elements := bq.elements[bq.elementsIndex:] - - bq.lock.RUnlock() - // Marshal the slice of elements into JSON. - data, err := json.Marshal(elements) + data, err := json.Marshal(bq.elems) if err != nil { return nil, fmt.Errorf("failed to marshal blocking queue: %w", err) }