-
Notifications
You must be signed in to change notification settings - Fork 519
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Refactor allocation calculation #2554
base: develop
Are you sure you want to change the base?
Conversation
Signed-off-by: r2k1 <yokree@gmail.com>
Signed-off-by: r2k1 <yokree@gmail.com>
Signed-off-by: r2k1 <yokree@gmail.com>
Signed-off-by: r2k1 <yokree@gmail.com>
Signed-off-by: r2k1 <yokree@gmail.com>
Signed-off-by: r2k1 <yokree@gmail.com>
Signed-off-by: r2k1 <yokree@gmail.com>
The latest updates on your projects. Learn more about Vercel for Git ↗︎
|
Nice! Let's get some of the other @opencost/opencost-maintainers to weigh in |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Need to do a bit more review, and definitely welcome insight from other contributors here. I do generally like the idea of separating the query and processing steps, so the idea is sound. We just want to ensure that we stay as semantically equivalent as possible.
pkg/costmodel/allocation.go
Outdated
func (cm *CostModel) execAllPromQueries(queries []promQuery, time time.Time) error { | ||
ctx := prom.NewNamedContext(cm.PrometheusClient, prom.AllocationContextName) | ||
// Run all queries concurrently | ||
errCh := make(chan error, len(queries)) | ||
for _, q := range queries { | ||
go func(q string, out *[]*prom.QueryResult) { | ||
var err error | ||
*out, _, err = ctx.QuerySync(q, time) | ||
errCh <- err | ||
}(q.query, q.out) | ||
} | ||
|
||
// TODO:CLEANUP remove "max batch" idea and clusterStart/End | ||
err := cm.buildPodMap(window, resolution, env.GetETLMaxPrometheusQueryDuration(), podMap, clusterStart, clusterEnd, ingestPodUID, podUIDKeyMap) | ||
if err != nil { | ||
log.Errorf("CostModel.ComputeAllocation: failed to build pod map: %s", err.Error()) | ||
// Collect errors from the queries | ||
errs := make([]error, 0, len(queries)) | ||
for range queries { | ||
if err := <-errCh; err != nil { | ||
log.Errorf("CostModel.ComputeAllocation: query context error %s", err) | ||
errs = append(errs, err) | ||
} | ||
} | ||
// (2) Run and apply remaining queries | ||
return errors.Join(errs...) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Concerning this segment of code:
- The code this replaced was already executing the queries concurrently, and correctly awaiting and collecting results/errors. There is an internal error collection mechanism on the prometheus context that handles both HTTP request errors as well as errors and warnings coming from prometheus:
Lines 177 to 178 in 3c75f52
// report all warnings, request, and parse errors (nils will be ignored) ctx.errorCollector.Report(query, warnings, requestError, results.Error) - This change appears to be mimicking what was already occurring, except the key difference is that there is a Sync request being made inside of a go routine instead of just calling
QueryAtTime
which is already an async request. If you would like to remove the unrolled code in favor of a loop, please follow the exact semantics that existed before. This is off the cuff, but something similar to the following:
// execute the queries concurrently
for _, q := range queries {
q.result = ctx.QuerySync(q, time)
}
// await results for each query
for _, q := range queries {
*q.out, _ = q.result.Await()
}
// use the ctx to retrieve the error collection if it exists
if ctx.HasErrors() {
for _, err := range ctx.Errors() {
log.Errorf("CostModel.ComputeAllocation: query context error %s", err)
}
return ctx.ErrorCollection()
}
return nil
- The ErrorCollection error type is important for upstream projects, just want to make sure it's not removed:
opencost/pkg/costmodel/allocation.go
Lines 594 to 600 in 3c75f52
if ctx.HasErrors() { for _, err := range ctx.Errors() { log.Errorf("CostModel.ComputeAllocation: query context error %s", err) } return allocSet, nil, ctx.ErrorCollection() } - A quick note about prometheus query concurrency: opencost uses a max concurrency throttled prometheus client which limits the total concurrent outbound queries. This is to avoid prometheus memory bloating that can occur when making many concurrent query requests. See the env option:
opencost/pkg/env/costmodelenv.go
Lines 492 to 494 in 3c75f52
func GetMaxQueryConcurrency() int { return env.GetInt(MaxQueryConcurrencyEnvVar, 5) }
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reverted the original approach of async execution. I've been struggling to figure out how to apply when I began.
But after a few iterations and your suggestion it's clear now.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@mbolt35 While we are here, do you know why there a custom implementation of prometheus client that performs http response parsing? Why original prometheus client isn't used?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I should've been more specific. We are wrapping the prometheus.Client
with concurrent query control logic as well as response handling for rate-limitting errors that come from managed prometheus. There is also a query file logger hook and metric tracking for concurrent query state.
Lines 113 to 129 in 3c75f52
//-------------------------------------------------------------------------- | |
// RateLimitedPrometheusClient | |
//-------------------------------------------------------------------------- | |
// RateLimitedPrometheusClient is a prometheus client which limits the total number of | |
// concurrent outbound requests allowed at a given moment. | |
type RateLimitedPrometheusClient struct { | |
id string | |
client prometheus.Client | |
auth *ClientAuth | |
queue collections.BlockingQueue[*workRequest] | |
decorator QueryParamsDecorator | |
rateLimitRetry *RateLimitRetryOpts | |
outbound atomic.Int32 | |
fileLogger *golog.Logger | |
headerXScopeOrgId string | |
} |
type promQuery struct { | ||
out *[]*prom.QueryResult | ||
query string | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To accomplish the looped queries, you'll likely want to add a:
result prom.QueryResultsChan
field
36b6343
to
9d9e7e2
Compare
Signed-off-by: r2k1 <yokree@gmail.com>
9d9e7e2
to
f13e53b
Compare
In general, I agree that this code, like many places in the code, could benefit from a careful clean up. The tough thing about that, and the reason we haven't done it, is that we don't have good tests. I would love to see more robust testing here, e.g., before/after compared over a period of time on a dynamic cluster, or significantly more complex unit test cases. Open to ideas. In lieu of having good tests, we have gone with the conservative approach of "don't change it if it isn't broken." I'm still supportive of that approach, but understand that others may want to make changes here. As far as a review goes, this one would require time that I'm not going to be able to offer until mid April. If this is still open then, I'll check back in and see how I can help. |
+1 I support this change. Test cases look good, maybe a bit more testing? I will take a closer look |
Quality Gate passedIssues Measures |
This pull request has been marked as stale because it has been open for 90 days with no activity. Please remove the stale label or comment or this pull request will be closed in 5 days. |
@r2k1 I think you still want to merge this right? |
What does this PR change?
Does this PR relate to any other PRs?
No
How will this PR impact users?
How was this PR tested?
Added unit tests. Run few endpoints locally without noticable issues.
Does this PR require changes to documentation?
No
Have you labeled this PR and its corresponding Issue as "next release" if it should be part of the next OpenCost release? If not, why not?
I don't understand the meaning of the label. Is there are PRs that aren't "next release"?