Skip to content
This repository was archived by the owner on Apr 25, 2025. It is now read-only.

Commit 3e3a3c6

Browse files
authored
feat(event client): add WithEventConsumerTimeout option (#186)
Signed-off-by: Alex Klakovsky <klakovskiy@gmail.com>
1 parent f4911ac commit 3e3a3c6

File tree

3 files changed

+28
-6
lines changed

3 files changed

+28
-6
lines changed

pkg/client/event/event.go

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -15,21 +15,25 @@ SPDX-License-Identifier: Apache-2.0
1515
package event
1616

1717
import (
18+
"time"
19+
1820
"github.com/hyperledger/fabric-sdk-go/pkg/common/options"
1921
"github.com/hyperledger/fabric-sdk-go/pkg/common/providers/context"
2022
"github.com/hyperledger/fabric-sdk-go/pkg/common/providers/fab"
2123
"github.com/hyperledger/fabric-sdk-go/pkg/fab/events/client"
2224
"github.com/hyperledger/fabric-sdk-go/pkg/fab/events/deliverclient"
2325
"github.com/hyperledger/fabric-sdk-go/pkg/fab/events/deliverclient/seek"
26+
"github.com/hyperledger/fabric-sdk-go/pkg/fab/events/service/dispatcher"
2427
"github.com/pkg/errors"
2528
)
2629

2730
// Client enables access to a channel events on a Fabric network.
2831
type Client struct {
29-
eventService fab.EventService
30-
permitBlockEvents bool
31-
fromBlock uint64
32-
seekType seek.Type
32+
eventService fab.EventService
33+
permitBlockEvents bool
34+
fromBlock uint64
35+
seekType seek.Type
36+
eventConsumerTimeout *time.Duration
3337
}
3438

3539
// New returns a Client instance. Client receives events such as block, filtered block,
@@ -64,6 +68,9 @@ func New(channelProvider context.ChannelProvider, opts ...ClientOption) (*Client
6468
opts = append(opts, deliverclient.WithBlockNum(eventClient.fromBlock))
6569
}
6670
}
71+
if eventClient.eventConsumerTimeout != nil {
72+
opts = append(opts, dispatcher.WithEventConsumerTimeout(*eventClient.eventConsumerTimeout))
73+
}
6774
es, err = channelContext.ChannelService().EventService(opts...)
6875
} else {
6976
es, err = channelContext.ChannelService().EventService()

pkg/client/event/event_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ func TestNewEventClient(t *testing.T) {
4343
t.Fatalf("Failed to create new event client: %s", err)
4444
}
4545

46-
_, err = New(ctx, WithBlockEvents(), WithSeekType(seek.Newest), WithBlockNum(math.MaxUint64))
46+
_, err = New(ctx, WithBlockEvents(), WithSeekType(seek.Newest), WithBlockNum(math.MaxUint64), WithEventConsumerTimeout(500 * time.Millisecond))
4747
if err != nil {
4848
t.Fatalf("Failed to create new event client: %s", err)
4949
}

pkg/client/event/opts.go

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,11 @@ SPDX-License-Identifier: Apache-2.0
66

77
package event
88

9-
import "github.com/hyperledger/fabric-sdk-go/pkg/fab/events/deliverclient/seek"
9+
import (
10+
"time"
11+
12+
"github.com/hyperledger/fabric-sdk-go/pkg/fab/events/deliverclient/seek"
13+
)
1014

1115
// ClientOption describes a functional parameter for the New constructor
1216
type ClientOption func(*Client) error
@@ -37,3 +41,14 @@ func WithSeekType(seek seek.Type) ClientOption {
3741
return nil
3842
}
3943
}
44+
45+
// WithEventConsumerTimeout is the timeout when sending events to a registered consumer.
46+
// If < 0, if buffer full, unblocks immediately and does not send.
47+
// If 0, if buffer full, will block and guarantee the event will be sent out.
48+
// If > 0, if buffer full, blocks util timeout.
49+
func WithEventConsumerTimeout(value time.Duration) ClientOption {
50+
return func(c *Client) error {
51+
c.eventConsumerTimeout = &value
52+
return nil
53+
}
54+
}

0 commit comments

Comments
 (0)