Eris  1.4.0
StreamSocket_impl.h
1 /*
2  Copyright (C) 2014 Erik Ogenvik
3 
4  This program is free software; you can redistribute it and/or modify
5  it under the terms of the GNU General Public License as published by
6  the Free Software Foundation; either version 2 of the License, or
7  (at your option) any later version.
8 
9  This program is distributed in the hope that it will be useful,
10  but WITHOUT ANY WARRANTY; without even the implied warranty of
11  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12  GNU General Public License for more details.
13 
14  You should have received a copy of the GNU General Public License
15  along with this program; if not, write to the Free Software Foundation,
16  Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
17  */
18 
19 #ifndef STREAMSOCKET_IMPL_H_
20 #define STREAMSOCKET_IMPL_H_
21 
22 #ifdef HAVE_CONFIG_H
23 #include "config.h"
24 #endif
25 
26 #include "StreamSocket.h"
27 
28 #include <Atlas/Codec.h>
29 
30 static const int CONNECT_TIMEOUT_SECONDS = 5;
31 
32 namespace Eris
33 {
34 
35 template<typename ProtocolT>
36 AsioStreamSocket<ProtocolT>::AsioStreamSocket(
37  boost::asio::io_service& io_service, const std::string& client_name,
38  Atlas::Bridge& bridge, StreamSocket::Callbacks& callbacks) :
39  StreamSocket(io_service, client_name, bridge, callbacks), m_socket(
40  io_service)
41 {
42 }
43 
44 template<typename ProtocolT>
45 AsioStreamSocket<ProtocolT>::~AsioStreamSocket()
46 {
47  if (m_socket.is_open()) {
48  if (m_is_connected) {
49  try {
50  m_socket.shutdown(ProtocolT::socket::shutdown_both);
51  } catch (const std::exception& e) {
52  warning() << "Error when shutting down socket: " << e.what();
53  }
54  }
55  try {
56  m_socket.close();
57  } catch (const std::exception& e) {
58  warning() << "Error when closing socket.";
59  }
60  }
61 }
62 
63 template<typename ProtocolT>
64 typename ProtocolT::socket& AsioStreamSocket<ProtocolT>::getAsioSocket()
65 {
66  return m_socket;
67 }
68 
69 template<typename ProtocolT>
70 ResolvableAsioStreamSocket<ProtocolT>::ResolvableAsioStreamSocket(
71  boost::asio::io_service& io_service, const std::string& client_name,
72  Atlas::Bridge& bridge, StreamSocket::Callbacks& callbacks) :
73  AsioStreamSocket<ProtocolT>(io_service, client_name, bridge, callbacks),
74  m_resolver(io_service)
75 {
76 }
77 
78 
79 
80 template<typename ProtocolT>
81 void ResolvableAsioStreamSocket<ProtocolT>::connectWithQuery(
82  const typename ProtocolT::resolver::query& query)
83 {
84  auto self(this->shared_from_this());
85  m_resolver.async_resolve(query,
86  [&, self](const boost::system::error_code& ec, typename ProtocolT::resolver::iterator iterator) {
87  if (this->_callbacks.stateChanged) {
88  if (!ec && iterator != typename ProtocolT::resolver::iterator()) {
89  this->connect(*iterator);
90  } else {
91  this->_callbacks.stateChanged(StreamSocket::CONNECTING_FAILED);
92  }
93  }
94  });
95 }
96 
97 template<typename ProtocolT>
98 void AsioStreamSocket<ProtocolT>::connect(
99  const typename ProtocolT::endpoint& endpoint)
100 {
101  _connectTimer.expires_from_now(
102  boost::posix_time::seconds(CONNECT_TIMEOUT_SECONDS));
103  auto self(this->shared_from_this());
104  _connectTimer.async_wait([&, self](boost::system::error_code ec) {
105  if (!ec) {
106  if (_callbacks.stateChanged) {
107  _callbacks.stateChanged(CONNECTING_TIMEOUT);
108  }
109  }
110  });
111 
112  m_socket.async_connect(endpoint,
113  [this, self](boost::system::error_code ec) {
114  if (_callbacks.stateChanged) {
115  if (!ec) {
116  this->_connectTimer.cancel();
117  m_is_connected = true;
118  this->startNegotiation();
119  } else {
120  _callbacks.stateChanged(CONNECTING_FAILED);
121  }
122  }
123  });
124 }
125 
126 template<typename ProtocolT>
127 void AsioStreamSocket<ProtocolT>::negotiate_read()
128 {
129  auto self(this->shared_from_this());
130  m_socket.async_read_some(mReadBuffer.prepare(read_buffer_size),
131  [this, self](boost::system::error_code ec, std::size_t length)
132  {
133  if (_callbacks.stateChanged) {
134  if (!ec)
135  {
136  mReadBuffer.commit(length);
137  if (length > 0) {
138  auto negotiateResult = this->negotiate();
139  if (negotiateResult == Atlas::Negotiate::FAILED) {
140  m_socket.close();
141  _callbacks.stateChanged(NEGOTIATE_FAILED);
142  return;
143  }
144  }
145 
146  //If the _sc instance is removed we're done with negotiation and should start the main loop.
147  if (_sc == nullptr) {
148  this->write();
149  this->do_read();
150  } else {
151  this->negotiate_write();
152  this->negotiate_read();
153  }
154  } else {
155  if (ec != boost::asio::error::operation_aborted) {
156  _callbacks.stateChanged(CONNECTION_FAILED);
157  } else {
158  warning() << "Error when reading from socket while negotiating: (" << ec << ") " << ec.message();
159  }
160  }
161  }
162  });
163 }
164 
165 template<typename ProtocolT>
166 void AsioStreamSocket<ProtocolT>::do_read()
167 {
168  auto self(this->shared_from_this());
169  m_socket.async_read_some(mReadBuffer.prepare(read_buffer_size),
170  [this, self](boost::system::error_code ec, std::size_t length)
171  {
172  if (_callbacks.stateChanged) {
173  if (!ec)
174  {
175  mReadBuffer.commit(length);
176  m_codec->poll(true);
177  _callbacks.dispatch();
178  this->do_read();
179  } else {
180  if (ec != boost::asio::error::operation_aborted) {
181  _callbacks.stateChanged(CONNECTION_FAILED);
182  } else {
183  warning() << "Error when reading from socket: (" << ec << ") " << ec.message();
184  }
185  }
186  }
187  });
188 }
189 
190 template<typename ProtocolT>
192 {
193  if (mWriteBuffer->size() != 0) {
194  if (mIsSending) {
195  //We're already sending in the background.
196  //Make that we should send again once we've completed sending.
197  mShouldSend = true;
198  return;
199  }
200 
201  mShouldSend = false;
202 
203  //We'll use a self reference to make sure that the client isn't deleted while sending.
204  auto self(this->shared_from_this());
205  //Swap places between writing buffer and sending buffer, and attach new write buffer to the out stream.
206  std::swap(mWriteBuffer, mSendBuffer);
207  mOutStream.rdbuf(mWriteBuffer);
208  mIsSending = true;
209 
210  async_write(m_socket, mSendBuffer->data(),
211  [this, self](boost::system::error_code ec, std::size_t length)
212  {
213  mSendBuffer->consume(length);
214  mIsSending = false;
215  if (!ec) {
216  //Is there data queued for transmission which we should send right away?
217  if (mShouldSend) {
218  this->write();
219  }
220  } else {
221  if (ec != boost::asio::error::operation_aborted) {
222  if (_callbacks.stateChanged) {
223  _callbacks.stateChanged(CONNECTION_FAILED);
224  }
225  } else {
226  warning() << "Error when writing to socket: (" << ec << ") " << ec.message();
227  }
228  }
229  });
230  }
231 
232 }
233 
234 template<typename ProtocolT>
236 {
237 
238  if (mWriteBuffer->size() != 0) {
239  auto self(this->shared_from_this());
240  boost::asio::async_write(m_socket, mWriteBuffer->data(),
241  [this, self](boost::system::error_code ec, std::size_t length)
242  {
243  if (!ec)
244  {
245  this->mWriteBuffer->consume(length);
246  } else {
247  warning() << "Error when writing to socket while negotiating: (" << ec << ") " << ec.message();
248  }
249  });
250  }
251 }
252 
253 }
254 
255 #endif /* STREAMSOCKET_IMPL_H_ */
256 
Definition: LogStream.h:55
boost::asio::streambuf mReadBuffer
Buffer for data being read from the socket.
Definition: StreamSocket.h:142
bool mShouldSend
True if we should send again as soon as an ongoing async_write operation completes.
Definition: StreamSocket.h:157
Atlas::Net::StreamConnect * _sc
negotiation object (NULL after connection!)
Definition: StreamSocket.h:164
Every Eris class and type lives inside the Eris namespace; certain utility functions live in the Util...
Definition: Account.cpp:34
connection failed
Definition: StreamSocket.h:67
boost::asio::streambuf * mWriteBuffer
Buffer used to write data to be sent.
Definition: StreamSocket.h:131
std::ostream mOutStream
Stream for data being sent out.
Definition: StreamSocket.h:152
virtual void write()
Send any unsent data.
Definition: StreamSocket_impl.h:191
Template specialization which uses boost::asio sockets.
Definition: StreamSocket.h:182
bool mIsSending
True if we&#39;re currently sending through an async_write (and thus shouldn&#39;t touch mSendBuffer).
Definition: StreamSocket.h:162
std::function< void(Status)> stateChanged
Called when the internal state has changed.
Definition: StreamSocket.h:85
boost::asio::streambuf * mSendBuffer
Buffer of data which is being sent.
Definition: StreamSocket.h:137
std::function< void()> dispatch
Called when operations have arrived and needs dispatching.
Definition: StreamSocket.h:80
failure when negotiating
Definition: StreamSocket.h:65