19 #ifndef STREAMSOCKET_IMPL_H_ 20 #define STREAMSOCKET_IMPL_H_ 26 #include "StreamSocket.h" 28 #include <Atlas/Codec.h> 30 static const int CONNECT_TIMEOUT_SECONDS = 5;
35 template<
typename ProtocolT>
36 AsioStreamSocket<ProtocolT>::AsioStreamSocket(
37 boost::asio::io_service& io_service,
const std::string& client_name,
38 Atlas::Bridge& bridge, StreamSocket::Callbacks& callbacks) :
39 StreamSocket(io_service, client_name, bridge, callbacks), m_socket(
44 template<
typename ProtocolT>
45 AsioStreamSocket<ProtocolT>::~AsioStreamSocket()
47 if (m_socket.is_open()) {
50 m_socket.shutdown(ProtocolT::socket::shutdown_both);
51 }
catch (
const std::exception& e) {
52 warning() <<
"Error when shutting down socket: " << e.what();
57 }
catch (
const std::exception& e) {
58 warning() <<
"Error when closing socket.";
63 template<
typename ProtocolT>
64 typename ProtocolT::socket& AsioStreamSocket<ProtocolT>::getAsioSocket()
69 template<
typename ProtocolT>
70 ResolvableAsioStreamSocket<ProtocolT>::ResolvableAsioStreamSocket(
71 boost::asio::io_service& io_service,
const std::string& client_name,
72 Atlas::Bridge& bridge, StreamSocket::Callbacks& callbacks) :
73 AsioStreamSocket<ProtocolT>(io_service, client_name, bridge, callbacks),
74 m_resolver(io_service)
80 template<
typename ProtocolT>
81 void ResolvableAsioStreamSocket<ProtocolT>::connectWithQuery(
82 const typename ProtocolT::resolver::query& query)
84 auto self(this->shared_from_this());
85 m_resolver.async_resolve(query,
86 [&,
self](
const boost::system::error_code& ec,
typename ProtocolT::resolver::iterator iterator) {
88 if (!ec && iterator != typename ProtocolT::resolver::iterator()) {
89 this->connect(*iterator);
91 this->_callbacks.stateChanged(StreamSocket::CONNECTING_FAILED);
97 template<
typename ProtocolT>
98 void AsioStreamSocket<ProtocolT>::connect(
99 const typename ProtocolT::endpoint& endpoint)
101 _connectTimer.expires_from_now(
102 boost::posix_time::seconds(CONNECT_TIMEOUT_SECONDS));
103 auto self(this->shared_from_this());
104 _connectTimer.async_wait([&,
self](boost::system::error_code ec) {
107 _callbacks.stateChanged(CONNECTING_TIMEOUT);
112 m_socket.async_connect(endpoint,
113 [
this,
self](boost::system::error_code ec) {
116 this->_connectTimer.cancel();
117 m_is_connected = true;
118 this->startNegotiation();
120 _callbacks.stateChanged(CONNECTING_FAILED);
126 template<
typename ProtocolT>
127 void AsioStreamSocket<ProtocolT>::negotiate_read()
129 auto self(this->shared_from_this());
130 m_socket.async_read_some(
mReadBuffer.prepare(read_buffer_size),
131 [
this,
self](boost::system::error_code ec, std::size_t length)
138 auto negotiateResult = this->negotiate();
139 if (negotiateResult == Atlas::Negotiate::FAILED) {
147 if (
_sc ==
nullptr) {
151 this->negotiate_write();
152 this->negotiate_read();
155 if (ec != boost::asio::error::operation_aborted) {
158 warning() <<
"Error when reading from socket while negotiating: (" << ec <<
") " << ec.message();
165 template<
typename ProtocolT>
166 void AsioStreamSocket<ProtocolT>::do_read()
168 auto self(this->shared_from_this());
169 m_socket.async_read_some(
mReadBuffer.prepare(read_buffer_size),
170 [
this,
self](boost::system::error_code ec, std::size_t length)
180 if (ec != boost::asio::error::operation_aborted) {
183 warning() <<
"Error when reading from socket: (" << ec <<
") " << ec.message();
190 template<
typename ProtocolT>
204 auto self(this->shared_from_this());
211 [
this,
self](boost::system::error_code ec, std::size_t length)
221 if (ec != boost::asio::error::operation_aborted) {
226 warning() <<
"Error when writing to socket: (" << ec <<
") " << ec.message();
234 template<
typename ProtocolT>
239 auto self(this->shared_from_this());
240 boost::asio::async_write(m_socket,
mWriteBuffer->data(),
241 [
this,
self](boost::system::error_code ec, std::size_t length)
247 warning() <<
"Error when writing to socket while negotiating: (" << ec <<
") " << ec.message();
Definition: LogStream.h:55
boost::asio::streambuf mReadBuffer
Buffer for data being read from the socket.
Definition: StreamSocket.h:142
bool mShouldSend
True if we should send again as soon as an ongoing async_write operation completes.
Definition: StreamSocket.h:157
Atlas::Net::StreamConnect * _sc
negotiation object (NULL after connection!)
Definition: StreamSocket.h:164
Every Eris class and type lives inside the Eris namespace; certain utility functions live in the Util...
Definition: Account.cpp:34
connection failed
Definition: StreamSocket.h:67
boost::asio::streambuf * mWriteBuffer
Buffer used to write data to be sent.
Definition: StreamSocket.h:131
std::ostream mOutStream
Stream for data being sent out.
Definition: StreamSocket.h:152
virtual void write()
Send any unsent data.
Definition: StreamSocket_impl.h:191
Template specialization which uses boost::asio sockets.
Definition: StreamSocket.h:182
bool mIsSending
True if we're currently sending through an async_write (and thus shouldn't touch mSendBuffer).
Definition: StreamSocket.h:162
std::function< void(Status)> stateChanged
Called when the internal state has changed.
Definition: StreamSocket.h:85
boost::asio::streambuf * mSendBuffer
Buffer of data which is being sent.
Definition: StreamSocket.h:137
std::function< void()> dispatch
Called when operations have arrived and needs dispatching.
Definition: StreamSocket.h:80
failure when negotiating
Definition: StreamSocket.h:65