Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Non-blocking S3AsyncClient usage #3467

Open
pomadchin opened this issue Jun 27, 2022 · 0 comments
Open

Non-blocking S3AsyncClient usage #3467

pomadchin opened this issue Jun 27, 2022 · 0 comments

Comments

@pomadchin
Copy link
Member

pomadchin commented Jun 27, 2022

This task is to replace the blocking S3Client we use in all cases it makes sense to replace it with its non blocking version, i.e. in {RDD | Collection}{Readers | Writers}.

To make migration soft we may want to keep both clients (Sync and Async) available. i.e. it makes more sense for AttributeStore to use the blocking S3Client for now.

Leaving below the S3ClientProducer to support the idea.

object S3ClientProducer {
  @transient private lazy val overrideConfig: ClientOverrideConfiguration = {
    val retryCondition =
      OrRetryCondition.create(
        RetryCondition.defaultRetryCondition(),
        RetryOnErrorCodeCondition.create("RequestTimeout")
      )
    val backoffStrategy =
      FullJitterBackoffStrategy.builder()
        .baseDelay(Duration.ofMillis(50))
        .maxBackoffTime(Duration.ofMillis(15))
        .build()
    val retryPolicy =
      RetryPolicy.defaultRetryPolicy()
        .toBuilder
        .retryCondition(retryCondition)
        .backoffStrategy(backoffStrategy)
        .build()
    ClientOverrideConfiguration.builder()
      .retryPolicy(retryPolicy)
      .build()
  }

  @transient private lazy val client: S3Client =
    S3Client.builder()
      .overrideConfiguration(overrideConfig)
      .build()

  @transient private lazy val asyncClient: S3AsyncClient =
    S3AsyncClient.builder()
      .overrideConfiguration(overrideConfig)
      .build()

  private var summonClient: () => S3Client = () => client

  private var summonAsyncClient: () => S3AsyncClient = () => asyncClient

  /** Set an alternative default function for summoning S3Clients */
  def set(getClient: () => S3Client): Unit = summonClient = getClient

  def setAsync(getClient: () => S3AsyncClient): Unit = summonAsyncClient = getClient

  /** Get the current function registered as default for summong S3Clients */
  def get: () => S3Client = summonClient

  def getAsync: () => S3AsyncClient = summonAsyncClient
}

Connects #2923, #2306

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

1 participant