-
Notifications
You must be signed in to change notification settings - Fork 390
/
tcp_server.h
191 lines (145 loc) · 5.61 KB
/
tcp_server.h
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
/*
* Copyright (c) 2013 Juniper Networks, Inc. All rights reserved.
*/
#ifndef __TCPSERVER_H__
#define __TCPSERVER_H__
#include <map>
#include <set>
#include <boost/asio/ip/tcp.hpp>
#include <boost/intrusive_ptr.hpp>
#include <boost/scoped_ptr.hpp>
#include <tbb/mutex.h>
#include <tbb/compat/condition_variable>
#include "base/util.h"
#include "io/server_manager.h"
#include "io/io_utils.h"
class EventManager;
class TcpSession;
class SocketIOStats;
class TcpServer {
public:
typedef boost::asio::ip::tcp::endpoint Endpoint;
typedef boost::asio::ip::tcp::socket Socket;
explicit TcpServer(EventManager *evm);
virtual ~TcpServer();
// Bind a listening socket and register it with the event manager.
virtual bool Initialize(unsigned short port);
const std::string ToString() const { return name_; }
void SetAcceptor();
void ResetAcceptor();
// shutdown the listening socket.
void Shutdown();
// close all existing sessions and delete them.
void ClearSessions();
// Helper function that allocates a socket and calls the virtual method
// AllocSession. The session object is owned by the TcpServer and must
// be deallocated via DeleteSession.
virtual TcpSession *CreateSession();
// Delete a session object.
virtual void DeleteSession(TcpSession *session);
virtual void Connect(TcpSession *session, Endpoint remote);
virtual bool DisableSandeshLogMessages() { return false; }
int GetPort() const;
const io::SocketStats &GetSocketStats() const { return stats_; }
//
// Return the number of tcp sessions in the map
//
size_t GetSessionCount() const {
return session_ref_.size();
}
EventManager *event_manager() { return evm_; }
// Returns true if any of the sessions on this server has read available
// data.
bool HasSessionReadAvailable() const;
bool HasSessions() const;
TcpSession *GetSession(Endpoint remote);
// wait until the server has deleted all sessions.
void WaitForEmpty();
void GetRxSocketStats(SocketIOStats &socket_stats) const;
void GetTxSocketStats(SocketIOStats &socket_stats) const;
int SetMd5SocketOption(int fd, uint32_t peer_ip,
const std::string &md5_password);
int SetListenSocketMd5Option(uint32_t peer_ip,
const std::string &md5_password);
protected:
typedef boost::intrusive_ptr<TcpServer> TcpServerPtr;
typedef boost::intrusive_ptr<TcpSession> TcpSessionPtr;
// Create a session object.
virtual TcpSession *AllocSession(Socket *socket) = 0;
// Only SslServer overrides this method, to manage server with SSL
// socket instead of TCP socket
virtual TcpSession *AllocSession(bool server_session);
virtual Socket *accept_socket() const;
virtual void set_accept_socket();
//
// Passively accepted a new session. Returns true if the session is
// accepted, false otherwise.
//
// If the session is not accepted, tcp_server.cc deletes the newly
// created session.
//
virtual bool AcceptSession(TcpSession *session);
// For testing - will typically be used by derived class.
void set_socket_open_failure(bool flag) { socket_open_failure_ = flag; }
bool socket_open_failure() const { return socket_open_failure_; }
Endpoint LocalEndpoint() const;
virtual void AcceptHandlerComplete(TcpSessionPtr &session);
virtual void ConnectHandlerComplete(TcpSessionPtr &session);
private:
friend class TcpSession;
friend class TcpMessageWriter;
friend class BgpServerUnitTest;
friend void intrusive_ptr_add_ref(TcpServer *server);
friend void intrusive_ptr_release(TcpServer *server);
struct TcpSessionPtrCmp {
bool operator()(const TcpSessionPtr &lhs,
const TcpSessionPtr &rhs) const {
return lhs.get() < rhs.get();
}
};
typedef std::set<TcpSessionPtr, TcpSessionPtrCmp> SessionSet;
typedef std::multimap<Endpoint, TcpSession *> SessionMap;
void InsertSessionToMap(Endpoint remote, TcpSession *session);
bool RemoveSessionFromMap(Endpoint remote, TcpSession *session);
// Called by the asio service.
void AcceptHandlerInternal(TcpServerPtr server,
const boost::system::error_code &error);
void ConnectHandler(TcpServerPtr server, TcpSessionPtr session,
const boost::system::error_code &error);
// Trigger the async accept operation.
void AsyncAccept();
void OnSessionClose(TcpSession *session);
void SetName(Endpoint local_endpoint);
io::SocketStats stats_;
EventManager *evm_;
// mutex protects the session maps
mutable tbb::mutex mutex_;
tbb::interface5::condition_variable cond_var_;
SessionSet session_ref_;
SessionMap session_map_;
std::auto_ptr<Socket> so_accept_; // socket used in async_accept
boost::scoped_ptr<boost::asio::ip::tcp::acceptor> acceptor_;
tbb::atomic<int> refcount_;
std::string name_;
bool socket_open_failure_;
DISALLOW_COPY_AND_ASSIGN(TcpServer);
};
typedef boost::intrusive_ptr<TcpServer> TcpServerPtr;
inline void intrusive_ptr_add_ref(TcpServer *server) {
server->refcount_.fetch_and_increment();
}
inline void intrusive_ptr_release(TcpServer *server) {
int prev = server->refcount_.fetch_and_decrement();
if (prev == 1) {
delete server;
}
}
class TcpServerManager {
public:
static void AddServer(TcpServer *server);
static void DeleteServer(TcpServer *server);
static size_t GetServerCount();
private:
static ServerManager<TcpServer, TcpServerPtr> impl_;
};
#endif // __TCPSERVER_H__