corosync 3.1.10
totemudpu.c
Go to the documentation of this file.
1/*
2 * Copyright (c) 2005 MontaVista Software, Inc.
3 * Copyright (c) 2006-2018 Red Hat, Inc.
4 *
5 * All rights reserved.
6 *
7 * Author: Steven Dake (sdake@redhat.com)
8
9 * This software licensed under BSD license, the text of which follows:
10 *
11 * Redistribution and use in source and binary forms, with or without
12 * modification, are permitted provided that the following conditions are met:
13 *
14 * - Redistributions of source code must retain the above copyright notice,
15 * this list of conditions and the following disclaimer.
16 * - Redistributions in binary form must reproduce the above copyright notice,
17 * this list of conditions and the following disclaimer in the documentation
18 * and/or other materials provided with the distribution.
19 * - Neither the name of the MontaVista Software, Inc. nor the names of its
20 * contributors may be used to endorse or promote products derived from this
21 * software without specific prior written permission.
22 *
23 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
24 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
25 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
26 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
27 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
28 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
29 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
30 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
31 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
32 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF
33 * THE POSSIBILITY OF SUCH DAMAGE.
34 */
35
36#include <config.h>
37
38#include <assert.h>
39#include <sys/mman.h>
40#include <sys/types.h>
41#include <sys/stat.h>
42#include <sys/socket.h>
43#include <netdb.h>
44#include <sys/un.h>
45#include <sys/ioctl.h>
46#include <sys/param.h>
47#include <netinet/in.h>
48#include <arpa/inet.h>
49#include <unistd.h>
50#include <fcntl.h>
51#include <stdlib.h>
52#include <stdio.h>
53#include <errno.h>
54#include <sched.h>
55#include <time.h>
56#include <sys/time.h>
57#include <sys/poll.h>
58#include <sys/uio.h>
59#include <limits.h>
60
61#include <qb/qblist.h>
62#include <qb/qbdefs.h>
63#include <qb/qbloop.h>
64
65#include <corosync/sq.h>
66#include <corosync/swab.h>
67#define LOGSYS_UTILS_ONLY 1
68#include <corosync/logsys.h>
69#include "totemudpu.h"
70
71#include "util.h"
72
73#ifndef MSG_NOSIGNAL
74#define MSG_NOSIGNAL 0
75#endif
76
77#define MCAST_SOCKET_BUFFER_SIZE (TRANSMITS_ALLOWED * UDP_RECEIVE_FRAME_SIZE_MAX)
78#define NETIF_STATE_REPORT_UP 1
79#define NETIF_STATE_REPORT_DOWN 2
80
81#define BIND_STATE_UNBOUND 0
82#define BIND_STATE_REGULAR 1
83#define BIND_STATE_LOOPBACK 2
84
86 struct qb_list_head list;
88 int fd;
89 int active;
90};
91
94
96
98
100
101 void *context;
102
104 void *context,
105 const void *msg,
106 unsigned int msg_len,
107 const struct sockaddr_storage *system_from);
108
110 void *context,
111 const struct totem_ip_address *iface_address,
112 unsigned int ring_no);
113
115
116 /*
117 * Function and data used to log messages
118 */
120
122
124
126
128
130
132 int level,
133 int subsys,
134 const char *function,
135 const char *file,
136 int line,
137 const char *format,
138 ...)__attribute__((format(printf, 6, 7)));
139
141
143
144 struct iovec totemudpu_iov_recv;
145
146 struct qb_list_head member_list;
147
149
151
153
155
157
158 struct timeval stats_tv_start;
159
161
163
164 qb_loop_timer_handle timer_netif_check_timeout;
165
166 unsigned int my_memb_entries;
167
169
171
173
175
177
178 qb_loop_timer_handle timer_merge_detect_timeout;
179
181
183};
184
185struct work_item {
186 const void *msg;
187 unsigned int msg_len;
189};
190
191static int totemudpu_build_sockets (
192 struct totemudpu_instance *instance,
193 struct totem_ip_address *bindnet_address,
194 struct totem_ip_address *bound_to);
195
196static int totemudpu_create_sending_socket(
197 void *udpu_context,
198 const struct totem_ip_address *member);
199
201 void *udpu_context);
202
203static void totemudpu_start_merge_detect_timeout(
204 void *udpu_context);
205
206static void totemudpu_stop_merge_detect_timeout(
207 void *udpu_context);
208
209static void totemudpu_instance_initialize (struct totemudpu_instance *instance)
210{
211 memset (instance, 0, sizeof (struct totemudpu_instance));
212
214
215 instance->totemudpu_iov_recv.iov_base = instance->iov_buffer;
216
217 instance->totemudpu_iov_recv.iov_len = UDP_RECEIVE_FRAME_SIZE_MAX + 1; //sizeof (instance->iov_buffer) + 1;
218
219 /*
220 * There is always atleast 1 processor
221 */
222 instance->my_memb_entries = 1;
223
224 qb_list_init (&instance->member_list);
225}
226
227#define log_printf(level, format, args...) \
228do { \
229 instance->totemudpu_log_printf ( \
230 level, instance->totemudpu_subsys_id, \
231 __FUNCTION__, __FILE__, __LINE__, \
232 (const char *)format, ##args); \
233} while (0);
234#define LOGSYS_PERROR(err_num, level, fmt, args...) \
235do { \
236 char _error_str[LOGSYS_MAX_PERROR_MSG_LEN]; \
237 const char *_error_ptr = qb_strerror_r(err_num, _error_str, sizeof(_error_str)); \
238 instance->totemudpu_log_printf ( \
239 level, instance->totemudpu_subsys_id, \
240 __FUNCTION__, __FILE__, __LINE__, \
241 fmt ": %s (%d)", ##args, _error_ptr, err_num); \
242 } while(0)
243
245 void *udpu_context,
246 const char *cipher_type,
247 const char *hash_type)
248{
249
250 return (0);
251}
252
253
254static inline void ucast_sendmsg (
255 struct totemudpu_instance *instance,
256 struct totem_ip_address *system_to,
257 const void *msg,
258 unsigned int msg_len)
259{
260 struct msghdr msg_ucast;
261 int res = 0;
262 struct sockaddr_storage sockaddr;
263 struct iovec iovec;
264 int addrlen;
265 int send_sock;
266
267 iovec.iov_base = (void *)msg;
268 iovec.iov_len = msg_len;
269
270 /*
271 * Build unicast message
272 */
274 instance->totem_interface->ip_port, &sockaddr, &addrlen);
275 memset(&msg_ucast, 0, sizeof(msg_ucast));
276 msg_ucast.msg_name = &sockaddr;
277 msg_ucast.msg_namelen = addrlen;
278 msg_ucast.msg_iov = (void *)&iovec;
279 msg_ucast.msg_iovlen = 1;
280
281 if (instance->netif_bind_state == BIND_STATE_REGULAR) {
282 send_sock = instance->token_socket;
283 } else {
284 send_sock = instance->local_loop_sock[1];
285 msg_ucast.msg_name = NULL;
286 msg_ucast.msg_namelen = 0;
287 }
288
289
290 /*
291 * Transmit unicast message
292 * An error here is recovered by totemsrp
293 */
294 res = sendmsg (send_sock, &msg_ucast, MSG_NOSIGNAL);
295 if (res < 0) {
297 "sendmsg(ucast) failed (non-critical)");
298 }
299}
300
301static inline void mcast_sendmsg (
302 struct totemudpu_instance *instance,
303 const void *msg,
304 unsigned int msg_len,
305 int only_active)
306{
307 struct msghdr msg_mcast;
308 int res = 0;
309 struct iovec iovec;
310 struct sockaddr_storage sockaddr;
311 int addrlen;
312 struct qb_list_head *list;
313 struct totemudpu_member *member;
314
315 iovec.iov_base = (void *)msg;
316 iovec.iov_len = msg_len;
317
318 memset(&msg_mcast, 0, sizeof(msg_mcast));
319 /*
320 * Build multicast message
321 */
322 if (instance->netif_bind_state == BIND_STATE_REGULAR) {
323 qb_list_for_each(list, &(instance->member_list)) {
324 member = qb_list_entry (list, struct totemudpu_member, list);
325 /*
326 * Do not send multicast message if message is not "flush", member
327 * is inactive and timeout for sending merge message didn't expired.
328 */
329 if (only_active && !member->active && !instance->send_merge_detect_message)
330 continue ;
331
333 instance->totem_interface->ip_port, &sockaddr, &addrlen);
334 memset(&msg_mcast, 0, sizeof(msg_mcast));
335 msg_mcast.msg_name = &sockaddr;
336 msg_mcast.msg_namelen = addrlen;
337 msg_mcast.msg_iov = (void *)&iovec;
338 msg_mcast.msg_iovlen = 1;
339
340 /*
341 * Transmit multicast message
342 * An error here is recovered by totemsrp
343 */
344 res = sendmsg (member->fd, &msg_mcast, MSG_NOSIGNAL);
345 if (res < 0) {
347 "sendmsg(mcast) failed (non-critical)");
348 }
349 }
350
351 if (!only_active || instance->send_merge_detect_message) {
352 /*
353 * Current message was sent to all nodes
354 */
356 instance->send_merge_detect_message = 0;
357 }
358 } else {
359 /*
360 * Transmit multicast message to local unix mcast loop
361 * An error here is recovered by totemsrp
362 */
363 msg_mcast.msg_name = NULL;
364 msg_mcast.msg_namelen = 0;
365 msg_mcast.msg_iov = (void *)&iovec;
366 msg_mcast.msg_iovlen = 1;
367
368 res = sendmsg (instance->local_loop_sock[1], &msg_mcast,
370 if (res < 0) {
372 "sendmsg(local mcast loop) failed (non-critical)");
373 }
374 }
375}
376
378 void *udpu_context)
379{
380 struct totemudpu_instance *instance = (struct totemudpu_instance *)udpu_context;
381 int res = 0;
382
383 if (instance->token_socket > 0) {
384 qb_loop_poll_del (instance->totemudpu_poll_handle,
385 instance->token_socket);
386 close (instance->token_socket);
387 }
388
389 if (instance->local_loop_sock[0] > 0) {
390 qb_loop_poll_del (instance->totemudpu_poll_handle,
391 instance->local_loop_sock[0]);
392 close (instance->local_loop_sock[0]);
393 close (instance->local_loop_sock[1]);
394 }
395
396 totemudpu_stop_merge_detect_timeout(instance);
397
398 return (res);
399}
400
401static struct totemudpu_member *find_member_by_sockaddr(
402 const void *udpu_context,
403 const struct sockaddr *sa)
404{
405 struct qb_list_head *list;
406 struct totemudpu_member *member;
407 struct totemudpu_member *res_member;
408 const struct totemudpu_instance *instance = (const struct totemudpu_instance *)udpu_context;
409
410 res_member = NULL;
411
412 qb_list_for_each(list, &(instance->member_list)) {
413 member = qb_list_entry (list,
414 struct totemudpu_member,
415 list);
416
417 if (totemip_sa_equal(&member->member, sa)) {
418 res_member = member;
419 break ;
420 }
421 }
422
423 return (res_member);
424}
425
426
427static int net_deliver_fn (
428 int fd,
429 int revents,
430 void *data)
431{
432 struct totemudpu_instance *instance = (struct totemudpu_instance *)data;
433 struct msghdr msg_recv;
434 struct iovec *iovec;
435 struct sockaddr_storage system_from;
436 int bytes_received;
437
438 iovec = &instance->totemudpu_iov_recv;
439
440 /*
441 * Receive datagram
442 */
443 memset(&msg_recv, 0, sizeof(msg_recv));
444 msg_recv.msg_name = &system_from;
445 msg_recv.msg_namelen = sizeof (struct sockaddr_storage);
446 msg_recv.msg_iov = iovec;
447 msg_recv.msg_iovlen = 1;
448
449 bytes_received = recvmsg (fd, &msg_recv, MSG_NOSIGNAL | MSG_DONTWAIT);
450 if (bytes_received == -1) {
451 return (0);
452 } else {
453 instance->stats_recv += bytes_received;
454 }
455
456 if (bytes_received >= UDP_RECEIVE_FRAME_SIZE_MAX + 1) {
457 /*
458 * Maximum packet size should be UDP_RECEIVE_FRAME_SIZE_MAX.
459 * If received packet is UDP_RECEIVE_FRAME_SIZE_MAX + 1 it means packet was truncated
460 * (iov_buffer size and iov_len are intentionally set to UDP_RECEIVE_FRAME_SIZE_MAX + 1).
461 */
463 "Received too big message. This may be because something bad is happening "
464 "on the network (attack?), or you tried join more nodes than corosync is "
465 "compiled with (%u) or bug in the code (bad estimation of "
466 "the UDP_RECEIVE_FRAME_SIZE_MAX). Dropping packet.", PROCESSOR_COUNT_MAX);
467 return (0);
468 }
469
470 if (instance->totem_config->block_unlisted_ips &&
472 find_member_by_sockaddr(instance, (const struct sockaddr *)&system_from) == NULL) {
473 log_printf(instance->totemudpu_log_level_debug, "Packet rejected from %s",
474 totemip_sa_print((const struct sockaddr *)&system_from));
475
476 return (0);
477 }
478
479 iovec->iov_len = bytes_received;
480
481 /*
482 * Handle incoming message
483 */
484 instance->totemudpu_deliver_fn (
485 instance->context,
486 iovec->iov_base,
487 iovec->iov_len,
488 &system_from);
489
490 iovec->iov_len = UDP_RECEIVE_FRAME_SIZE_MAX + 1;
491 return (0);
492}
493
494static int netif_determine (
495 struct totemudpu_instance *instance,
496 struct totem_ip_address *bindnet,
497 struct totem_ip_address *bound_to,
498 int *interface_up,
499 int *interface_num)
500{
501 int res;
502
503 res = totemip_iface_check (bindnet, bound_to,
504 interface_up, interface_num,
506
507
508 return (res);
509}
510
511
512/*
513 * If the interface is up, the sockets for totem are built. If the interface is down
514 * this function is requeued in the timer list to retry building the sockets later.
515 */
516static void timer_function_netif_check_timeout (
517 void *data)
518{
519 struct totemudpu_instance *instance = (struct totemudpu_instance *)data;
520 int interface_up;
521 int interface_num;
522
523 /*
524 * Build sockets for every interface
525 */
526 netif_determine (instance,
527 &instance->totem_interface->bindnet,
528 &instance->totem_interface->boundto,
529 &interface_up, &interface_num);
530 /*
531 * If the network interface isn't back up and we are already
532 * in loopback mode, add timer to check again and return
533 */
534 if ((instance->netif_bind_state == BIND_STATE_LOOPBACK &&
535 interface_up == 0) ||
536
537 (instance->my_memb_entries == 1 &&
539 interface_up == 1)) {
540
541 qb_loop_timer_add (instance->totemudpu_poll_handle,
542 QB_LOOP_MED,
543 instance->totem_config->downcheck_timeout*QB_TIME_NS_IN_MSEC,
544 (void *)instance,
545 timer_function_netif_check_timeout,
546 &instance->timer_netif_check_timeout);
547
548 /*
549 * Add a timer to check for a downed regular interface
550 */
551 return;
552 }
553
554 if (instance->token_socket > 0) {
555 qb_loop_poll_del (instance->totemudpu_poll_handle,
556 instance->token_socket);
557 close (instance->token_socket);
558 instance->token_socket = -1;
559 }
560
561 if (interface_up == 0) {
562 if (instance->netif_bind_state == BIND_STATE_UNBOUND) {
564 "One of your ip addresses are now bound to localhost. "
565 "Corosync would not work correctly.");
567 }
568
569 /*
570 * Interface is not up
571 */
573
574 /*
575 * Add a timer to retry building interfaces and request memb_gather_enter
576 */
577 qb_loop_timer_add (instance->totemudpu_poll_handle,
578 QB_LOOP_MED,
579 instance->totem_config->downcheck_timeout*QB_TIME_NS_IN_MSEC,
580 (void *)instance,
581 timer_function_netif_check_timeout,
582 &instance->timer_netif_check_timeout);
583 } else {
584 /*
585 * Interface is up
586 */
588 }
589 /*
590 * Create and bind the multicast and unicast sockets
591 */
592 totemudpu_build_sockets (instance,
593 &instance->totem_interface->bindnet,
594 &instance->totem_interface->boundto);
595
596 if (instance->netif_bind_state == BIND_STATE_REGULAR) {
597 qb_loop_poll_add (instance->totemudpu_poll_handle,
598 QB_LOOP_MED,
599 instance->token_socket,
600 POLLIN, instance, net_deliver_fn);
601 }
602
603 totemip_copy (&instance->my_id, &instance->totem_interface->boundto);
604
605 /*
606 * This reports changes in the interface to the user and totemsrp
607 */
608 if (instance->netif_bind_state == BIND_STATE_REGULAR) {
611 "The network interface [%s] is now up.",
614 instance->totemudpu_iface_change_fn (instance->context, &instance->my_id, 0);
615 }
616 /*
617 * Add a timer to check for interface going down in single membership
618 */
619 if (instance->my_memb_entries == 1) {
620 qb_loop_timer_add (instance->totemudpu_poll_handle,
621 QB_LOOP_MED,
622 instance->totem_config->downcheck_timeout*QB_TIME_NS_IN_MSEC,
623 (void *)instance,
624 timer_function_netif_check_timeout,
625 &instance->timer_netif_check_timeout);
626 }
627
628 } else {
631 "The network interface is down.");
632 instance->totemudpu_iface_change_fn (instance->context, &instance->my_id, 0);
633 }
635
636 }
637}
638
639/* Set the socket priority to INTERACTIVE to ensure
640 that our messages don't get queued behind anything else */
641static void totemudpu_traffic_control_set(struct totemudpu_instance *instance, int sock)
642{
643#ifdef SO_PRIORITY
644 int prio = 6; /* TC_PRIO_INTERACTIVE */
645
646 if (setsockopt(sock, SOL_SOCKET, SO_PRIORITY, &prio, sizeof(int))) {
648 "Could not set traffic priority");
649 }
650#endif
651}
652
653static int totemudpu_build_sockets_ip (
654 struct totemudpu_instance *instance,
655 struct totem_ip_address *bindnet_address,
656 struct totem_ip_address *bound_to,
657 int interface_num)
658{
659 struct sockaddr_storage sockaddr;
660 int addrlen;
661 int res;
662 unsigned int recvbuf_size;
663 unsigned int optlen = sizeof (recvbuf_size);
664 unsigned int retries = 0;
665
666 /*
667 * Setup unicast socket
668 */
669 instance->token_socket = socket (bindnet_address->family, SOCK_DGRAM, 0);
670 if (instance->token_socket == -1) {
672 "socket() failed");
673 return (-1);
674 }
675
677 res = fcntl (instance->token_socket, F_SETFL, O_NONBLOCK);
678 if (res == -1) {
680 "Could not set non-blocking operation on token socket");
681 return (-1);
682 }
683
684 /*
685 * Bind to unicast socket used for token send/receives
686 * This has the side effect of binding to the correct interface
687 */
688 totemip_totemip_to_sockaddr_convert(bound_to, instance->totem_interface->ip_port, &sockaddr, &addrlen);
689 while (1) {
690 res = bind (instance->token_socket, (struct sockaddr *)&sockaddr, addrlen);
691 if (res == 0) {
692 break;
693 }
695 "bind token socket failed");
696 if (++retries > BIND_MAX_RETRIES) {
697 break;
698 }
699
700 /*
701 * Wait for a while
702 */
703 (void)poll(NULL, 0, BIND_RETRIES_INTERVAL * retries);
704 }
705
706 if (res == -1) {
707 return (-1);
708 }
709
710 /*
711 * the token_socket can receive many messages. Allow a large number
712 * of receive messages on this socket
713 */
714 recvbuf_size = MCAST_SOCKET_BUFFER_SIZE;
715 res = setsockopt (instance->token_socket, SOL_SOCKET, SO_RCVBUF,
716 &recvbuf_size, optlen);
717 if (res == -1) {
719 "Could not set recvbuf size");
720 }
721
722 res = set_socket_dscp(instance->token_socket,
723 instance->totem_config->ip_dscp);
724 if (res == -1) {
726 "Could not set IP_TOS bits");
727 }
728
729 return 0;
730}
731
732int totemudpu_nodestatus_get (void *udpu_context, unsigned int nodeid,
733 struct totem_node_status *node_status)
734{
735 struct totemudpu_instance *instance = (struct totemudpu_instance *)udpu_context;
736 struct qb_list_head *list;
737 struct totemudpu_member *member;
738
739 qb_list_for_each(list, &(instance->member_list)) {
740 member = qb_list_entry (list,
741 struct totemudpu_member,
742 list);
743
744 if (member->member.nodeid == nodeid) {
745 node_status->nodeid = nodeid;
746 /* reachable is filled in by totemsrp */
747 if (instance->netif_bind_state == BIND_STATE_REGULAR) {
748 node_status->link_status[0].enabled = 1;
749 } else {
750 node_status->link_status[0].enabled = 0;
751 }
752 node_status->link_status[0].connected = node_status->reachable;
753 node_status->link_status[0].mtu = instance->totem_config->net_mtu;
754 strncpy(node_status->link_status[0].src_ipaddr, totemip_print(&member->member), KNET_MAX_HOST_LEN-1);
755 }
756 }
757 return (0);
758}
759
761 void *net_context,
762 char ***status,
763 unsigned int *iface_count)
764{
765 static char *statuses[INTERFACE_MAX] = {(char*)"OK"};
766
767 if (status) {
768 *status = statuses;
769 }
770 *iface_count = 1;
771
772 return (0);
773}
774
775
776static int totemudpu_build_local_sockets(
777 struct totemudpu_instance *instance)
778{
779 int i;
780 unsigned int sendbuf_size;
781 unsigned int recvbuf_size;
782 unsigned int optlen = sizeof (sendbuf_size);
783 int res;
784
785 /*
786 * Create local multicast loop socket
787 */
788 if (socketpair(AF_UNIX, SOCK_DGRAM, 0, instance->local_loop_sock) == -1) {
790 "socket() failed");
791 return (-1);
792 }
793
794 for (i = 0; i < 2; i++) {
795 totemip_nosigpipe (instance->local_loop_sock[i]);
796 res = fcntl (instance->local_loop_sock[i], F_SETFL, O_NONBLOCK);
797 if (res == -1) {
799 "Could not set non-blocking operation on multicast socket");
800 return (-1);
801 }
802 }
803
804 recvbuf_size = MCAST_SOCKET_BUFFER_SIZE;
805 sendbuf_size = MCAST_SOCKET_BUFFER_SIZE;
806
807 res = setsockopt (instance->local_loop_sock[0], SOL_SOCKET, SO_RCVBUF, &recvbuf_size, optlen);
808 if (res == -1) {
810 "Unable to set SO_RCVBUF size on UDP local mcast loop socket");
811 return (-1);
812 }
813 res = setsockopt (instance->local_loop_sock[1], SOL_SOCKET, SO_SNDBUF, &sendbuf_size, optlen);
814 if (res == -1) {
816 "Unable to set SO_SNDBUF size on UDP local mcast loop socket");
817 return (-1);
818 }
819
820 res = getsockopt (instance->local_loop_sock[0], SOL_SOCKET, SO_RCVBUF, &recvbuf_size, &optlen);
821 if (res == 0) {
823 "Local receive multicast loop socket recv buffer size (%d bytes).", recvbuf_size);
824 }
825
826 res = getsockopt (instance->local_loop_sock[1], SOL_SOCKET, SO_SNDBUF, &sendbuf_size, &optlen);
827 if (res == 0) {
829 "Local transmit multicast loop socket send buffer size (%d bytes).", sendbuf_size);
830 }
831
832 return (0);
833}
834
835static int totemudpu_build_sockets (
836 struct totemudpu_instance *instance,
837 struct totem_ip_address *bindnet_address,
838 struct totem_ip_address *bound_to)
839{
840 int interface_num;
841 int interface_up;
842 int res;
843
844 /*
845 * Determine the ip address bound to and the interface name
846 */
847 res = netif_determine (instance,
848 bindnet_address,
849 bound_to,
850 &interface_up,
851 &interface_num);
852
853 if (res == -1) {
854 return (-1);
855 }
856
857 totemip_copy(&instance->my_id, bound_to);
858
859 res = totemudpu_build_sockets_ip (instance,
860 bindnet_address, bound_to, interface_num);
861
862 if (res == -1) {
863 /* if we get here, corosync won't work anyway, so better leaving than faking to work */
865 "Unable to create sockets, exiting");
866 exit(EXIT_FAILURE);
867 }
868
869 /* We only send out of the token socket */
870 totemudpu_traffic_control_set(instance, instance->token_socket);
871
872 /*
873 * Rebind all members to new ips
874 */
876
877 return res;
878}
879
880/*
881 * Totem Network interface
882 * depends on poll abstraction, POSIX, IPV4
883 */
884
885/*
886 * Create an instance
887 */
889 qb_loop_t *poll_handle,
890 void **udpu_context,
892 totemsrp_stats_t *stats,
893 void *context,
894
895 int (*deliver_fn) (
896 void *context,
897 const void *msg,
898 unsigned int msg_len,
899 const struct sockaddr_storage *system_from),
900
901 int (*iface_change_fn) (
902 void *context,
903 const struct totem_ip_address *iface_address,
904 unsigned int ring_no),
905
906 void (*mtu_changed) (
907 void *context,
908 int net_mtu),
909
910 void (*target_set_completed) (
911 void *context))
912{
913 struct totemudpu_instance *instance;
914
915 instance = malloc (sizeof (struct totemudpu_instance));
916 if (instance == NULL) {
917 return (-1);
918 }
919
920 totemudpu_instance_initialize (instance);
921
922 instance->totem_config = totem_config;
923 instance->stats = stats;
924
925 /*
926 * Configure logging
927 */
928 instance->totemudpu_log_level_security = 1; //totem_config->totem_logging_configuration.log_level_security;
935
936 /*
937 * Initialize local variables for totemudpu
938 */
939 instance->totem_interface = &totem_config->interfaces[0];
940 memset (instance->iov_buffer, 0, UDP_RECEIVE_FRAME_SIZE_MAX + 1);
941
942 instance->totemudpu_poll_handle = poll_handle;
943
944 instance->totem_interface->bindnet.nodeid = instance->totem_config->node_id;
945
946 instance->context = context;
947 instance->totemudpu_deliver_fn = deliver_fn;
948
949 instance->totemudpu_iface_change_fn = iface_change_fn;
950
951 instance->totemudpu_target_set_completed = target_set_completed;
952
953 /*
954 * Create static local mcast sockets
955 */
956 if (totemudpu_build_local_sockets(instance) == -1) {
957 free(instance);
958 return (-1);
959 }
960
961 qb_loop_poll_add (
962 instance->totemudpu_poll_handle,
963 QB_LOOP_MED,
964 instance->local_loop_sock[0],
965 POLLIN, instance, net_deliver_fn);
966
967 /*
968 * RRP layer isn't ready to receive message because it hasn't
969 * initialized yet. Add short timer to check the interfaces.
970 */
971 qb_loop_timer_add (instance->totemudpu_poll_handle,
972 QB_LOOP_MED,
973 100*QB_TIME_NS_IN_MSEC,
974 (void *)instance,
975 timer_function_netif_check_timeout,
976 &instance->timer_netif_check_timeout);
977
978 totemudpu_start_merge_detect_timeout((void*)instance);
979
980 *udpu_context = instance;
981 return (0);
982}
983
985{
986 return malloc (FRAME_SIZE_MAX);
987}
988
990{
991 return free (ptr);
992}
993
995 void *udpu_context,
996 int processor_count)
997{
998 struct totemudpu_instance *instance = (struct totemudpu_instance *)udpu_context;
999 int res = 0;
1000
1001 instance->my_memb_entries = processor_count;
1002 qb_loop_timer_del (instance->totemudpu_poll_handle,
1003 instance->timer_netif_check_timeout);
1004 if (processor_count == 1) {
1005 qb_loop_timer_add (instance->totemudpu_poll_handle,
1006 QB_LOOP_MED,
1007 instance->totem_config->downcheck_timeout*QB_TIME_NS_IN_MSEC,
1008 (void *)instance,
1009 timer_function_netif_check_timeout,
1010 &instance->timer_netif_check_timeout);
1011 }
1012
1013 return (res);
1014}
1015
1017{
1018 int res = 0;
1019
1020 return (res);
1021}
1022
1024{
1025 int res = 0;
1026
1027 return (res);
1028}
1029
1031 void *udpu_context,
1032 const void *msg,
1033 unsigned int msg_len)
1034{
1035 struct totemudpu_instance *instance = (struct totemudpu_instance *)udpu_context;
1036 int res = 0;
1037
1038 ucast_sendmsg (instance, &instance->token_target, msg, msg_len);
1039
1040 return (res);
1041}
1043 void *udpu_context,
1044 const void *msg,
1045 unsigned int msg_len)
1046{
1047 struct totemudpu_instance *instance = (struct totemudpu_instance *)udpu_context;
1048 int res = 0;
1049
1050 mcast_sendmsg (instance, msg, msg_len, 0);
1051
1052 return (res);
1053}
1054
1056 void *udpu_context,
1057 const void *msg,
1058 unsigned int msg_len)
1059{
1060 struct totemudpu_instance *instance = (struct totemudpu_instance *)udpu_context;
1061 int res = 0;
1062
1063 mcast_sendmsg (instance, msg, msg_len, 1);
1064
1065 return (res);
1066}
1067
1069{
1070 struct totemudpu_instance *instance = (struct totemudpu_instance *)udpu_context;
1071 int res = 0;
1072
1073 timer_function_netif_check_timeout (instance);
1074
1075 return (res);
1076}
1077
1082
1083
1085 void *udpu_context,
1086 unsigned int nodeid)
1087{
1088
1089 struct totemudpu_instance *instance = (struct totemudpu_instance *)udpu_context;
1090 struct qb_list_head *list;
1091 struct totemudpu_member *member;
1092 int res = 0;
1093
1094 qb_list_for_each(list, &(instance->member_list)) {
1095 member = qb_list_entry (list,
1096 struct totemudpu_member,
1097 list);
1098
1099 if (member->member.nodeid == nodeid) {
1100 memcpy (&instance->token_target, &member->member,
1101 sizeof (struct totem_ip_address));
1102
1103 instance->totemudpu_target_set_completed (instance->context);
1104 break;
1105 }
1106 }
1107 return (res);
1108}
1109
1111 void *udpu_context)
1112{
1113 struct totemudpu_instance *instance = (struct totemudpu_instance *)udpu_context;
1114 unsigned int res;
1115 struct sockaddr_storage system_from;
1116 struct msghdr msg_recv;
1117 struct pollfd ufd;
1118 int nfds, i;
1119 int msg_processed = 0;
1120 int sock;
1121
1122 /*
1123 * Receive datagram
1124 */
1125 memset(&msg_recv, 0, sizeof(msg_recv));
1126 msg_recv.msg_name = &system_from;
1127 msg_recv.msg_namelen = sizeof (struct sockaddr_storage);
1128 msg_recv.msg_iov = &instance->totemudpu_iov_recv;
1129 msg_recv.msg_iovlen = 1;
1130
1131 for (i = 0; i < 2; i++) {
1132 sock = -1;
1133 if (i == 0) {
1134 if (instance->netif_bind_state == BIND_STATE_REGULAR) {
1135 sock = instance->token_socket;
1136 } else {
1137 continue;
1138 }
1139 }
1140 if (i == 1) {
1141 sock = instance->local_loop_sock[0];
1142 }
1143 assert(sock != -1);
1144
1145 do {
1146 ufd.fd = sock;
1147 ufd.events = POLLIN;
1148 nfds = poll (&ufd, 1, 0);
1149 if (nfds == 1 && ufd.revents & POLLIN) {
1150 res = recvmsg (sock, &msg_recv, MSG_NOSIGNAL | MSG_DONTWAIT);
1151 if (res != -1) {
1152 msg_processed = 1;
1153 } else {
1154 msg_processed = -1;
1155 }
1156 }
1157 } while (nfds == 1);
1158 }
1159
1160 return (msg_processed);
1161}
1162
1163static int totemudpu_create_sending_socket(
1164 void *udpu_context,
1165 const struct totem_ip_address *member)
1166{
1167 struct totemudpu_instance *instance = (struct totemudpu_instance *)udpu_context;
1168 int fd;
1169 int res;
1170 unsigned int sendbuf_size;
1171 unsigned int optlen = sizeof (sendbuf_size);
1172 struct sockaddr_storage sockaddr;
1173 int addrlen;
1174
1175 fd = socket (member->family, SOCK_DGRAM, 0);
1176 if (fd == -1) {
1178 "Could not create socket for new member");
1179 return (-1);
1180 }
1181 totemip_nosigpipe (fd);
1182 res = fcntl (fd, F_SETFL, O_NONBLOCK);
1183 if (res == -1) {
1185 "Could not set non-blocking operation on token socket");
1186 goto error_close_fd;
1187 }
1188
1189 /*
1190 * These sockets are used to send multicast messages, so their buffers
1191 * should be large
1192 */
1193 sendbuf_size = MCAST_SOCKET_BUFFER_SIZE;
1194 res = setsockopt (fd, SOL_SOCKET, SO_SNDBUF,
1195 &sendbuf_size, optlen);
1196 if (res == -1) {
1197 LOGSYS_PERROR (errno, instance->totemudpu_log_level_notice,
1198 "Could not set sendbuf size");
1199 /*
1200 * Fail in setting sendbuf size is not fatal -> don't exit
1201 */
1202 }
1203
1204 res = set_socket_dscp(fd, instance->totem_config->ip_dscp);
1205 if (res == -1) {
1206 LOGSYS_PERROR (errno, instance->totemudpu_log_level_notice,
1207 "Could not set IP_TOS bits");
1208 }
1209
1210 /*
1211 * Bind to sending interface
1212 */
1213 totemip_totemip_to_sockaddr_convert(&instance->my_id, 0, &sockaddr, &addrlen);
1214 res = bind (fd, (struct sockaddr *)&sockaddr, addrlen);
1215 if (res == -1) {
1217 "bind token socket failed");
1218 goto error_close_fd;
1219 }
1220
1221 return (fd);
1222
1223error_close_fd:
1224 close(fd);
1225 return (-1);
1226}
1227
1228int totemudpu_iface_set (void *net_context,
1229 const struct totem_ip_address *local_addr,
1230 unsigned short ip_port,
1231 unsigned int iface_no)
1232{
1233 /* Not supported */
1234 return (-1);
1235}
1236
1238 void *udpu_context,
1239 const struct totem_ip_address *local,
1240 const struct totem_ip_address *member,
1241 int ring_no)
1242{
1243 struct totemudpu_instance *instance = (struct totemudpu_instance *)udpu_context;
1244
1245 struct totemudpu_member *new_member;
1246
1247 new_member = malloc (sizeof (struct totemudpu_member));
1248 if (new_member == NULL) {
1249 return (-1);
1250 }
1251
1252 memset(new_member, 0, sizeof(*new_member));
1253
1254 log_printf (LOGSYS_LEVEL_NOTICE, "adding new UDPU member {%s}",
1256 qb_list_init (&new_member->list);
1257 qb_list_add_tail (&new_member->list, &instance->member_list);
1258 memcpy (&new_member->member, member, sizeof (struct totem_ip_address));
1259 new_member->fd = totemudpu_create_sending_socket(udpu_context, member);
1260 new_member->active = 1;
1261
1262 return (0);
1263}
1264
1266 void *udpu_context,
1267 const struct totem_ip_address *token_target,
1268 int ring_no)
1269{
1270 int found = 0;
1271 struct qb_list_head *list;
1272 struct totemudpu_member *member;
1273
1274 struct totemudpu_instance *instance = (struct totemudpu_instance *)udpu_context;
1275
1276 /*
1277 * Find the member to remove and close its socket
1278 */
1279 qb_list_for_each(list, &(instance->member_list)) {
1280 member = qb_list_entry (list,
1281 struct totemudpu_member,
1282 list);
1283
1284 if (totemip_compare (token_target, &member->member)==0) {
1286 "removing UDPU member {%s}",
1287 totemip_print(&member->member));
1288
1289 if (member->fd > 0) {
1291 "Closing socket to: {%s}",
1292 totemip_print(&member->member));
1293 qb_loop_poll_del (instance->totemudpu_poll_handle,
1294 member->fd);
1295 close (member->fd);
1296 }
1297 found = 1;
1298 break;
1299 }
1300 }
1301
1302 /*
1303 * Delete the member from the list
1304 */
1305 if (found) {
1306 qb_list_del (list);
1307 }
1308
1309 instance = NULL;
1310 return (0);
1311}
1312
1314 void *udpu_context)
1315{
1316 struct qb_list_head *list;
1317 struct totemudpu_member *member;
1318
1319 struct totemudpu_instance *instance = (struct totemudpu_instance *)udpu_context;
1320
1321 qb_list_for_each(list, &(instance->member_list)) {
1322 member = qb_list_entry (list,
1323 struct totemudpu_member,
1324 list);
1325
1326 if (member->fd > 0) {
1327 close (member->fd);
1328 }
1329
1330 member->fd = totemudpu_create_sending_socket(udpu_context, &member->member);
1331 }
1332
1333 return (0);
1334}
1335
1336
1337static void timer_function_merge_detect_timeout (
1338 void *data)
1339{
1340 struct totemudpu_instance *instance = (struct totemudpu_instance *)data;
1341
1342 if (instance->merge_detect_messages_sent_before_timeout == 0) {
1343 instance->send_merge_detect_message = 1;
1344 }
1345
1347
1348 totemudpu_start_merge_detect_timeout(instance);
1349}
1350
1351static void totemudpu_start_merge_detect_timeout(
1352 void *udpu_context)
1353{
1354 struct totemudpu_instance *instance = (struct totemudpu_instance *)udpu_context;
1355
1356 qb_loop_timer_add(instance->totemudpu_poll_handle,
1357 QB_LOOP_MED,
1358 instance->totem_config->merge_timeout * 2 * QB_TIME_NS_IN_MSEC,
1359 (void *)instance,
1360 timer_function_merge_detect_timeout,
1361 &instance->timer_merge_detect_timeout);
1362
1363}
1364
1365static void totemudpu_stop_merge_detect_timeout(
1366 void *udpu_context)
1367{
1368 struct totemudpu_instance *instance = (struct totemudpu_instance *)udpu_context;
1369
1370 qb_loop_timer_del(instance->totemudpu_poll_handle,
1371 instance->timer_merge_detect_timeout);
1372}
1373
1375 void *udpu_context,
1376 struct totem_config *totem_config)
1377{
1378 /* Not supported */
1379 return (-1);
1380}
#define INTERFACE_MAX
Definition coroapi.h:88
unsigned int nodeid
Definition coroapi.h:0
#define PROCESSOR_COUNT_MAX
Definition coroapi.h:96
@ COROSYNC_DONE_FATAL_ERR
Definition exec/util.h:55
#define LOGSYS_LEVEL_NOTICE
Definition logsys.h:74
#define LOGSYS_LEVEL_DEBUG
Definition logsys.h:76
unsigned int node_id
Definition totem.h:167
struct totem_logging_configuration totem_logging_configuration
Definition totem.h:208
unsigned int downcheck_timeout
Definition totem.h:200
struct totem_interface * interfaces
Definition totem.h:165
unsigned int clear_node_high_bit
Definition totem.h:168
unsigned int merge_timeout
Definition totem.h:198
unsigned char ip_dscp
Definition totem.h:250
unsigned int net_mtu
Definition totem.h:210
unsigned int block_unlisted_ips
Definition totem.h:246
struct totem_ip_address boundto
Definition totem.h:84
uint16_t ip_port
Definition totem.h:87
struct totem_ip_address bindnet
Definition totem.h:83
The totem_ip_address struct.
Definition coroapi.h:111
unsigned int nodeid
Definition coroapi.h:112
unsigned short family
Definition coroapi.h:113
void(* log_printf)(int level, int subsys, const char *function_name, const char *file_name, int file_line, const char *format,...) __attribute__((format(printf
Definition totem.h:101
uint8_t reachable
Definition totem.h:270
unsigned int nodeid
Definition totem.h:269
struct knet_link_status link_status[KNET_MAX_LINK]
Definition totem.h:276
int(* totemudpu_deliver_fn)(void *context, const void *msg, unsigned int msg_len, const struct sockaddr_storage *system_from)
Definition totemudpu.c:103
struct totem_config * totem_config
Definition totemudpu.c:168
qb_loop_t * totemudpu_poll_handle
Definition totemudpu.c:93
void(* totemudpu_log_printf)(int level, int subsys, const char *function, const char *file, int line, const char *format,...) __attribute__((format(printf
Definition totemudpu.c:131
char iov_buffer[UDP_RECEIVE_FRAME_SIZE_MAX+1]
Definition totemudpu.c:142
struct totem_ip_address my_id
Definition totemudpu.c:160
int totemudpu_log_level_notice
Definition totemudpu.c:125
qb_loop_timer_handle timer_netif_check_timeout
Definition totemudpu.c:164
int(* totemudpu_iface_change_fn)(void *context, const struct totem_ip_address *iface_address, unsigned int ring_no)
Definition totemudpu.c:109
void void * udpu_context
Definition totemudpu.c:140
qb_loop_timer_handle timer_merge_detect_timeout
Definition totemudpu.c:178
unsigned int my_memb_entries
Definition totemudpu.c:166
int totemudpu_log_level_security
Definition totemudpu.c:119
struct totem_ip_address token_target
Definition totemudpu.c:172
unsigned int merge_detect_messages_sent_before_timeout
Definition totemudpu.c:182
int send_merge_detect_message
Definition totemudpu.c:180
struct timeval stats_tv_start
Definition totemudpu.c:158
totemsrp_stats_t * stats
Definition totemudpu.c:170
int totemudpu_log_level_warning
Definition totemudpu.c:123
struct iovec totemudpu_iov_recv
Definition totemudpu.c:144
struct totem_interface * totem_interface
Definition totemudpu.c:95
int totemudpu_log_level_debug
Definition totemudpu.c:127
int totemudpu_log_level_error
Definition totemudpu.c:121
void(* totemudpu_target_set_completed)(void *context)
Definition totemudpu.c:114
struct qb_list_head member_list
Definition totemudpu.c:146
struct totem_ip_address member
Definition totemudpu.c:87
struct qb_list_head list
Definition totemudpu.c:86
const void * msg
Definition totemknet.c:191
unsigned int msg_len
Definition totemknet.c:192
struct totemknet_instance * instance
Definition totemknet.c:193
typedef __attribute__
#define FRAME_SIZE_MAX
Definition totem.h:52
#define BIND_RETRIES_INTERVAL
Definition totem.h:71
#define BIND_MAX_RETRIES
Definition totem.h:70
#define UDP_RECEIVE_FRAME_SIZE_MAX
Definition totem.h:62
const char * totemip_sa_print(const struct sockaddr *sa)
Definition totemip.c:234
int totemip_sa_equal(const struct totem_ip_address *totem_ip, const struct sockaddr *sa)
Definition totemip.c:95
int totemip_iface_check(struct totem_ip_address *bindnet, struct totem_ip_address *boundto, int *interface_up, int *interface_num, int mask_high_bit)
Definition totemip.c:529
#define totemip_nosigpipe(s)
Definition totemip.h:56
void totemip_copy(struct totem_ip_address *addr1, const struct totem_ip_address *addr2)
Definition totemip.c:123
size_t totemip_udpip_header_size(int family)
Definition totemip.c:608
int totemip_compare(const void *a, const void *b)
Definition totemip.c:150
int totemip_totemip_to_sockaddr_convert(struct totem_ip_address *ip_addr, uint16_t port, struct sockaddr_storage *saddr, int *addrlen)
Definition totemip.c:264
const char * totemip_print(const struct totem_ip_address *addr)
Definition totemip.c:256
#define MSG_NOSIGNAL
Definition totemknet.c:82
struct srp_addr system_from
Definition totemsrp.c:1
#define NETIF_STATE_REPORT_UP
Definition totemudp.c:77
#define BIND_STATE_LOOPBACK
Definition totemudp.c:82
#define NETIF_STATE_REPORT_DOWN
Definition totemudp.c:78
#define MCAST_SOCKET_BUFFER_SIZE
Definition totemudp.c:76
#define BIND_STATE_UNBOUND
Definition totemudp.c:80
#define BIND_STATE_REGULAR
Definition totemudp.c:81
void totemudpu_buffer_release(void *ptr)
Definition totemudpu.c:989
int totemudpu_ifaces_get(void *net_context, char ***status, unsigned int *iface_count)
Definition totemudpu.c:760
#define log_printf(level, format, args...)
Definition totemudpu.c:227
int totemudpu_send_flush(void *udpu_context)
Definition totemudpu.c:1023
int totemudpu_token_target_set(void *udpu_context, unsigned int nodeid)
Definition totemudpu.c:1084
void * totemudpu_buffer_alloc(void)
Definition totemudpu.c:984
int totemudpu_member_list_rebind_ip(void *udpu_context)
Definition totemudpu.c:1313
int totemudpu_recv_mcast_empty(void *udpu_context)
Definition totemudpu.c:1110
int totemudpu_processor_count_set(void *udpu_context, int processor_count)
Definition totemudpu.c:994
int totemudpu_token_send(void *udpu_context, const void *msg, unsigned int msg_len)
Definition totemudpu.c:1030
int totemudpu_iface_set(void *net_context, const struct totem_ip_address *local_addr, unsigned short ip_port, unsigned int iface_no)
Definition totemudpu.c:1228
int totemudpu_crypto_set(void *udpu_context, const char *cipher_type, const char *hash_type)
Definition totemudpu.c:244
int totemudpu_recv_flush(void *udpu_context)
Definition totemudpu.c:1016
int totemudpu_mcast_noflush_send(void *udpu_context, const void *msg, unsigned int msg_len)
Definition totemudpu.c:1055
int totemudpu_member_remove(void *udpu_context, const struct totem_ip_address *token_target, int ring_no)
Definition totemudpu.c:1265
int totemudpu_finalize(void *udpu_context)
Definition totemudpu.c:377
int totemudpu_mcast_flush_send(void *udpu_context, const void *msg, unsigned int msg_len)
Definition totemudpu.c:1042
int totemudpu_iface_check(void *udpu_context)
Definition totemudpu.c:1068
int totemudpu_reconfigure(void *udpu_context, struct totem_config *totem_config)
Definition totemudpu.c:1374
#define LOGSYS_PERROR(err_num, level, fmt, args...)
Definition totemudpu.c:234
int totemudpu_initialize(qb_loop_t *poll_handle, void **udpu_context, struct totem_config *totem_config, totemsrp_stats_t *stats, void *context, int(*deliver_fn)(void *context, const void *msg, unsigned int msg_len, const struct sockaddr_storage *system_from), int(*iface_change_fn)(void *context, const struct totem_ip_address *iface_address, unsigned int ring_no), void(*mtu_changed)(void *context, int net_mtu), void(*target_set_completed)(void *context))
Create an instance.
Definition totemudpu.c:888
void totemudpu_net_mtu_adjust(void *udpu_context, struct totem_config *totem_config)
Definition totemudpu.c:1078
int totemudpu_nodestatus_get(void *udpu_context, unsigned int nodeid, struct totem_node_status *node_status)
Definition totemudpu.c:732
int totemudpu_member_add(void *udpu_context, const struct totem_ip_address *local, const struct totem_ip_address *member, int ring_no)
Definition totemudpu.c:1237
int set_socket_dscp(int socket, unsigned char dscp)
Definition util.c:358