-
Notifications
You must be signed in to change notification settings - Fork 390
/
config_cassandra_client.h
282 lines (228 loc) · 10.5 KB
/
config_cassandra_client.h
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
/*
* Copyright (c) 2016 Juniper Networks, Inc. All rights reserved.
*/
#ifndef ctrlplane_config_cass_client_h
#define ctrlplane_config_cass_client_h
#include <boost/shared_ptr.hpp>
#include <tbb/spin_rw_mutex.h>
#include "base/queue_task.h"
#include "config_db_client.h"
#include "database/gendb_if.h"
#include "json_adapter_data.h"
class EventManager;
class ConfigJsonParser;
class ConfigClientManager;
struct ConfigDBConnInfo;
class TaskTrigger;
class ConfigCassandraClient;
struct ConfigCassandraParseContext;
class ConfigDBFQNameCacheEntry;
class ConfigDBUUIDCacheEntry;
class ObjectProcessReq {
public:
ObjectProcessReq(std::string oper, std::string obj_type,
std::string uuid_str) : oper_(oper),
obj_type_(obj_type), uuid_str_(uuid_str) {
}
std::string oper_;
std::string obj_type_;
std::string uuid_str_;
private:
DISALLOW_COPY_AND_ASSIGN(ObjectProcessReq);
};
class ConfigCassandraPartition {
public:
typedef boost::shared_ptr<WorkQueue<ObjectProcessReq *> >
ObjProcessWorkQType;
ConfigCassandraPartition(ConfigCassandraClient *client, size_t idx);
~ConfigCassandraPartition();
ObjProcessWorkQType obj_process_queue() {
return obj_process_queue_;
}
void FormDeleteRequestList(const std::string &uuid,
ConfigClientManager::RequestList *req_list,
IFMapTable::RequestKey *key, bool add_change);
bool StoreKeyIfUpdated(const std::string &uuid, const std::string &key,
const std::string &value, uint64_t timestamp,
ConfigCassandraParseContext &context);
void MarkCacheDirty(const std::string &uuid);
void Enqueue(ObjectProcessReq *req);
bool UUIDToObjCacheShow(const std::string &uuid,
ConfigDBUUIDCacheEntry &entry) const;
bool UUIDToObjCacheShow(const std::string &start_uuid,
uint32_t num_entries,
std::vector<ConfigDBUUIDCacheEntry> &entries) const;
private:
friend class ConfigCassandraClient;
struct ObjectProcessRequestType {
ObjectProcessRequestType(const std::string &in_oper,
const std::string &in_obj_type,
const std::string &in_uuid)
: oper(in_oper), obj_type(in_obj_type), uuid(in_uuid) {
}
std::string oper;
std::string obj_type;
std::string uuid;
};
typedef std::map<std::string, ObjectProcessRequestType *> UUIDProcessSet;
typedef std::pair<uint64_t, bool> FieldTimeStampInfo;
typedef std::map<std::string, FieldTimeStampInfo> FieldDetailMap;
// Map of UUID to Field mapping
typedef std::map<std::string, FieldDetailMap> ObjectCacheMap;
bool RequestHandler(ObjectProcessReq *req);
void AddUUIDToRequestList(const std::string &oper,
const std::string &obj_type, const std::string &uuid_str);
bool ConfigReader();
bool BunchReadReq(const std::set<std::string> &req_list);
void RemoveObjReqEntries(std::set<std::string> &req_list);
void RemoveObjReqEntry(std::string &uuid);
void UpdatePropertyDeleteToReqList(IFMapTable::RequestKey * key,
ObjectCacheMap::iterator uuid_iter, const std::string &lookup_key,
ConfigClientManager::RequestList *req_list);
ConfigCassandraClient *client() {
return config_client_;
}
void FillUUIDToObjCacheInfo(const std::string &uuid,
ObjectCacheMap::const_iterator uuid_iter,
ConfigDBUUIDCacheEntry &entry) const;
ObjProcessWorkQType obj_process_queue_;
UUIDProcessSet uuid_read_set_;
ObjectCacheMap object_cache_map_;
boost::shared_ptr<TaskTrigger> config_reader_;
ConfigCassandraClient *config_client_;
int worker_id_;
};
/*
* This class has the functionality to interact with the cassandra servers that
* store the user configuration.
*/
class ConfigCassandraClient : public ConfigDbClient {
public:
// Cassandra table names
static const std::string kUuidTableName;
static const std::string kFqnTableName;
// Task names
static const std::string kCassClientTaskId;
static const std::string kObjectProcessTaskId;
// wait time before retrying in seconds
static const uint64_t kInitRetryTimeUSec = 5000000;
// Number of UUID requests to handle in one config reader task execution
static const int kMaxRequestsToYield = 512;
// Number of UUIDs to read in one read request
static const int kMaxNumUUIDToRead = 64;
// Number of FQName entries to read in one read request
static const int kNumFQNameEntriesToRead = 4096;
typedef boost::scoped_ptr<GenDb::GenDbIf> GenDbIfPtr;
typedef std::pair<std::string, std::string> ObjTypeFQNPair;
typedef std::vector<ConfigCassandraPartition *> PartitionList;
ConfigCassandraClient(ConfigClientManager *mgr, EventManager *evm,
const IFMapConfigOptions &options,
ConfigJsonParser *in_parser, int num_workers);
virtual ~ConfigCassandraClient();
virtual void InitDatabase();
void BulkSyncDone();
virtual void GetConnectionInfo(ConfigDBConnInfo &status) const;
virtual uint32_t GetNumReadRequestToBunch() const;
ConfigJsonParser *json_parser() const { return parser_; }
ConfigClientManager *mgr() { return mgr_; }
const ConfigClientManager *mgr() const { return mgr_; }
ConfigCassandraPartition *GetPartition(const std::string &uuid);
const ConfigCassandraPartition *GetPartition(const std::string &uuid) const;
const ConfigCassandraPartition *GetPartition(int worker_id) const;
void EnqueueUUIDRequest(std::string oper, std::string obj_type,
std::string uuid_str);
virtual void FormDeleteRequestList(const std::string &uuid,
ConfigClientManager::RequestList *req_list,
IFMapTable::RequestKey *key, bool add_change);
// FQ Name Cache
ObjTypeFQNPair UUIDToFQName(const std::string &uuid_str,
bool deleted_ok = true) const;
virtual void AddFQNameCache(const std::string &uuid,
const std::string &obj_name,
const std::string &obj_type);
virtual void InvalidateFQNameCache(const std::string &uuid);
void PurgeFQNameCache(const std::string &uuid);
virtual bool UUIDToFQNameShow(const std::string &uuid,
ConfigDBFQNameCacheEntry &entry) const;
virtual bool UUIDToFQNameShow(const std::string &start_uuid,
uint32_t num_entries,
std::vector<ConfigDBFQNameCacheEntry> &entries) const;
virtual bool UUIDToObjCacheShow(int inst_num, const std::string &uuid,
ConfigDBUUIDCacheEntry &entry) const;
virtual bool UUIDToObjCacheShow(int inst_num, const std::string &start_uuid,
uint32_t num_entries,
std::vector<ConfigDBUUIDCacheEntry> &entries) const;
protected:
typedef std::pair<std::string, std::string> ObjTypeUUIDType;
typedef std::list<ObjTypeUUIDType> ObjTypeUUIDList;
virtual bool ReadObjUUIDTable(std::set<std::string> *uuid_list);
bool ProcessObjUUIDTableEntry(const std::string &uuid_key,
const GenDb::ColList &col_list);
void ParseObjUUIDTableEachColumnBuildContext(const std::string &uuid,
const std::string &key, const std::string &value,
uint64_t timestamp, CassColumnKVVec *cass_data_vec,
ConfigCassandraParseContext &context);
virtual void ParseContextAndPopulateIFMapTable(const std::string &uuid_key,
const ConfigCassandraParseContext &context,
const CassColumnKVVec &cass_data_vec);
void UpdateFQNameCache(const std::string &key, const std::string &obj_type,
ObjTypeUUIDList &uuid_list);
virtual bool BulkDataSync();
bool EnqueueDBSyncRequest(const ObjTypeUUIDList &uuid_list);
virtual std::string FetchUUIDFromFQNameEntry(const std::string &key) const;
virtual void EnqueueDelete(const string &uuid,
ConfigClientManager::RequestList req_list) const;
virtual int HashUUID(const std::string &uuid_str) const;
virtual void HandleObjectDelete(const std::string &uuid);
virtual std::string GetUUID(const std::string &key) const { return key; }
virtual bool SkipTimeStampCheckForTypeAndFQName() const { return true; }
virtual uint32_t GetFQNameEntriesToRead() const {
return kNumFQNameEntriesToRead;
}
virtual const int GetMaxRequestsToYield() const {
return kMaxRequestsToYield;
}
virtual const uint64_t GetInitRetryTimeUSec() const {
return kInitRetryTimeUSec;
}
int num_workers() const { return num_workers_; }
PartitionList &partitions() {
return partitions_;
}
private:
friend class ConfigCassandraPartition;
// UUID to FQName mapping
struct FQNameCacheType {
FQNameCacheType(std::string in_obj_type, std::string in_obj_name)
: obj_type(in_obj_type), obj_name(in_obj_name), deleted(false) {
}
std::string obj_type;
std::string obj_name;
bool deleted;
};
typedef std::map<std::string, FQNameCacheType> FQNameCacheMap;
void InitRetry();
virtual void ParseObjUUIDTableEntry(const std::string &uuid,
const GenDb::ColList &col_list, CassColumnKVVec *cass_data_vec,
ConfigCassandraParseContext &context);
bool FQNameReader();
bool ParseFQNameRowGetUUIDList(const std::string &obj_type,
const GenDb::ColList &col_list, ObjTypeUUIDList &uuid_list,
std::string *last_column);
void HandleCassandraConnectionStatus(bool success);
void FillFQNameCacheInfo(const std::string &uuid,
FQNameCacheMap::const_iterator it, ConfigDBFQNameCacheEntry &entry) const;
ConfigClientManager *mgr_;
EventManager *evm_;
GenDbIfPtr dbif_;
ConfigJsonParser *parser_;
int num_workers_;
PartitionList partitions_;
boost::scoped_ptr<TaskTrigger> fq_name_reader_;
mutable tbb::spin_rw_mutex rw_mutex_;
FQNameCacheMap fq_name_cache_;
tbb::atomic<long> bulk_sync_status_;
tbb::atomic<bool> cassandra_connection_up_;
tbb::atomic<uint64_t> connection_status_change_at_;
};
#endif // ctrlplane_config_cass_client_h