package perf import ( "encoding/binary" "errors" "fmt" "io" "os" "runtime" "sync" "time" "github.com/cilium/ebpf" "github.com/cilium/ebpf/internal" "github.com/cilium/ebpf/internal/epoll" "github.com/cilium/ebpf/internal/unix" ) var ( ErrClosed = os.ErrClosed errEOR = errors.New("end of ring") ) var perfEventHeaderSize = binary.Size(perfEventHeader{}) // perfEventHeader must match 'struct perf_event_header` in . type perfEventHeader struct { Type uint32 Misc uint16 Size uint16 } func cpuForEvent(event *unix.EpollEvent) int { return int(event.Pad) } // Record contains either a sample or a counter of the // number of lost samples. type Record struct { // The CPU this record was generated on. CPU int // The data submitted via bpf_perf_event_output. // Due to a kernel bug, this can contain between 0 and 7 bytes of trailing // garbage from the ring depending on the input sample's length. RawSample []byte // The number of samples which could not be output, since // the ring buffer was full. LostSamples uint64 } // Read a record from a reader and tag it as being from the given CPU. // // buf must be at least perfEventHeaderSize bytes long. func readRecord(rd io.Reader, rec *Record, buf []byte, overwritable bool) error { // Assert that the buffer is large enough. buf = buf[:perfEventHeaderSize] _, err := io.ReadFull(rd, buf) if errors.Is(err, io.EOF) { return errEOR } else if err != nil { return fmt.Errorf("read perf event header: %v", err) } header := perfEventHeader{ internal.NativeEndian.Uint32(buf[0:4]), internal.NativeEndian.Uint16(buf[4:6]), internal.NativeEndian.Uint16(buf[6:8]), } switch header.Type { case unix.PERF_RECORD_LOST: rec.RawSample = rec.RawSample[:0] rec.LostSamples, err = readLostRecords(rd) return err case unix.PERF_RECORD_SAMPLE: rec.LostSamples = 0 // We can reuse buf here because perfEventHeaderSize > perfEventSampleSize. rec.RawSample, err = readRawSample(rd, buf, rec.RawSample) return err default: return &unknownEventError{header.Type} } } func readLostRecords(rd io.Reader) (uint64, error) { // lostHeader must match 'struct perf_event_lost in kernel sources. var lostHeader struct { ID uint64 Lost uint64 } err := binary.Read(rd, internal.NativeEndian, &lostHeader) if err != nil { return 0, fmt.Errorf("can't read lost records header: %v", err) } return lostHeader.Lost, nil } var perfEventSampleSize = binary.Size(uint32(0)) // This must match 'struct perf_event_sample in kernel sources. type perfEventSample struct { Size uint32 } func readRawSample(rd io.Reader, buf, sampleBuf []byte) ([]byte, error) { buf = buf[:perfEventSampleSize] if _, err := io.ReadFull(rd, buf); err != nil { return nil, fmt.Errorf("read sample size: %w", err) } sample := perfEventSample{ internal.NativeEndian.Uint32(buf), } var data []byte if size := int(sample.Size); cap(sampleBuf) < size { data = make([]byte, size) } else { data = sampleBuf[:size] } if _, err := io.ReadFull(rd, data); err != nil { return nil, fmt.Errorf("read sample: %w", err) } return data, nil } // Reader allows reading bpf_perf_event_output // from user space. type Reader struct { poller *epoll.Poller deadline time.Time // mu protects read/write access to the Reader structure with the // exception of 'pauseFds', which is protected by 'pauseMu'. // If locking both 'mu' and 'pauseMu', 'mu' must be locked first. mu sync.Mutex // Closing a PERF_EVENT_ARRAY removes all event fds // stored in it, so we keep a reference alive. array *ebpf.Map rings []*perfEventRing epollEvents []unix.EpollEvent epollRings []*perfEventRing eventHeader []byte // pauseFds are a copy of the fds in 'rings', protected by 'pauseMu'. // These allow Pause/Resume to be executed independently of any ongoing // Read calls, which would otherwise need to be interrupted. pauseMu sync.Mutex pauseFds []int paused bool overwritable bool } // ReaderOptions control the behaviour of the user // space reader. type ReaderOptions struct { // The number of written bytes required in any per CPU buffer before // Read will process data. Must be smaller than PerCPUBuffer. // The default is to start processing as soon as data is available. Watermark int // This perf ring buffer is overwritable, once full the oldest event will be // overwritten by newest. Overwritable bool } // NewReader creates a new reader with default options. // // array must be a PerfEventArray. perCPUBuffer gives the size of the // per CPU buffer in bytes. It is rounded up to the nearest multiple // of the current page size. func NewReader(array *ebpf.Map, perCPUBuffer int) (*Reader, error) { return NewReaderWithOptions(array, perCPUBuffer, ReaderOptions{}) } // NewReaderWithOptions creates a new reader with the given options. func NewReaderWithOptions(array *ebpf.Map, perCPUBuffer int, opts ReaderOptions) (pr *Reader, err error) { if perCPUBuffer < 1 { return nil, errors.New("perCPUBuffer must be larger than 0") } var ( fds []int nCPU = int(array.MaxEntries()) rings = make([]*perfEventRing, 0, nCPU) pauseFds = make([]int, 0, nCPU) ) poller, err := epoll.New() if err != nil { return nil, err } defer func() { if err != nil { poller.Close() for _, fd := range fds { unix.Close(fd) } for _, ring := range rings { if ring != nil { ring.Close() } } } }() // bpf_perf_event_output checks which CPU an event is enabled on, // but doesn't allow using a wildcard like -1 to specify "all CPUs". // Hence we have to create a ring for each CPU. for i := 0; i < nCPU; i++ { ring, err := newPerfEventRing(i, perCPUBuffer, opts.Watermark, opts.Overwritable) if errors.Is(err, unix.ENODEV) { // The requested CPU is currently offline, skip it. rings = append(rings, nil) pauseFds = append(pauseFds, -1) continue } if err != nil { return nil, fmt.Errorf("failed to create perf ring for CPU %d: %v", i, err) } rings = append(rings, ring) pauseFds = append(pauseFds, ring.fd) if err := poller.Add(ring.fd, i); err != nil { return nil, err } } array, err = array.Clone() if err != nil { return nil, err } pr = &Reader{ array: array, rings: rings, poller: poller, deadline: time.Time{}, epollEvents: make([]unix.EpollEvent, len(rings)), epollRings: make([]*perfEventRing, 0, len(rings)), eventHeader: make([]byte, perfEventHeaderSize), pauseFds: pauseFds, overwritable: opts.Overwritable, } if err = pr.Resume(); err != nil { return nil, err } runtime.SetFinalizer(pr, (*Reader).Close) return pr, nil } // Close frees resources used by the reader. // // It interrupts calls to Read. // // Calls to perf_event_output from eBPF programs will return // ENOENT after calling this method. func (pr *Reader) Close() error { if err := pr.poller.Close(); err != nil { if errors.Is(err, os.ErrClosed) { return nil } return fmt.Errorf("close poller: %w", err) } // Trying to poll will now fail, so Read() can't block anymore. Acquire the // lock so that we can clean up. pr.mu.Lock() defer pr.mu.Unlock() for _, ring := range pr.rings { if ring != nil { ring.Close() } } pr.rings = nil pr.pauseFds = nil pr.array.Close() return nil } // SetDeadline controls how long Read and ReadInto will block waiting for samples. // // Passing a zero time.Time will remove the deadline. Passing a deadline in the // past will prevent the reader from blocking if there are no records to be read. func (pr *Reader) SetDeadline(t time.Time) { pr.mu.Lock() defer pr.mu.Unlock() pr.deadline = t } // Read the next record from the perf ring buffer. // // The function blocks until there are at least Watermark bytes in one // of the per CPU buffers. Records from buffers below the Watermark // are not returned. // // Records can contain between 0 and 7 bytes of trailing garbage from the ring // depending on the input sample's length. // // Calling Close interrupts the function. // // Returns os.ErrDeadlineExceeded if a deadline was set. func (pr *Reader) Read() (Record, error) { var r Record return r, pr.ReadInto(&r) } var errMustBePaused = fmt.Errorf("perf ringbuffer: must have been paused before reading overwritable buffer") // ReadInto is like Read except that it allows reusing Record and associated buffers. func (pr *Reader) ReadInto(rec *Record) error { pr.mu.Lock() defer pr.mu.Unlock() pr.pauseMu.Lock() defer pr.pauseMu.Unlock() if pr.overwritable && !pr.paused { return errMustBePaused } if pr.rings == nil { return fmt.Errorf("perf ringbuffer: %w", ErrClosed) } for { if len(pr.epollRings) == 0 { // NB: The deferred pauseMu.Unlock will panic if Wait panics, which // might obscure the original panic. pr.pauseMu.Unlock() nEvents, err := pr.poller.Wait(pr.epollEvents, pr.deadline) pr.pauseMu.Lock() if err != nil { return err } // Re-validate pr.paused since we dropped pauseMu. if pr.overwritable && !pr.paused { return errMustBePaused } for _, event := range pr.epollEvents[:nEvents] { ring := pr.rings[cpuForEvent(&event)] pr.epollRings = append(pr.epollRings, ring) // Read the current head pointer now, not every time // we read a record. This prevents a single fast producer // from keeping the reader busy. ring.loadHead() } } // Start at the last available event. The order in which we // process them doesn't matter, and starting at the back allows // resizing epollRings to keep track of processed rings. err := pr.readRecordFromRing(rec, pr.epollRings[len(pr.epollRings)-1]) if err == errEOR { // We've emptied the current ring buffer, process // the next one. pr.epollRings = pr.epollRings[:len(pr.epollRings)-1] continue } return err } } // Pause stops all notifications from this Reader. // // While the Reader is paused, any attempts to write to the event buffer from // BPF programs will return -ENOENT. // // Subsequent calls to Read will block until a call to Resume. func (pr *Reader) Pause() error { pr.pauseMu.Lock() defer pr.pauseMu.Unlock() if pr.pauseFds == nil { return fmt.Errorf("%w", ErrClosed) } for i := range pr.pauseFds { if err := pr.array.Delete(uint32(i)); err != nil && !errors.Is(err, ebpf.ErrKeyNotExist) { return fmt.Errorf("could't delete event fd for CPU %d: %w", i, err) } } pr.paused = true return nil } // Resume allows this perf reader to emit notifications. // // Subsequent calls to Read will block until the next event notification. func (pr *Reader) Resume() error { pr.pauseMu.Lock() defer pr.pauseMu.Unlock() if pr.pauseFds == nil { return fmt.Errorf("%w", ErrClosed) } for i, fd := range pr.pauseFds { if fd == -1 { continue } if err := pr.array.Put(uint32(i), uint32(fd)); err != nil { return fmt.Errorf("couldn't put event fd %d for CPU %d: %w", fd, i, err) } } pr.paused = false return nil } // NB: Has to be preceded by a call to ring.loadHead. func (pr *Reader) readRecordFromRing(rec *Record, ring *perfEventRing) error { defer ring.writeTail() rec.CPU = ring.cpu err := readRecord(ring, rec, pr.eventHeader, pr.overwritable) if pr.overwritable && (errors.Is(err, io.EOF) || errors.Is(err, io.ErrUnexpectedEOF)) { return errEOR } return err } type unknownEventError struct { eventType uint32 } func (uev *unknownEventError) Error() string { return fmt.Sprintf("unknown event type: %d", uev.eventType) } // IsUnknownEvent returns true if the error occurred // because an unknown event was submitted to the perf event ring. func IsUnknownEvent(err error) bool { var uee *unknownEventError return errors.As(err, &uee) }