-
Notifications
You must be signed in to change notification settings - Fork 390
/
generator.h
169 lines (142 loc) · 5.77 KB
/
generator.h
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
/*
* Copyright (c) 2013 Juniper Networks, Inc. All rights reserved.
*/
#ifndef _GENERATOR_H_
#define _GENERATOR_H_
#include <boost/shared_ptr.hpp>
#include <string>
#include <boost/tuple/tuple.hpp>
#include "viz_message.h"
#include "base/queue_task.h"
#include <sandesh/sandesh.h>
#include <sandesh/sandesh_uve_types.h>
#include <sandesh/sandesh_state_machine.h>
#include "collector_uve_types.h"
#include "syslog_collector.h"
class DbHandler;
class Sandesh;
class VizSession;
class Collector;
class SandeshStateMachineStats;
class Generator {
public:
virtual ~Generator() {}
virtual const std::string ToString() const = 0;
virtual bool ProcessRules(const VizMsg *vmsg, bool rsc) = 0;
virtual DbHandler * GetDbHandler() const = 0; // db_handler_;
bool ReceiveSandeshMsg(const VizMsg *vmsg, bool rsc);
void SendSandeshMessageStatistics();
void GetStatistics(std::vector<SandeshStats> *ssv) const;
void GetStatistics(std::vector<SandeshLogLevelStats> *lsv) const;
private:
void UpdateStatistics(const VizMsg *vmsg);
VizMsgStatistics statistics_;
mutable tbb::mutex smutex_;
};
class SandeshGenerator : public Generator {
public:
typedef boost::tuple<std::string /* Source */, std::string /* Module */,
std::string /* Instance id */, std::string /* Node type */> GeneratorId;
SandeshGenerator(Collector * const collector, VizSession *session,
SandeshStateMachine *state_machine,
const std::string &source, const std::string &module,
const std::string &instance_id, const std::string &node_type,
DbHandlerPtr global_db_handler,
bool use_global_dbhandler);
~SandeshGenerator();
void ReceiveSandeshCtrlMsg(uint32_t connects);
void DisconnectSession(VizSession *vsession);
void ConnectSession(VizSession *vsession, SandeshStateMachine *state_machine);
bool GetSandeshStateMachineQueueCount(uint64_t &queue_count) const;
bool GetSandeshStateMachineDropLevel(std::string &drop_level) const;
bool GetSandeshStateMachineStats(SandeshStateMachineStats &sm_stats,
SandeshGeneratorBasicStats &sm_msg_stats) const;
bool GetDbStats(uint64_t *queue_count, uint64_t *enqueues,
std::string *drop_level, std::vector<SandeshStats> *vdropmstats) const;
bool IsStateMachineBackPressureTimerRunning() const;
void SendDbStatistics();
const std::string &instance_id() const { return instance_id_; }
const std::string &node_type() const { return node_type_; }
VizSession * session() const {
tbb::mutex::scoped_lock lock(mutex_);
return viz_session_;
}
const std::string &module() const { return module_; }
const std::string &source() const { return source_; }
virtual const std::string ToString() const { return name_; }
SandeshStateMachine * get_state_machine(void) {
return state_machine_;
}
const std::string State() const;
void GetGeneratorInfo(ModuleServerState &genlist) const;
void SetDbQueueWaterMarkInfo(Sandesh::QueueWaterMarkInfo &wm);
void ResetDbQueueWaterMarkInfo();
void SetSmQueueWaterMarkInfo(Sandesh::QueueWaterMarkInfo &wm);
void ResetSmQueueWaterMarkInfo();
void StartDbifReinit();
virtual DbHandler * GetDbHandler() const { return db_handler_.get(); }
bool UseGlobalDbHandler() const { return use_global_dbhandler_; }
uint32_t GetSandeshStateMachineMaxQueueCount() const {
return state_machine_->GetMaxQueueCount();
}
private:
virtual bool ProcessRules(const VizMsg *vmsg, bool rsc);
void set_session(VizSession *session);
void set_state_machine(SandeshStateMachine *state_machine) {
state_machine_ = state_machine;
// Update state machine
state_machine_->SetGeneratorKey(name_);
}
void HandleSeqRedisReply(const std::map<std::string,int32_t> &typeMap);
void HandleDelRedisReply(bool res);
void TimerErrorHandler(std::string name, std::string error);
bool DbConnectTimerExpired();
void Create_Db_Connect_Timer();
void Start_Db_Connect_Timer();
void Stop_Db_Connect_Timer();
void Delete_Db_Connect_Timer();
void Db_Connection_Uninit();
bool Db_Connection_Init();
void ProcessRulesCb(GenDb::DbOpResult::type dresult);
bool StateMachineBackPressureTimerExpired();
void CreateStateMachineBackPressureTimer();
void DeleteStateMachineBackPressureTimer();
void StartStateMachineBackPressureTimer();
void StopStateMachineBackPressureTimer();
static const uint32_t kWaitTimerSec = 10;
static const uint32_t kDbConnectTimerSec = 10;
Collector * const collector_;
SandeshStateMachine *state_machine_;
VizSession *viz_session_;
GeneratorInfoAttr gen_attr_;
const std::string instance_id_;
const std::string node_type_;
const std::string source_;
const std::string module_;
const std::string name_;
int instance_;
Timer *db_connect_timer_;
tbb::atomic<bool> disconnected_;
DbHandlerPtr db_handler_;
bool use_global_dbhandler_;
GenDb::GenDbIf::DbAddColumnCb process_rules_cb_;
Timer *sm_back_pressure_timer_;
mutable tbb::mutex mutex_;
};
class SyslogGenerator : public Generator {
public:
SyslogGenerator(SyslogListeners *const listeners,
const std::string &source, const std::string &module);
const std::string &module() const { return module_; }
const std::string &source() const { return source_; }
virtual const std::string ToString() const { return name_; }
virtual DbHandler * GetDbHandler() const { return db_handler_.get(); }
private:
virtual bool ProcessRules(const VizMsg *vmsg, bool rsc);
SyslogListeners * const syslog_;
const std::string source_;
const std::string module_;
const std::string name_;
DbHandlerPtr db_handler_;
};
#endif