/
ovsdb_client_session.cc
120 lines (100 loc) · 3.93 KB
/
ovsdb_client_session.cc
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
/*
* Copyright (c) 2014 Juniper Networks, Inc. All rights reserved.
*/
#include <assert.h>
extern "C" {
#include <ovsdb_wrapper.h>
};
#include <cmn/agent.h>
#include <ovsdb_types.h>
#include <ovsdb_client_connection_state.h>
#include <ovsdb_client_idl.h>
#include <ovsdb_client_session.h>
#include <ovsdb_route_peer.h>
#include <cstddef>
using OVSDB::OvsdbClientIdl;
using OVSDB::OvsdbClientSession;
using OVSDB::ConnectionStateTable;
int OvsdbClientSession::ovsdb_io_task_id_ = -1;
OvsdbClientSession::OvsdbClientSession(Agent *agent, OvsPeerManager *manager) :
client_idl_(NULL), agent_(agent), manager_(manager), parser_(NULL),
monitor_req_timer_(TimerManager::CreateTimer(
*(agent->event_manager())->io_service(),
"OVSDB Client Send Monitor Request Wait",
TaskScheduler::GetInstance()->GetTaskId("Agent::KSync"), 0)) {
idl_inited_ = false;
// initialize ovsdb_io task id on first constructor.
if (ovsdb_io_task_id_ == -1) {
ovsdb_io_task_id_ =
TaskScheduler::GetInstance()->GetTaskId("OVSDB::IO");
}
}
OvsdbClientSession::~OvsdbClientSession() {
TimerManager::DeleteTimer(monitor_req_timer_);
}
// This is invoked from OVSDB::IO task context. Handle the keepalive messages
// in The OVSDB::IO task context itself. OVSDB::IO should not have exclusion
// with any of the tasks
void OvsdbClientSession::MessageProcess(const u_int8_t *buf, std::size_t len) {
std::size_t used = 0;
// Multiple json message may be clubbed together, need to keep reading
// the buffer till whole message is consumed.
while (used != len) {
if (parser_ == NULL) {
parser_ = ovsdb_wrapper_json_parser_create(0);
}
const u_int8_t *pkt = buf + used;
std::size_t pkt_len = len - used;
std::size_t read;
read = ovsdb_wrapper_json_parser_feed(parser_, (const char *)pkt,
pkt_len);
used +=read;
/* If we have complete JSON, attempt to parse it as JSON-RPC. */
if (ovsdb_wrapper_json_parser_is_done(parser_)) {
struct json *json = ovsdb_wrapper_json_parser_finish(parser_);
parser_ = NULL;
struct jsonrpc_msg *msg;
char *error = ovsdb_wrapper_jsonrpc_msg_from_json(json, &msg);
if (error) {
assert(0);
free(error);
}
if (ovsdb_wrapper_msg_echo_req(msg)) {
// Echo request from ovsdb-server, reply inline so that
// ovsdb-server knows that connection is still active
struct jsonrpc_msg *reply;
reply = ovsdb_wrapper_jsonrpc_create_reply(msg);
SendJsonRpc(reply);
}
// If idl is inited and active, handover msg to IDL for processing
// we even enqueue processed echo req message to workqueue, to
// track session activity in IDL.
if (idl_inited_ == true && !client_idl_->deleted()) {
client_idl_->MessageProcess(msg);
continue;
}
ovsdb_wrapper_jsonrpc_msg_destroy(msg);
}
}
}
void OvsdbClientSession::SendJsonRpc(struct jsonrpc_msg *msg) {
struct json *json_msg = ovsdb_wrapper_jsonrpc_msg_to_json(msg);
char *s = ovsdb_wrapper_json_to_string(json_msg, 0);
ovsdb_wrapper_json_destroy(json_msg);
SendMsg((u_int8_t *)s, strlen(s));
// release the memory allocated by ovsdb_wrapper_json_to_string
free(s);
}
void OvsdbClientSession::OnEstablish() {
OVSDB_TRACE(Trace, "Connection to client established");
client_idl_ = new OvsdbClientIdl(this, agent_, manager_);
idl_inited_ = true;
client_idl_->OnEstablish();
}
void OvsdbClientSession::OnClose() {
OVSDB_TRACE(Trace, "Connection to client Closed");
client_idl_->TriggerDeletion();
}
OvsdbClientIdl *OvsdbClientSession::client_idl() {
return client_idl_.get();
}