diff --git a/pkg/kevent/callstack.go b/pkg/callstack/callstack.go similarity index 71% rename from pkg/kevent/callstack.go rename to pkg/callstack/callstack.go index 611f23f9b..051db7c58 100644 --- a/pkg/kevent/callstack.go +++ b/pkg/callstack/callstack.go @@ -16,32 +16,18 @@ * limitations under the License. */ -package kevent +package callstack import ( - "expvar" - "github.com/rabbitstack/fibratus/pkg/kevent/kparams" - "github.com/rabbitstack/fibratus/pkg/util/multierror" "github.com/rabbitstack/fibratus/pkg/util/va" - log "github.com/sirupsen/logrus" "golang.org/x/arch/x86/x86asm" "golang.org/x/sys/windows" "os" "path/filepath" "strconv" "strings" - "sync" - "time" ) -// maxDequeFlushPeriod specifies the maximum period -// for the events to reside in the deque. -var maxDequeFlushPeriod = time.Second * 30 -var flusherInterval = time.Second * 5 - -// callstackFlushes computes overall callstack dequeue flushes -var callstackFlushes = expvar.NewInt("callstack.flushes") - // unbacked represents the identifier for unbacked regions in stack frames const unbacked = "unbacked" @@ -370,133 +356,3 @@ func (s Callstack) CallsiteInsns(pid uint32, leading bool) []string { } return opcodes } - -// CallstackDecorator maintains a FIFO queue where events -// eligible for stack enrichment are queued. Upon arrival -// of the respective stack walk event, the acting event is -// popped from the queue and enriched with return addresses -// which are later subject to symbolization. -type CallstackDecorator struct { - buckets map[uint64][]*Kevent - q *Queue - mux sync.Mutex - - flusher *time.Ticker - quit chan struct{} -} - -// NewCallstackDecorator creates a new callstack decorator -// which receives the event queue for long-standing event -// flushing. -func NewCallstackDecorator(q *Queue) *CallstackDecorator { - c := &CallstackDecorator{ - q: q, - buckets: make(map[uint64][]*Kevent), - flusher: time.NewTicker(flusherInterval), - quit: make(chan struct{}, 1), - } - - go c.doFlush() - - return c -} - -// Push pushes a new event to the queue. -func (cd *CallstackDecorator) Push(e *Kevent) { - cd.mux.Lock() - defer cd.mux.Unlock() - - // append the event to the bucket indexed by stack id - id := e.StackID() - q, ok := cd.buckets[id] - if !ok { - cd.buckets[id] = []*Kevent{e} - } else { - cd.buckets[id] = append(q, e) - } -} - -// Pop receives the stack walk event and pops the oldest -// originating event with the same pid,tid tuple formerly -// coined as stack identifier. The originating event is then -// decorated with callstack return addresses. -func (cd *CallstackDecorator) Pop(e *Kevent) *Kevent { - cd.mux.Lock() - defer cd.mux.Unlock() - - id := e.StackID() - q, ok := cd.buckets[id] - if !ok { - return e - } - - var evt *Kevent - if len(q) > 0 { - evt, cd.buckets[id] = q[0], q[1:] - } - - if evt == nil { - return e - } - - callstack := e.Kparams.MustGetSlice(kparams.Callstack) - evt.AppendParam(kparams.Callstack, kparams.Slice, callstack) - - return evt -} - -// Stop shutdowns the callstack decorator flusher. -func (cd *CallstackDecorator) Stop() { - cd.quit <- struct{}{} -} - -// RemoveBucket removes the bucket and all enqueued events. -func (cd *CallstackDecorator) RemoveBucket(id uint64) { - cd.mux.Lock() - defer cd.mux.Unlock() - delete(cd.buckets, id) -} - -func (cd *CallstackDecorator) doFlush() { - for { - select { - case <-cd.flusher.C: - errs := cd.flush() - if len(errs) > 0 { - log.Warnf("callstack: unable to flush queued events: %v", multierror.Wrap(errs...)) - } - case <-cd.quit: - return - } - } -} - -// flush pushes events to the event queue if they have -// been living in the deque more than the maximum allowed -// flush period. -func (cd *CallstackDecorator) flush() []error { - cd.mux.Lock() - defer cd.mux.Unlock() - - if len(cd.buckets) == 0 { - return nil - } - - errs := make([]error, 0) - - for id, q := range cd.buckets { - for i, evt := range q { - if time.Since(evt.Timestamp) < maxDequeFlushPeriod { - continue - } - callstackFlushes.Add(1) - err := cd.q.push(evt) - if err != nil { - errs = append(errs, err) - } - cd.buckets[id] = append(q[:i], q[i+1:]...) - } - } - - return errs -} diff --git a/pkg/callstack/callstack_test.go b/pkg/callstack/callstack_test.go new file mode 100644 index 000000000..664fc9909 --- /dev/null +++ b/pkg/callstack/callstack_test.go @@ -0,0 +1,59 @@ +/* + * Copyright 2021-present by Nedim Sabic Sabic + * https://www.fibratus.io + * All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package callstack + +import ( + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "testing" +) + +func TestCallstack(t *testing.T) { + var callstack Callstack + callstack.Init(9) + + assert.Equal(t, 9, cap(callstack)) + + callstack.PushFrame(Frame{Addr: 0x2638e59e0a5, Offset: 0, Symbol: "?", Module: "unbacked"}) + callstack.PushFrame(Frame{Addr: 0x7ffb313853b2, Offset: 0x10a, Symbol: "Java_java_lang_ProcessImpl_create", Module: "C:\\Program Files\\JetBrains\\GoLand 2021.2.3\\jbr\\bin\\java.dll"}) + callstack.PushFrame(Frame{Addr: 0x7ffb3138592e, Offset: 0x3a2, Symbol: "Java_java_lang_ProcessImpl_waitForTimeoutInterruptibly", Module: "C:\\Program Files\\JetBrains\\GoLand 2021.2.3\\jbr\\bin\\java.dll"}) + callstack.PushFrame(Frame{Addr: 0x7ffb5c1d0396, Offset: 0x61, Symbol: "CreateProcessW", Module: "C:\\WINDOWS\\System32\\KERNELBASE.dll"}) + callstack.PushFrame(Frame{Addr: 0x7ffb5d8e61f4, Offset: 0x54, Symbol: "CreateProcessW", Module: "C:\\WINDOWS\\System32\\KERNEL32.DLL"}) + callstack.PushFrame(Frame{Addr: 0x7ffb5c1d0396, Offset: 0x66, Symbol: "CreateProcessW", Module: "C:\\WINDOWS\\System32\\KERNELBASE.dll"}) + callstack.PushFrame(Frame{Addr: 0xfffff8015662a605, Offset: 0x9125, Symbol: "setjmpex", Module: "C:\\WINDOWS\\system32\\ntoskrnl.exe"}) + callstack.PushFrame(Frame{Addr: 0xfffff801568e9c33, Offset: 0x2ef3, Symbol: "LpcRequestPort", Module: "C:\\WINDOWS\\system32\\ntoskrnl.exe"}) + callstack.PushFrame(Frame{Addr: 0xfffff8015690b644, Offset: 0x45b4, Symbol: "ObDeleteCapturedInsertInfo", Module: "C:\\WINDOWS\\system32\\ntoskrnl.exe"}) + + assert.True(t, callstack.ContainsUnbacked()) + assert.Equal(t, 9, callstack.Depth()) + assert.Equal(t, "0xfffff8015690b644 C:\\WINDOWS\\system32\\ntoskrnl.exe!ObDeleteCapturedInsertInfo+0x45b4|0xfffff801568e9c33 C:\\WINDOWS\\system32\\ntoskrnl.exe!LpcRequestPort+0x2ef3|0xfffff8015662a605 C:\\WINDOWS\\system32\\ntoskrnl.exe!setjmpex+0x9125|0x7ffb5c1d0396 C:\\WINDOWS\\System32\\KERNELBASE.dll!CreateProcessW+0x66|0x7ffb5d8e61f4 C:\\WINDOWS\\System32\\KERNEL32.DLL!CreateProcessW+0x54|0x7ffb5c1d0396 C:\\WINDOWS\\System32\\KERNELBASE.dll!CreateProcessW+0x61|0x7ffb3138592e C:\\Program Files\\JetBrains\\GoLand 2021.2.3\\jbr\\bin\\java.dll!Java_java_lang_ProcessImpl_waitForTimeoutInterruptibly+0x3a2|0x7ffb313853b2 C:\\Program Files\\JetBrains\\GoLand 2021.2.3\\jbr\\bin\\java.dll!Java_java_lang_ProcessImpl_create+0x10a|0x2638e59e0a5 unbacked!?", callstack.String()) + assert.Equal(t, "KERNELBASE.dll|KERNEL32.DLL|KERNELBASE.dll|java.dll|unbacked", callstack.Summary()) + + uframe := callstack.FinalUserFrame() + require.NotNil(t, uframe) + assert.Equal(t, "7ffb5c1d0396", uframe.Addr.String()) + assert.Equal(t, "CreateProcessW", uframe.Symbol) + assert.Equal(t, "C:\\WINDOWS\\System32\\KERNELBASE.dll", uframe.Module) + + kframe := callstack.FinalKernelFrame() + require.NotNil(t, kframe) + assert.Equal(t, "fffff8015690b644", kframe.Addr.String()) + assert.Equal(t, "ObDeleteCapturedInsertInfo", kframe.Symbol) + assert.Equal(t, "C:\\WINDOWS\\system32\\ntoskrnl.exe", kframe.Module) +} diff --git a/pkg/filter/accessor_windows_test.go b/pkg/filter/accessor_windows_test.go index a6bf4ae62..bf8bb7bba 100644 --- a/pkg/filter/accessor_windows_test.go +++ b/pkg/filter/accessor_windows_test.go @@ -19,6 +19,7 @@ package filter import ( + "github.com/rabbitstack/fibratus/pkg/callstack" "github.com/rabbitstack/fibratus/pkg/kevent" "github.com/rabbitstack/fibratus/pkg/kevent/ktypes" ptypes "github.com/rabbitstack/fibratus/pkg/ps/types" @@ -107,22 +108,22 @@ func TestIsFieldAccessible(t *testing.T) { }, { newThreadAccessor(), - &kevent.Kevent{Type: ktypes.CreateProcess, Category: ktypes.Process, Callstack: []kevent.Frame{{Addr: 0x7ffb5c1d0396, Offset: 0x61, Symbol: "CreateProcessW", Module: "C:\\WINDOWS\\System32\\KERNELBASE.dll"}}}, + &kevent.Kevent{Type: ktypes.CreateProcess, Category: ktypes.Process, Callstack: []callstack.Frame{{Addr: 0x7ffb5c1d0396, Offset: 0x61, Symbol: "CreateProcessW", Module: "C:\\WINDOWS\\System32\\KERNELBASE.dll"}}}, true, }, { newThreadAccessor(), - &kevent.Kevent{Type: ktypes.RegSetValue, Category: ktypes.Registry, Callstack: []kevent.Frame{{Addr: 0x7ffb5c1d0396, Offset: 0x61, Symbol: "CreateProcessW", Module: "C:\\WINDOWS\\System32\\KERNELBASE.dll"}}}, + &kevent.Kevent{Type: ktypes.RegSetValue, Category: ktypes.Registry, Callstack: []callstack.Frame{{Addr: 0x7ffb5c1d0396, Offset: 0x61, Symbol: "CreateProcessW", Module: "C:\\WINDOWS\\System32\\KERNELBASE.dll"}}}, true, }, { newRegistryAccessor(), - &kevent.Kevent{Type: ktypes.RegSetValue, Category: ktypes.Registry, Callstack: []kevent.Frame{{Addr: 0x7ffb5c1d0396, Offset: 0x61, Symbol: "CreateProcessW", Module: "C:\\WINDOWS\\System32\\KERNELBASE.dll"}}}, + &kevent.Kevent{Type: ktypes.RegSetValue, Category: ktypes.Registry, Callstack: []callstack.Frame{{Addr: 0x7ffb5c1d0396, Offset: 0x61, Symbol: "CreateProcessW", Module: "C:\\WINDOWS\\System32\\KERNELBASE.dll"}}}, true, }, { newNetworkAccessor(), - &kevent.Kevent{Type: ktypes.RegSetValue, Category: ktypes.Registry, Callstack: []kevent.Frame{{Addr: 0x7ffb5c1d0396, Offset: 0x61, Symbol: "CreateProcessW", Module: "C:\\WINDOWS\\System32\\KERNELBASE.dll"}}}, + &kevent.Kevent{Type: ktypes.RegSetValue, Category: ktypes.Registry, Callstack: []callstack.Frame{{Addr: 0x7ffb5c1d0396, Offset: 0x61, Symbol: "CreateProcessW", Module: "C:\\WINDOWS\\System32\\KERNELBASE.dll"}}}, false, }, { diff --git a/pkg/filter/filter_test.go b/pkg/filter/filter_test.go index b69250fcb..6e2eae2da 100644 --- a/pkg/filter/filter_test.go +++ b/pkg/filter/filter_test.go @@ -20,6 +20,7 @@ package filter import ( "github.com/rabbitstack/fibratus/internal/etw/processors" + "github.com/rabbitstack/fibratus/pkg/callstack" "github.com/rabbitstack/fibratus/pkg/config" "github.com/rabbitstack/fibratus/pkg/filter/fields" "github.com/rabbitstack/fibratus/pkg/fs" @@ -377,14 +378,14 @@ func TestThreadFilter(t *testing.T) { require.NoError(t, windows.WriteProcessMemory(windows.CurrentProcess(), base, &insns[0], uintptr(len(insns)), nil)) kevt.Callstack.Init(8) - kevt.Callstack.PushFrame(kevent.Frame{PID: kevt.PID, Addr: 0x2638e59e0a5, Offset: 0, Symbol: "?", Module: "unbacked"}) - kevt.Callstack.PushFrame(kevent.Frame{PID: kevt.PID, Addr: va.Address(base), Offset: 0, Symbol: "?", Module: "unbacked"}) - kevt.Callstack.PushFrame(kevent.Frame{PID: kevt.PID, Addr: 0x7ffb313853b2, Offset: 0x10a, Symbol: "Java_java_lang_ProcessImpl_create", Module: "C:\\Program Files\\JetBrains\\GoLand 2021.2.3\\jbr\\bin\\java.dll"}) - kevt.Callstack.PushFrame(kevent.Frame{PID: kevt.PID, Addr: 0x7ffb3138592e, Offset: 0x3a2, Symbol: "Java_java_lang_ProcessImpl_waitForTimeoutInterruptibly", Module: "C:\\Program Files\\JetBrains\\GoLand 2021.2.3\\jbr\\bin\\java.dll"}) - kevt.Callstack.PushFrame(kevent.Frame{PID: kevt.PID, Addr: 0x7ffb5d8e61f4, Offset: 0x54, Symbol: "CreateProcessW", Module: "C:\\WINDOWS\\System32\\KERNEL32.DLL"}) - kevt.Callstack.PushFrame(kevent.Frame{PID: kevt.PID, Addr: 0x7ffb5c1d0396, ModuleAddress: 0x7ffb5c1d0396, Offset: 0x66, Symbol: "CreateProcessW", Module: "C:\\WINDOWS\\System32\\KERNELBASE.dll"}) - kevt.Callstack.PushFrame(kevent.Frame{PID: kevt.PID, Addr: 0xfffff8072ebc1f6f, Offset: 0x4ef, Symbol: "FltRequestFileInfoOnCreateCompletion", Module: "C:\\WINDOWS\\System32\\drivers\\FLTMGR.SYS"}) - kevt.Callstack.PushFrame(kevent.Frame{PID: kevt.PID, Addr: 0xfffff8072eb8961b, Offset: 0x20cb, Symbol: "FltGetStreamContext", Module: "C:\\WINDOWS\\System32\\drivers\\FLTMGR.SYS"}) + kevt.Callstack.PushFrame(callstack.Frame{PID: kevt.PID, Addr: 0x2638e59e0a5, Offset: 0, Symbol: "?", Module: "unbacked"}) + kevt.Callstack.PushFrame(callstack.Frame{PID: kevt.PID, Addr: va.Address(base), Offset: 0, Symbol: "?", Module: "unbacked"}) + kevt.Callstack.PushFrame(callstack.Frame{PID: kevt.PID, Addr: 0x7ffb313853b2, Offset: 0x10a, Symbol: "Java_java_lang_ProcessImpl_create", Module: "C:\\Program Files\\JetBrains\\GoLand 2021.2.3\\jbr\\bin\\java.dll"}) + kevt.Callstack.PushFrame(callstack.Frame{PID: kevt.PID, Addr: 0x7ffb3138592e, Offset: 0x3a2, Symbol: "Java_java_lang_ProcessImpl_waitForTimeoutInterruptibly", Module: "C:\\Program Files\\JetBrains\\GoLand 2021.2.3\\jbr\\bin\\java.dll"}) + kevt.Callstack.PushFrame(callstack.Frame{PID: kevt.PID, Addr: 0x7ffb5d8e61f4, Offset: 0x54, Symbol: "CreateProcessW", Module: "C:\\WINDOWS\\System32\\KERNEL32.DLL"}) + kevt.Callstack.PushFrame(callstack.Frame{PID: kevt.PID, Addr: 0x7ffb5c1d0396, ModuleAddress: 0x7ffb5c1d0396, Offset: 0x66, Symbol: "CreateProcessW", Module: "C:\\WINDOWS\\System32\\KERNELBASE.dll"}) + kevt.Callstack.PushFrame(callstack.Frame{PID: kevt.PID, Addr: 0xfffff8072ebc1f6f, Offset: 0x4ef, Symbol: "FltRequestFileInfoOnCreateCompletion", Module: "C:\\WINDOWS\\System32\\drivers\\FLTMGR.SYS"}) + kevt.Callstack.PushFrame(callstack.Frame{PID: kevt.PID, Addr: 0xfffff8072eb8961b, Offset: 0x20cb, Symbol: "FltGetStreamContext", Module: "C:\\WINDOWS\\System32\\drivers\\FLTMGR.SYS"}) var tests = []struct { filter string @@ -504,7 +505,7 @@ func TestThreadFilter(t *testing.T) { var n uintptr require.NoError(t, windows.WriteProcessMemory(pi.Process, ntdll, &insns[0], uintptr(len(insns)), &n)) - kevt.Callstack[0] = kevent.Frame{PID: kevt.PID, Addr: va.Address(ntdll), Offset: 0, Symbol: "?", Module: "C:\\Windows\\System32\\ntdll.dll"} + kevt.Callstack[0] = callstack.Frame{PID: kevt.PID, Addr: va.Address(ntdll), Offset: 0, Symbol: "?", Module: "C:\\Windows\\System32\\ntdll.dll"} var tests1 = []struct { filter string diff --git a/pkg/filter/ql/function.go b/pkg/filter/ql/function.go index 2890c1058..d6f54630f 100644 --- a/pkg/filter/ql/function.go +++ b/pkg/filter/ql/function.go @@ -20,8 +20,8 @@ package ql import ( "fmt" + "github.com/rabbitstack/fibratus/pkg/callstack" "github.com/rabbitstack/fibratus/pkg/filter/fields" - "github.com/rabbitstack/fibratus/pkg/kevent" "github.com/rabbitstack/fibratus/pkg/pe" pstypes "github.com/rabbitstack/fibratus/pkg/ps/types" "github.com/rabbitstack/fibratus/pkg/util/signature" @@ -243,7 +243,7 @@ func (f *Foreach) Call(args []interface{}) (interface{}, bool) { return true, true } } - case kevent.Callstack: + case callstack.Callstack: var pid uint32 var proc windows.Handle var err error @@ -536,7 +536,7 @@ func (f *Foreach) mmapMapValuer(segments []*BoundSegmentLiteral, mmap pstypes.Mm } // callstackMapValuer returns map valuer with thread stack frame data. -func (f *Foreach) callstackMapValuer(segments []*BoundSegmentLiteral, frame kevent.Frame, proc windows.Handle) MapValuer { +func (f *Foreach) callstackMapValuer(segments []*BoundSegmentLiteral, frame callstack.Frame, proc windows.Handle) MapValuer { var valuer = MapValuer{} for _, seg := range segments { key := seg.Value diff --git a/pkg/kevent/formatter.go b/pkg/kevent/formatter.go index 09dd6bcf2..0c4c8a8b0 100644 --- a/pkg/kevent/formatter.go +++ b/pkg/kevent/formatter.go @@ -55,7 +55,7 @@ const ( host = ".Host" pe = ".PE" kparsAccessor = ".Kparams." - callstack = ".Callstack" + cstack = ".Callstack" ) var ( @@ -90,7 +90,7 @@ var kfields = map[string]bool{ meta: true, host: true, pe: true, - callstack: true, + cstack: true, } func hintFields() string { diff --git a/pkg/kevent/formatter_windows.go b/pkg/kevent/formatter_windows.go index 673c60d71..2692fe7d5 100644 --- a/pkg/kevent/formatter_windows.go +++ b/pkg/kevent/formatter_windows.go @@ -62,7 +62,7 @@ func (f *Formatter) Format(kevt *Kevent) []byte { } // add callstack summary if !kevt.Callstack.IsEmpty() { - values[callstack] = kevt.Callstack.String() + values[cstack] = kevt.Callstack.String() } if f.expandKparamsDot { diff --git a/pkg/kevent/kevent.go b/pkg/kevent/kevent.go index 2f7efe7e4..7aaa7d9db 100644 --- a/pkg/kevent/kevent.go +++ b/pkg/kevent/kevent.go @@ -20,6 +20,7 @@ package kevent import ( "fmt" + "github.com/rabbitstack/fibratus/pkg/callstack" kcapver "github.com/rabbitstack/fibratus/pkg/kcap/version" "github.com/rabbitstack/fibratus/pkg/kevent/kparams" "github.com/rabbitstack/fibratus/pkg/kevent/ktypes" @@ -98,7 +99,7 @@ type Kevent struct { // PS represents process' metadata and its allocated resources such as handles, DLLs, etc. PS *pstypes.PS `json:"ps,omitempty"` // Callstack represents the call stack for the thread that generated the event. - Callstack Callstack `json:"callstack"` + Callstack callstack.Callstack `json:"callstack"` // WaitEnqueue indicates if this event should temporarily defer pushing to // the consumer output queue. This is usually required in event processors // to propagate certain events stored in processor's state when the related diff --git a/pkg/kevent/kevent_windows_test.go b/pkg/kevent/kevent_windows_test.go index 4347b3649..8c7329cfd 100644 --- a/pkg/kevent/kevent_windows_test.go +++ b/pkg/kevent/kevent_windows_test.go @@ -19,6 +19,7 @@ package kevent import ( + "github.com/rabbitstack/fibratus/pkg/callstack" "github.com/rabbitstack/fibratus/pkg/fs" "github.com/rabbitstack/fibratus/pkg/kevent/kparams" "github.com/rabbitstack/fibratus/pkg/kevent/ktypes" @@ -119,3 +120,46 @@ func TestPartialKey(t *testing.T) { }) } } + +func TestCallstack(t *testing.T) { + e := &Kevent{ + Type: ktypes.CreateProcess, + Tid: 2484, + PID: 859, + CPU: 1, + Seq: 2, + Name: "CreateProcess", + Timestamp: time.Now(), + Category: ktypes.Process, + } + + e.Callstack.Init(9) + assert.Equal(t, 9, cap(e.Callstack)) + + e.Callstack.PushFrame(callstack.Frame{Addr: 0x2638e59e0a5, Offset: 0, Symbol: "?", Module: "unbacked"}) + e.Callstack.PushFrame(callstack.Frame{Addr: 0x7ffb313853b2, Offset: 0x10a, Symbol: "Java_java_lang_ProcessImpl_create", Module: "C:\\Program Files\\JetBrains\\GoLand 2021.2.3\\jbr\\bin\\java.dll"}) + e.Callstack.PushFrame(callstack.Frame{Addr: 0x7ffb3138592e, Offset: 0x3a2, Symbol: "Java_java_lang_ProcessImpl_waitForTimeoutInterruptibly", Module: "C:\\Program Files\\JetBrains\\GoLand 2021.2.3\\jbr\\bin\\java.dll"}) + e.Callstack.PushFrame(callstack.Frame{Addr: 0x7ffb5c1d0396, Offset: 0x61, Symbol: "CreateProcessW", Module: "C:\\WINDOWS\\System32\\KERNELBASE.dll"}) + e.Callstack.PushFrame(callstack.Frame{Addr: 0x7ffb5d8e61f4, Offset: 0x54, Symbol: "CreateProcessW", Module: "C:\\WINDOWS\\System32\\KERNEL32.DLL"}) + e.Callstack.PushFrame(callstack.Frame{Addr: 0x7ffb5c1d0396, Offset: 0x66, Symbol: "CreateProcessW", Module: "C:\\WINDOWS\\System32\\KERNELBASE.dll"}) + e.Callstack.PushFrame(callstack.Frame{Addr: 0xfffff8015662a605, Offset: 0x9125, Symbol: "setjmpex", Module: "C:\\WINDOWS\\system32\\ntoskrnl.exe"}) + e.Callstack.PushFrame(callstack.Frame{Addr: 0xfffff801568e9c33, Offset: 0x2ef3, Symbol: "LpcRequestPort", Module: "C:\\WINDOWS\\system32\\ntoskrnl.exe"}) + e.Callstack.PushFrame(callstack.Frame{Addr: 0xfffff8015690b644, Offset: 0x45b4, Symbol: "ObDeleteCapturedInsertInfo", Module: "C:\\WINDOWS\\system32\\ntoskrnl.exe"}) + + assert.True(t, e.Callstack.ContainsUnbacked()) + assert.Equal(t, 9, e.Callstack.Depth()) + assert.Equal(t, "0xfffff8015690b644 C:\\WINDOWS\\system32\\ntoskrnl.exe!ObDeleteCapturedInsertInfo+0x45b4|0xfffff801568e9c33 C:\\WINDOWS\\system32\\ntoskrnl.exe!LpcRequestPort+0x2ef3|0xfffff8015662a605 C:\\WINDOWS\\system32\\ntoskrnl.exe!setjmpex+0x9125|0x7ffb5c1d0396 C:\\WINDOWS\\System32\\KERNELBASE.dll!CreateProcessW+0x66|0x7ffb5d8e61f4 C:\\WINDOWS\\System32\\KERNEL32.DLL!CreateProcessW+0x54|0x7ffb5c1d0396 C:\\WINDOWS\\System32\\KERNELBASE.dll!CreateProcessW+0x61|0x7ffb3138592e C:\\Program Files\\JetBrains\\GoLand 2021.2.3\\jbr\\bin\\java.dll!Java_java_lang_ProcessImpl_waitForTimeoutInterruptibly+0x3a2|0x7ffb313853b2 C:\\Program Files\\JetBrains\\GoLand 2021.2.3\\jbr\\bin\\java.dll!Java_java_lang_ProcessImpl_create+0x10a|0x2638e59e0a5 unbacked!?", e.Callstack.String()) + assert.Equal(t, "KERNELBASE.dll|KERNEL32.DLL|KERNELBASE.dll|java.dll|unbacked", e.Callstack.Summary()) + + uframe := e.Callstack.FinalUserFrame() + require.NotNil(t, uframe) + assert.Equal(t, "7ffb5c1d0396", uframe.Addr.String()) + assert.Equal(t, "CreateProcessW", uframe.Symbol) + assert.Equal(t, "C:\\WINDOWS\\System32\\KERNELBASE.dll", uframe.Module) + + kframe := e.Callstack.FinalKernelFrame() + require.NotNil(t, kframe) + assert.Equal(t, "fffff8015690b644", kframe.Addr.String()) + assert.Equal(t, "ObDeleteCapturedInsertInfo", kframe.Symbol) + assert.Equal(t, "C:\\WINDOWS\\system32\\ntoskrnl.exe", kframe.Module) +} diff --git a/pkg/kevent/queue.go b/pkg/kevent/queue.go index 974ab97f7..9a9a801e4 100644 --- a/pkg/kevent/queue.go +++ b/pkg/kevent/queue.go @@ -53,7 +53,7 @@ type Queue struct { q chan *Kevent listeners []Listener backlog *backlog - cd *CallstackDecorator + decorator *StackwalkDecorator stackEnrichment bool enqueueAlways bool } @@ -67,7 +67,7 @@ func NewQueue(size int, stackEnrichment bool, enqueueAlways bool) *Queue { stackEnrichment: stackEnrichment, enqueueAlways: enqueueAlways, } - q.cd = NewCallstackDecorator(q) + q.decorator = NewStackwalkDecorator(q) return q } @@ -80,7 +80,7 @@ func NewQueueWithChannel(ch chan *Kevent, stackEnrichment bool, enqueueAlways bo stackEnrichment: stackEnrichment, enqueueAlways: enqueueAlways, } - q.cd = NewCallstackDecorator(q) + q.decorator = NewStackwalkDecorator(q) return q } @@ -94,7 +94,7 @@ func (q *Queue) RegisterListener(listener Listener) { func (q *Queue) Events() <-chan *Kevent { return q.q } // Close closes the queue disposing allocated resources. -func (q *Queue) Close() { q.cd.Stop() } +func (q *Queue) Close() { q.decorator.Stop() } // Push pushes a new event to the channel. Prior to // sending the event to the channel, all registered @@ -120,12 +120,12 @@ func (q *Queue) Push(e *Kevent) error { if q.stackEnrichment { // store pending event for callstack enrichment if e.Type.CanEnrichStack() { - q.cd.Push(e) + q.decorator.Push(e) return nil } // decorate events with callstack return addresses if e.IsStackWalk() { - e = q.cd.Pop(e) + e = q.decorator.Pop(e) } } if isEventDelayed(e) { @@ -159,7 +159,7 @@ func (q *Queue) push(e *Kevent) error { } if q.stackEnrichment && e.IsTerminateThread() { id := uint64(e.Kparams.MustGetPid() + e.Kparams.MustGetTid()) - q.cd.RemoveBucket(id) + q.decorator.RemoveBucket(id) } if enqueue || len(q.listeners) == 0 { q.q <- e diff --git a/pkg/kevent/stackwalk.go b/pkg/kevent/stackwalk.go new file mode 100644 index 000000000..82d5d462c --- /dev/null +++ b/pkg/kevent/stackwalk.go @@ -0,0 +1,168 @@ +/* + * Copyright 2021-present by Nedim Sabic Sabic + * https://www.fibratus.io + * All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kevent + +import ( + "expvar" + "github.com/rabbitstack/fibratus/pkg/kevent/kparams" + "github.com/rabbitstack/fibratus/pkg/util/multierror" + log "github.com/sirupsen/logrus" + "sync" + "time" +) + +// maxDequeFlushPeriod specifies the maximum period +// for the events to reside in the queue. +var maxDequeFlushPeriod = time.Second * 30 + +// flusherInterval specifies the interval for the queue flushing. +var flusherInterval = time.Second * 5 + +// callstackFlushes computes overall callstack dequeue flushes +var callstackFlushes = expvar.NewInt("callstack.flushes") + +// StackwalkDecorator maintains a FIFO queue where events +// eligible for stack enrichment are queued. Upon arrival +// of the respective stack walk event, the acting event is +// popped from the queue and enriched with return addresses +// which are later subject to symbolization. +type StackwalkDecorator struct { + buckets map[uint64][]*Kevent + q *Queue + mux sync.Mutex + + flusher *time.Ticker + quit chan struct{} +} + +// NewStackwalkDecorator creates a new callstack return +// addresses decorator which receives the event queue +// for long-standing event flushing. +func NewStackwalkDecorator(q *Queue) *StackwalkDecorator { + s := &StackwalkDecorator{ + q: q, + buckets: make(map[uint64][]*Kevent), + flusher: time.NewTicker(flusherInterval), + quit: make(chan struct{}, 1), + } + + go s.doFlush() + + return s +} + +// Push pushes a new event to the queue. +func (s *StackwalkDecorator) Push(e *Kevent) { + s.mux.Lock() + defer s.mux.Unlock() + + // append the event to the bucket indexed by stack id + id := e.StackID() + q, ok := s.buckets[id] + if !ok { + s.buckets[id] = []*Kevent{e} + } else { + s.buckets[id] = append(q, e) + } +} + +// Pop receives the stack walk event and pops the oldest +// originating event with the same pid,tid tuple formerly +// coined as stack identifier. The originating event is then +// decorated with callstack return addresses. +func (s *StackwalkDecorator) Pop(e *Kevent) *Kevent { + s.mux.Lock() + defer s.mux.Unlock() + + id := e.StackID() + q, ok := s.buckets[id] + if !ok { + return e + } + + var evt *Kevent + if len(q) > 0 { + evt, s.buckets[id] = q[0], q[1:] + } + + if evt == nil { + return e + } + + callstack := e.Kparams.MustGetSlice(kparams.Callstack) + evt.AppendParam(kparams.Callstack, kparams.Slice, callstack) + + return evt +} + +// Stop shutdowns the stack walk decorator flusher. +func (s *StackwalkDecorator) Stop() { + s.quit <- struct{}{} +} + +// RemoveBucket removes the bucket and all enqueued events. +func (s *StackwalkDecorator) RemoveBucket(id uint64) { + s.mux.Lock() + defer s.mux.Unlock() + delete(s.buckets, id) +} + +func (s *StackwalkDecorator) doFlush() { + for { + select { + case <-s.flusher.C: + errs := s.flush() + if len(errs) > 0 { + log.Warnf("callstack: unable to flush queued events: %v", multierror.Wrap(errs...)) + } + case <-s.quit: + return + } + } +} + +// flush pushes events to the event queue if they have +// been living in the deque more than the maximum allowed +// flush period. +func (s *StackwalkDecorator) flush() []error { + s.mux.Lock() + defer s.mux.Unlock() + + if len(s.buckets) == 0 { + return nil + } + + errs := make([]error, 0) + + for id, q := range s.buckets { + for i, evt := range q { + if time.Since(evt.Timestamp) < maxDequeFlushPeriod { + continue + } + callstackFlushes.Add(1) + err := s.q.push(evt) + if err != nil { + errs = append(errs, err) + } + s.buckets[id] = append(q[:i], q[i+1:]...) + } + } + + return errs +} diff --git a/pkg/kevent/callstack_test.go b/pkg/kevent/stackwalk_test.go similarity index 54% rename from pkg/kevent/callstack_test.go rename to pkg/kevent/stackwalk_test.go index 39468b8fb..63d8d1fb8 100644 --- a/pkg/kevent/callstack_test.go +++ b/pkg/kevent/stackwalk_test.go @@ -1,5 +1,5 @@ /* - * Copyright 2021-2022 by Nedim Sabic Sabic + * Copyright 2021-present by Nedim Sabic Sabic * https://www.fibratus.io * All Rights Reserved. * @@ -24,57 +24,13 @@ import ( "github.com/rabbitstack/fibratus/pkg/kevent/ktypes" "github.com/rabbitstack/fibratus/pkg/util/va" "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" "testing" "time" ) -func TestCallstack(t *testing.T) { - e := &Kevent{ - Type: ktypes.CreateProcess, - Tid: 2484, - PID: 859, - CPU: 1, - Seq: 2, - Name: "CreateProcess", - Timestamp: time.Now(), - Category: ktypes.Process, - } - - e.Callstack.Init(9) - assert.Equal(t, 9, cap(e.Callstack)) - - e.Callstack.PushFrame(Frame{Addr: 0x2638e59e0a5, Offset: 0, Symbol: "?", Module: "unbacked"}) - e.Callstack.PushFrame(Frame{Addr: 0x7ffb313853b2, Offset: 0x10a, Symbol: "Java_java_lang_ProcessImpl_create", Module: "C:\\Program Files\\JetBrains\\GoLand 2021.2.3\\jbr\\bin\\java.dll"}) - e.Callstack.PushFrame(Frame{Addr: 0x7ffb3138592e, Offset: 0x3a2, Symbol: "Java_java_lang_ProcessImpl_waitForTimeoutInterruptibly", Module: "C:\\Program Files\\JetBrains\\GoLand 2021.2.3\\jbr\\bin\\java.dll"}) - e.Callstack.PushFrame(Frame{Addr: 0x7ffb5c1d0396, Offset: 0x61, Symbol: "CreateProcessW", Module: "C:\\WINDOWS\\System32\\KERNELBASE.dll"}) - e.Callstack.PushFrame(Frame{Addr: 0x7ffb5d8e61f4, Offset: 0x54, Symbol: "CreateProcessW", Module: "C:\\WINDOWS\\System32\\KERNEL32.DLL"}) - e.Callstack.PushFrame(Frame{Addr: 0x7ffb5c1d0396, Offset: 0x66, Symbol: "CreateProcessW", Module: "C:\\WINDOWS\\System32\\KERNELBASE.dll"}) - e.Callstack.PushFrame(Frame{Addr: 0xfffff8015662a605, Offset: 0x9125, Symbol: "setjmpex", Module: "C:\\WINDOWS\\system32\\ntoskrnl.exe"}) - e.Callstack.PushFrame(Frame{Addr: 0xfffff801568e9c33, Offset: 0x2ef3, Symbol: "LpcRequestPort", Module: "C:\\WINDOWS\\system32\\ntoskrnl.exe"}) - e.Callstack.PushFrame(Frame{Addr: 0xfffff8015690b644, Offset: 0x45b4, Symbol: "ObDeleteCapturedInsertInfo", Module: "C:\\WINDOWS\\system32\\ntoskrnl.exe"}) - - assert.True(t, e.Callstack.ContainsUnbacked()) - assert.Equal(t, 9, e.Callstack.Depth()) - assert.Equal(t, "0xfffff8015690b644 C:\\WINDOWS\\system32\\ntoskrnl.exe!ObDeleteCapturedInsertInfo+0x45b4|0xfffff801568e9c33 C:\\WINDOWS\\system32\\ntoskrnl.exe!LpcRequestPort+0x2ef3|0xfffff8015662a605 C:\\WINDOWS\\system32\\ntoskrnl.exe!setjmpex+0x9125|0x7ffb5c1d0396 C:\\WINDOWS\\System32\\KERNELBASE.dll!CreateProcessW+0x66|0x7ffb5d8e61f4 C:\\WINDOWS\\System32\\KERNEL32.DLL!CreateProcessW+0x54|0x7ffb5c1d0396 C:\\WINDOWS\\System32\\KERNELBASE.dll!CreateProcessW+0x61|0x7ffb3138592e C:\\Program Files\\JetBrains\\GoLand 2021.2.3\\jbr\\bin\\java.dll!Java_java_lang_ProcessImpl_waitForTimeoutInterruptibly+0x3a2|0x7ffb313853b2 C:\\Program Files\\JetBrains\\GoLand 2021.2.3\\jbr\\bin\\java.dll!Java_java_lang_ProcessImpl_create+0x10a|0x2638e59e0a5 unbacked!?", e.Callstack.String()) - assert.Equal(t, "KERNELBASE.dll|KERNEL32.DLL|KERNELBASE.dll|java.dll|unbacked", e.Callstack.Summary()) - - uframe := e.Callstack.FinalUserFrame() - require.NotNil(t, uframe) - assert.Equal(t, "7ffb5c1d0396", uframe.Addr.String()) - assert.Equal(t, "CreateProcessW", uframe.Symbol) - assert.Equal(t, "C:\\WINDOWS\\System32\\KERNELBASE.dll", uframe.Module) - - kframe := e.Callstack.FinalKernelFrame() - require.NotNil(t, kframe) - assert.Equal(t, "fffff8015690b644", kframe.Addr.String()) - assert.Equal(t, "ObDeleteCapturedInsertInfo", kframe.Symbol) - assert.Equal(t, "C:\\WINDOWS\\system32\\ntoskrnl.exe", kframe.Module) -} - -func TestCallstackDecorator(t *testing.T) { +func TestStackwalkDecorator(t *testing.T) { q := NewQueue(50, false, true) - cd := NewCallstackDecorator(q) + cd := NewStackwalkDecorator(q) e := &Kevent{ Type: ktypes.CreateFile, @@ -140,10 +96,10 @@ func init() { flusherInterval = time.Second } -func TestCallstackDecoratorFlush(t *testing.T) { +func TestStackwalkDecoratorFlush(t *testing.T) { q := NewQueue(50, false, true) q.RegisterListener(&DummyListener{}) - cd := NewCallstackDecorator(q) + cd := NewStackwalkDecorator(q) defer cd.Stop() e := &Kevent{ diff --git a/pkg/symbolize/symbolizer.go b/pkg/symbolize/symbolizer.go index b139dc7b5..41587fbc2 100644 --- a/pkg/symbolize/symbolizer.go +++ b/pkg/symbolize/symbolizer.go @@ -21,6 +21,7 @@ package symbolize import ( "expvar" "fmt" + "github.com/rabbitstack/fibratus/pkg/callstack" "github.com/rabbitstack/fibratus/pkg/config" "github.com/rabbitstack/fibratus/pkg/kevent" "github.com/rabbitstack/fibratus/pkg/kevent/kparams" @@ -436,8 +437,8 @@ func (s *Symbolizer) pushFrames(addrs []va.Address, e *kevent.Kevent) { // PE export directory entries. If either the // symbol or module are not resolved, then we // fall back to Debug API. -func (s *Symbolizer) produceFrame(addr va.Address, e *kevent.Kevent) kevent.Frame { - frame := kevent.Frame{PID: e.PID, Addr: addr} +func (s *Symbolizer) produceFrame(addr va.Address, e *kevent.Kevent) callstack.Frame { + frame := callstack.Frame{PID: e.PID, Addr: addr} if addr.InSystemRange() { if s.config.SymbolizeKernelAddresses { frame.Module = s.r.GetModuleName(windows.CurrentProcess(), addr) @@ -547,7 +548,7 @@ func (s *Symbolizer) produceFrame(addr va.Address, e *kevent.Kevent) kevent.Fram return frame } -func (s *Symbolizer) cacheSymbol(pid uint32, addr va.Address, frame *kevent.Frame) { +func (s *Symbolizer) cacheSymbol(pid uint32, addr va.Address, frame *callstack.Frame) { if sym, ok := s.symbols[pid]; ok { if _, ok := sym[addr]; !ok { symCachedSymbols.Add(1)