Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
146 changes: 1 addition & 145 deletions pkg/kevent/callstack.go → pkg/callstack/callstack.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,32 +16,18 @@
* limitations under the License.
*/

package kevent
package callstack

import (
"expvar"
"github.com/rabbitstack/fibratus/pkg/kevent/kparams"
"github.com/rabbitstack/fibratus/pkg/util/multierror"
"github.com/rabbitstack/fibratus/pkg/util/va"
log "github.com/sirupsen/logrus"
"golang.org/x/arch/x86/x86asm"
"golang.org/x/sys/windows"
"os"
"path/filepath"
"strconv"
"strings"
"sync"
"time"
)

// maxDequeFlushPeriod specifies the maximum period
// for the events to reside in the deque.
var maxDequeFlushPeriod = time.Second * 30
var flusherInterval = time.Second * 5

// callstackFlushes computes overall callstack dequeue flushes
var callstackFlushes = expvar.NewInt("callstack.flushes")

// unbacked represents the identifier for unbacked regions in stack frames
const unbacked = "unbacked"

Expand Down Expand Up @@ -370,133 +356,3 @@ func (s Callstack) CallsiteInsns(pid uint32, leading bool) []string {
}
return opcodes
}

// CallstackDecorator maintains a FIFO queue where events
// eligible for stack enrichment are queued. Upon arrival
// of the respective stack walk event, the acting event is
// popped from the queue and enriched with return addresses
// which are later subject to symbolization.
type CallstackDecorator struct {
buckets map[uint64][]*Kevent
q *Queue
mux sync.Mutex

flusher *time.Ticker
quit chan struct{}
}

// NewCallstackDecorator creates a new callstack decorator
// which receives the event queue for long-standing event
// flushing.
func NewCallstackDecorator(q *Queue) *CallstackDecorator {
c := &CallstackDecorator{
q: q,
buckets: make(map[uint64][]*Kevent),
flusher: time.NewTicker(flusherInterval),
quit: make(chan struct{}, 1),
}

go c.doFlush()

return c
}

// Push pushes a new event to the queue.
func (cd *CallstackDecorator) Push(e *Kevent) {
cd.mux.Lock()
defer cd.mux.Unlock()

// append the event to the bucket indexed by stack id
id := e.StackID()
q, ok := cd.buckets[id]
if !ok {
cd.buckets[id] = []*Kevent{e}
} else {
cd.buckets[id] = append(q, e)
}
}

// Pop receives the stack walk event and pops the oldest
// originating event with the same pid,tid tuple formerly
// coined as stack identifier. The originating event is then
// decorated with callstack return addresses.
func (cd *CallstackDecorator) Pop(e *Kevent) *Kevent {
cd.mux.Lock()
defer cd.mux.Unlock()

id := e.StackID()
q, ok := cd.buckets[id]
if !ok {
return e
}

var evt *Kevent
if len(q) > 0 {
evt, cd.buckets[id] = q[0], q[1:]
}

if evt == nil {
return e
}

callstack := e.Kparams.MustGetSlice(kparams.Callstack)
evt.AppendParam(kparams.Callstack, kparams.Slice, callstack)

return evt
}

// Stop shutdowns the callstack decorator flusher.
func (cd *CallstackDecorator) Stop() {
cd.quit <- struct{}{}
}

// RemoveBucket removes the bucket and all enqueued events.
func (cd *CallstackDecorator) RemoveBucket(id uint64) {
cd.mux.Lock()
defer cd.mux.Unlock()
delete(cd.buckets, id)
}

func (cd *CallstackDecorator) doFlush() {
for {
select {
case <-cd.flusher.C:
errs := cd.flush()
if len(errs) > 0 {
log.Warnf("callstack: unable to flush queued events: %v", multierror.Wrap(errs...))
}
case <-cd.quit:
return
}
}
}

// flush pushes events to the event queue if they have
// been living in the deque more than the maximum allowed
// flush period.
func (cd *CallstackDecorator) flush() []error {
cd.mux.Lock()
defer cd.mux.Unlock()

if len(cd.buckets) == 0 {
return nil
}

errs := make([]error, 0)

for id, q := range cd.buckets {
for i, evt := range q {
if time.Since(evt.Timestamp) < maxDequeFlushPeriod {
continue
}
callstackFlushes.Add(1)
err := cd.q.push(evt)
if err != nil {
errs = append(errs, err)
}
cd.buckets[id] = append(q[:i], q[i+1:]...)
}
}

return errs
}
59 changes: 59 additions & 0 deletions pkg/callstack/callstack_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/*
* Copyright 2021-present by Nedim Sabic Sabic
* https://www.fibratus.io
* All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package callstack

import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"testing"
)

func TestCallstack(t *testing.T) {
var callstack Callstack
callstack.Init(9)

assert.Equal(t, 9, cap(callstack))

callstack.PushFrame(Frame{Addr: 0x2638e59e0a5, Offset: 0, Symbol: "?", Module: "unbacked"})
callstack.PushFrame(Frame{Addr: 0x7ffb313853b2, Offset: 0x10a, Symbol: "Java_java_lang_ProcessImpl_create", Module: "C:\\Program Files\\JetBrains\\GoLand 2021.2.3\\jbr\\bin\\java.dll"})
callstack.PushFrame(Frame{Addr: 0x7ffb3138592e, Offset: 0x3a2, Symbol: "Java_java_lang_ProcessImpl_waitForTimeoutInterruptibly", Module: "C:\\Program Files\\JetBrains\\GoLand 2021.2.3\\jbr\\bin\\java.dll"})
callstack.PushFrame(Frame{Addr: 0x7ffb5c1d0396, Offset: 0x61, Symbol: "CreateProcessW", Module: "C:\\WINDOWS\\System32\\KERNELBASE.dll"})
callstack.PushFrame(Frame{Addr: 0x7ffb5d8e61f4, Offset: 0x54, Symbol: "CreateProcessW", Module: "C:\\WINDOWS\\System32\\KERNEL32.DLL"})
callstack.PushFrame(Frame{Addr: 0x7ffb5c1d0396, Offset: 0x66, Symbol: "CreateProcessW", Module: "C:\\WINDOWS\\System32\\KERNELBASE.dll"})
callstack.PushFrame(Frame{Addr: 0xfffff8015662a605, Offset: 0x9125, Symbol: "setjmpex", Module: "C:\\WINDOWS\\system32\\ntoskrnl.exe"})
callstack.PushFrame(Frame{Addr: 0xfffff801568e9c33, Offset: 0x2ef3, Symbol: "LpcRequestPort", Module: "C:\\WINDOWS\\system32\\ntoskrnl.exe"})
callstack.PushFrame(Frame{Addr: 0xfffff8015690b644, Offset: 0x45b4, Symbol: "ObDeleteCapturedInsertInfo", Module: "C:\\WINDOWS\\system32\\ntoskrnl.exe"})

assert.True(t, callstack.ContainsUnbacked())
assert.Equal(t, 9, callstack.Depth())
assert.Equal(t, "0xfffff8015690b644 C:\\WINDOWS\\system32\\ntoskrnl.exe!ObDeleteCapturedInsertInfo+0x45b4|0xfffff801568e9c33 C:\\WINDOWS\\system32\\ntoskrnl.exe!LpcRequestPort+0x2ef3|0xfffff8015662a605 C:\\WINDOWS\\system32\\ntoskrnl.exe!setjmpex+0x9125|0x7ffb5c1d0396 C:\\WINDOWS\\System32\\KERNELBASE.dll!CreateProcessW+0x66|0x7ffb5d8e61f4 C:\\WINDOWS\\System32\\KERNEL32.DLL!CreateProcessW+0x54|0x7ffb5c1d0396 C:\\WINDOWS\\System32\\KERNELBASE.dll!CreateProcessW+0x61|0x7ffb3138592e C:\\Program Files\\JetBrains\\GoLand 2021.2.3\\jbr\\bin\\java.dll!Java_java_lang_ProcessImpl_waitForTimeoutInterruptibly+0x3a2|0x7ffb313853b2 C:\\Program Files\\JetBrains\\GoLand 2021.2.3\\jbr\\bin\\java.dll!Java_java_lang_ProcessImpl_create+0x10a|0x2638e59e0a5 unbacked!?", callstack.String())
assert.Equal(t, "KERNELBASE.dll|KERNEL32.DLL|KERNELBASE.dll|java.dll|unbacked", callstack.Summary())

uframe := callstack.FinalUserFrame()
require.NotNil(t, uframe)
assert.Equal(t, "7ffb5c1d0396", uframe.Addr.String())
assert.Equal(t, "CreateProcessW", uframe.Symbol)
assert.Equal(t, "C:\\WINDOWS\\System32\\KERNELBASE.dll", uframe.Module)

kframe := callstack.FinalKernelFrame()
require.NotNil(t, kframe)
assert.Equal(t, "fffff8015690b644", kframe.Addr.String())
assert.Equal(t, "ObDeleteCapturedInsertInfo", kframe.Symbol)
assert.Equal(t, "C:\\WINDOWS\\system32\\ntoskrnl.exe", kframe.Module)
}
9 changes: 5 additions & 4 deletions pkg/filter/accessor_windows_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package filter

import (
"github.com/rabbitstack/fibratus/pkg/callstack"
"github.com/rabbitstack/fibratus/pkg/kevent"
"github.com/rabbitstack/fibratus/pkg/kevent/ktypes"
ptypes "github.com/rabbitstack/fibratus/pkg/ps/types"
Expand Down Expand Up @@ -107,22 +108,22 @@ func TestIsFieldAccessible(t *testing.T) {
},
{
newThreadAccessor(),
&kevent.Kevent{Type: ktypes.CreateProcess, Category: ktypes.Process, Callstack: []kevent.Frame{{Addr: 0x7ffb5c1d0396, Offset: 0x61, Symbol: "CreateProcessW", Module: "C:\\WINDOWS\\System32\\KERNELBASE.dll"}}},
&kevent.Kevent{Type: ktypes.CreateProcess, Category: ktypes.Process, Callstack: []callstack.Frame{{Addr: 0x7ffb5c1d0396, Offset: 0x61, Symbol: "CreateProcessW", Module: "C:\\WINDOWS\\System32\\KERNELBASE.dll"}}},
true,
},
{
newThreadAccessor(),
&kevent.Kevent{Type: ktypes.RegSetValue, Category: ktypes.Registry, Callstack: []kevent.Frame{{Addr: 0x7ffb5c1d0396, Offset: 0x61, Symbol: "CreateProcessW", Module: "C:\\WINDOWS\\System32\\KERNELBASE.dll"}}},
&kevent.Kevent{Type: ktypes.RegSetValue, Category: ktypes.Registry, Callstack: []callstack.Frame{{Addr: 0x7ffb5c1d0396, Offset: 0x61, Symbol: "CreateProcessW", Module: "C:\\WINDOWS\\System32\\KERNELBASE.dll"}}},
true,
},
{
newRegistryAccessor(),
&kevent.Kevent{Type: ktypes.RegSetValue, Category: ktypes.Registry, Callstack: []kevent.Frame{{Addr: 0x7ffb5c1d0396, Offset: 0x61, Symbol: "CreateProcessW", Module: "C:\\WINDOWS\\System32\\KERNELBASE.dll"}}},
&kevent.Kevent{Type: ktypes.RegSetValue, Category: ktypes.Registry, Callstack: []callstack.Frame{{Addr: 0x7ffb5c1d0396, Offset: 0x61, Symbol: "CreateProcessW", Module: "C:\\WINDOWS\\System32\\KERNELBASE.dll"}}},
true,
},
{
newNetworkAccessor(),
&kevent.Kevent{Type: ktypes.RegSetValue, Category: ktypes.Registry, Callstack: []kevent.Frame{{Addr: 0x7ffb5c1d0396, Offset: 0x61, Symbol: "CreateProcessW", Module: "C:\\WINDOWS\\System32\\KERNELBASE.dll"}}},
&kevent.Kevent{Type: ktypes.RegSetValue, Category: ktypes.Registry, Callstack: []callstack.Frame{{Addr: 0x7ffb5c1d0396, Offset: 0x61, Symbol: "CreateProcessW", Module: "C:\\WINDOWS\\System32\\KERNELBASE.dll"}}},
false,
},
{
Expand Down
19 changes: 10 additions & 9 deletions pkg/filter/filter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package filter

import (
"github.com/rabbitstack/fibratus/internal/etw/processors"
"github.com/rabbitstack/fibratus/pkg/callstack"
"github.com/rabbitstack/fibratus/pkg/config"
"github.com/rabbitstack/fibratus/pkg/filter/fields"
"github.com/rabbitstack/fibratus/pkg/fs"
Expand Down Expand Up @@ -377,14 +378,14 @@ func TestThreadFilter(t *testing.T) {
require.NoError(t, windows.WriteProcessMemory(windows.CurrentProcess(), base, &insns[0], uintptr(len(insns)), nil))

kevt.Callstack.Init(8)
kevt.Callstack.PushFrame(kevent.Frame{PID: kevt.PID, Addr: 0x2638e59e0a5, Offset: 0, Symbol: "?", Module: "unbacked"})
kevt.Callstack.PushFrame(kevent.Frame{PID: kevt.PID, Addr: va.Address(base), Offset: 0, Symbol: "?", Module: "unbacked"})
kevt.Callstack.PushFrame(kevent.Frame{PID: kevt.PID, Addr: 0x7ffb313853b2, Offset: 0x10a, Symbol: "Java_java_lang_ProcessImpl_create", Module: "C:\\Program Files\\JetBrains\\GoLand 2021.2.3\\jbr\\bin\\java.dll"})
kevt.Callstack.PushFrame(kevent.Frame{PID: kevt.PID, Addr: 0x7ffb3138592e, Offset: 0x3a2, Symbol: "Java_java_lang_ProcessImpl_waitForTimeoutInterruptibly", Module: "C:\\Program Files\\JetBrains\\GoLand 2021.2.3\\jbr\\bin\\java.dll"})
kevt.Callstack.PushFrame(kevent.Frame{PID: kevt.PID, Addr: 0x7ffb5d8e61f4, Offset: 0x54, Symbol: "CreateProcessW", Module: "C:\\WINDOWS\\System32\\KERNEL32.DLL"})
kevt.Callstack.PushFrame(kevent.Frame{PID: kevt.PID, Addr: 0x7ffb5c1d0396, ModuleAddress: 0x7ffb5c1d0396, Offset: 0x66, Symbol: "CreateProcessW", Module: "C:\\WINDOWS\\System32\\KERNELBASE.dll"})
kevt.Callstack.PushFrame(kevent.Frame{PID: kevt.PID, Addr: 0xfffff8072ebc1f6f, Offset: 0x4ef, Symbol: "FltRequestFileInfoOnCreateCompletion", Module: "C:\\WINDOWS\\System32\\drivers\\FLTMGR.SYS"})
kevt.Callstack.PushFrame(kevent.Frame{PID: kevt.PID, Addr: 0xfffff8072eb8961b, Offset: 0x20cb, Symbol: "FltGetStreamContext", Module: "C:\\WINDOWS\\System32\\drivers\\FLTMGR.SYS"})
kevt.Callstack.PushFrame(callstack.Frame{PID: kevt.PID, Addr: 0x2638e59e0a5, Offset: 0, Symbol: "?", Module: "unbacked"})
kevt.Callstack.PushFrame(callstack.Frame{PID: kevt.PID, Addr: va.Address(base), Offset: 0, Symbol: "?", Module: "unbacked"})
kevt.Callstack.PushFrame(callstack.Frame{PID: kevt.PID, Addr: 0x7ffb313853b2, Offset: 0x10a, Symbol: "Java_java_lang_ProcessImpl_create", Module: "C:\\Program Files\\JetBrains\\GoLand 2021.2.3\\jbr\\bin\\java.dll"})
kevt.Callstack.PushFrame(callstack.Frame{PID: kevt.PID, Addr: 0x7ffb3138592e, Offset: 0x3a2, Symbol: "Java_java_lang_ProcessImpl_waitForTimeoutInterruptibly", Module: "C:\\Program Files\\JetBrains\\GoLand 2021.2.3\\jbr\\bin\\java.dll"})
kevt.Callstack.PushFrame(callstack.Frame{PID: kevt.PID, Addr: 0x7ffb5d8e61f4, Offset: 0x54, Symbol: "CreateProcessW", Module: "C:\\WINDOWS\\System32\\KERNEL32.DLL"})
kevt.Callstack.PushFrame(callstack.Frame{PID: kevt.PID, Addr: 0x7ffb5c1d0396, ModuleAddress: 0x7ffb5c1d0396, Offset: 0x66, Symbol: "CreateProcessW", Module: "C:\\WINDOWS\\System32\\KERNELBASE.dll"})
kevt.Callstack.PushFrame(callstack.Frame{PID: kevt.PID, Addr: 0xfffff8072ebc1f6f, Offset: 0x4ef, Symbol: "FltRequestFileInfoOnCreateCompletion", Module: "C:\\WINDOWS\\System32\\drivers\\FLTMGR.SYS"})
kevt.Callstack.PushFrame(callstack.Frame{PID: kevt.PID, Addr: 0xfffff8072eb8961b, Offset: 0x20cb, Symbol: "FltGetStreamContext", Module: "C:\\WINDOWS\\System32\\drivers\\FLTMGR.SYS"})

var tests = []struct {
filter string
Expand Down Expand Up @@ -504,7 +505,7 @@ func TestThreadFilter(t *testing.T) {
var n uintptr
require.NoError(t, windows.WriteProcessMemory(pi.Process, ntdll, &insns[0], uintptr(len(insns)), &n))

kevt.Callstack[0] = kevent.Frame{PID: kevt.PID, Addr: va.Address(ntdll), Offset: 0, Symbol: "?", Module: "C:\\Windows\\System32\\ntdll.dll"}
kevt.Callstack[0] = callstack.Frame{PID: kevt.PID, Addr: va.Address(ntdll), Offset: 0, Symbol: "?", Module: "C:\\Windows\\System32\\ntdll.dll"}

var tests1 = []struct {
filter string
Expand Down
6 changes: 3 additions & 3 deletions pkg/filter/ql/function.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ package ql

import (
"fmt"
"github.com/rabbitstack/fibratus/pkg/callstack"
"github.com/rabbitstack/fibratus/pkg/filter/fields"
"github.com/rabbitstack/fibratus/pkg/kevent"
"github.com/rabbitstack/fibratus/pkg/pe"
pstypes "github.com/rabbitstack/fibratus/pkg/ps/types"
"github.com/rabbitstack/fibratus/pkg/util/signature"
Expand Down Expand Up @@ -243,7 +243,7 @@ func (f *Foreach) Call(args []interface{}) (interface{}, bool) {
return true, true
}
}
case kevent.Callstack:
case callstack.Callstack:
var pid uint32
var proc windows.Handle
var err error
Expand Down Expand Up @@ -536,7 +536,7 @@ func (f *Foreach) mmapMapValuer(segments []*BoundSegmentLiteral, mmap pstypes.Mm
}

// callstackMapValuer returns map valuer with thread stack frame data.
func (f *Foreach) callstackMapValuer(segments []*BoundSegmentLiteral, frame kevent.Frame, proc windows.Handle) MapValuer {
func (f *Foreach) callstackMapValuer(segments []*BoundSegmentLiteral, frame callstack.Frame, proc windows.Handle) MapValuer {
var valuer = MapValuer{}
for _, seg := range segments {
key := seg.Value
Expand Down
4 changes: 2 additions & 2 deletions pkg/kevent/formatter.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ const (
host = ".Host"
pe = ".PE"
kparsAccessor = ".Kparams."
callstack = ".Callstack"
cstack = ".Callstack"
)

var (
Expand Down Expand Up @@ -90,7 +90,7 @@ var kfields = map[string]bool{
meta: true,
host: true,
pe: true,
callstack: true,
cstack: true,
}

func hintFields() string {
Expand Down
2 changes: 1 addition & 1 deletion pkg/kevent/formatter_windows.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ func (f *Formatter) Format(kevt *Kevent) []byte {
}
// add callstack summary
if !kevt.Callstack.IsEmpty() {
values[callstack] = kevt.Callstack.String()
values[cstack] = kevt.Callstack.String()
}

if f.expandKparamsDot {
Expand Down
3 changes: 2 additions & 1 deletion pkg/kevent/kevent.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package kevent

import (
"fmt"
"github.com/rabbitstack/fibratus/pkg/callstack"
kcapver "github.com/rabbitstack/fibratus/pkg/kcap/version"
"github.com/rabbitstack/fibratus/pkg/kevent/kparams"
"github.com/rabbitstack/fibratus/pkg/kevent/ktypes"
Expand Down Expand Up @@ -98,7 +99,7 @@ type Kevent struct {
// PS represents process' metadata and its allocated resources such as handles, DLLs, etc.
PS *pstypes.PS `json:"ps,omitempty"`
// Callstack represents the call stack for the thread that generated the event.
Callstack Callstack `json:"callstack"`
Callstack callstack.Callstack `json:"callstack"`
// WaitEnqueue indicates if this event should temporarily defer pushing to
// the consumer output queue. This is usually required in event processors
// to propagate certain events stored in processor's state when the related
Expand Down
Loading