Skip to content

Commit a872c4d

Browse files
authored
chore: add ability to limit the number of events that are output (#15)
1 parent 2fad0cb commit a872c4d

File tree

4 files changed

+67
-27
lines changed

4 files changed

+67
-27
lines changed

dns.go

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ import (
1616
"github.com/google/gopacket/layers"
1717
)
1818

19-
func processUDPPackets(ctx context.Context, reader *ringbuf.Reader) {
19+
func processUDPPackets(ctx context.Context, reader *ringbuf.Reader, outputCh chan<- []statEntry) {
2020
seenDNSPackets := map[statEntry]struct{}{}
2121
resetSeenPacketsTick := time.NewTicker(time.Minute)
2222
defer resetSeenPacketsTick.Stop()
@@ -32,11 +32,9 @@ func processUDPPackets(ctx context.Context, reader *ringbuf.Reader) {
3232
seenDNSPackets = map[statEntry]struct{}{}
3333
default:
3434
record, err := reader.Read()
35-
if err != nil {
36-
if errors.Is(err, ringbuf.ErrClosed) {
37-
return
38-
}
39-
35+
if errors.Is(err, ringbuf.ErrClosed) {
36+
return
37+
} else if err != nil {
4038
log.Printf("Error reading UDP Packet: %v", err)
4139
continue
4240
}
@@ -97,7 +95,7 @@ func processUDPPackets(ctx context.Context, reader *ringbuf.Reader) {
9795
}
9896

9997
entry.Timestamp = time.Now().UTC()
100-
fmt.Print(outputJSON([]statEntry{entry}))
98+
outputCh <- []statEntry{entry}
10199
}
102100
}
103101
}

flags.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ import (
3232
var (
3333
uniqueOutput, version, help, externalOnly *bool
3434
kubeconfig *string
35+
maxEvents int
3536
)
3637

3738
func parseFlags() {
@@ -41,6 +42,7 @@ func parseFlags() {
4142
uniqueOutput = fs.Bool('u', "unique", "if true, only show the first instance of each connection")
4243
kubeconfig = fs.StringLong("kubeconfig", "", "path to kubeconfig file (if not set, dynamic kubeconfig discovery is used)")
4344
externalOnly = fs.BoolDefault('e', "external", true, "if true, only show traffic to external destinations")
45+
maxEventsPtr := fs.Int('m', "max-events", 10000, "max number of events to output, 0 for unlimited")
4446
version = fs.BoolLong("version", "display program version")
4547

4648
var err error
@@ -63,4 +65,6 @@ func parseFlags() {
6365

6466
os.Exit(0)
6567
}
68+
69+
maxEvents = *maxEventsPtr
6670
}

go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ module github.com/dkorunic/pktstat-bpf
22

33
go 1.24.0
44

5-
toolchain go1.24.1
5+
toolchain go1.25.4
66

77
require (
88
github.com/cilium/ebpf v0.18.0

main.go

Lines changed: 57 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -142,31 +142,51 @@ func main() {
142142
signalCh := make(chan os.Signal, 1)
143143
signal.Notify(signalCh, os.Interrupt, syscall.SIGTERM, syscall.SIGINT)
144144

145-
// Create a ticker to process the map every second
146-
ticker := time.NewTicker(1 * time.Second)
147-
defer ticker.Stop()
148-
149145
// Set up signal handler
150146
go func() {
151147
s := <-signalCh
152148
_, _ = fmt.Fprintf(os.Stderr, "Received %v signal, exiting...\n", s)
153149
cancel()
154150
}()
155151

156-
// Create DNS lookup map and mutex for sharing between goroutines
157-
dnsLookupMap := make(map[uint32]string)
158-
dnsLookupMapMutex := &sync.RWMutex{}
152+
wg := &sync.WaitGroup{}
153+
outputCh := make(chan []statEntry)
154+
go outputEvents(outputCh)
155+
defer close(outputCh)
159156

160157
udpPktReader, err := ringbuf.NewReader(objs.UdpPkts)
161158
if err != nil {
162159
log.Printf("Failed to create ringbuf reader for UDP packets: %v", err)
163160
} else {
164161
log.Printf("Created UDP packet ringbuf reader successfully")
165-
go processUDPPackets(ctx, udpPktReader)
166-
defer udpPktReader.Close()
162+
wg.Go(func() {
163+
go func() {
164+
<-ctx.Done()
165+
udpPktReader.Close()
166+
}()
167+
168+
processUDPPackets(ctx, udpPktReader, outputCh)
169+
})
167170
}
168171

169172
// Run the main loop
173+
wg.Go(func() {
174+
processPktCountMap(ctx, &objs, outputCh)
175+
})
176+
177+
wg.Wait()
178+
}
179+
180+
// processPktCountMap starts processing the PktCount map for connection events
181+
func processPktCountMap(ctx context.Context, objs *counterObjects, outputCh chan<- []statEntry) {
182+
// Create a ticker to process the map every second
183+
ticker := time.NewTicker(1 * time.Second)
184+
defer ticker.Stop()
185+
186+
// Create DNS lookup map and mutex for sharing between goroutines
187+
dnsLookupMap := make(map[uint32]string)
188+
dnsLookupMapMutex := &sync.RWMutex{}
189+
170190
seenEntries := make(map[string]bool)
171191
for {
172192
select {
@@ -226,23 +246,41 @@ func main() {
226246
continue
227247
}
228248

229-
// Format output as JSON Lines
230-
output := outputJSON(newEntries)
231-
232-
// Add newline if needed
233-
if output != "" && !strings.HasSuffix(output, "\n") {
234-
output += "\n"
235-
}
236-
237-
// Write output to stdout
238-
fmt.Print(output)
249+
outputCh <- newEntries
239250

240251
case <-ctx.Done():
241252
return
242253
}
243254
}
244255
}
245256

257+
func outputEvents(ch <-chan []statEntry) {
258+
numEvents := 0
259+
260+
for entries := range ch {
261+
if maxEvents > 0 && numEvents >= maxEvents {
262+
continue
263+
}
264+
// Format output as JSON Lines
265+
output := outputJSON(entries)
266+
267+
// Add newline if needed
268+
if output != "" && !strings.HasSuffix(output, "\n") {
269+
output += "\n"
270+
}
271+
272+
// Write output to stdout
273+
fmt.Print(output)
274+
275+
if maxEvents > 0 {
276+
numEvents += len(entries)
277+
if numEvents >= maxEvents {
278+
log.Printf("Max number of events has been met, no longer outputting any events")
279+
}
280+
}
281+
}
282+
}
283+
246284
// startKProbes attaches a series of eBPF programs to kernel functions using KProbes.
247285
//
248286
// This function iterates over a slice of kprobeHook structs, each containing a kernel function

0 commit comments

Comments
 (0)