Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

URGENT: Multiple Requests Handle Issue #66

Open
datalatics-official opened this issue Jan 19, 2022 · 4 comments
Open

URGENT: Multiple Requests Handle Issue #66

datalatics-official opened this issue Jan 19, 2022 · 4 comments

Comments

@datalatics-official
Copy link

Hello @mpenick

Currently I'm facing another issue. This is related to timeout. Let me paste the error message here:

Query (SELECT * FROM table_name WHERE id=?;) failed com.datastax.driver.core.exceptions.NoHostAvailableException: All host(s) tried for query failed (tried: host:9042 (com.datastax.driver.core.exceptions.DriverException: Timeout while trying to acquire available connection (you may want to increase the driver number of per-host connections)))

I've event increase the per host connections. Let share that part of code with you here:

private val cluster = Cluster.builder() .addContactPoints(configuration.astraHostnames: _*) .withPort(9042) .withPoolingOptions(new PoolingOptions().setConnectionsPerHost(HostDistance.LOCAL, 40, 10000).setConnectionsPerHost(HostDistance.REMOTE, 20, 5000)) .withSocketOptions(new SocketOptions().setReadTimeoutMillis(configuration.cassandraReadTimeout)) .withLoadBalancingPolicy(LatencyAwarePolicy.builder(new TokenAwarePolicy(new RoundRobinPolicy())).build) .withRetryPolicy(DefaultRetryPolicy.INSTANCE) .withSpeculativeExecutionPolicy(new ConstantSpeculativeExecutionPolicy(500, 2)) .build()

I think driver is working properly which is v.2.1.10.3. But my cql-proxy service is unable to handle multiple requests in hundreds at the same time.
Could you please share that how many request cql-proxy can handle at a time? and how I can change that number?

@mpenick
Copy link
Contributor

mpenick commented Jan 19, 2022

Thanks for the report. I'll work on reproducing the issue tomorrow.

@mpenick
Copy link
Contributor

mpenick commented Jan 21, 2022

@datalatics-official Could you please describe your workload?

Are you sending multiple requests asynchronously?

Also, those numbers for connections per host are really high. The driver does coalescing of requests and having more connections can mitigate the benefit of that.

@mpenick
Copy link
Contributor

mpenick commented Jan 21, 2022

I'm unable to reproduce using driver v2.1.10.3 using the following for load:

package org.example;

import com.codahale.metrics.ConsoleReporter;
import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.Timer;
import com.codahale.metrics.Timer.Context;
import com.datastax.driver.core.Cluster;
import com.datastax.driver.core.ConsistencyLevel;
import com.datastax.driver.core.HostDistance;
import com.datastax.driver.core.PoolingOptions;
import com.datastax.driver.core.PreparedStatement;
import com.datastax.driver.core.ResultSetFuture;
import com.datastax.driver.core.Session;
import com.datastax.driver.core.SocketOptions;
import com.datastax.driver.core.policies.ConstantSpeculativeExecutionPolicy;
import com.datastax.driver.core.policies.DefaultRetryPolicy;
import com.datastax.driver.core.policies.LatencyAwarePolicy;
import com.datastax.driver.core.policies.RoundRobinPolicy;
import com.datastax.driver.core.policies.TokenAwarePolicy;
import com.google.common.util.concurrent.Futures;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

public class App
{
    private static final int  numThreads = 10;
    private static final MetricRegistry metrics = new MetricRegistry();
    private static final Timer requests = metrics.timer("requests");
    private static final ExecutorService executor = Executors.newFixedThreadPool(numThreads);

    public static void main( String[] args ) throws InterruptedException {
        ConsoleReporter reporter = ConsoleReporter.forRegistry(metrics)
            .convertRatesTo(TimeUnit.SECONDS)
            .convertDurationsTo(TimeUnit.MILLISECONDS)
            .build();
        reporter.start(2, TimeUnit.SECONDS);

        Cluster cluster = null;
        try {
            cluster = Cluster.builder()
                .addContactPoint("127.0.0.1")
                .withPort(9042)
                .withPoolingOptions(new PoolingOptions().setConnectionsPerHost(HostDistance.LOCAL,  40, 10000).setConnectionsPerHost(HostDistance.REMOTE, 20, 5000))
                .withSocketOptions(new SocketOptions().setReadTimeoutMillis(5000))
                .withLoadBalancingPolicy(LatencyAwarePolicy.builder(new TokenAwarePolicy(new RoundRobinPolicy())).build())
                .withRetryPolicy(DefaultRetryPolicy.INSTANCE)
                .withSpeculativeExecutionPolicy(new ConstantSpeculativeExecutionPolicy(500, 2))
                .build();
            Session session = cluster.connect("testks1");

            PreparedStatement preparedStatement = session.prepare("INSERT INTO testks1.test (pk) VALUES(?)").setConsistencyLevel(
                ConsistencyLevel.LOCAL_QUORUM);

            for (int i = 0; i < numThreads; ++i) {
                final long value = i;
                executor.submit(() -> {
                    while (true) {
                        try {
                            try (Context ignored = requests.time()) {
                                final int numRequests = 100;
                                List<ResultSetFuture> futures = new ArrayList<>(numRequests);
                                for (int j = 0; j < numRequests; ++j) {
                                    futures.add(
                                        session.executeAsync(preparedStatement.bind(value)));
                                }
                                Futures.successfulAsList(futures).get();
                            }
                        } catch (Exception e) {
                            System.err.println(e);
                        }
                    }
                });
            }

            while (!executor.awaitTermination(9999, TimeUnit.DAYS)) {
            }
        } finally {
            if (cluster != null) cluster.close();
        }
    }
}

@datalatics-official
Copy link
Author

datalatics-official commented Jan 25, 2022

@mpenick Thanks for quick response. Let me check this in details. Get back to you soon with my feedback.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants