-
Notifications
You must be signed in to change notification settings - Fork 390
/
cql_if.h
141 lines (130 loc) · 5.64 KB
/
cql_if.h
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
//
// Copyright (c) 2015 Juniper Networks, Inc. All rights reserved.
//
#ifndef DATABASE_CASSANDRA_CQL_CQL_IF_H_
#define DATABASE_CASSANDRA_CQL_CQL_IF_H_
#include <string>
#include <vector>
#include <tbb/atomic.h>
#include <database/gendb_if.h>
#include <database/gendb_statistics.h>
#include <database/cassandra/cql/cql_types.h>
#include <database/cassandra/cql/cql_lib_if.h>
#include <database/cassandra/cql/cql_if_impl.h>
class EventManager;
namespace cass {
namespace cql {
class CqlIf : public GenDb::GenDbIf {
public:
CqlIf(EventManager *evm,
const std::vector<std::string> &cassandra_ips,
int cassandra_port,
const std::string &cassandra_user,
const std::string &cassandra_password);
CqlIf();
virtual ~CqlIf();
// Init/Uninit
virtual bool Db_Init();
virtual void Db_Uninit();
virtual void Db_UninitUnlocked();
virtual void Db_SetInitDone(bool);
// Tablespace
virtual bool Db_SetTablespace(const std::string &tablespace);
virtual bool Db_AddSetTablespace(const std::string &tablespace,
const std::string &replication_factor = "1");
// Column family
virtual bool Db_AddColumnfamily(const GenDb::NewCf &cf,
const std::string &compaction_strategy);
virtual bool Db_UseColumnfamily(const GenDb::NewCf &cf);
virtual bool Db_UseColumnfamily(const std::string &cfname);
// Column
virtual bool Db_AddColumn(std::auto_ptr<GenDb::ColList> cl,
GenDb::DbConsistency::type dconsistency,
GenDb::GenDbIf::DbAddColumnCb cb);
virtual bool Db_AddColumnSync(std::auto_ptr<GenDb::ColList> cl,
GenDb::DbConsistency::type dconsistency);
// Read
virtual bool Db_GetRow(GenDb::ColList *out, const std::string &cfname,
const GenDb::DbDataValueVec &rowkey,
GenDb::DbConsistency::type dconsistency);
virtual bool Db_GetMultiRow(GenDb::ColListVec *out,
const std::string &cfname,
const std::vector<GenDb::DbDataValueVec> &v_rowkey);
virtual bool Db_GetMultiRow(GenDb::ColListVec *out,
const std::string &cfname,
const std::vector<GenDb::DbDataValueVec> &v_rowkey,
const GenDb::ColumnNameRange &crange);
virtual bool Db_GetRowAsync(const std::string &cfname,
const GenDb::DbDataValueVec &rowkey,
GenDb::DbConsistency::type dconsistency,
GenDb::GenDbIf::DbGetRowCb cb);
virtual bool Db_GetRowAsync(const std::string &cfname,
const GenDb::DbDataValueVec &rowkey,
GenDb::DbConsistency::type dconsistency, int task_id,
int task_instance, GenDb::GenDbIf::DbGetRowCb cb);
virtual bool Db_GetRowAsync(const std::string &cfname,
const GenDb::DbDataValueVec &rowkey,
const GenDb::ColumnNameRange &crange,
GenDb::DbConsistency::type dconsistency,
GenDb::GenDbIf::DbGetRowCb cb);
virtual bool Db_GetRowAsync(const std::string &cfname,
const GenDb::DbDataValueVec &rowkey,
const GenDb::ColumnNameRange &crange,
GenDb::DbConsistency::type dconsistency, int task_id,
int task_instance, GenDb::GenDbIf::DbGetRowCb cb);
virtual bool Db_GetAllRows(GenDb::ColListVec *out,
const std::string &cfname, GenDb::DbConsistency::type dconsistency);
// Queue
virtual bool Db_GetQueueStats(uint64_t *queue_count,
uint64_t *enqueues) const;
virtual void Db_SetQueueWaterMark(bool high, size_t queue_count,
DbQueueWaterMarkCb cb);
virtual void Db_ResetQueueWaterMarks();
// Stats
virtual bool Db_GetStats(std::vector<GenDb::DbTableInfo> *vdbti,
GenDb::DbErrors *dbe);
virtual bool Db_GetCumulativeStats(std::vector<GenDb::DbTableInfo> *vdbti,
GenDb::DbErrors *dbe) const;
virtual void Db_GetCqlMetrics(Metrics *metrics) const;
virtual void Db_GetCqlStats(DbStats *db_stats) const;
// Connection
virtual std::vector<GenDb::Endpoint> Db_GetEndpoints() const;
private:
void OnAsyncColumnAddCompletion(GenDb::DbOpResult::type drc,
std::auto_ptr<GenDb::ColList> row,
std::string cfname, GenDb::GenDbIf::DbAddColumnCb cb);
void OnAsyncRowGetCompletion(GenDb::DbOpResult::type drc,
std::auto_ptr<GenDb::ColList> row,
std::string cfname, GenDb::GenDbIf::DbGetRowCb cb);
void OnAsyncRowGetCompletion(GenDb::DbOpResult::type drc,
std::auto_ptr<GenDb::ColList> row,
std::string cfname, GenDb::GenDbIf::DbGetRowCb cb,
bool use_worker, int task_id, int task_instance);
void IncrementTableWriteStats(const std::string &table_name);
void IncrementTableWriteStats(const std::string &table_name,
uint64_t num_writes);
void IncrementTableWriteFailStats(const std::string &table_name);
void IncrementTableWriteFailStats(const std::string &table_name,
uint64_t num_writes);
void IncrementTableWriteBackPressureFailStats(
const std::string &table_name);
void IncrementTableReadStats(const std::string &table_name);
void IncrementTableReadStats(const std::string &table_name,
uint64_t num_reads);
void IncrementTableReadFailStats(const std::string &table_name);
void IncrementTableReadFailStats(const std::string &table_name,
uint64_t num_reads);
void IncrementTableReadBackPressureFailStats(
const std::string &table_name);
void IncrementErrors(GenDb::IfErrors::Type err_type);
boost::scoped_ptr<interface::CassLibrary> cci_;
boost::scoped_ptr<CqlIfImpl> impl_;
tbb::atomic<bool> initialized_;
std::vector<GenDb::Endpoint> endpoints_;
mutable tbb::mutex stats_mutex_;
GenDb::GenDbIfStats stats_;
bool use_prepared_for_insert_;
};
} // namespace cql
} // namespace cass
#endif // DATABASE_CASSANDRA_CQL_CQL_IF_H_