-
Notifications
You must be signed in to change notification settings - Fork 28k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-48298][Core] Add TCP mode to StatsD sink #46604
base: master
Are you sure you want to change the base?
Conversation
Hi @cloud-fan @dongjoon-hyun @yaooqinn Could you take a look when you get a chance or perhaps pull other people in for review? Thank you🙇 |
private object DataSender { | ||
def get(host: String, port: Int, protocol: DataSenderType, connTimeoutMs: Int): DataSender = { | ||
val ds = protocol match { | ||
case TCP => new TCPDataSender(host, port, connTimeoutMs) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not using TCP long connections here. The consideration is:
- For metrics we do not heavily use the connection, it's usually a single-shot of a few metrics in a period of a few seconds, 10s by default. It's very costly even overwhelming for the Statsd server side to keep a large number of long-lived connections, given that in the production environments we usually have a ton of Spark applications/drivers/executors.
- The drawback to consider is for a short TCP connection, the port cannot be reused for a short while due to TCP_TIMEWAIT, which is a tradeoff and it somewhat depends on what the
interval
is. - Also there is a cost of initializing and destroying a TCP connection.
- The drawback to consider is for a short TCP connection, the port cannot be reused for a short while due to TCP_TIMEWAIT, which is a tradeoff and it somewhat depends on what the
- Simplicity. To maintain a long connection, we may have to handle a lot more failure scenarios, e.g., detect & reconnect due to a server-side glitch or transient network issues, and possibly a retry mechanism, and perhaps a retry with backoff and jitter, etc. Not sure if this complexity is expected though given that the current UDP mode is just fire-and-forget.
- Other considerations:
- Short-lived TCP connection is better for load-balancing & automatic failover, etc when, let's say, the Statsd service is hosted by multiple hosts behind a load balancer (e.g., a DNS, pods behind a Service in Kubernetes). This can be another long story itself to talk about.
So I don't see an ideal solution for all scenarios (this is probably why we are offering both UDP and TCP modes), we might provide both options (short-lived and long-lived) for TCP mode in case it's worth it.
Please feel free to discuss if you have different opinions.
What changes were proposed in this pull request?
A new trait
DataSender
is added in this PR to provide a unified data sender interface for the StatsdReporter. Under this trait we have implementations for UDP mode (UDPDataSender
, which keeps the existing UDP-based implementation) and TCP mode (TCPDataSender
). It is configurable which mode/sender to use and the default mode is still UDP.The unit tests are refactored to add TCP mode testing (context provided by
withSocketAndSinkTCP
) in addition to the existing tests for the UDP mode (renamed fromwithSocketAndSink
towithSocketAndSinkUDP
)Why are the changes needed?
As mentioned in the Jira ticket: https://issues.apache.org/jira/browse/SPARK-48298
Currently, the StatsdSink in Spark supports UDP mode only, which is the default mode of StatsD. However, in real production environments, we often find that a more reliable transmission of metrics is needed to avoid metrics loss in high-traffic systems, and also provide more flexibility in network configurations.
TCP mode is already supported by Statsd: https://github.com/statsd/statsd/blob/master/docs/server.md
Prometheus' statsd_exporter: https://github.com/prometheus/statsd_exporter
and also many other Statsd-based metrics proxies/receivers.
Does this PR introduce any user-facing change?
Yes.
The following new config options are added to
conf/metrics.properties.template
:*.sink.statsd.protocol
*.sink.statsd.connTimeoutMs
A new error condition is defined in error-conditions.json for protocol configuration error.
How was this patch tested?
Added/Refactored unit tests. Also manually tested with metric configurations sending metrics to a Netcat server in TCP and UDP modes.
Was this patch authored or co-authored using generative AI tooling?
No.