@@ -64,8 +64,7 @@ var defaultAPIOpts = &apiOpts{
6464 Max : 10 * time .Second ,
6565 MaxRetries : 10 ,
6666 },
67- client : http .DefaultClient ,
68- // Hardcoded for now.
67+ client : http .DefaultClient ,
6968 retryOnRateLimit : true ,
7069 compression : SnappyBlockCompression ,
7170 path : "api/v1/write" ,
@@ -96,7 +95,7 @@ func WithAPIPath(path string) APIOption {
9695 }
9796}
9897
99- // WithAPIRetryOnRateLimit returns APIOption that disables retrying on rate limit status code.
98+ // WithAPINoRetryOnRateLimit returns APIOption that disables retrying on rate limit status code.
10099func WithAPINoRetryOnRateLimit () APIOption {
101100 return func (o * apiOpts ) error {
102101 o .retryOnRateLimit = false
@@ -373,45 +372,46 @@ type writeStorage interface {
373372 // Other headers might be trimmed, depending on the configured middlewares
374373 // e.g. a default SnappyMiddleware trims "Content-Encoding" and ensures that
375374 // encoded body bytes are already decompressed.
376- Store (ctx context. Context , msgType WriteMessageType , req * http. Request ) (_ * WriteResponse , _ error )
375+ Store (req * http. Request , msgType WriteMessageType ) (_ * WriteResponse , _ error )
377376}
378377
379- type handler struct {
378+ type writeHandler struct {
380379 store writeStorage
381380 acceptedMessageTypes MessageTypes
382- opts handlerOpts
381+ opts writeHandlerOpts
383382}
384383
385- type handlerOpts struct {
384+ type writeHandlerOpts struct {
386385 logger * slog.Logger
387386 middlewares []func (http.Handler ) http.Handler
388387}
389388
390- // HandlerOption represents an option for the handler.
391- type HandlerOption func (o * handlerOpts )
389+ // WriteHandlerOption represents an option for the write handler.
390+ type WriteHandlerOption func (o * writeHandlerOpts )
392391
393- // WithHandlerLogger returns HandlerOption that allows providing slog logger.
392+ // WithWriteHandlerLogger returns WriteHandlerOption that allows providing slog logger.
394393// By default, nothing is logged.
395- func WithHandlerLogger (logger * slog.Logger ) HandlerOption {
396- return func (o * handlerOpts ) {
394+ func WithWriteHandlerLogger (logger * slog.Logger ) WriteHandlerOption {
395+ return func (o * writeHandlerOpts ) {
397396 o .logger = logger
398397 }
399398}
400399
401- // WithHandlerMiddleware returns HandlerOption that allows providing middlewares.
400+ // WithWriteHandlerMiddlewares returns WriteHandlerOption that allows providing middlewares.
402401// Multiple middlewares can be provided and will be applied in the order they are passed.
403- // When using this option, SnappyDecompressorMiddleware is not applied by default so
404- // it (or any other decompression middleware) needs to be added explicitly.
405- func WithHandlerMiddlewares (middlewares ... func (http.Handler ) http.Handler ) HandlerOption {
406- return func (o * handlerOpts ) {
402+ // This option replaces the default middlewares (SnappyDecompressorMiddleware), so if
403+ // you want to have handler that works with the default Remote Write 2.0 protocol,
404+ // SnappyDecompressorMiddleware (or any other decompression middleware) needs to be added explicitly.
405+ func WithWriteHandlerMiddlewares (middlewares ... func (http.Handler ) http.Handler ) WriteHandlerOption {
406+ return func (o * writeHandlerOpts ) {
407407 o .middlewares = middlewares
408408 }
409409}
410410
411- // SnappyDecompressorMiddleware returns a middleware that checks if the request body is snappy-encoded and decompresses it.
411+ // SnappyDecodeMiddleware returns a middleware that checks if the request body is snappy-encoded and decompresses it.
412412// If the request body is not snappy-encoded, it returns an error.
413- // Used by default in NewRemoteWriteHandler .
414- func SnappyDecompressorMiddleware (logger * slog.Logger ) func (http.Handler ) http.Handler {
413+ // Used by default in NewHandler .
414+ func SnappyDecodeMiddleware (logger * slog.Logger ) func (http.Handler ) http.Handler {
415415 bufPool := sync.Pool {
416416 New : func () any {
417417 return bytes .NewBuffer (nil )
@@ -455,18 +455,18 @@ func SnappyDecompressorMiddleware(logger *slog.Logger) func(http.Handler) http.H
455455 }
456456}
457457
458- // NewHandler returns HTTP handler that receives Remote Write 2.0
458+ // NewWriteHandler returns HTTP handler that receives Remote Write 2.0
459459// protocol https://prometheus.io/docs/specs/remote_write_spec_2_0/.
460- func NewHandler (store writeStorage , acceptedMessageTypes MessageTypes , opts ... HandlerOption ) http.Handler {
461- o := handlerOpts {
460+ func NewWriteHandler (store writeStorage , acceptedMessageTypes MessageTypes , opts ... WriteHandlerOption ) http.Handler {
461+ o := writeHandlerOpts {
462462 logger : slog .New (nopSlogHandler {}),
463- middlewares : []func (http.Handler ) http.Handler {SnappyDecompressorMiddleware (slog .New (nopSlogHandler {}))},
463+ middlewares : []func (http.Handler ) http.Handler {SnappyDecodeMiddleware (slog .New (nopSlogHandler {}))},
464464 }
465465 for _ , opt := range opts {
466466 opt (& o )
467467 }
468468
469- h := & handler {
469+ h := & writeHandler {
470470 opts : o ,
471471 store : store ,
472472 acceptedMessageTypes : acceptedMessageTypes ,
@@ -513,7 +513,7 @@ func ParseProtoMsg(contentType string) (WriteMessageType, error) {
513513 return WriteV1MessageType , nil
514514}
515515
516- func (h * handler ) ServeHTTP (w http.ResponseWriter , r * http.Request ) {
516+ func (h * writeHandler ) ServeHTTP (w http.ResponseWriter , r * http.Request ) {
517517 if r .Method != http .MethodPost {
518518 http .Error (w , "Method not allowed" , http .StatusMethodNotAllowed )
519519 return
@@ -538,20 +538,25 @@ func (h *handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
538538 return
539539 }
540540
541- writeResponse , storeErr := h .store .Store (r .Context (), msgType , r )
541+ writeResponse , storeErr := h .store .Store (r , msgType )
542+ if writeResponse == nil {
543+ // User could forget to return write response; in this case we assume 0 samples
544+ // were written.
545+ writeResponse = NewWriteResponse ()
546+ }
542547
543- // Set required X-Prometheus-Remote-Write-Written-* response headers, in all cases, alongwith any user-defined headers.
544- writeResponse .SetHeaders (w )
548+ // Set required X-Prometheus-Remote-Write-Written-* response headers, in all cases, along with any user-defined headers.
549+ writeResponse .writeHeaders (w )
545550
546551 if storeErr != nil {
547- if writeResponse .StatusCode () == 0 {
552+ if writeResponse .statusCode == 0 {
548553 writeResponse .SetStatusCode (http .StatusInternalServerError )
549554 }
550- if writeResponse .StatusCode () / 100 == 5 { // 5xx
551- h .opts .logger .Error ("Error while storing the remote write request" , "err" , storeErr . Error () )
555+ if writeResponse .statusCode / 100 == 5 { // 5xx
556+ h .opts .logger .Error ("Error while storing the remote write request" , "err" , storeErr )
552557 }
553- http .Error (w , storeErr .Error (), writeResponse .StatusCode () )
558+ http .Error (w , storeErr .Error (), writeResponse .statusCode )
554559 return
555560 }
556- w .WriteHeader (writeResponse .StatusCode () )
561+ w .WriteHeader (writeResponse .statusCode )
557562}
0 commit comments