17 #ifndef YAMICPP_AGENT_IMPL_H_INCLUDED 18 #define YAMICPP_AGENT_IMPL_H_INCLUDED 20 #include "agent_impl_base.h" 21 #include "id_generator.h" 22 #include "incoming_message_info.h" 23 #include "incoming_message_queue.h" 24 #include "name_resolver.h" 26 #include "outgoing_message.h" 27 #include "outgoing_message_manager.h" 28 #include "water_flow_manager.h" 29 #include <yami4-core/agent.h> 48 class incoming_message_dispatcher_base;
50 class agent_impl :
public agent_impl_base
54 agent_impl(
const parameters & options);
56 agent_impl(
const parameters & options, event_callback & event_listener);
60 std::string add_listener(
const std::string & listener);
61 void remove_listener(
const std::string & listener);
63 void open_connection(
const std::string & target);
64 void open_connection(
const std::string & target,
65 const parameters & options);
67 std::unique_ptr<outgoing_message> send(
68 const std::string & target,
69 const std::string & object_name,
70 const std::string & message_name,
71 const serializable & content,
76 outgoing_message & message,
77 const std::string & target,
78 const std::string & object_name,
79 const std::string & message_name,
80 const serializable & content,
84 bool clean_outgoing_message_callback(
long long id);
86 void send_one_way(
const std::string & target,
87 const std::string & object_name,
88 const std::string & message_name,
89 const serializable & content,
93 void send_reply(
const std::string & source,
94 long long message_id,
const serializable & body,
95 std::size_t priority);
97 void send_rejection(
const std::string & source,
98 long long message_id,
const std::string & reason,
99 std::size_t priority);
101 void close_connection(
const std::string & target, std::size_t priority);
103 void hard_close_connection(
const std::string & target);
105 virtual void register_object(
106 const std::string & object_name,
107 std::unique_ptr<incoming_message_dispatcher_base> &
object);
109 void unregister_object(
const std::string & object_name);
111 virtual long long send(
112 std::unique_ptr<outgoing_message_dispatcher_base> &
113 outgoing_message_callback,
114 const std::string & target,
115 const std::string & object_name,
116 const std::string & message_name,
117 const serializable & content,
118 std::size_t priority,
121 virtual void register_connection_event_monitor(
122 std::unique_ptr<connection_event_dispatcher_base> & monitor);
124 virtual void register_io_error_logger(
125 std::unique_ptr<io_error_dispatcher_base> & logger);
127 void report_connection_event(
131 void queue_incoming(std::unique_ptr<incoming_message_info> & incoming);
135 long long message_id, std::unique_ptr<parameters> & body);
137 long long message_id, std::unique_ptr<std::vector<char> > & raw_buffer);
140 void report_rejected(
long long message_id,
const std::string & reason);
143 void do_message_dispatching(std::size_t dispatcher_index);
146 void do_event_loop();
148 void get_outgoing_flow_state(std::size_t & current_level,
149 std::size_t & high_water_mark, std::size_t & low_water_mark)
const;
151 void decrease_outgoing_flow();
153 void get_channel_usage(
int & max_allowed,
int & used);
155 std::size_t get_pending_outgoing_bytes(
const std::string & target);
157 cpp_options options_;
160 agent_impl(
const agent_impl &);
161 void operator=(
const agent_impl &);
163 void init(
const parameters & options);
166 enum message_creation_policy
180 std::unique_ptr<outgoing_message> do_send(
181 const std::string & target,
182 const std::string & object_name,
183 const std::string & message_name,
184 const serializable & content,
185 std::size_t priority,
188 std::unique_ptr<outgoing_message_dispatcher_base> &
189 outgoing_message_callback,
190 outgoing_message * message,
191 message_creation_policy message_create,
192 long long * out_message_id = NULL);
194 std::unique_ptr<outgoing_message> do_send_to_single_target(
195 const std::string & target,
196 const core::parameters & header,
197 const serializable & content,
198 std::size_t priority,
199 long long message_id,
201 bool one_way,
bool wait_for_transmission,
bool wait_for_completion,
202 std::unique_ptr<outgoing_message_dispatcher_base> &
203 outgoing_message_callback,
204 outgoing_message * message,
205 message_creation_policy message_create);
207 core::channel_descriptor make_sure_channel_exists(
208 const std::string & target,
bool auto_connect,
209 const parameters * overriding_options = NULL);
211 void increase_outgoing_flow();
212 void increase_incoming_flow();
213 void decrease_incoming_flow();
215 void stop_worker_thread();
216 void stop_dispatcher_threads(std::size_t num_of_threads);
220 outgoing_message_manager outgoing_manager_;
222 water_flow_manager outgoing_flow_manager_;
223 flag outgoing_flow_allowed_;
225 incoming_message_queue incoming_queue_;
226 flag * dispatcher_stopped_flags_;
228 water_flow_manager incoming_flow_manager_;
229 bool allow_incoming_traffic_;
230 mutex allow_incoming_traffic_mtx_;
232 std::unique_ptr<connection_event_dispatcher_base> connection_event_monitor_;
233 std::unique_ptr<io_error_dispatcher_base> io_error_logger_;
234 event_callback * event_listener_;
236 name_resolver resolver_;
238 id_generator id_gen_;
240 bool worker_stop_request_;
241 mutex worker_stop_mtx_;
242 flag worker_stopped_;
249 #endif // YAMICPP_AGENT_IMPL_H_INCLUDED Namespace devoted to everything related to YAMI4.
Definition: activity_statistics_monitor.cpp:27
connection_event
Kind of connection event.
Definition: connection_event.h:24