42#include <sys/socket.h>
47#include <netinet/in.h>
67#define LOGSYS_UTILS_ONLY 1
77#define MCAST_SOCKET_BUFFER_SIZE (TRANSMITS_ALLOWED * UDP_RECEIVE_FRAME_SIZE_MAX)
78#define NETIF_STATE_REPORT_UP 1
79#define NETIF_STATE_REPORT_DOWN 2
81#define BIND_STATE_UNBOUND 0
82#define BIND_STATE_REGULAR 1
83#define BIND_STATE_LOOPBACK 2
106 unsigned int msg_len,
112 unsigned int ring_no);
134 const char *function,
191static int totemudpu_build_sockets (
196static int totemudpu_create_sending_socket(
203static void totemudpu_start_merge_detect_timeout(
206static void totemudpu_stop_merge_detect_timeout(
227#define log_printf(level, format, args...) \
229 instance->totemudpu_log_printf ( \
230 level, instance->totemudpu_subsys_id, \
231 __FUNCTION__, __FILE__, __LINE__, \
232 (const char *)format, ##args); \
234#define LOGSYS_PERROR(err_num, level, fmt, args...) \
236 char _error_str[LOGSYS_MAX_PERROR_MSG_LEN]; \
237 const char *_error_ptr = qb_strerror_r(err_num, _error_str, sizeof(_error_str)); \
238 instance->totemudpu_log_printf ( \
239 level, instance->totemudpu_subsys_id, \
240 __FUNCTION__, __FILE__, __LINE__, \
241 fmt ": %s (%d)", ##args, _error_ptr, err_num); \
246 const char *cipher_type,
247 const char *hash_type)
254static inline void ucast_sendmsg (
258 unsigned int msg_len)
260 struct msghdr msg_ucast;
262 struct sockaddr_storage sockaddr;
267 iovec.iov_base = (
void *)msg;
268 iovec.iov_len = msg_len;
275 memset(&msg_ucast, 0,
sizeof(msg_ucast));
276 msg_ucast.msg_name = &sockaddr;
277 msg_ucast.msg_namelen = addrlen;
278 msg_ucast.msg_iov = (
void *)&iovec;
279 msg_ucast.msg_iovlen = 1;
285 msg_ucast.msg_name = NULL;
286 msg_ucast.msg_namelen = 0;
297 "sendmsg(ucast) failed (non-critical)");
301static inline void mcast_sendmsg (
304 unsigned int msg_len,
307 struct msghdr msg_mcast;
310 struct sockaddr_storage sockaddr;
312 struct qb_list_head *list;
315 iovec.iov_base = (
void *)msg;
316 iovec.iov_len = msg_len;
318 memset(&msg_mcast, 0,
sizeof(msg_mcast));
334 memset(&msg_mcast, 0,
sizeof(msg_mcast));
335 msg_mcast.msg_name = &sockaddr;
336 msg_mcast.msg_namelen = addrlen;
337 msg_mcast.msg_iov = (
void *)&iovec;
338 msg_mcast.msg_iovlen = 1;
347 "sendmsg(mcast) failed (non-critical)");
363 msg_mcast.msg_name = NULL;
364 msg_mcast.msg_namelen = 0;
365 msg_mcast.msg_iov = (
void *)&iovec;
366 msg_mcast.msg_iovlen = 1;
372 "sendmsg(local mcast loop) failed (non-critical)");
396 totemudpu_stop_merge_detect_timeout(instance);
402 const void *udpu_context,
403 const struct sockaddr *sa)
405 struct qb_list_head *list;
413 member = qb_list_entry (list,
427static int net_deliver_fn (
433 struct msghdr msg_recv;
443 memset(&msg_recv, 0,
sizeof(msg_recv));
445 msg_recv.msg_namelen =
sizeof (
struct sockaddr_storage);
446 msg_recv.msg_iov = iovec;
447 msg_recv.msg_iovlen = 1;
449 bytes_received = recvmsg (fd, &msg_recv,
MSG_NOSIGNAL | MSG_DONTWAIT);
450 if (bytes_received == -1) {
463 "Received too big message. This may be because something bad is happening "
464 "on the network (attack?), or you tried join more nodes than corosync is "
465 "compiled with (%u) or bug in the code (bad estimation of "
472 find_member_by_sockaddr(instance, (
const struct sockaddr *)&
system_from) == NULL) {
479 iovec->iov_len = bytes_received;
494static int netif_determine (
504 interface_up, interface_num,
516static void timer_function_netif_check_timeout (
526 netif_determine (instance,
529 &interface_up, &interface_num);
535 interface_up == 0) ||
539 interface_up == 1)) {
545 timer_function_netif_check_timeout,
546 &instance->timer_netif_check_timeout);
561 if (interface_up == 0) {
564 "One of your ip addresses are now bound to localhost. "
565 "Corosync would not work correctly.");
581 timer_function_netif_check_timeout,
582 &instance->timer_netif_check_timeout);
592 totemudpu_build_sockets (instance,
600 POLLIN, instance, net_deliver_fn);
611 "The network interface [%s] is now up.",
624 timer_function_netif_check_timeout,
625 &instance->timer_netif_check_timeout);
631 "The network interface is down.");
641static void totemudpu_traffic_control_set(
struct totemudpu_instance *instance,
int sock)
646 if (setsockopt(sock, SOL_SOCKET, SO_PRIORITY, &prio,
sizeof(
int))) {
648 "Could not set traffic priority");
653static int totemudpu_build_sockets_ip (
659 struct sockaddr_storage sockaddr;
662 unsigned int recvbuf_size;
663 unsigned int optlen =
sizeof (recvbuf_size);
664 unsigned int retries = 0;
677 res = fcntl (instance->
token_socket, F_SETFL, O_NONBLOCK);
680 "Could not set non-blocking operation on token socket");
690 res = bind (instance->
token_socket, (
struct sockaddr *)&sockaddr, addrlen);
695 "bind token socket failed");
715 res = setsockopt (instance->
token_socket, SOL_SOCKET, SO_RCVBUF,
716 &recvbuf_size, optlen);
719 "Could not set recvbuf size");
726 "Could not set IP_TOS bits");
736 struct qb_list_head *list;
763 unsigned int *iface_count)
776static int totemudpu_build_local_sockets(
780 unsigned int sendbuf_size;
781 unsigned int recvbuf_size;
782 unsigned int optlen =
sizeof (sendbuf_size);
788 if (socketpair(AF_UNIX, SOCK_DGRAM, 0, instance->
local_loop_sock) == -1) {
794 for (i = 0; i < 2; i++) {
799 "Could not set non-blocking operation on multicast socket");
807 res = setsockopt (instance->
local_loop_sock[0], SOL_SOCKET, SO_RCVBUF, &recvbuf_size, optlen);
810 "Unable to set SO_RCVBUF size on UDP local mcast loop socket");
813 res = setsockopt (instance->
local_loop_sock[1], SOL_SOCKET, SO_SNDBUF, &sendbuf_size, optlen);
816 "Unable to set SO_SNDBUF size on UDP local mcast loop socket");
820 res = getsockopt (instance->
local_loop_sock[0], SOL_SOCKET, SO_RCVBUF, &recvbuf_size, &optlen);
823 "Local receive multicast loop socket recv buffer size (%d bytes).", recvbuf_size);
826 res = getsockopt (instance->
local_loop_sock[1], SOL_SOCKET, SO_SNDBUF, &sendbuf_size, &optlen);
829 "Local transmit multicast loop socket send buffer size (%d bytes).", sendbuf_size);
835static int totemudpu_build_sockets (
847 res = netif_determine (instance,
859 res = totemudpu_build_sockets_ip (instance,
860 bindnet_address, bound_to, interface_num);
865 "Unable to create sockets, exiting");
870 totemudpu_traffic_control_set(instance, instance->
token_socket);
889 qb_loop_t *poll_handle,
898 unsigned int msg_len,
901 int (*iface_change_fn) (
904 unsigned int ring_no),
906 void (*mtu_changed) (
910 void (*target_set_completed) (
916 if (instance == NULL) {
920 totemudpu_instance_initialize (instance);
956 if (totemudpu_build_local_sockets(instance) == -1) {
965 POLLIN, instance, net_deliver_fn);
973 100*QB_TIME_NS_IN_MSEC,
975 timer_function_netif_check_timeout,
978 totemudpu_start_merge_detect_timeout((
void*)instance);
1004 if (processor_count == 1) {
1009 timer_function_netif_check_timeout,
1033 unsigned int msg_len)
1038 ucast_sendmsg (instance, &instance->
token_target, msg, msg_len);
1045 unsigned int msg_len)
1050 mcast_sendmsg (instance, msg, msg_len, 0);
1058 unsigned int msg_len)
1063 mcast_sendmsg (instance, msg, msg_len, 1);
1073 timer_function_netif_check_timeout (instance);
1090 struct qb_list_head *list;
1116 struct msghdr msg_recv;
1119 int msg_processed = 0;
1125 memset(&msg_recv, 0,
sizeof(msg_recv));
1127 msg_recv.msg_namelen =
sizeof (
struct sockaddr_storage);
1129 msg_recv.msg_iovlen = 1;
1131 for (i = 0; i < 2; i++) {
1147 ufd.events = POLLIN;
1148 nfds = poll (&ufd, 1, 0);
1149 if (nfds == 1 && ufd.revents & POLLIN) {
1150 res = recvmsg (sock, &msg_recv,
MSG_NOSIGNAL | MSG_DONTWAIT);
1157 }
while (nfds == 1);
1160 return (msg_processed);
1163static int totemudpu_create_sending_socket(
1170 unsigned int sendbuf_size;
1171 unsigned int optlen =
sizeof (sendbuf_size);
1172 struct sockaddr_storage sockaddr;
1175 fd = socket (member->
family, SOCK_DGRAM, 0);
1178 "Could not create socket for new member");
1182 res = fcntl (fd, F_SETFL, O_NONBLOCK);
1185 "Could not set non-blocking operation on token socket");
1186 goto error_close_fd;
1194 res = setsockopt (fd, SOL_SOCKET, SO_SNDBUF,
1195 &sendbuf_size, optlen);
1198 "Could not set sendbuf size");
1207 "Could not set IP_TOS bits");
1214 res = bind (fd, (
struct sockaddr *)&sockaddr, addrlen);
1217 "bind token socket failed");
1218 goto error_close_fd;
1230 unsigned short ip_port,
1231 unsigned int iface_no)
1248 if (new_member == NULL) {
1252 memset(new_member, 0,
sizeof(*new_member));
1256 qb_list_init (&new_member->
list);
1259 new_member->
fd = totemudpu_create_sending_socket(udpu_context,
member);
1271 struct qb_list_head *list;
1279 qb_list_for_each(list, &(instance->
member_list)) {
1280 member = qb_list_entry (list,
1286 "removing UDPU member {%s}",
1289 if (member->
fd > 0) {
1291 "Closing socket to: {%s}",
1316 struct qb_list_head *list;
1321 qb_list_for_each(list, &(instance->
member_list)) {
1322 member = qb_list_entry (list,
1326 if (member->
fd > 0) {
1337static void timer_function_merge_detect_timeout (
1348 totemudpu_start_merge_detect_timeout(instance);
1351static void totemudpu_start_merge_detect_timeout(
1360 timer_function_merge_detect_timeout,
1361 &instance->timer_merge_detect_timeout);
1365static void totemudpu_stop_merge_detect_timeout(
#define PROCESSOR_COUNT_MAX
@ COROSYNC_DONE_FATAL_ERR
#define LOGSYS_LEVEL_NOTICE
#define LOGSYS_LEVEL_DEBUG
struct totem_logging_configuration totem_logging_configuration
unsigned int downcheck_timeout
struct totem_interface * interfaces
unsigned int clear_node_high_bit
unsigned int merge_timeout
unsigned int block_unlisted_ips
struct totem_ip_address boundto
struct totem_ip_address bindnet
The totem_ip_address struct.
void(* log_printf)(int level, int subsys, const char *function_name, const char *file_name, int file_line, const char *format,...) __attribute__((format(printf
struct knet_link_status link_status[KNET_MAX_LINK]
int(* totemudpu_deliver_fn)(void *context, const void *msg, unsigned int msg_len, const struct sockaddr_storage *system_from)
struct totem_config * totem_config
qb_loop_t * totemudpu_poll_handle
void(* totemudpu_log_printf)(int level, int subsys, const char *function, const char *file, int line, const char *format,...) __attribute__((format(printf
char iov_buffer[UDP_RECEIVE_FRAME_SIZE_MAX+1]
struct totem_ip_address my_id
int totemudpu_log_level_notice
qb_loop_timer_handle timer_netif_check_timeout
int(* totemudpu_iface_change_fn)(void *context, const struct totem_ip_address *iface_address, unsigned int ring_no)
qb_loop_timer_handle timer_merge_detect_timeout
unsigned int my_memb_entries
int totemudpu_log_level_security
struct totem_ip_address token_target
unsigned int merge_detect_messages_sent_before_timeout
int send_merge_detect_message
struct timeval stats_tv_start
int totemudpu_log_level_warning
struct iovec totemudpu_iov_recv
struct totem_interface * totem_interface
int totemudpu_log_level_debug
int totemudpu_log_level_error
void(* totemudpu_target_set_completed)(void *context)
struct qb_list_head member_list
struct totem_ip_address member
struct totemknet_instance * instance
#define BIND_RETRIES_INTERVAL
#define UDP_RECEIVE_FRAME_SIZE_MAX
const char * totemip_sa_print(const struct sockaddr *sa)
int totemip_sa_equal(const struct totem_ip_address *totem_ip, const struct sockaddr *sa)
int totemip_iface_check(struct totem_ip_address *bindnet, struct totem_ip_address *boundto, int *interface_up, int *interface_num, int mask_high_bit)
#define totemip_nosigpipe(s)
void totemip_copy(struct totem_ip_address *addr1, const struct totem_ip_address *addr2)
size_t totemip_udpip_header_size(int family)
int totemip_compare(const void *a, const void *b)
int totemip_totemip_to_sockaddr_convert(struct totem_ip_address *ip_addr, uint16_t port, struct sockaddr_storage *saddr, int *addrlen)
const char * totemip_print(const struct totem_ip_address *addr)
struct srp_addr system_from
#define NETIF_STATE_REPORT_UP
#define BIND_STATE_LOOPBACK
#define NETIF_STATE_REPORT_DOWN
#define MCAST_SOCKET_BUFFER_SIZE
#define BIND_STATE_UNBOUND
#define BIND_STATE_REGULAR
void totemudpu_buffer_release(void *ptr)
int totemudpu_ifaces_get(void *net_context, char ***status, unsigned int *iface_count)
#define log_printf(level, format, args...)
int totemudpu_send_flush(void *udpu_context)
int totemudpu_token_target_set(void *udpu_context, unsigned int nodeid)
void * totemudpu_buffer_alloc(void)
int totemudpu_member_list_rebind_ip(void *udpu_context)
int totemudpu_recv_mcast_empty(void *udpu_context)
int totemudpu_processor_count_set(void *udpu_context, int processor_count)
int totemudpu_token_send(void *udpu_context, const void *msg, unsigned int msg_len)
int totemudpu_iface_set(void *net_context, const struct totem_ip_address *local_addr, unsigned short ip_port, unsigned int iface_no)
int totemudpu_crypto_set(void *udpu_context, const char *cipher_type, const char *hash_type)
int totemudpu_recv_flush(void *udpu_context)
int totemudpu_mcast_noflush_send(void *udpu_context, const void *msg, unsigned int msg_len)
int totemudpu_member_remove(void *udpu_context, const struct totem_ip_address *token_target, int ring_no)
int totemudpu_finalize(void *udpu_context)
int totemudpu_mcast_flush_send(void *udpu_context, const void *msg, unsigned int msg_len)
int totemudpu_iface_check(void *udpu_context)
int totemudpu_reconfigure(void *udpu_context, struct totem_config *totem_config)
#define LOGSYS_PERROR(err_num, level, fmt, args...)
int totemudpu_initialize(qb_loop_t *poll_handle, void **udpu_context, struct totem_config *totem_config, totemsrp_stats_t *stats, void *context, int(*deliver_fn)(void *context, const void *msg, unsigned int msg_len, const struct sockaddr_storage *system_from), int(*iface_change_fn)(void *context, const struct totem_ip_address *iface_address, unsigned int ring_no), void(*mtu_changed)(void *context, int net_mtu), void(*target_set_completed)(void *context))
Create an instance.
void totemudpu_net_mtu_adjust(void *udpu_context, struct totem_config *totem_config)
int totemudpu_nodestatus_get(void *udpu_context, unsigned int nodeid, struct totem_node_status *node_status)
int totemudpu_member_add(void *udpu_context, const struct totem_ip_address *local, const struct totem_ip_address *member, int ring_no)
int set_socket_dscp(int socket, unsigned char dscp)