Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Record the number of bytes read from storage in a metric #1206

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
48 changes: 48 additions & 0 deletions clients/object_store_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,51 @@ func DownloadOSURL(osURL string) (io.ReadCloser, error) {
return fileInfoReader.Body, nil
}

func NewTeeReadCloser(readCloser io.ReadCloser, writer io.Writer) io.ReadCloser {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about adding a unit test for this?

return &TeeReadCloser{
reader: io.TeeReader(readCloser, writer),
closer: readCloser,
}
}

type TeeReadCloser struct {
reader io.Reader
closer io.Closer
}

func (tr *TeeReadCloser) Read(p []byte) (n int, err error) {
return tr.reader.Read(p)
}

func (tr *TeeReadCloser) Close() error {
return tr.closer.Close()
}

func newByteReporter(host, bucket string) *byteReporter {
return &byteReporter{
host: host,
bucket: bucket,
}
}

type byteReporter struct {
count int64
host string
bucket string
}

func (c *byteReporter) Write(p []byte) (int, error) {
n := len(p)
c.count += int64(n)
if c.count%1024*1024 == 0 {
// report count every 1MB, accept that accuracy may be up to 1MB incorrect
metrics.Metrics.ObjectStoreClient.BytesTransferred.WithLabelValues(c.host, "read", c.bucket).Add(float64(c.count))
c.count = 0
}

return n, nil
}

func GetOSURL(osURL, byteRange string) (*drivers.FileInfoReader, error) {
storageDriver, err := drivers.ParseOSURL(osURL, true)
if err != nil {
Expand Down Expand Up @@ -57,6 +102,9 @@ func GetOSURL(osURL, byteRange string) (*drivers.FileInfoReader, error) {

metrics.Metrics.ObjectStoreClient.RequestDuration.WithLabelValues(host, "read", bucket).Observe(duration.Seconds())

// wrap the ReadCloser in a TeeReadCloser so that we can record the egress bytes
fileInfoReader.Body = NewTeeReadCloser(fileInfoReader.Body, newByteReporter(host, bucket))

return fileInfoReader, nil
}

Expand Down
11 changes: 8 additions & 3 deletions metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,10 @@ import (
)

type ClientMetrics struct {
RetryCount *prometheus.GaugeVec
FailureCount *prometheus.CounterVec
RequestDuration *prometheus.HistogramVec
RetryCount *prometheus.GaugeVec
FailureCount *prometheus.CounterVec
RequestDuration *prometheus.HistogramVec
BytesTransferred *prometheus.CounterVec
}

type VODPipelineMetrics struct {
Expand Down Expand Up @@ -176,6 +177,10 @@ func NewMetrics() *CatalystAPIMetrics {
Help: "Time taken to send transcoding status updates",
Buckets: []float64{.005, .01, .025, .05, .1, .25, .5, 1, 2.5, 5, 10},
}, []string{"host", "operation", "bucket"}),
BytesTransferred: promauto.NewCounterVec(prometheus.CounterOpts{
Name: "object_store_bytes_transferred",
Help: "The total number of bytes transferred from storage",
}, []string{"host", "operation", "bucket"}),
},

VODPipelineMetrics: VODPipelineMetrics{
Expand Down