YAMI4 Core
channel.h
1 // Copyright Maciej Sobczak 2008-2019.
2 // This file is part of YAMI4.
3 //
4 // YAMI4 is free software: you can redistribute it and/or modify
5 // it under the terms of the GNU General Public License as published by
6 // the Free Software Foundation, either version 3 of the License, or
7 // (at your option) any later version.
8 //
9 // YAMI4 is distributed in the hope that it will be useful,
10 // but WITHOUT ANY WARRANTY; without even the implied warranty of
11 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 // GNU General Public License for more details.
13 //
14 // You should have received a copy of the GNU General Public License
15 // along with YAMI4. If not, see <http://www.gnu.org/licenses/>.
16 
17 #ifndef YAMICORE_CHANNEL_H_INCLUDED
18 #define YAMICORE_CHANNEL_H_INCLUDED
19 
20 #include "core.h"
21 #include "details-fwd.h"
22 #include "options.h"
23 #include <cstddef>
24 
25 #ifdef YAMI4_WITH_OPEN_SSL
26 #ifdef _WIN32
27 // this is necessary to avoid conflicts between
28 // WinSock2.h (used by YAMI4) and winsock.h (used by OpenSSL):
29 #include <WinSock2.h>
30 #endif // _WIN32
31 #include <openssl/ssl.h>
32 #endif // YAMI4_WITH_OPEN_SSL
33 
34 // selected per platform
35 #include <details-types.h>
36 
37 namespace yami
38 {
39 
40 namespace details
41 {
42 
43 enum incoming_fsm { read_frame_header, read_frame_payload };
44 
45 const std::size_t word_size = 4;
46 const std::size_t frame_head_size = 4 * word_size;
47 
48 class selector;
49 
50 class channel
51 {
52 public:
53 
54  // for initializing newly created channels
55  // this function attempts to create a physical connection
56  // according to the given target
57  // should be called outside of critical section
58  core::result init(allocator & alloc, mutex & mtx,
59  const options & configuration_options,
60  const core::parameters * overriding_options,
61  const char * target,
62  core::incoming_message_dispatch_function incoming_message_callback,
63  void * incoming_message_hint,
64  core::event_notification_function event_notification_callback,
65  void * event_notification_hint,
66  core::io_error_function io_error_callback,
67  void * io_error_callback_hint);
68 
69  // for initializing channels that are already physically created
70  // by listeners
71  // target is transfered from listener to this channel
72  core::result init(allocator & alloc, mutex & mtx,
73  const options & configuration_options,
74  char * target, io_descriptor_type fd, protocol prot,
75  std::size_t preferred_frame_size,
76  core::incoming_message_dispatch_function incoming_message_callback,
77  void * incoming_message_hint,
78  core::event_notification_function event_notification_callback,
79  void * event_notification_hint,
80  core::io_error_function io_error_callback,
81  void * io_error_callback_hint);
82 
83 #ifdef YAMI4_WITH_OPEN_SSL
84  void set_client_ssl(SSL * ssl);
85  void set_server_ssl(SSL * ssl);
86 #endif // YAMI4_WITH_OPEN_SSL
87 
88  void close_connection();
89  void clean();
90 
91  std::size_t get_frame_size() const;
92 
93  // buffers are transferred into the internally managed outgoing queue
94  core::result post(std::size_t priority,
95  char * * buffers, const std::size_t * buffer_sizes,
96  std::size_t number_of_frames,
97  bool & first_frame,
98  core::message_progress_function progress_callback,
99  void * progress_hint);
100 
101  // intended for cases when the incoming frame is physically read
102  // outside of the channel, but also used for datagrams (UDP)
103  core::result process_complete_incoming_frame(
104  const char * buffer, const std::size_t buffer_size);
105 
106  core::result do_some_work(io_direction direction, bool & close_me);
107 
108 #ifdef YAMI4_WITH_QNX
109  // used for QNX channels to send, in a blocking way,
110  // a single outgoing frame (if there is any)
111  core::result send_blocking_datagram(bool & close_me);
112 
113  // used to establish the channel id for receiving responses
114  // from remote peers, this id is sent together with each outgoing message
115  void set_default_qnx_listening_channel_id(int chid);
116 
117  // used by channel to interrupt the selector
118  // after successful send of the QNX frame,
119  // so that the worker thread can wake up if it happened to be waiting
120  void set_selector(selector & slc);
121 #endif // YAMI4_WITH_QNX
122 
123  core::result post_close(std::size_t priority, bool & close_me);
124 
125  const char * get_target() const { return target_; }
126  const char * move_target();
127 
128  io_descriptor_type get_io_descriptor() const { return fd_; }
129 
130  void get_io_descriptor(
131  io_descriptor_type & fd, io_direction & direction) const;
132 
133  bool has_buffered_data() const { return buffer_available_ > buffer_consumed_; }
134 
135 #ifdef YAMI4_WITH_OPEN_SSL
136  SSL * get_ssl() const { return ssl_; }
137 
138  void set_pending_read(bool pending) { pending_read_ = pending; }
139  bool get_pending_read() const { return pending_read_; }
140 #endif // YAMI4_WITH_OPEN_SSL
141 
142  protocol get_protocol() const { return protocol_; }
143 
144  void set_selector_index(int index) { selector_index_ = index; }
145  int get_selector_index() const { return selector_index_; }
146 
147  void inc_ref() { ++ref_count_; }
148  void dec_ref() { --ref_count_; }
149  bool can_be_removed() const { return ref_count_ == 0; }
150 
151  // for unit tests
152  outgoing_frame * get_first_outgoing_frame() const;
153  outgoing_frame * get_last_outgoing_frame() const;
154  incoming_message_frame_list * get_first_incoming_frame_list() const;
155  incoming_message_frame_list * get_last_incoming_frame_list() const;
156  incoming_fsm get_incoming_state() const;
157 
158  std::size_t get_pending_outgoing_bytes() const;
159 
160 private:
161 
162  void common_init(allocator & alloc, mutex & mtx,
163  const options & configuration_options,
164  const core::parameters * overriding_options,
165  core::incoming_message_dispatch_function incoming_message_callback,
166  void * incoming_message_hint,
167  core::event_notification_function event_notification_callback,
168  void * event_notification_hint,
169  core::io_error_function io_error_callback,
170  void * io_error_callback_hint);
171 
172  outgoing_frame * * find_outgoing_insertion_point(std::size_t priority);
173 
174  // cleans all frames starting with the given one
175  void clean_outgoing_frames(outgoing_frame * frame);
176 
177  void notify_cancellation(
178  core::message_progress_function progress_callback,
179  void * progress_hint);
180 
181  void parse_incoming_frame_header();
182 
183  core::result store_incoming_frame();
184 
185  core::result dispatch_short_incoming_message();
186  core::result dispatch_incoming_message(
187  incoming_message_frame_list * list);
188 
189  // cleans all incoming frame lists and their frames
190  void clean_incoming_messages();
191 
192  // cleans the given list
193  void clean_incoming_frames(incoming_message_frame_list * list);
194 
195  void notify_progress_and_consume_frame_list();
196 
197  core::result do_some_input();
198  core::result do_some_output(bool & close_me);
199 
200  core::result connect();
201  core::result connect_to_file(const char * file_name);
202  core::result connect_to_tcp(const char * tcp_address);
203  core::result connect_to_udp(const char * udp_address);
204  core::result connect_to_unix(const char * path);
205 #ifdef YAMI4_WITH_QNX
206  core::result connect_to_qnx(const char * qnx_endpoint);
207 #endif // YAMI4_WITH_QNX
208 #ifdef YAMI4_WITH_OPEN_SSL
209  core::result connect_to_tcps(const char * tcp_address);
210 #endif // YAMI4_WITH_OPEN_SSL
211 
212  core::result read_bytes(char * buf, std::size_t size,
213  std::size_t & readn);
214  core::result write_bytes(const char * buf, std::size_t size,
215  std::size_t & written);
216 
217  options configuration_options_;
218  allocator * alloc_;
219 
220 #ifdef YAMI4_WITH_OPEN_SSL
221  SSL * ssl_;
222  bool pending_read_;
223 #endif // YAMI4_WITH_OPEN_SSL
224 
225  mutex * mtx_; // provided by channel group
226 
227  char * target_;
228  protocol protocol_;
229  io_direction direction_;
230  io_descriptor_type fd_;
231  int selector_index_;
232 
233  // used only for reading with TCP and allocated lazily
234  char * frame_buffer_;
235  std::size_t buffer_capacity_;
236  std::size_t buffer_available_;
237  std::size_t buffer_consumed_;
238 
239  std::size_t preferred_frame_size_;
240 
241  // used only with UDP, as it is needed with every send operation
242  void * target_address_;
243  int target_address_size_;
244  char * datagram_buffer_;
245 
246 #ifdef YAMI4_WITH_QNX
247  // used only with QNX channels
248  int connection_id_;
249  selector * selector_;
250  // buffer for serialized chid prefix
251  char qnx_listener_channel_id_serialized_[4u];
252 #endif // YAMI4_WITH_QNX
253 
254  std::size_t ref_count_;
255 
256  // soft close allows output of remaining existing frames,
257  // hard close means dropping everything
258  enum mode { operational, soft_close, hard_close };
259  mode mode_;
260 
261  // state of outgoing queue
262 
263  std::size_t output_frame_offset_;
264  outgoing_frame * first_outgoing_frame_;
265  outgoing_frame * last_outgoing_frame_;
266 
267  // state of incoming queue
268 
269  incoming_fsm incoming_state_;
270  std::size_t read_offset_; // within a given state
271 
272  char frame_head_buffer_[frame_head_size];
273  int current_message_id_;
274  std::size_t current_frame_number_;
275  std::size_t current_message_header_size_;
276  std::size_t current_frame_payload_size_;
277  char * current_frame_payload_;
278  bool last_frame_;
279 
280  incoming_message_frame_list * first_incoming_frame_list_;
281  incoming_message_frame_list * last_incoming_frame_list_;
282 
283  std::size_t pending_outgoing_bytes_;
284 
285  core::incoming_message_dispatch_function incoming_message_callback_;
286  void * incoming_message_hint_;
287 
288  core::event_notification_function event_notification_callback_;
289  void * event_notification_hint_;
290 
291  core::io_error_function io_error_callback_;
292  void * io_error_callback_hint_;
293 };
294 
295 } // namespace details
296 
297 } // namespace yami
298 
299 #endif // YAMICORE_CHANNEL_H_INCLUDED
void(* io_error_function)(void *hint, int error_code, const char *description)
Type of function callback for internal I/O error logging.
Definition: core.h:149
void(* message_progress_function)(void *hint, std::size_t sent_bytes, std::size_t total_byte_count)
Definition: core.h:121
Namespace devoted for everything related to YAMI4.
Definition: agent.h:25
void(* incoming_message_dispatch_function)(void *hint, const char *source, const char *header_buffers[], std::size_t header_buffer_sizes[], std::size_t num_of_header_buffers, const char *body_buffers[], std::size_t body_buffer_sizes[], std::size_t num_of_body_buffers)
Definition: core.h:70
void(* event_notification_function)(void *hint, event_notification e, const char *str, std::size_t size)
Definition: core.h:142
result
General type for reporting success and error states.
Definition: core.h:32