corosync 3.1.10
totemsrp.c
Go to the documentation of this file.
1/*
2 * Copyright (c) 2003-2006 MontaVista Software, Inc.
3 * Copyright (c) 2006-2018 Red Hat, Inc.
4 *
5 * All rights reserved.
6 *
7 * Author: Steven Dake (sdake@redhat.com)
8 *
9 * This software licensed under BSD license, the text of which follows:
10 *
11 * Redistribution and use in source and binary forms, with or without
12 * modification, are permitted provided that the following conditions are met:
13 *
14 * - Redistributions of source code must retain the above copyright notice,
15 * this list of conditions and the following disclaimer.
16 * - Redistributions in binary form must reproduce the above copyright notice,
17 * this list of conditions and the following disclaimer in the documentation
18 * and/or other materials provided with the distribution.
19 * - Neither the name of the MontaVista Software, Inc. nor the names of its
20 * contributors may be used to endorse or promote products derived from this
21 * software without specific prior written permission.
22 *
23 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
24 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
25 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
26 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
27 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
28 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
29 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
30 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
31 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
32 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF
33 * THE POSSIBILITY OF SUCH DAMAGE.
34 */
35
36/*
37 * The first version of this code was based upon Yair Amir's PhD thesis:
38 * https://corosync.github.io/corosync/doc/Yair_phd.ps.gz (ch4,5).
39 *
40 * The current version of totemsrp implements the Totem protocol specified in:
41 * https://corosync.github.io/corosync/doc/tocssrp95.ps.gz
42 *
43 * The deviations from the above published protocols are:
44 * - token hold mode where token doesn't rotate on unused ring - reduces cpu
45 * usage on 1.6ghz xeon from 35% to less then .1 % as measured by top
46 */
47
48#include <config.h>
49
50#include <assert.h>
51#ifdef HAVE_ALLOCA_H
52#include <alloca.h>
53#endif
54#include <sys/mman.h>
55#include <sys/types.h>
56#include <sys/stat.h>
57#include <sys/socket.h>
58#include <netdb.h>
59#include <sys/un.h>
60#include <sys/ioctl.h>
61#include <sys/param.h>
62#include <netinet/in.h>
63#include <arpa/inet.h>
64#include <unistd.h>
65#include <fcntl.h>
66#include <stdlib.h>
67#include <stdio.h>
68#include <errno.h>
69#include <sched.h>
70#include <time.h>
71#include <sys/time.h>
72#include <sys/poll.h>
73#include <sys/uio.h>
74#include <limits.h>
75
76#include <qb/qblist.h>
77#include <qb/qbdefs.h>
78#include <qb/qbutil.h>
79#include <qb/qbloop.h>
80
81#include <corosync/swab.h>
82#include <corosync/sq.h>
83
84#define LOGSYS_UTILS_ONLY 1
85#include <corosync/logsys.h>
86
87#include "totemsrp.h"
88#include "totemnet.h"
89
90#include "icmap.h"
91#include "totemconfig.h"
92
93#include "cs_queue.h"
94
95#define LOCALHOST_IP inet_addr("127.0.0.1")
96#define QUEUE_RTR_ITEMS_SIZE_MAX 16384 /* allow 16384 retransmit items */
97#define RETRANS_MESSAGE_QUEUE_SIZE_MAX 16384 /* allow 500 messages to be queued */
98#define RECEIVED_MESSAGE_QUEUE_SIZE_MAX 500 /* allow 500 messages to be queued */
99#define MAXIOVS 5
100#define RETRANSMIT_ENTRIES_MAX 30
101#define TOKEN_SIZE_MAX 64000 /* bytes */
102#define LEAVE_DUMMY_NODEID 0
103
104/*
105 * SRP address.
106 */
107struct srp_addr {
108 unsigned int nodeid;
109};
110
111/*
112 * Rollover handling:
113 * SEQNO_START_MSG is the starting sequence number after a new configuration
114 * This should remain zero, unless testing overflow in which case
115 * 0x7ffff000 and 0xfffff000 are good starting values.
116 *
117 * SEQNO_START_TOKEN is the starting sequence number after a new configuration
118 * for a token. This should remain zero, unless testing overflow in which
119 * case 07fffff00 or 0xffffff00 are good starting values.
120 */
121#define SEQNO_START_MSG 0x0
122#define SEQNO_START_TOKEN 0x0
123
124/*
125 * These can be used ot test different rollover points
126 * #define SEQNO_START_MSG 0xfffffe00
127 * #define SEQNO_START_TOKEN 0xfffffe00
128 */
129
130/*
131 * These can be used to test the error recovery algorithms
132 * #define TEST_DROP_ORF_TOKEN_PERCENTAGE 30
133 * #define TEST_DROP_COMMIT_TOKEN_PERCENTAGE 30
134 * #define TEST_DROP_MCAST_PERCENTAGE 50
135 * #define TEST_RECOVERY_MSG_COUNT 300
136 */
137
138/*
139 * we compare incoming messages to determine if their endian is
140 * different - if so convert them
141 *
142 * do not change
143 */
144#define ENDIAN_LOCAL 0xff22
145
147 MESSAGE_TYPE_ORF_TOKEN = 0, /* Ordering, Reliability, Flow (ORF) control Token */
148 MESSAGE_TYPE_MCAST = 1, /* ring ordered multicast message */
149 MESSAGE_TYPE_MEMB_MERGE_DETECT = 2, /* merge rings if there are available rings */
150 MESSAGE_TYPE_MEMB_JOIN = 3, /* membership join message */
151 MESSAGE_TYPE_MEMB_COMMIT_TOKEN = 4, /* membership commit token */
152 MESSAGE_TYPE_TOKEN_HOLD_CANCEL = 5, /* cancel the holding of the token */
153};
154
159
160/*
161 * New membership algorithm local variables
162 */
165 int set;
166};
167
168
170 struct qb_list_head list;
171 int (*callback_fn) (enum totem_callback_token_type type, const void *);
173 int delete;
174 void *data;
175};
176
177
179 int mcast;
180 int token;
181};
182
183struct mcast {
186 unsigned int seq;
189 unsigned int node_id;
191} __attribute__((packed));
192
193
194struct rtr_item {
196 unsigned int seq;
197}__attribute__((packed));
198
199
200struct orf_token {
202 unsigned int seq;
203 unsigned int token_seq;
204 unsigned int aru;
205 unsigned int aru_addr;
207 unsigned int backlog;
208 unsigned int fcc;
212}__attribute__((packed));
213
214
215struct memb_join {
218 unsigned int proc_list_entries;
220 unsigned long long ring_seq;
221 unsigned char end_of_memb_join[0];
222/*
223 * These parts of the data structure are dynamic:
224 * struct srp_addr proc_list[];
225 * struct srp_addr failed_list[];
226 */
227} __attribute__((packed));
228
229
235
236
241
242
245 unsigned int aru;
246 unsigned int high_delivered;
247 unsigned int received_flg;
248}__attribute__((packed));
249
250
253 unsigned int token_seq;
255 unsigned int retrans_flg;
258 unsigned char end_of_commit_token[0];
259/*
260 * These parts of the data structure are dynamic:
261 *
262 * struct srp_addr addr[PROCESSOR_COUNT_MAX];
263 * struct memb_commit_token_memb_entry memb_list[PROCESSOR_COUNT_MAX];
264 */
265}__attribute__((packed));
266
268 struct mcast *mcast;
269 unsigned int msg_len;
271
273 struct mcast *mcast;
274 unsigned int msg_len;
275};
276
283
286
288
289 /*
290 * Flow control mcasts and remcasts on last and current orf_token
291 */
293
295
297
299
301
303
305
307
309
311
313
315
317
319
321
323
325
327
329
331
333
335
337
339
341
343
345
347
348 unsigned int my_last_aru;
349
351
353
355
356 unsigned int my_install_seq;
357
359
361
363
365
367
368 /*
369 * Queues used to order, deliver, and recover messages
370 */
372
374
376
378
380
381 /*
382 * Received up to and including
383 */
384 unsigned int my_aru;
385
386 unsigned int my_high_delivered;
387
389
390 struct qb_list_head token_callback_sent_listhead;
391
393
395
396 unsigned int my_token_seq;
397
398 /*
399 * Timers
400 */
401 qb_loop_timer_handle timer_pause_timeout;
402
403 qb_loop_timer_handle timer_orf_token_timeout;
404
405 qb_loop_timer_handle timer_orf_token_warning;
406
408
410
411 qb_loop_timer_handle timer_merge_detect_timeout;
412
414
416
418
419 qb_loop_timer_handle timer_heartbeat_timeout;
420
421 /*
422 * Function and data used to log messages
423 */
425
427
429
431
433
435
437
439 int level,
440 int subsys,
441 const char *function,
442 const char *file,
443 int line,
444 const char *format, ...)__attribute__((format(printf, 6, 7)));;
445
447
448//TODO struct srp_addr next_memb;
449
451
453
455 unsigned int nodeid,
456 const void *msg,
457 unsigned int msg_len,
458 int endian_conversion_required);
459
461 enum totem_configuration_type configuration_type,
462 const unsigned int *member_list, size_t member_list_entries,
463 const unsigned int *left_list, size_t left_list_entries,
464 const unsigned int *joined_list, size_t joined_list_entries,
465 const struct memb_ring_id *ring_id);
466
468
471
474 unsigned int nodeid);
475
477 const struct memb_ring_id *memb_ring_id,
478 unsigned int nodeid);
479
481
483
484 unsigned long long token_ring_id_seq;
485
486 unsigned int last_released;
487
488 unsigned int set_aru;
489
491
493
495
496 unsigned int my_last_seq;
497
498 struct timeval tv_old;
499
501
503
504 unsigned int use_heartbeat;
505
506 unsigned int my_trc;
507
508 unsigned int my_pbl;
509
510 unsigned int my_cbl;
511
513
515
517
519
521
523
525
527
531};
532
534 int count;
536 struct totemsrp_instance *instance,
537 const void *msg,
538 size_t msg_len,
539 int endian_conversion_needed);
540};
541
561
562const char* gather_state_from_desc [] = {
563 [TOTEMSRP_GSFROM_CONSENSUS_TIMEOUT] = "consensus timeout",
565 [TOTEMSRP_GSFROM_THE_TOKEN_WAS_LOST_IN_THE_OPERATIONAL_STATE] = "The token was lost in the OPERATIONAL state.",
566 [TOTEMSRP_GSFROM_THE_CONSENSUS_TIMEOUT_EXPIRED] = "The consensus timeout expired.",
567 [TOTEMSRP_GSFROM_THE_TOKEN_WAS_LOST_IN_THE_COMMIT_STATE] = "The token was lost in the COMMIT state.",
568 [TOTEMSRP_GSFROM_THE_TOKEN_WAS_LOST_IN_THE_RECOVERY_STATE] = "The token was lost in the RECOVERY state.",
569 [TOTEMSRP_GSFROM_FAILED_TO_RECEIVE] = "failed to receive",
570 [TOTEMSRP_GSFROM_FOREIGN_MESSAGE_IN_OPERATIONAL_STATE] = "foreign message in operational state",
571 [TOTEMSRP_GSFROM_FOREIGN_MESSAGE_IN_GATHER_STATE] = "foreign message in gather state",
572 [TOTEMSRP_GSFROM_MERGE_DURING_OPERATIONAL_STATE] = "merge during operational state",
573 [TOTEMSRP_GSFROM_MERGE_DURING_GATHER_STATE] = "merge during gather state",
574 [TOTEMSRP_GSFROM_MERGE_DURING_JOIN] = "merge during join",
575 [TOTEMSRP_GSFROM_JOIN_DURING_OPERATIONAL_STATE] = "join during operational state",
576 [TOTEMSRP_GSFROM_JOIN_DURING_COMMIT_STATE] = "join during commit state",
577 [TOTEMSRP_GSFROM_JOIN_DURING_RECOVERY] = "join during recovery",
578 [TOTEMSRP_GSFROM_INTERFACE_CHANGE] = "interface change",
579};
580
581/*
582 * forward decls
583 */
584static int message_handler_orf_token (
585 struct totemsrp_instance *instance,
586 const void *msg,
587 size_t msg_len,
588 int endian_conversion_needed);
589
590static int message_handler_mcast (
591 struct totemsrp_instance *instance,
592 const void *msg,
593 size_t msg_len,
594 int endian_conversion_needed);
595
596static int message_handler_memb_merge_detect (
597 struct totemsrp_instance *instance,
598 const void *msg,
599 size_t msg_len,
600 int endian_conversion_needed);
601
602static int message_handler_memb_join (
603 struct totemsrp_instance *instance,
604 const void *msg,
605 size_t msg_len,
606 int endian_conversion_needed);
607
608static int message_handler_memb_commit_token (
609 struct totemsrp_instance *instance,
610 const void *msg,
611 size_t msg_len,
612 int endian_conversion_needed);
613
614static int message_handler_token_hold_cancel (
615 struct totemsrp_instance *instance,
616 const void *msg,
617 size_t msg_len,
618 int endian_conversion_needed);
619
620static void totemsrp_instance_initialize (struct totemsrp_instance *instance);
621
622static void srp_addr_to_nodeid (
623 struct totemsrp_instance *instance,
624 unsigned int *nodeid_out,
625 struct srp_addr *srp_addr_in,
626 unsigned int entries);
627
628static int srp_addr_equal (const struct srp_addr *a, const struct srp_addr *b);
629
630static void memb_leave_message_send (struct totemsrp_instance *instance);
631
632static void token_callbacks_execute (struct totemsrp_instance *instance, enum totem_callback_token_type type);
633static void memb_state_gather_enter (struct totemsrp_instance *instance, enum gather_state_from gather_from);
634static void messages_deliver_to_app (struct totemsrp_instance *instance, int skip, unsigned int end_point);
635static int orf_token_mcast (struct totemsrp_instance *instance, struct orf_token *oken,
636 int fcc_mcasts_allowed);
637static void messages_free (struct totemsrp_instance *instance, unsigned int token_aru);
638
639static void memb_ring_id_set (struct totemsrp_instance *instance,
640 const struct memb_ring_id *ring_id);
641static void target_set_completed (void *context);
642static void memb_state_commit_token_update (struct totemsrp_instance *instance);
643static void memb_state_commit_token_target_set (struct totemsrp_instance *instance);
644static int memb_state_commit_token_send (struct totemsrp_instance *instance);
645static int memb_state_commit_token_send_recovery (struct totemsrp_instance *instance, struct memb_commit_token *memb_commit_token);
646static void memb_state_commit_token_create (struct totemsrp_instance *instance);
647static int token_hold_cancel_send (struct totemsrp_instance *instance);
648static void orf_token_endian_convert (const struct orf_token *in, struct orf_token *out);
649static void memb_commit_token_endian_convert (const struct memb_commit_token *in, struct memb_commit_token *out);
650static void memb_join_endian_convert (const struct memb_join *in, struct memb_join *out);
651static void mcast_endian_convert (const struct mcast *in, struct mcast *out);
652static void memb_merge_detect_endian_convert (
653 const struct memb_merge_detect *in,
654 struct memb_merge_detect *out);
655static struct srp_addr srp_addr_endian_convert (struct srp_addr in);
656static void timer_function_orf_token_timeout (void *data);
657static void timer_function_orf_token_warning (void *data);
658static void timer_function_pause_timeout (void *data);
659static void timer_function_heartbeat_timeout (void *data);
660static void timer_function_token_retransmit_timeout (void *data);
661static void timer_function_token_hold_retransmit_timeout (void *data);
662static void timer_function_merge_detect_timeout (void *data);
663static void *totemsrp_buffer_alloc (struct totemsrp_instance *instance);
664static void totemsrp_buffer_release (struct totemsrp_instance *instance, void *ptr);
665static const char* gsfrom_to_msg(enum gather_state_from gsfrom);
666
667int main_deliver_fn (
668 void *context,
669 const void *msg,
670 unsigned int msg_len,
671 const struct sockaddr_storage *system_from);
672
674 void *context,
675 const struct totem_ip_address *iface_address,
676 unsigned int iface_no);
677
679 6,
680 {
681 message_handler_orf_token, /* MESSAGE_TYPE_ORF_TOKEN */
682 message_handler_mcast, /* MESSAGE_TYPE_MCAST */
683 message_handler_memb_merge_detect, /* MESSAGE_TYPE_MEMB_MERGE_DETECT */
684 message_handler_memb_join, /* MESSAGE_TYPE_MEMB_JOIN */
685 message_handler_memb_commit_token, /* MESSAGE_TYPE_MEMB_COMMIT_TOKEN */
686 message_handler_token_hold_cancel /* MESSAGE_TYPE_TOKEN_HOLD_CANCEL */
687 }
688};
689
690#define log_printf(level, format, args...) \
691do { \
692 instance->totemsrp_log_printf ( \
693 level, instance->totemsrp_subsys_id, \
694 __FUNCTION__, __FILE__, __LINE__, \
695 format, ##args); \
696} while (0);
697#define LOGSYS_PERROR(err_num, level, fmt, args...) \
698do { \
699 char _error_str[LOGSYS_MAX_PERROR_MSG_LEN]; \
700 const char *_error_ptr = qb_strerror_r(err_num, _error_str, sizeof(_error_str)); \
701 instance->totemsrp_log_printf ( \
702 level, instance->totemsrp_subsys_id, \
703 __FUNCTION__, __FILE__, __LINE__, \
704 fmt ": %s (%d)\n", ##args, _error_ptr, err_num); \
705 } while(0)
706
707static const char* gsfrom_to_msg(enum gather_state_from gsfrom)
708{
709 if (gsfrom <= TOTEMSRP_GSFROM_MAX) {
710 return gather_state_from_desc[gsfrom];
711 }
712 else {
713 return "UNKNOWN";
714 }
715}
716
717static void totemsrp_instance_initialize (struct totemsrp_instance *instance)
718{
719 memset (instance, 0, sizeof (struct totemsrp_instance));
720
721 qb_list_init (&instance->token_callback_received_listhead);
722
723 qb_list_init (&instance->token_callback_sent_listhead);
724
725 instance->my_received_flg = 1;
726
727 instance->my_token_seq = SEQNO_START_TOKEN - 1;
728
730
731 instance->set_aru = -1;
732
733 instance->my_aru = SEQNO_START_MSG;
734
736
738
739 instance->orf_token_discard = 0;
740
741 instance->originated_orf_token = 0;
742
743 instance->commit_token = (struct memb_commit_token *)instance->commit_token_storage;
744
745 instance->waiting_trans_ack = 1;
746}
747
748static int pause_flush (struct totemsrp_instance *instance)
749{
750 uint64_t now_msec;
751 uint64_t timestamp_msec;
752 int res = 0;
753
754 now_msec = (qb_util_nano_current_get () / QB_TIME_NS_IN_MSEC);
755 timestamp_msec = instance->pause_timestamp / QB_TIME_NS_IN_MSEC;
756
757 if ((now_msec - timestamp_msec) > (instance->totem_config->token_timeout / 2)) {
759 "Process pause detected for %d ms, flushing membership messages.", (unsigned int)(now_msec - timestamp_msec));
760 /*
761 * -1 indicates an error from recvmsg
762 */
763 do {
765 } while (res == -1);
766 }
767 return (res);
768}
769
770static int token_event_stats_collector (enum totem_callback_token_type type, const void *void_instance)
771{
772 struct totemsrp_instance *instance = (struct totemsrp_instance *)void_instance;
773 uint64_t time_now;
774
775 time_now = (qb_util_nano_current_get() / QB_TIME_NS_IN_MSEC);
776
778 /* incr latest token the index */
779 if (instance->stats.latest_token == (TOTEM_TOKEN_STATS_MAX - 1))
780 instance->stats.latest_token = 0;
781 else
782 instance->stats.latest_token++;
783
784 if (instance->stats.earliest_token == instance->stats.latest_token) {
785 /* we have filled up the array, start overwriting */
786 if (instance->stats.earliest_token == (TOTEM_TOKEN_STATS_MAX - 1))
787 instance->stats.earliest_token = 0;
788 else
789 instance->stats.earliest_token++;
790
791 instance->stats.token[instance->stats.earliest_token].rx = 0;
792 instance->stats.token[instance->stats.earliest_token].tx = 0;
793 instance->stats.token[instance->stats.earliest_token].backlog_calc = 0;
794 }
795
796 instance->stats.token[instance->stats.latest_token].rx = time_now;
797 instance->stats.token[instance->stats.latest_token].tx = 0; /* in case we drop the token */
798 } else {
799 instance->stats.token[instance->stats.latest_token].tx = time_now;
800 }
801 return 0;
802}
803
804static void totempg_mtu_changed(void *context, int net_mtu)
805{
806 struct totemsrp_instance *instance = context;
807
808 instance->totem_config->net_mtu = net_mtu - 2 * sizeof (struct mcast);
809
811 "Net MTU changed to %d, new value is %d",
812 net_mtu, instance->totem_config->net_mtu);
813}
814
815/*
816 * Exported interfaces
817 */
819 qb_loop_t *poll_handle,
820 void **srp_context,
822 totempg_stats_t *stats,
823
824 void (*deliver_fn) (
825 unsigned int nodeid,
826 const void *msg,
827 unsigned int msg_len,
828 int endian_conversion_required),
829
830 void (*confchg_fn) (
831 enum totem_configuration_type configuration_type,
832 const unsigned int *member_list, size_t member_list_entries,
833 const unsigned int *left_list, size_t left_list_entries,
834 const unsigned int *joined_list, size_t joined_list_entries,
835 const struct memb_ring_id *ring_id),
836 void (*waiting_trans_ack_cb_fn) (
837 int waiting_trans_ack))
838{
839 struct totemsrp_instance *instance;
840 int res;
841
842 instance = malloc (sizeof (struct totemsrp_instance));
843 if (instance == NULL) {
844 goto error_exit;
845 }
846
847 totemsrp_instance_initialize (instance);
848
849 instance->totemsrp_waiting_trans_ack_cb_fn = waiting_trans_ack_cb_fn;
851
852 stats->srp = &instance->stats;
853 instance->stats.latest_token = 0;
854 instance->stats.earliest_token = 0;
855
856 instance->totem_config = totem_config;
857
858 /*
859 * Configure logging
860 */
869
870 /*
871 * Configure totem store and load functions
872 */
875
876 /*
877 * Initialize local variables for totemsrp
878 */
880
881 /*
882 * Display totem configuration
883 */
885 "Token Timeout (%d ms) retransmit timeout (%d ms)",
888 uint32_t token_warning_ms = totem_config->token_warning * totem_config->token_timeout / 100;
890 "Token warning every %d ms (%d%% of Token Timeout)",
891 token_warning_ms, totem_config->token_warning);
892 if (token_warning_ms < totem_config->token_retransmit_timeout)
894 "The token warning interval (%d ms) is less than the token retransmit timeout (%d ms) "
895 "which can lead to spurious token warnings. Consider increasing the token_warning parameter.",
896 token_warning_ms, totem_config->token_retransmit_timeout);
897 } else {
899 "Token warnings disabled");
900 }
902 "token hold (%d ms) retransmits before loss (%d retrans)",
905 "join (%d ms) send_join (%d ms) consensus (%d ms) merge (%d ms)",
909
912 "downcheck (%d ms) fail to recv const (%d msgs)",
915 "seqno unchanged const (%d rotations) Maximum network MTU %d", totem_config->seqno_unchanged_const, totem_config->net_mtu);
916
918 "window size per rotation (%d messages) maximum messages per rotation (%d messages)",
920
922 "missed count const (%d messages)",
924
926 "send threads (%d threads)", totem_config->threads);
927
929 "heartbeat_failures_allowed (%d)", totem_config->heartbeat_failures_allowed);
931 "max_network_delay (%d ms)", totem_config->max_network_delay);
932
933
934 cs_queue_init (&instance->retrans_message_queue, RETRANS_MESSAGE_QUEUE_SIZE_MAX,
935 sizeof (struct message_item), instance->threaded_mode_enabled);
936
937 sq_init (&instance->regular_sort_queue,
938 QUEUE_RTR_ITEMS_SIZE_MAX, sizeof (struct sort_queue_item), 0);
939
940 sq_init (&instance->recovery_sort_queue,
941 QUEUE_RTR_ITEMS_SIZE_MAX, sizeof (struct sort_queue_item), 0);
942
943 instance->totemsrp_poll_handle = poll_handle;
944
945 instance->totemsrp_deliver_fn = deliver_fn;
946
947 instance->totemsrp_confchg_fn = confchg_fn;
948 instance->use_heartbeat = 1;
949
950 timer_function_pause_timeout (instance);
951
954 "HeartBeat is Disabled. To enable set heartbeat_failures_allowed > 0");
955 instance->use_heartbeat = 0;
956 }
957
958 if (instance->use_heartbeat) {
959 instance->heartbeat_timeout
962
963 if (instance->heartbeat_timeout >= totem_config->token_timeout) {
965 "total heartbeat_timeout (%d ms) is not less than token timeout (%d ms)",
966 instance->heartbeat_timeout,
969 "heartbeat_timeout = heartbeat_failures_allowed * token_retransmit_timeout + max_network_delay");
971 "heartbeat timeout should be less than the token timeout. Heartbeat is disabled!!");
972 instance->use_heartbeat = 0;
973 }
974 else {
976 "total heartbeat_timeout (%d ms)", instance->heartbeat_timeout);
977 }
978 }
979
980 res = totemnet_initialize (
981 poll_handle,
982 &instance->totemnet_context,
984 stats->srp,
985 instance,
988 totempg_mtu_changed,
989 target_set_completed);
990 if (res == -1) {
991 goto error_exit;
992 }
993
994 instance->my_id.nodeid = instance->totem_config->interfaces[instance->lowest_active_if].boundto.nodeid;
995
996 /*
997 * Must have net_mtu adjusted by totemnet_initialize first
998 */
999 cs_queue_init (&instance->new_message_queue,
1001 sizeof (struct message_item), instance->threaded_mode_enabled);
1002
1003 cs_queue_init (&instance->new_message_queue_trans,
1005 sizeof (struct message_item), instance->threaded_mode_enabled);
1006
1008 &instance->token_recv_event_handle,
1010 0,
1011 token_event_stats_collector,
1012 instance);
1014 &instance->token_sent_event_handle,
1016 0,
1017 token_event_stats_collector,
1018 instance);
1019 *srp_context = instance;
1020 return (0);
1021
1022error_exit:
1023 return (-1);
1024}
1025
1027 void *srp_context)
1028{
1029 struct totemsrp_instance *instance = (struct totemsrp_instance *)srp_context;
1030
1031 memb_leave_message_send (instance);
1033 cs_queue_free (&instance->new_message_queue);
1034 cs_queue_free (&instance->new_message_queue_trans);
1035 cs_queue_free (&instance->retrans_message_queue);
1036 sq_free (&instance->regular_sort_queue);
1037 sq_free (&instance->recovery_sort_queue);
1038 free (instance);
1039}
1040
1042 void *srp_context,
1043 unsigned int nodeid,
1044 struct totem_node_status *node_status)
1045{
1046 struct totemsrp_instance *instance = (struct totemsrp_instance *)srp_context;
1047 int i;
1048
1050
1051 /* Fill in 'reachable' here as the lower level UDP[u] layers don't know */
1052 for (i = 0; i < instance->my_proc_list_entries; i++) {
1053 if (instance->my_proc_list[i].nodeid == nodeid) {
1054 node_status->reachable = 1;
1055 }
1056 }
1057
1058 return totemnet_nodestatus_get(instance->totemnet_context, nodeid, node_status);
1059}
1060
1061
1062/*
1063 * Return configured interfaces. interfaces is array of totem_ip addresses allocated by caller,
1064 * with interaces_size number of items. iface_count is final number of interfaces filled by this
1065 * function.
1066 *
1067 * Function returns 0 on success, otherwise if interfaces array is not big enough, -2 is returned,
1068 * and if interface was not found, -1 is returned.
1069 */
1071 void *srp_context,
1072 unsigned int nodeid,
1073 unsigned int *interface_id,
1074 struct totem_ip_address *interfaces,
1075 unsigned int interfaces_size,
1076 char ***status,
1077 unsigned int *iface_count)
1078{
1079 struct totemsrp_instance *instance = (struct totemsrp_instance *)srp_context;
1080 struct totem_ip_address *iface_ptr = interfaces;
1081 int res = 0;
1082 int i,n;
1083 int num_ifs = 0;
1084
1085 memset(interfaces, 0, sizeof(struct totem_ip_address) * interfaces_size);
1086 *iface_count = INTERFACE_MAX;
1087
1088 for (i=0; i<INTERFACE_MAX; i++) {
1089 for (n=0; n < instance->totem_config->interfaces[i].member_count; n++) {
1090 if (instance->totem_config->interfaces[i].configured &&
1091 instance->totem_config->interfaces[i].member_list[n].nodeid == nodeid) {
1092 memcpy(iface_ptr, &instance->totem_config->interfaces[i].member_list[n], sizeof(struct totem_ip_address));
1093 interface_id[num_ifs] = i;
1094 iface_ptr++;
1095 if (++num_ifs > interfaces_size) {
1096 res = -2;
1097 break;
1098 }
1099 }
1100 }
1101 }
1102
1103 totemnet_ifaces_get(instance->totemnet_context, status, iface_count);
1104 *iface_count = num_ifs;
1105 return (res);
1106}
1107
1109 void *srp_context,
1110 const char *cipher_type,
1111 const char *hash_type)
1112{
1113 struct totemsrp_instance *instance = (struct totemsrp_instance *)srp_context;
1114 int res;
1115
1116 res = totemnet_crypto_set(instance->totemnet_context, cipher_type, hash_type);
1117
1118 return (res);
1119}
1120
1121
1123 void *srp_context)
1124{
1125 struct totemsrp_instance *instance = (struct totemsrp_instance *)srp_context;
1126 unsigned int res;
1127
1128 res = instance->my_id.nodeid;
1129
1130 return (res);
1131}
1132
1134 void *srp_context)
1135{
1136 struct totemsrp_instance *instance = (struct totemsrp_instance *)srp_context;
1137 int res;
1138
1139 res = instance->totem_config->interfaces[instance->lowest_active_if].boundto.family;
1140
1141 return (res);
1142}
1143
1144
1145/*
1146 * Set operations for use by the membership algorithm
1147 */
1148static int srp_addr_equal (const struct srp_addr *a, const struct srp_addr *b)
1149{
1150 if (a->nodeid == b->nodeid) {
1151 return 1;
1152 }
1153 return 0;
1154}
1155
1156static void srp_addr_to_nodeid (
1157 struct totemsrp_instance *instance,
1158 unsigned int *nodeid_out,
1159 struct srp_addr *srp_addr_in,
1160 unsigned int entries)
1161{
1162 unsigned int i;
1163
1164 for (i = 0; i < entries; i++) {
1165 nodeid_out[i] = srp_addr_in[i].nodeid;
1166 }
1167}
1168
1169static struct srp_addr srp_addr_endian_convert (struct srp_addr in)
1170{
1171 struct srp_addr res;
1172
1173 res.nodeid = swab32 (in.nodeid);
1174
1175 return (res);
1176}
1177
1178static void memb_consensus_reset (struct totemsrp_instance *instance)
1179{
1180 instance->consensus_list_entries = 0;
1181}
1182
1183static void memb_set_subtract (
1184 struct srp_addr *out_list, int *out_list_entries,
1185 struct srp_addr *one_list, int one_list_entries,
1186 struct srp_addr *two_list, int two_list_entries)
1187{
1188 int found = 0;
1189 int i;
1190 int j;
1191
1192 *out_list_entries = 0;
1193
1194 for (i = 0; i < one_list_entries; i++) {
1195 for (j = 0; j < two_list_entries; j++) {
1196 if (srp_addr_equal (&one_list[i], &two_list[j])) {
1197 found = 1;
1198 break;
1199 }
1200 }
1201 if (found == 0) {
1202 out_list[*out_list_entries] = one_list[i];
1203 *out_list_entries = *out_list_entries + 1;
1204 }
1205 found = 0;
1206 }
1207}
1208
1209/*
1210 * Set consensus for a specific processor
1211 */
1212static void memb_consensus_set (
1213 struct totemsrp_instance *instance,
1214 const struct srp_addr *addr)
1215{
1216 int found = 0;
1217 int i;
1218
1219 for (i = 0; i < instance->consensus_list_entries; i++) {
1220 if (srp_addr_equal(addr, &instance->consensus_list[i].addr)) {
1221 found = 1;
1222 break; /* found entry */
1223 }
1224 }
1225 instance->consensus_list[i].addr = *addr;
1226 instance->consensus_list[i].set = 1;
1227 if (found == 0) {
1228 instance->consensus_list_entries++;
1229 }
1230 return;
1231}
1232
1233/*
1234 * Is consensus set for a specific processor
1235 */
1236static int memb_consensus_isset (
1237 struct totemsrp_instance *instance,
1238 const struct srp_addr *addr)
1239{
1240 int i;
1241
1242 for (i = 0; i < instance->consensus_list_entries; i++) {
1243 if (srp_addr_equal (addr, &instance->consensus_list[i].addr)) {
1244 return (instance->consensus_list[i].set);
1245 }
1246 }
1247 return (0);
1248}
1249
1250/*
1251 * Is consensus agreed upon based upon consensus database
1252 */
1253static int memb_consensus_agreed (
1254 struct totemsrp_instance *instance)
1255{
1256 struct srp_addr token_memb[PROCESSOR_COUNT_MAX];
1257 int token_memb_entries = 0;
1258 int agreed = 1;
1259 int i;
1260
1261 memb_set_subtract (token_memb, &token_memb_entries,
1262 instance->my_proc_list, instance->my_proc_list_entries,
1263 instance->my_failed_list, instance->my_failed_list_entries);
1264
1265 for (i = 0; i < token_memb_entries; i++) {
1266 if (memb_consensus_isset (instance, &token_memb[i]) == 0) {
1267 agreed = 0;
1268 break;
1269 }
1270 }
1271
1272 if (agreed && instance->failed_to_recv == 1) {
1273 /*
1274 * Both nodes agreed on our failure. We don't care how many proc list items left because we
1275 * will create single ring anyway.
1276 */
1277
1278 return (agreed);
1279 }
1280
1281 assert (token_memb_entries >= 1);
1282
1283 return (agreed);
1284}
1285
1286static void memb_consensus_notset (
1287 struct totemsrp_instance *instance,
1288 struct srp_addr *no_consensus_list,
1289 int *no_consensus_list_entries,
1290 struct srp_addr *comparison_list,
1291 int comparison_list_entries)
1292{
1293 int i;
1294
1295 *no_consensus_list_entries = 0;
1296
1297 for (i = 0; i < instance->my_proc_list_entries; i++) {
1298 if (memb_consensus_isset (instance, &instance->my_proc_list[i]) == 0) {
1299 no_consensus_list[*no_consensus_list_entries] = instance->my_proc_list[i];
1300 *no_consensus_list_entries = *no_consensus_list_entries + 1;
1301 }
1302 }
1303}
1304
1305/*
1306 * Is set1 equal to set2 Entries can be in different orders
1307 */
1308static int memb_set_equal (
1309 struct srp_addr *set1, int set1_entries,
1310 struct srp_addr *set2, int set2_entries)
1311{
1312 int i;
1313 int j;
1314
1315 int found = 0;
1316
1317 if (set1_entries != set2_entries) {
1318 return (0);
1319 }
1320 for (i = 0; i < set2_entries; i++) {
1321 for (j = 0; j < set1_entries; j++) {
1322 if (srp_addr_equal (&set1[j], &set2[i])) {
1323 found = 1;
1324 break;
1325 }
1326 }
1327 if (found == 0) {
1328 return (0);
1329 }
1330 found = 0;
1331 }
1332 return (1);
1333}
1334
1335/*
1336 * Is subset fully contained in fullset
1337 */
1338static int memb_set_subset (
1339 const struct srp_addr *subset, int subset_entries,
1340 const struct srp_addr *fullset, int fullset_entries)
1341{
1342 int i;
1343 int j;
1344 int found = 0;
1345
1346 if (subset_entries > fullset_entries) {
1347 return (0);
1348 }
1349 for (i = 0; i < subset_entries; i++) {
1350 for (j = 0; j < fullset_entries; j++) {
1351 if (srp_addr_equal (&subset[i], &fullset[j])) {
1352 found = 1;
1353 }
1354 }
1355 if (found == 0) {
1356 return (0);
1357 }
1358 found = 0;
1359 }
1360 return (1);
1361}
1362/*
1363 * merge subset into fullset taking care not to add duplicates
1364 */
1365static void memb_set_merge (
1366 const struct srp_addr *subset, int subset_entries,
1367 struct srp_addr *fullset, int *fullset_entries)
1368{
1369 int found = 0;
1370 int i;
1371 int j;
1372
1373 for (i = 0; i < subset_entries; i++) {
1374 for (j = 0; j < *fullset_entries; j++) {
1375 if (srp_addr_equal (&fullset[j], &subset[i])) {
1376 found = 1;
1377 break;
1378 }
1379 }
1380 if (found == 0) {
1381 fullset[*fullset_entries] = subset[i];
1382 *fullset_entries = *fullset_entries + 1;
1383 }
1384 found = 0;
1385 }
1386 return;
1387}
1388
1389static void memb_set_and_with_ring_id (
1390 struct srp_addr *set1,
1391 struct memb_ring_id *set1_ring_ids,
1392 int set1_entries,
1393 struct srp_addr *set2,
1394 int set2_entries,
1395 struct memb_ring_id *old_ring_id,
1396 struct srp_addr *and,
1397 int *and_entries)
1398{
1399 int i;
1400 int j;
1401 int found = 0;
1402
1403 *and_entries = 0;
1404
1405 for (i = 0; i < set2_entries; i++) {
1406 for (j = 0; j < set1_entries; j++) {
1407 if (srp_addr_equal (&set1[j], &set2[i])) {
1408 if (memcmp (&set1_ring_ids[j], old_ring_id, sizeof (struct memb_ring_id)) == 0) {
1409 found = 1;
1410 }
1411 break;
1412 }
1413 }
1414 if (found) {
1415 and[*and_entries] = set1[j];
1416 *and_entries = *and_entries + 1;
1417 }
1418 found = 0;
1419 }
1420 return;
1421}
1422
1423static void memb_set_log(
1424 struct totemsrp_instance *instance,
1425 int level,
1426 const char *string,
1427 struct srp_addr *list,
1428 int list_entries)
1429{
1430 char int_buf[32];
1431 char list_str[512];
1432 int i;
1433
1434 memset(list_str, 0, sizeof(list_str));
1435
1436 for (i = 0; i < list_entries; i++) {
1437 if (i == 0) {
1438 snprintf(int_buf, sizeof(int_buf), CS_PRI_NODE_ID, list[i].nodeid);
1439 } else {
1440 snprintf(int_buf, sizeof(int_buf), "," CS_PRI_NODE_ID, list[i].nodeid);
1441 }
1442
1443 if (strlen(list_str) + strlen(int_buf) >= sizeof(list_str)) {
1444 break ;
1445 }
1446 strcat(list_str, int_buf);
1447 }
1448
1449 log_printf(level, "List '%s' contains %d entries: %s", string, list_entries, list_str);
1450}
1451
1452static void my_leave_memb_clear(
1453 struct totemsrp_instance *instance)
1454{
1455 memset(instance->my_leave_memb_list, 0, sizeof(instance->my_leave_memb_list));
1456 instance->my_leave_memb_entries = 0;
1457}
1458
1459static unsigned int my_leave_memb_match(
1460 struct totemsrp_instance *instance,
1461 unsigned int nodeid)
1462{
1463 int i;
1464 unsigned int ret = 0;
1465
1466 for (i = 0; i < instance->my_leave_memb_entries; i++){
1467 if (instance->my_leave_memb_list[i] == nodeid){
1468 ret = nodeid;
1469 break;
1470 }
1471 }
1472 return ret;
1473}
1474
1475static void my_leave_memb_set(
1476 struct totemsrp_instance *instance,
1477 unsigned int nodeid)
1478{
1479 int i, found = 0;
1480 for (i = 0; i < instance->my_leave_memb_entries; i++){
1481 if (instance->my_leave_memb_list[i] == nodeid){
1482 found = 1;
1483 break;
1484 }
1485 }
1486 if (found == 1) {
1487 return;
1488 }
1489 if (instance->my_leave_memb_entries < (PROCESSOR_COUNT_MAX - 1)) {
1490 instance->my_leave_memb_list[instance->my_leave_memb_entries] = nodeid;
1491 instance->my_leave_memb_entries++;
1492 } else {
1494 "Cannot set LEAVE nodeid=" CS_PRI_NODE_ID, nodeid);
1495 }
1496}
1497
1498
1499static void *totemsrp_buffer_alloc (struct totemsrp_instance *instance)
1500{
1501 assert (instance != NULL);
1502 return totemnet_buffer_alloc (instance->totemnet_context);
1503}
1504
1505static void totemsrp_buffer_release (struct totemsrp_instance *instance, void *ptr)
1506{
1507 assert (instance != NULL);
1509}
1510
1511static void reset_token_retransmit_timeout (struct totemsrp_instance *instance)
1512{
1513 int32_t res;
1514
1515 qb_loop_timer_del (instance->totemsrp_poll_handle,
1517 res = qb_loop_timer_add (instance->totemsrp_poll_handle,
1518 QB_LOOP_MED,
1519 instance->totem_config->token_retransmit_timeout*QB_TIME_NS_IN_MSEC,
1520 (void *)instance,
1521 timer_function_token_retransmit_timeout,
1522 &instance->timer_orf_token_retransmit_timeout);
1523 if (res != 0) {
1524 log_printf(instance->totemsrp_log_level_error, "reset_token_retransmit_timeout - qb_loop_timer_add error : %d", res);
1525 }
1526
1527}
1528
1529static void start_merge_detect_timeout (struct totemsrp_instance *instance)
1530{
1531 int32_t res;
1532
1533 if (instance->my_merge_detect_timeout_outstanding == 0) {
1534 res = qb_loop_timer_add (instance->totemsrp_poll_handle,
1535 QB_LOOP_MED,
1536 instance->totem_config->merge_timeout*QB_TIME_NS_IN_MSEC,
1537 (void *)instance,
1538 timer_function_merge_detect_timeout,
1539 &instance->timer_merge_detect_timeout);
1540 if (res != 0) {
1541 log_printf(instance->totemsrp_log_level_error, "start_merge_detect_timeout - qb_loop_timer_add error : %d", res);
1542 }
1543
1545 }
1546}
1547
1548static void cancel_merge_detect_timeout (struct totemsrp_instance *instance)
1549{
1550 qb_loop_timer_del (instance->totemsrp_poll_handle, instance->timer_merge_detect_timeout);
1552}
1553
1554/*
1555 * ring_state_* is used to save and restore the sort queue
1556 * state when a recovery operation fails (and enters gather)
1557 */
1558static void old_ring_state_save (struct totemsrp_instance *instance)
1559{
1560 if (instance->old_ring_state_saved == 0) {
1561 instance->old_ring_state_saved = 1;
1562 memcpy (&instance->my_old_ring_id, &instance->my_ring_id,
1563 sizeof (struct memb_ring_id));
1564 instance->old_ring_state_aru = instance->my_aru;
1567 "Saving state aru %x high seq received %x",
1568 instance->my_aru, instance->my_high_seq_received);
1569 }
1570}
1571
1572static void old_ring_state_restore (struct totemsrp_instance *instance)
1573{
1574 instance->my_aru = instance->old_ring_state_aru;
1577 "Restoring instance->my_aru %x my high seq received %x",
1578 instance->my_aru, instance->my_high_seq_received);
1579}
1580
1581static void old_ring_state_reset (struct totemsrp_instance *instance)
1582{
1584 "Resetting old ring state");
1585 instance->old_ring_state_saved = 0;
1586}
1587
1588static void reset_pause_timeout (struct totemsrp_instance *instance)
1589{
1590 int32_t res;
1591
1592 qb_loop_timer_del (instance->totemsrp_poll_handle, instance->timer_pause_timeout);
1593 res = qb_loop_timer_add (instance->totemsrp_poll_handle,
1594 QB_LOOP_MED,
1595 instance->totem_config->token_timeout * QB_TIME_NS_IN_MSEC / 5,
1596 (void *)instance,
1597 timer_function_pause_timeout,
1598 &instance->timer_pause_timeout);
1599 if (res != 0) {
1600 log_printf(instance->totemsrp_log_level_error, "reset_pause_timeout - qb_loop_timer_add error : %d", res);
1601 }
1602}
1603
1604static void reset_token_warning (struct totemsrp_instance *instance) {
1605 int32_t res;
1606
1607 qb_loop_timer_del (instance->totemsrp_poll_handle, instance->timer_orf_token_warning);
1608 res = qb_loop_timer_add (instance->totemsrp_poll_handle,
1609 QB_LOOP_MED,
1610 instance->totem_config->token_warning * instance->totem_config->token_timeout / 100 * QB_TIME_NS_IN_MSEC,
1611 (void *)instance,
1612 timer_function_orf_token_warning,
1613 &instance->timer_orf_token_warning);
1614 if (res != 0) {
1615 log_printf(instance->totemsrp_log_level_error, "reset_token_warning - qb_loop_timer_add error : %d", res);
1616 }
1617}
1618
1619static void reset_token_timeout (struct totemsrp_instance *instance) {
1620 int32_t res;
1621
1622 qb_loop_timer_del (instance->totemsrp_poll_handle, instance->timer_orf_token_timeout);
1623 res = qb_loop_timer_add (instance->totemsrp_poll_handle,
1624 QB_LOOP_MED,
1625 instance->totem_config->token_timeout*QB_TIME_NS_IN_MSEC,
1626 (void *)instance,
1627 timer_function_orf_token_timeout,
1628 &instance->timer_orf_token_timeout);
1629 if (res != 0) {
1630 log_printf(instance->totemsrp_log_level_error, "reset_token_timeout - qb_loop_timer_add error : %d", res);
1631 }
1632
1633 if (instance->totem_config->token_warning)
1634 reset_token_warning(instance);
1635}
1636
1637static void reset_heartbeat_timeout (struct totemsrp_instance *instance) {
1638 int32_t res;
1639
1640 qb_loop_timer_del (instance->totemsrp_poll_handle, instance->timer_heartbeat_timeout);
1641 res = qb_loop_timer_add (instance->totemsrp_poll_handle,
1642 QB_LOOP_MED,
1643 instance->heartbeat_timeout*QB_TIME_NS_IN_MSEC,
1644 (void *)instance,
1645 timer_function_heartbeat_timeout,
1646 &instance->timer_heartbeat_timeout);
1647 if (res != 0) {
1648 log_printf(instance->totemsrp_log_level_error, "reset_heartbeat_timeout - qb_loop_timer_add error : %d", res);
1649 }
1650}
1651
1652
1653static void cancel_token_warning (struct totemsrp_instance *instance) {
1654 qb_loop_timer_del (instance->totemsrp_poll_handle, instance->timer_orf_token_warning);
1655}
1656
1657static void cancel_token_timeout (struct totemsrp_instance *instance) {
1658 qb_loop_timer_del (instance->totemsrp_poll_handle, instance->timer_orf_token_timeout);
1659
1660 if (instance->totem_config->token_warning)
1661 cancel_token_warning(instance);
1662}
1663
1664static void cancel_heartbeat_timeout (struct totemsrp_instance *instance) {
1665 qb_loop_timer_del (instance->totemsrp_poll_handle, instance->timer_heartbeat_timeout);
1666}
1667
1668static void cancel_token_retransmit_timeout (struct totemsrp_instance *instance)
1669{
1670 qb_loop_timer_del (instance->totemsrp_poll_handle, instance->timer_orf_token_retransmit_timeout);
1671}
1672
1673static void start_token_hold_retransmit_timeout (struct totemsrp_instance *instance)
1674{
1675 int32_t res;
1676
1677 res = qb_loop_timer_add (instance->totemsrp_poll_handle,
1678 QB_LOOP_MED,
1679 instance->totem_config->token_hold_timeout*QB_TIME_NS_IN_MSEC,
1680 (void *)instance,
1681 timer_function_token_hold_retransmit_timeout,
1682 &instance->timer_orf_token_hold_retransmit_timeout);
1683 if (res != 0) {
1684 log_printf(instance->totemsrp_log_level_error, "start_token_hold_retransmit_timeout - qb_loop_timer_add error : %d", res);
1685 }
1686}
1687
1688static void cancel_token_hold_retransmit_timeout (struct totemsrp_instance *instance)
1689{
1690 qb_loop_timer_del (instance->totemsrp_poll_handle,
1692}
1693
1694static void memb_state_consensus_timeout_expired (
1695 struct totemsrp_instance *instance)
1696{
1697 struct srp_addr no_consensus_list[PROCESSOR_COUNT_MAX];
1698 int no_consensus_list_entries;
1699
1700 instance->stats.consensus_timeouts++;
1701 if (memb_consensus_agreed (instance)) {
1702 memb_consensus_reset (instance);
1703
1704 memb_consensus_set (instance, &instance->my_id);
1705
1706 reset_token_timeout (instance); // REVIEWED
1707 } else {
1708 memb_consensus_notset (
1709 instance,
1710 no_consensus_list,
1711 &no_consensus_list_entries,
1712 instance->my_proc_list,
1713 instance->my_proc_list_entries);
1714
1715 memb_set_merge (no_consensus_list, no_consensus_list_entries,
1716 instance->my_failed_list, &instance->my_failed_list_entries);
1717 memb_state_gather_enter (instance, TOTEMSRP_GSFROM_CONSENSUS_TIMEOUT);
1718 }
1719}
1720
1721static void memb_join_message_send (struct totemsrp_instance *instance);
1722
1723static void memb_merge_detect_transmit (struct totemsrp_instance *instance);
1724
1725/*
1726 * Timers used for various states of the membership algorithm
1727 */
1728static void timer_function_pause_timeout (void *data)
1729{
1730 struct totemsrp_instance *instance = data;
1731
1732 instance->pause_timestamp = qb_util_nano_current_get ();
1733 reset_pause_timeout (instance);
1734}
1735
1736static void memb_recovery_state_token_loss (struct totemsrp_instance *instance)
1737{
1738 old_ring_state_restore (instance);
1739 memb_state_gather_enter (instance, TOTEMSRP_GSFROM_THE_TOKEN_WAS_LOST_IN_THE_RECOVERY_STATE);
1740 instance->stats.recovery_token_lost++;
1741}
1742
1743static void timer_function_orf_token_warning (void *data)
1744{
1745 struct totemsrp_instance *instance = data;
1746 uint64_t tv_diff;
1747
1748 /* need to protect against the case where token_warning is set to 0 dynamically */
1749 if (instance->totem_config->token_warning) {
1750 tv_diff = qb_util_nano_current_get () / QB_TIME_NS_IN_MSEC -
1751 instance->stats.token[instance->stats.latest_token].rx;
1753 "Token has not been received in %"PRIu64" ms", tv_diff);
1754 reset_token_warning(instance);
1755 } else {
1756 cancel_token_warning(instance);
1757 }
1758}
1759
1760static void timer_function_orf_token_timeout (void *data)
1761{
1762 struct totemsrp_instance *instance = data;
1763
1764 switch (instance->memb_state) {
1767 "The token was lost in the OPERATIONAL state.");
1769 "A processor failed, forming new configuration:"
1770 " token timed out (%ums), waiting %ums for consensus.",
1771 instance->totem_config->token_timeout,
1772 instance->totem_config->consensus_timeout);
1774 memb_state_gather_enter (instance, TOTEMSRP_GSFROM_THE_TOKEN_WAS_LOST_IN_THE_OPERATIONAL_STATE);
1775 instance->stats.operational_token_lost++;
1776 break;
1777
1778 case MEMB_STATE_GATHER:
1780 "The consensus timeout expired (%ums).",
1781 instance->totem_config->consensus_timeout);
1782 memb_state_consensus_timeout_expired (instance);
1783 memb_state_gather_enter (instance, TOTEMSRP_GSFROM_THE_CONSENSUS_TIMEOUT_EXPIRED);
1784 instance->stats.gather_token_lost++;
1785 break;
1786
1787 case MEMB_STATE_COMMIT:
1789 "The token was lost in the COMMIT state.");
1790 memb_state_gather_enter (instance, TOTEMSRP_GSFROM_THE_TOKEN_WAS_LOST_IN_THE_COMMIT_STATE);
1791 instance->stats.commit_token_lost++;
1792 break;
1793
1796 "The token was lost in the RECOVERY state.");
1797 memb_recovery_state_token_loss (instance);
1798 instance->orf_token_discard = 1;
1799 break;
1800 }
1801}
1802
1803static void timer_function_heartbeat_timeout (void *data)
1804{
1805 struct totemsrp_instance *instance = data;
1807 "HeartBeat Timer expired Invoking token loss mechanism in state %d ", instance->memb_state);
1808 timer_function_orf_token_timeout(data);
1809}
1810
1811static void memb_timer_function_state_gather (void *data)
1812{
1813 struct totemsrp_instance *instance = data;
1814 int32_t res;
1815
1816 switch (instance->memb_state) {
1819 assert (0); /* this should never happen */
1820 break;
1821 case MEMB_STATE_GATHER:
1822 case MEMB_STATE_COMMIT:
1823 memb_join_message_send (instance);
1824
1825 /*
1826 * Restart the join timeout
1827 `*/
1828 qb_loop_timer_del (instance->totemsrp_poll_handle, instance->memb_timer_state_gather_join_timeout);
1829
1830 res = qb_loop_timer_add (instance->totemsrp_poll_handle,
1831 QB_LOOP_MED,
1832 instance->totem_config->join_timeout*QB_TIME_NS_IN_MSEC,
1833 (void *)instance,
1834 memb_timer_function_state_gather,
1835 &instance->memb_timer_state_gather_join_timeout);
1836
1837 if (res != 0) {
1838 log_printf(instance->totemsrp_log_level_error, "memb_timer_function_state_gather - qb_loop_timer_add error : %d", res);
1839 }
1840 break;
1841 }
1842}
1843
1844static void memb_timer_function_gather_consensus_timeout (void *data)
1845{
1846 struct totemsrp_instance *instance = data;
1847 memb_state_consensus_timeout_expired (instance);
1848}
1849
1850static void deliver_messages_from_recovery_to_regular (struct totemsrp_instance *instance)
1851{
1852 unsigned int i;
1853 struct sort_queue_item *recovery_message_item;
1854 struct sort_queue_item regular_message_item;
1855 unsigned int range = 0;
1856 int res;
1857 void *ptr;
1858 struct mcast *mcast;
1859
1861 "recovery to regular %x-%x", SEQNO_START_MSG + 1, instance->my_aru);
1862
1863 range = instance->my_aru - SEQNO_START_MSG;
1864 /*
1865 * Move messages from recovery to regular sort queue
1866 */
1867// todo should i be initialized to 0 or 1 ?
1868 for (i = 1; i <= range; i++) {
1869 res = sq_item_get (&instance->recovery_sort_queue,
1870 i + SEQNO_START_MSG, &ptr);
1871 if (res != 0) {
1872 continue;
1873 }
1874 recovery_message_item = ptr;
1875
1876 /*
1877 * Convert recovery message into regular message
1878 */
1879 mcast = recovery_message_item->mcast;
1881 /*
1882 * Message is a recovery message encapsulated
1883 * in a new ring message
1884 */
1885 regular_message_item.mcast =
1886 (struct mcast *)(((char *)recovery_message_item->mcast) + sizeof (struct mcast));
1887 regular_message_item.msg_len =
1888 recovery_message_item->msg_len - sizeof (struct mcast);
1889 mcast = regular_message_item.mcast;
1890 } else {
1891 /*
1892 * TODO this case shouldn't happen
1893 */
1894 continue;
1895 }
1896
1898 "comparing if ring id is for this processors old ring seqno " CS_PRI_RING_ID_SEQ,
1899 (uint64_t)mcast->seq);
1900
1901 /*
1902 * Only add this message to the regular sort
1903 * queue if it was originated with the same ring
1904 * id as the previous ring
1905 */
1906 if (memcmp (&instance->my_old_ring_id, &mcast->ring_id,
1907 sizeof (struct memb_ring_id)) == 0) {
1908
1909 res = sq_item_inuse (&instance->regular_sort_queue, mcast->seq);
1910 if (res == 0) {
1911 sq_item_add (&instance->regular_sort_queue,
1912 &regular_message_item, mcast->seq);
1913 if (sq_lt_compare (instance->old_ring_state_high_seq_received, mcast->seq)) {
1915 }
1916 }
1917 } else {
1919 "-not adding msg with seq no " CS_PRI_RING_ID_SEQ, (uint64_t)mcast->seq);
1920 }
1921 }
1922}
1923
1924/*
1925 * Change states in the state machine of the membership algorithm
1926 */
1927static void memb_state_operational_enter (struct totemsrp_instance *instance)
1928{
1929 struct srp_addr joined_list[PROCESSOR_COUNT_MAX];
1930 int joined_list_entries = 0;
1931 unsigned int aru_save;
1932 unsigned int joined_list_totemip[PROCESSOR_COUNT_MAX];
1933 unsigned int trans_memb_list_totemip[PROCESSOR_COUNT_MAX];
1934 unsigned int new_memb_list_totemip[PROCESSOR_COUNT_MAX];
1935 unsigned int left_list[PROCESSOR_COUNT_MAX];
1936 unsigned int i;
1937 unsigned int res;
1938 char left_node_msg[1024];
1939 char joined_node_msg[1024];
1940 char failed_node_msg[1024];
1941
1942 instance->originated_orf_token = 0;
1943
1944 memb_consensus_reset (instance);
1945
1946 old_ring_state_reset (instance);
1947
1948 deliver_messages_from_recovery_to_regular (instance);
1949
1951 "Delivering to app %x to %x",
1952 instance->my_high_delivered + 1, instance->old_ring_state_high_seq_received);
1953
1954 aru_save = instance->my_aru;
1955 instance->my_aru = instance->old_ring_state_aru;
1956
1957 messages_deliver_to_app (instance, 0, instance->old_ring_state_high_seq_received);
1958
1959 /*
1960 * Calculate joined and left list
1961 */
1962 memb_set_subtract (instance->my_left_memb_list,
1963 &instance->my_left_memb_entries,
1964 instance->my_memb_list, instance->my_memb_entries,
1965 instance->my_trans_memb_list, instance->my_trans_memb_entries);
1966
1967 memb_set_subtract (joined_list, &joined_list_entries,
1968 instance->my_new_memb_list, instance->my_new_memb_entries,
1969 instance->my_trans_memb_list, instance->my_trans_memb_entries);
1970
1971 /*
1972 * Install new membership
1973 */
1974 instance->my_memb_entries = instance->my_new_memb_entries;
1975 memcpy (&instance->my_memb_list, instance->my_new_memb_list,
1976 sizeof (struct srp_addr) * instance->my_memb_entries);
1977 instance->last_released = 0;
1978 instance->my_set_retrans_flg = 0;
1979
1980 /*
1981 * Deliver transitional configuration to application
1982 */
1983 srp_addr_to_nodeid (instance, left_list, instance->my_left_memb_list,
1984 instance->my_left_memb_entries);
1985 srp_addr_to_nodeid (instance, trans_memb_list_totemip,
1986 instance->my_trans_memb_list, instance->my_trans_memb_entries);
1988 trans_memb_list_totemip, instance->my_trans_memb_entries,
1989 left_list, instance->my_left_memb_entries,
1990 0, 0, &instance->my_ring_id);
1991 /*
1992 * Switch new totemsrp messages queue. Messages sent from now on are stored
1993 * in different queue so synchronization messages are delivered first. Totempg
1994 * buffers will be switched later.
1995 */
1996 instance->waiting_trans_ack = 1;
1997
1998// TODO we need to filter to ensure we only deliver those
1999// messages which are part of instance->my_deliver_memb
2000 messages_deliver_to_app (instance, 1, instance->old_ring_state_high_seq_received);
2001
2002 /*
2003 * Switch totempg buffers. This used to be right after
2004 * instance->waiting_trans_ack = 1;
2005 * line. This was causing problem, because there may be not yet
2006 * processed parts of messages in totempg buffers.
2007 * So when buffers were switched and recovered messages
2008 * got delivered it was not possible to assemble them.
2009 */
2011
2012 instance->my_aru = aru_save;
2013
2014 /*
2015 * Deliver regular configuration to application
2016 */
2017 srp_addr_to_nodeid (instance, new_memb_list_totemip,
2018 instance->my_new_memb_list, instance->my_new_memb_entries);
2019 srp_addr_to_nodeid (instance, joined_list_totemip, joined_list,
2020 joined_list_entries);
2022 new_memb_list_totemip, instance->my_new_memb_entries,
2023 0, 0,
2024 joined_list_totemip, joined_list_entries, &instance->my_ring_id);
2025
2026 /*
2027 * The recovery sort queue now becomes the regular
2028 * sort queue. It is necessary to copy the state
2029 * into the regular sort queue.
2030 */
2031 sq_copy (&instance->regular_sort_queue, &instance->recovery_sort_queue);
2032 instance->my_last_aru = SEQNO_START_MSG;
2033
2034 /* When making my_proc_list smaller, ensure that the
2035 * now non-used entries are zero-ed out. There are some suspect
2036 * assert's that assume that there is always 2 entries in the list.
2037 * These fail when my_proc_list is reduced to 1 entry (and the
2038 * valid [0] entry is the same as the 'unused' [1] entry).
2039 */
2040 memset(instance->my_proc_list, 0,
2041 sizeof (struct srp_addr) * instance->my_proc_list_entries);
2042
2043 instance->my_proc_list_entries = instance->my_new_memb_entries;
2044 memcpy (instance->my_proc_list, instance->my_new_memb_list,
2045 sizeof (struct srp_addr) * instance->my_memb_entries);
2046
2047 instance->my_failed_list_entries = 0;
2048 /*
2049 * TODO Not exactly to spec
2050 *
2051 * At the entry to this function all messages without a gap are
2052 * deliered.
2053 *
2054 * This code throw away messages from the last gap in the sort queue
2055 * to my_high_seq_received
2056 *
2057 * What should really happen is we should deliver all messages up to
2058 * a gap, then delier the transitional configuration, then deliver
2059 * the messages between the first gap and my_high_seq_received, then
2060 * deliver a regular configuration, then deliver the regular
2061 * configuration
2062 *
2063 * Unfortunately totempg doesn't appear to like this operating mode
2064 * which needs more inspection
2065 */
2066 i = instance->my_high_seq_received + 1;
2067 do {
2068 void *ptr;
2069
2070 i -= 1;
2071 res = sq_item_get (&instance->regular_sort_queue, i, &ptr);
2072 if (i == 0) {
2073 break;
2074 }
2075 } while (res);
2076
2077 instance->my_high_delivered = i;
2078
2079 for (i = 0; i <= instance->my_high_delivered; i++) {
2080 void *ptr;
2081
2082 res = sq_item_get (&instance->regular_sort_queue, i, &ptr);
2083 if (res == 0) {
2084 struct sort_queue_item *regular_message;
2085
2086 regular_message = ptr;
2087 free (regular_message->mcast);
2088 }
2089 }
2090 sq_items_release (&instance->regular_sort_queue, instance->my_high_delivered);
2091 instance->last_released = instance->my_high_delivered;
2092
2093 if (joined_list_entries) {
2094 int sptr = 0;
2095 sptr += snprintf(joined_node_msg, sizeof(joined_node_msg)-sptr, " joined:");
2096 for (i=0; i< joined_list_entries; i++) {
2097 sptr += snprintf(joined_node_msg+sptr, sizeof(joined_node_msg)-sptr, " " CS_PRI_NODE_ID, joined_list_totemip[i]);
2098 }
2099 }
2100 else {
2101 joined_node_msg[0] = '\0';
2102 }
2103
2104 if (instance->my_left_memb_entries) {
2105 int sptr = 0;
2106 int sptr2 = 0;
2107 sptr += snprintf(left_node_msg, sizeof(left_node_msg)-sptr, " left:");
2108 for (i=0; i< instance->my_left_memb_entries; i++) {
2109 sptr += snprintf(left_node_msg+sptr, sizeof(left_node_msg)-sptr, " " CS_PRI_NODE_ID, left_list[i]);
2110 }
2111 for (i=0; i< instance->my_left_memb_entries; i++) {
2112 if (my_leave_memb_match(instance, left_list[i]) == 0) {
2113 if (sptr2 == 0) {
2114 sptr2 += snprintf(failed_node_msg, sizeof(failed_node_msg)-sptr2, " failed:");
2115 }
2116 sptr2 += snprintf(failed_node_msg+sptr2, sizeof(left_node_msg)-sptr2, " " CS_PRI_NODE_ID, left_list[i]);
2117 }
2118 }
2119 if (sptr2 == 0) {
2120 failed_node_msg[0] = '\0';
2121 }
2122 }
2123 else {
2124 left_node_msg[0] = '\0';
2125 failed_node_msg[0] = '\0';
2126 }
2127
2128 my_leave_memb_clear(instance);
2129
2131 "entering OPERATIONAL state.");
2133 "A new membership (" CS_PRI_RING_ID ") was formed. Members%s%s",
2134 instance->my_ring_id.rep,
2135 (uint64_t)instance->my_ring_id.seq,
2136 joined_node_msg,
2137 left_node_msg);
2138
2139 if (strlen(failed_node_msg)) {
2141 "Failed to receive the leave message.%s",
2142 failed_node_msg);
2143 }
2144
2146
2147 instance->stats.operational_entered++;
2148 instance->stats.continuous_gather = 0;
2149
2150 instance->my_received_flg = 1;
2151
2152 reset_pause_timeout (instance);
2153
2154 /*
2155 * Save ring id information from this configuration to determine
2156 * which processors are transitioning from old regular configuration
2157 * in to new regular configuration on the next configuration change
2158 */
2159 memcpy (&instance->my_old_ring_id, &instance->my_ring_id,
2160 sizeof (struct memb_ring_id));
2161
2162 return;
2163}
2164
2165static void memb_state_gather_enter (
2166 struct totemsrp_instance *instance,
2167 enum gather_state_from gather_from)
2168{
2169 int32_t res;
2170
2171 instance->orf_token_discard = 1;
2172
2173 instance->originated_orf_token = 0;
2174
2175 memb_set_merge (
2176 &instance->my_id, 1,
2177 instance->my_proc_list, &instance->my_proc_list_entries);
2178
2179 memb_join_message_send (instance);
2180
2181 /*
2182 * Restart the join timeout
2183 */
2184 qb_loop_timer_del (instance->totemsrp_poll_handle, instance->memb_timer_state_gather_join_timeout);
2185
2186 res = qb_loop_timer_add (instance->totemsrp_poll_handle,
2187 QB_LOOP_MED,
2188 instance->totem_config->join_timeout*QB_TIME_NS_IN_MSEC,
2189 (void *)instance,
2190 memb_timer_function_state_gather,
2191 &instance->memb_timer_state_gather_join_timeout);
2192 if (res != 0) {
2193 log_printf(instance->totemsrp_log_level_error, "memb_state_gather_enter - qb_loop_timer_add error(1) : %d", res);
2194 }
2195
2196 /*
2197 * Restart the consensus timeout
2198 */
2199 qb_loop_timer_del (instance->totemsrp_poll_handle,
2201
2202 res = qb_loop_timer_add (instance->totemsrp_poll_handle,
2203 QB_LOOP_MED,
2204 instance->totem_config->consensus_timeout*QB_TIME_NS_IN_MSEC,
2205 (void *)instance,
2206 memb_timer_function_gather_consensus_timeout,
2207 &instance->memb_timer_state_gather_consensus_timeout);
2208 if (res != 0) {
2209 log_printf(instance->totemsrp_log_level_error, "memb_state_gather_enter - qb_loop_timer_add error(2) : %d", res);
2210 }
2211
2212 /*
2213 * Cancel the token loss and token retransmission timeouts
2214 */
2215 cancel_token_retransmit_timeout (instance); // REVIEWED
2216 cancel_token_timeout (instance); // REVIEWED
2217 cancel_merge_detect_timeout (instance);
2218
2219 memb_consensus_reset (instance);
2220
2221 memb_consensus_set (instance, &instance->my_id);
2222
2224 "entering GATHER state from %d(%s).",
2225 gather_from, gsfrom_to_msg(gather_from));
2226
2227 instance->memb_state = MEMB_STATE_GATHER;
2228 instance->stats.gather_entered++;
2229
2231 /*
2232 * State 3 means gather, so we are continuously gathering.
2233 */
2234 instance->stats.continuous_gather++;
2235 }
2236
2237 return;
2238}
2239
2240static void timer_function_token_retransmit_timeout (void *data);
2241
2242static void target_set_completed (
2243 void *context)
2244{
2245 struct totemsrp_instance *instance = (struct totemsrp_instance *)context;
2246
2247 memb_state_commit_token_send (instance);
2248
2249}
2250
2251static void memb_state_commit_enter (
2252 struct totemsrp_instance *instance)
2253{
2254 old_ring_state_save (instance);
2255
2256 memb_state_commit_token_update (instance);
2257
2258 memb_state_commit_token_target_set (instance);
2259
2260 qb_loop_timer_del (instance->totemsrp_poll_handle, instance->memb_timer_state_gather_join_timeout);
2261
2263
2264 qb_loop_timer_del (instance->totemsrp_poll_handle, instance->memb_timer_state_gather_consensus_timeout);
2265
2267
2268 memb_ring_id_set (instance, &instance->commit_token->ring_id);
2269
2270 instance->memb_ring_id_store (&instance->my_ring_id, instance->my_id.nodeid);
2271
2272 instance->token_ring_id_seq = instance->my_ring_id.seq;
2273
2275 "entering COMMIT state.");
2276
2277 instance->memb_state = MEMB_STATE_COMMIT;
2278 reset_token_retransmit_timeout (instance); // REVIEWED
2279 reset_token_timeout (instance); // REVIEWED
2280
2281 instance->stats.commit_entered++;
2282 instance->stats.continuous_gather = 0;
2283
2284 /*
2285 * reset all flow control variables since we are starting a new ring
2286 */
2287 instance->my_trc = 0;
2288 instance->my_pbl = 0;
2289 instance->my_cbl = 0;
2290 /*
2291 * commit token sent after callback that token target has been set
2292 */
2293}
2294
2295static void memb_state_recovery_enter (
2296 struct totemsrp_instance *instance,
2298{
2299 int i;
2300 int local_received_flg = 1;
2301 unsigned int low_ring_aru;
2302 unsigned int range = 0;
2303 unsigned int messages_originated = 0;
2304 const struct srp_addr *addr;
2305 struct memb_commit_token_memb_entry *memb_list;
2306 struct memb_ring_id my_new_memb_ring_id_list[PROCESSOR_COUNT_MAX];
2307
2308 addr = (const struct srp_addr *)commit_token->end_of_commit_token;
2309 memb_list = (struct memb_commit_token_memb_entry *)(addr + commit_token->addr_entries);
2310
2312 "entering RECOVERY state.");
2313
2314 instance->orf_token_discard = 0;
2315
2316 instance->my_high_ring_delivered = 0;
2317
2318 sq_reinit (&instance->recovery_sort_queue, SEQNO_START_MSG);
2319 cs_queue_reinit (&instance->retrans_message_queue);
2320
2321 low_ring_aru = instance->old_ring_state_high_seq_received;
2322
2323 memb_state_commit_token_send_recovery (instance, commit_token);
2324
2325 instance->my_token_seq = SEQNO_START_TOKEN - 1;
2326
2327 /*
2328 * Build regular configuration
2329 */
2331 instance->totemnet_context,
2332 commit_token->addr_entries);
2333
2334 /*
2335 * Build transitional configuration
2336 */
2337 for (i = 0; i < instance->my_new_memb_entries; i++) {
2338 memcpy (&my_new_memb_ring_id_list[i],
2339 &memb_list[i].ring_id,
2340 sizeof (struct memb_ring_id));
2341 }
2342 memb_set_and_with_ring_id (
2343 instance->my_new_memb_list,
2344 my_new_memb_ring_id_list,
2345 instance->my_new_memb_entries,
2346 instance->my_memb_list,
2347 instance->my_memb_entries,
2348 &instance->my_old_ring_id,
2349 instance->my_trans_memb_list,
2350 &instance->my_trans_memb_entries);
2351
2352 for (i = 0; i < instance->my_trans_memb_entries; i++) {
2354 "TRANS [%d] member " CS_PRI_NODE_ID ":", i, instance->my_trans_memb_list[i].nodeid);
2355 }
2356 for (i = 0; i < instance->my_new_memb_entries; i++) {
2358 "position [%d] member " CS_PRI_NODE_ID ":", i, addr[i].nodeid);
2360 "previous ringid (" CS_PRI_RING_ID ")",
2361 memb_list[i].ring_id.rep, (uint64_t)memb_list[i].ring_id.seq);
2362
2364 "aru %x high delivered %x received flag %d",
2365 memb_list[i].aru,
2366 memb_list[i].high_delivered,
2367 memb_list[i].received_flg);
2368
2369 // assert (totemip_print (&memb_list[i].ring_id.rep) != 0);
2370 }
2371 /*
2372 * Determine if any received flag is false
2373 */
2374 for (i = 0; i < commit_token->addr_entries; i++) {
2375 if (memb_set_subset (&instance->my_new_memb_list[i], 1,
2376 instance->my_trans_memb_list, instance->my_trans_memb_entries) &&
2377
2378 memb_list[i].received_flg == 0) {
2379 instance->my_deliver_memb_entries = instance->my_trans_memb_entries;
2380 memcpy (instance->my_deliver_memb_list, instance->my_trans_memb_list,
2381 sizeof (struct srp_addr) * instance->my_trans_memb_entries);
2382 local_received_flg = 0;
2383 break;
2384 }
2385 }
2386 if (local_received_flg == 1) {
2387 goto no_originate;
2388 } /* Else originate messages if we should */
2389
2390 /*
2391 * Calculate my_low_ring_aru, instance->my_high_ring_delivered for the transitional membership
2392 */
2393 for (i = 0; i < commit_token->addr_entries; i++) {
2394 if (memb_set_subset (&instance->my_new_memb_list[i], 1,
2395 instance->my_deliver_memb_list,
2396 instance->my_deliver_memb_entries) &&
2397
2398 memcmp (&instance->my_old_ring_id,
2399 &memb_list[i].ring_id,
2400 sizeof (struct memb_ring_id)) == 0) {
2401
2402 if (sq_lt_compare (memb_list[i].aru, low_ring_aru)) {
2403
2404 low_ring_aru = memb_list[i].aru;
2405 }
2406 if (sq_lt_compare (instance->my_high_ring_delivered, memb_list[i].high_delivered)) {
2407 instance->my_high_ring_delivered = memb_list[i].high_delivered;
2408 }
2409 }
2410 }
2411
2412 /*
2413 * Copy all old ring messages to instance->retrans_message_queue
2414 */
2415 range = instance->old_ring_state_high_seq_received - low_ring_aru;
2416 if (range == 0) {
2417 /*
2418 * No messages to copy
2419 */
2420 goto no_originate;
2421 }
2422 assert (range < QUEUE_RTR_ITEMS_SIZE_MAX);
2423
2425 "copying all old ring messages from %x-%x.",
2426 low_ring_aru + 1, instance->old_ring_state_high_seq_received);
2427
2428 for (i = 1; i <= range; i++) {
2431 void *ptr;
2432 int res;
2433
2434 res = sq_item_get (&instance->regular_sort_queue,
2435 low_ring_aru + i, &ptr);
2436 if (res != 0) {
2437 continue;
2438 }
2439 sort_queue_item = ptr;
2440 messages_originated++;
2441 memset (&message_item, 0, sizeof (struct message_item));
2442 // TODO LEAK
2443 message_item.mcast = totemsrp_buffer_alloc (instance);
2444 assert (message_item.mcast);
2445 memset(message_item.mcast, 0, sizeof (struct mcast));
2449 message_item.mcast->system_from = instance->my_id;
2451
2453 assert (message_item.mcast->header.nodeid);
2454 memcpy (&message_item.mcast->ring_id, &instance->my_ring_id,
2455 sizeof (struct memb_ring_id));
2456 message_item.msg_len = sort_queue_item->msg_len + sizeof (struct mcast);
2457 memcpy (((char *)message_item.mcast) + sizeof (struct mcast),
2460 cs_queue_item_add (&instance->retrans_message_queue, &message_item);
2461 }
2463 "Originated %d messages in RECOVERY.", messages_originated);
2464 goto originated;
2465
2466no_originate:
2468 "Did not need to originate any messages in recovery.");
2469
2470originated:
2471 instance->my_aru = SEQNO_START_MSG;
2472 instance->my_aru_count = 0;
2473 instance->my_seq_unchanged = 0;
2475 instance->my_install_seq = SEQNO_START_MSG;
2476 instance->last_released = SEQNO_START_MSG;
2477
2478 reset_token_timeout (instance); // REVIEWED
2479 reset_token_retransmit_timeout (instance); // REVIEWED
2480
2481 instance->memb_state = MEMB_STATE_RECOVERY;
2482 instance->stats.recovery_entered++;
2483 instance->stats.continuous_gather = 0;
2484
2485 return;
2486}
2487
2488void totemsrp_event_signal (void *srp_context, enum totem_event_type type, int value)
2489{
2490 struct totemsrp_instance *instance = (struct totemsrp_instance *)srp_context;
2491
2492 token_hold_cancel_send (instance);
2493
2494 return;
2495}
2496
2498 void *srp_context,
2499 struct iovec *iovec,
2500 unsigned int iov_len,
2501 int guarantee)
2502{
2503 struct totemsrp_instance *instance = (struct totemsrp_instance *)srp_context;
2504 int i;
2506 char *addr;
2507 unsigned int addr_idx;
2508 struct cs_queue *queue_use;
2509
2510 if (instance->waiting_trans_ack) {
2511 queue_use = &instance->new_message_queue_trans;
2512 } else {
2513 queue_use = &instance->new_message_queue;
2514 }
2515
2516 if (cs_queue_is_full (queue_use)) {
2517 log_printf (instance->totemsrp_log_level_debug, "queue full");
2518 return (-1);
2519 }
2520
2521 memset (&message_item, 0, sizeof (struct message_item));
2522
2523 /*
2524 * Allocate pending item
2525 */
2526 message_item.mcast = totemsrp_buffer_alloc (instance);
2527 if (message_item.mcast == 0) {
2528 goto error_mcast;
2529 }
2530
2531 /*
2532 * Set mcast header
2533 */
2534 memset(message_item.mcast, 0, sizeof (struct mcast));
2539
2541 assert (message_item.mcast->header.nodeid);
2542
2544 message_item.mcast->system_from = instance->my_id;
2545
2546 addr = (char *)message_item.mcast;
2547 addr_idx = sizeof (struct mcast);
2548 for (i = 0; i < iov_len; i++) {
2549 memcpy (&addr[addr_idx], iovec[i].iov_base, iovec[i].iov_len);
2550 addr_idx += iovec[i].iov_len;
2551 }
2552
2553 message_item.msg_len = addr_idx;
2554
2555 log_printf (instance->totemsrp_log_level_trace, "mcasted message added to pending queue");
2556 instance->stats.mcast_tx++;
2557 cs_queue_item_add (queue_use, &message_item);
2558
2559 return (0);
2560
2561error_mcast:
2562 return (-1);
2563}
2564
2565/*
2566 * Determine if there is room to queue a new message
2567 */
2568int totemsrp_avail (void *srp_context)
2569{
2570 struct totemsrp_instance *instance = (struct totemsrp_instance *)srp_context;
2571 int avail;
2572 struct cs_queue *queue_use;
2573
2574 if (instance->waiting_trans_ack) {
2575 queue_use = &instance->new_message_queue_trans;
2576 } else {
2577 queue_use = &instance->new_message_queue;
2578 }
2579 cs_queue_avail (queue_use, &avail);
2580
2581 return (avail);
2582}
2583
2584/*
2585 * ORF Token Management
2586 */
2587/*
2588 * Recast message to mcast group if it is available
2589 */
2590static int orf_token_remcast (
2591 struct totemsrp_instance *instance,
2592 int seq)
2593{
2595 int res;
2596 void *ptr;
2597
2598 struct sq *sort_queue;
2599
2600 if (instance->memb_state == MEMB_STATE_RECOVERY) {
2601 sort_queue = &instance->recovery_sort_queue;
2602 } else {
2603 sort_queue = &instance->regular_sort_queue;
2604 }
2605
2606 res = sq_in_range (sort_queue, seq);
2607 if (res == 0) {
2608 log_printf (instance->totemsrp_log_level_debug, "sq not in range");
2609 return (-1);
2610 }
2611
2612 /*
2613 * Get RTR item at seq, if not available, return
2614 */
2615 res = sq_item_get (sort_queue, seq, &ptr);
2616 if (res != 0) {
2617 return -1;
2618 }
2619
2620 sort_queue_item = ptr;
2621
2623 instance->totemnet_context,
2626
2627 return (0);
2628}
2629
2630
2631/*
2632 * Free all freeable messages from ring
2633 */
2634static void messages_free (
2635 struct totemsrp_instance *instance,
2636 unsigned int token_aru)
2637{
2638 struct sort_queue_item *regular_message;
2639 unsigned int i;
2640 int res;
2641 int log_release = 0;
2642 unsigned int release_to;
2643 unsigned int range = 0;
2644
2645 release_to = token_aru;
2646 if (sq_lt_compare (instance->my_last_aru, release_to)) {
2647 release_to = instance->my_last_aru;
2648 }
2649 if (sq_lt_compare (instance->my_high_delivered, release_to)) {
2650 release_to = instance->my_high_delivered;
2651 }
2652
2653 /*
2654 * Ensure we dont try release before an already released point
2655 */
2656 if (sq_lt_compare (release_to, instance->last_released)) {
2657 return;
2658 }
2659
2660 range = release_to - instance->last_released;
2661 assert (range < QUEUE_RTR_ITEMS_SIZE_MAX);
2662
2663 /*
2664 * Release retransmit list items if group aru indicates they are transmitted
2665 */
2666 for (i = 1; i <= range; i++) {
2667 void *ptr;
2668
2669 res = sq_item_get (&instance->regular_sort_queue,
2670 instance->last_released + i, &ptr);
2671 if (res == 0) {
2672 regular_message = ptr;
2673 totemsrp_buffer_release (instance, regular_message->mcast);
2674 }
2675 sq_items_release (&instance->regular_sort_queue,
2676 instance->last_released + i);
2677
2678 log_release = 1;
2679 }
2680 instance->last_released += range;
2681
2682 if (log_release) {
2684 "releasing messages up to and including %x", release_to);
2685 }
2686}
2687
2688static void update_aru (
2689 struct totemsrp_instance *instance)
2690{
2691 unsigned int i;
2692 int res;
2693 struct sq *sort_queue;
2694 unsigned int range;
2695 unsigned int my_aru_saved = 0;
2696
2697 if (instance->memb_state == MEMB_STATE_RECOVERY) {
2698 sort_queue = &instance->recovery_sort_queue;
2699 } else {
2700 sort_queue = &instance->regular_sort_queue;
2701 }
2702
2703 range = instance->my_high_seq_received - instance->my_aru;
2704
2705 my_aru_saved = instance->my_aru;
2706 for (i = 1; i <= range; i++) {
2707
2708 void *ptr;
2709
2710 res = sq_item_get (sort_queue, my_aru_saved + i, &ptr);
2711 /*
2712 * If hole, stop updating aru
2713 */
2714 if (res != 0) {
2715 break;
2716 }
2717 }
2718 instance->my_aru += i - 1;
2719}
2720
2721/*
2722 * Multicasts pending messages onto the ring (requires orf_token possession)
2723 */
2724static int orf_token_mcast (
2725 struct totemsrp_instance *instance,
2726 struct orf_token *token,
2727 int fcc_mcasts_allowed)
2728{
2729 struct message_item *message_item = 0;
2730 struct cs_queue *mcast_queue;
2731 struct sq *sort_queue;
2733 struct mcast *mcast;
2734 unsigned int fcc_mcast_current;
2735
2736 if (instance->memb_state == MEMB_STATE_RECOVERY) {
2737 mcast_queue = &instance->retrans_message_queue;
2738 sort_queue = &instance->recovery_sort_queue;
2739 reset_token_retransmit_timeout (instance); // REVIEWED
2740 } else {
2741 if (instance->waiting_trans_ack) {
2742 mcast_queue = &instance->new_message_queue_trans;
2743 } else {
2744 mcast_queue = &instance->new_message_queue;
2745 }
2746
2747 sort_queue = &instance->regular_sort_queue;
2748 }
2749
2750 for (fcc_mcast_current = 0; fcc_mcast_current < fcc_mcasts_allowed; fcc_mcast_current++) {
2751 if (cs_queue_is_empty (mcast_queue)) {
2752 break;
2753 }
2754 message_item = (struct message_item *)cs_queue_item_get (mcast_queue);
2755
2756 message_item->mcast->seq = ++token->seq;
2757 message_item->mcast->this_seqno = instance->global_seqno++;
2758
2759 /*
2760 * Build IO vector
2761 */
2762 memset (&sort_queue_item, 0, sizeof (struct sort_queue_item));
2765
2767
2768 memcpy (&mcast->ring_id, &instance->my_ring_id, sizeof (struct memb_ring_id));
2769
2770 /*
2771 * Add message to retransmit queue
2772 */
2773 sq_item_add (sort_queue, &sort_queue_item, message_item->mcast->seq);
2774
2776 instance->totemnet_context,
2779
2780 /*
2781 * Delete item from pending queue
2782 */
2783 cs_queue_item_remove (mcast_queue);
2784
2785 /*
2786 * If messages mcasted, deliver any new messages to totempg
2787 */
2788 instance->my_high_seq_received = token->seq;
2789 }
2790
2791 update_aru (instance);
2792
2793 /*
2794 * Return 1 if more messages are available for single node clusters
2795 */
2796 return (fcc_mcast_current);
2797}
2798
2799/*
2800 * Remulticasts messages in orf_token's retransmit list (requires orf_token)
2801 * Modify's orf_token's rtr to include retransmits required by this process
2802 */
2803static int orf_token_rtr (
2804 struct totemsrp_instance *instance,
2805 struct orf_token *orf_token,
2806 unsigned int *fcc_allowed)
2807{
2808 unsigned int res;
2809 unsigned int i, j;
2810 unsigned int found;
2811 struct sq *sort_queue;
2812 struct rtr_item *rtr_list;
2813 unsigned int range = 0;
2814 char retransmit_msg[1024];
2815 char value[64];
2816
2817 if (instance->memb_state == MEMB_STATE_RECOVERY) {
2818 sort_queue = &instance->recovery_sort_queue;
2819 } else {
2820 sort_queue = &instance->regular_sort_queue;
2821 }
2822
2824
2825 strcpy (retransmit_msg, "Retransmit List: ");
2828 "Retransmit List %d", orf_token->rtr_list_entries);
2829 for (i = 0; i < orf_token->rtr_list_entries; i++) {
2830 sprintf (value, "%x ", rtr_list[i].seq);
2831 strcat (retransmit_msg, value);
2832 }
2833 strcat (retransmit_msg, "");
2835 "%s", retransmit_msg);
2836 }
2837
2838 /*
2839 * Retransmit messages on orf_token's RTR list from RTR queue
2840 */
2841 for (instance->fcc_remcast_current = 0, i = 0;
2842 instance->fcc_remcast_current < *fcc_allowed && i < orf_token->rtr_list_entries;) {
2843
2844 /*
2845 * If this retransmit request isn't from this configuration,
2846 * try next rtr entry
2847 */
2848 if (memcmp (&rtr_list[i].ring_id, &instance->my_ring_id,
2849 sizeof (struct memb_ring_id)) != 0) {
2850
2851 i += 1;
2852 continue;
2853 }
2854
2855 res = orf_token_remcast (instance, rtr_list[i].seq);
2856 if (res == 0) {
2857 /*
2858 * Multicasted message, so no need to copy to new retransmit list
2859 */
2861 assert (orf_token->rtr_list_entries >= 0);
2862 memmove (&rtr_list[i], &rtr_list[i + 1],
2863 sizeof (struct rtr_item) * (orf_token->rtr_list_entries - i));
2864
2865 instance->stats.mcast_retx++;
2866 instance->fcc_remcast_current++;
2867 } else {
2868 i += 1;
2869 }
2870 }
2871 *fcc_allowed = *fcc_allowed - instance->fcc_remcast_current;
2872
2873 /*
2874 * Add messages to retransmit to RTR list
2875 * but only retry if there is room in the retransmit list
2876 */
2877
2878 range = orf_token->seq - instance->my_aru;
2879 assert (range < QUEUE_RTR_ITEMS_SIZE_MAX);
2880
2882 (i <= range); i++) {
2883
2884 /*
2885 * Ensure message is within the sort queue range
2886 */
2887 res = sq_in_range (sort_queue, instance->my_aru + i);
2888 if (res == 0) {
2889 break;
2890 }
2891
2892 /*
2893 * Find if a message is missing from this processor
2894 */
2895 res = sq_item_inuse (sort_queue, instance->my_aru + i);
2896 if (res == 0) {
2897 /*
2898 * Determine how many times we have missed receiving
2899 * this sequence number. sq_item_miss_count increments
2900 * a counter for the sequence number. The miss count
2901 * will be returned and compared. This allows time for
2902 * delayed multicast messages to be received before
2903 * declaring the message is missing and requesting a
2904 * retransmit.
2905 */
2906 res = sq_item_miss_count (sort_queue, instance->my_aru + i);
2907 if (res < instance->totem_config->miss_count_const) {
2908 continue;
2909 }
2910
2911 /*
2912 * Determine if missing message is already in retransmit list
2913 */
2914 found = 0;
2915 for (j = 0; j < orf_token->rtr_list_entries; j++) {
2916 if (instance->my_aru + i == rtr_list[j].seq) {
2917 found = 1;
2918 }
2919 }
2920 if (found == 0) {
2921 /*
2922 * Missing message not found in current retransmit list so add it
2923 */
2924 memcpy (&rtr_list[orf_token->rtr_list_entries].ring_id,
2925 &instance->my_ring_id, sizeof (struct memb_ring_id));
2926 rtr_list[orf_token->rtr_list_entries].seq = instance->my_aru + i;
2928 }
2929 }
2930 }
2931 return (instance->fcc_remcast_current);
2932}
2933
2934static void token_retransmit (struct totemsrp_instance *instance)
2935{
2936 instance->stats.orf_token_tx++;
2938 instance->orf_token_retransmit,
2939 instance->orf_token_retransmit_size);
2940}
2941
2942/*
2943 * Retransmit the regular token if no mcast or token has
2944 * been received in retransmit token period retransmit
2945 * the token to the next processor
2946 */
2947static void timer_function_token_retransmit_timeout (void *data)
2948{
2949 struct totemsrp_instance *instance = data;
2950
2951 switch (instance->memb_state) {
2952 case MEMB_STATE_GATHER:
2953 break;
2954 case MEMB_STATE_COMMIT:
2957 token_retransmit (instance);
2958 reset_token_retransmit_timeout (instance); // REVIEWED
2959 break;
2960 }
2961}
2962
2963static void timer_function_token_hold_retransmit_timeout (void *data)
2964{
2965 struct totemsrp_instance *instance = data;
2966
2967 switch (instance->memb_state) {
2968 case MEMB_STATE_GATHER:
2969 break;
2970 case MEMB_STATE_COMMIT:
2971 break;
2974 token_retransmit (instance);
2975 break;
2976 }
2977}
2978
2979static void timer_function_merge_detect_timeout(void *data)
2980{
2981 struct totemsrp_instance *instance = data;
2982
2984
2985 switch (instance->memb_state) {
2987 if (instance->my_ring_id.rep == instance->my_id.nodeid) {
2988 memb_merge_detect_transmit (instance);
2989 }
2990 break;
2991 case MEMB_STATE_GATHER:
2992 case MEMB_STATE_COMMIT:
2994 break;
2995 }
2996}
2997
2998/*
2999 * Send orf_token to next member (requires orf_token)
3000 */
3001static int token_send (
3002 struct totemsrp_instance *instance,
3003 struct orf_token *orf_token,
3004 int forward_token)
3005{
3006 int res = 0;
3007 unsigned int orf_token_size;
3008
3009 orf_token_size = sizeof (struct orf_token) +
3010 (orf_token->rtr_list_entries * sizeof (struct rtr_item));
3011
3012 orf_token->header.nodeid = instance->my_id.nodeid;
3013 memcpy (instance->orf_token_retransmit, orf_token, orf_token_size);
3014 instance->orf_token_retransmit_size = orf_token_size;
3015 assert (orf_token->header.nodeid);
3016
3017 if (forward_token == 0) {
3018 return (0);
3019 }
3020
3021 instance->stats.orf_token_tx++;
3023 orf_token,
3024 orf_token_size);
3025
3026 return (res);
3027}
3028
3029static int token_hold_cancel_send (struct totemsrp_instance *instance)
3030{
3032
3033 /*
3034 * Only cancel if the token is currently held
3035 */
3036 if (instance->my_token_held == 0) {
3037 return (0);
3038 }
3039 instance->my_token_held = 0;
3040
3041 /*
3042 * Build message
3043 */
3049 memcpy (&token_hold_cancel.ring_id, &instance->my_ring_id,
3050 sizeof (struct memb_ring_id));
3052
3053 instance->stats.token_hold_cancel_tx++;
3054
3056 sizeof (struct token_hold_cancel));
3057
3058 return (0);
3059}
3060
3061static int orf_token_send_initial (struct totemsrp_instance *instance)
3062{
3063 struct orf_token orf_token;
3064 int res;
3065
3070 orf_token.header.nodeid = instance->my_id.nodeid;
3071 assert (orf_token.header.nodeid);
3075 instance->my_set_retrans_flg = 1;
3076
3077 if (cs_queue_is_empty (&instance->retrans_message_queue) == 1) {
3079 instance->my_set_retrans_flg = 0;
3080 } else {
3082 instance->my_set_retrans_flg = 1;
3083 }
3084
3085 orf_token.aru = 0;
3087 orf_token.aru_addr = instance->my_id.nodeid;
3088
3089 memcpy (&orf_token.ring_id, &instance->my_ring_id, sizeof (struct memb_ring_id));
3090 orf_token.fcc = 0;
3091 orf_token.backlog = 0;
3092
3094
3095 res = token_send (instance, &orf_token, 1);
3096
3097 return (res);
3098}
3099
3100static void memb_state_commit_token_update (
3101 struct totemsrp_instance *instance)
3102{
3103 struct srp_addr *addr;
3104 struct memb_commit_token_memb_entry *memb_list;
3105 unsigned int high_aru;
3106 unsigned int i;
3107
3108 addr = (struct srp_addr *)instance->commit_token->end_of_commit_token;
3109 memb_list = (struct memb_commit_token_memb_entry *)(addr + instance->commit_token->addr_entries);
3110
3111 memcpy (instance->my_new_memb_list, addr,
3112 sizeof (struct srp_addr) * instance->commit_token->addr_entries);
3113
3114 instance->my_new_memb_entries = instance->commit_token->addr_entries;
3115
3116 memcpy (&memb_list[instance->commit_token->memb_index].ring_id,
3117 &instance->my_old_ring_id, sizeof (struct memb_ring_id));
3118
3119 memb_list[instance->commit_token->memb_index].aru = instance->old_ring_state_aru;
3120 /*
3121 * TODO high delivered is really instance->my_aru, but with safe this
3122 * could change?
3123 */
3124 instance->my_received_flg =
3125 (instance->my_aru == instance->my_high_seq_received);
3126
3127 memb_list[instance->commit_token->memb_index].received_flg = instance->my_received_flg;
3128
3129 memb_list[instance->commit_token->memb_index].high_delivered = instance->my_high_delivered;
3130 /*
3131 * find high aru up to current memb_index for all matching ring ids
3132 * if any ring id matching memb_index has aru less then high aru set
3133 * received flag for that entry to false
3134 */
3135 high_aru = memb_list[instance->commit_token->memb_index].aru;
3136 for (i = 0; i <= instance->commit_token->memb_index; i++) {
3137 if (memcmp (&memb_list[instance->commit_token->memb_index].ring_id,
3138 &memb_list[i].ring_id,
3139 sizeof (struct memb_ring_id)) == 0) {
3140
3141 if (sq_lt_compare (high_aru, memb_list[i].aru)) {
3142 high_aru = memb_list[i].aru;
3143 }
3144 }
3145 }
3146
3147 for (i = 0; i <= instance->commit_token->memb_index; i++) {
3148 if (memcmp (&memb_list[instance->commit_token->memb_index].ring_id,
3149 &memb_list[i].ring_id,
3150 sizeof (struct memb_ring_id)) == 0) {
3151
3152 if (sq_lt_compare (memb_list[i].aru, high_aru)) {
3153 memb_list[i].received_flg = 0;
3154 if (i == instance->commit_token->memb_index) {
3155 instance->my_received_flg = 0;
3156 }
3157 }
3158 }
3159 }
3160
3161 instance->commit_token->header.nodeid = instance->my_id.nodeid;
3162 instance->commit_token->memb_index += 1;
3163 assert (instance->commit_token->memb_index <= instance->commit_token->addr_entries);
3164 assert (instance->commit_token->header.nodeid);
3165}
3166
3167static void memb_state_commit_token_target_set (
3168 struct totemsrp_instance *instance)
3169{
3170 struct srp_addr *addr;
3171
3172 addr = (struct srp_addr *)instance->commit_token->end_of_commit_token;
3173
3174 /* Totemnet just looks at the node id */
3176 instance->totemnet_context,
3177 addr[instance->commit_token->memb_index %
3178 instance->commit_token->addr_entries].nodeid);
3179}
3180
3181static int memb_state_commit_token_send_recovery (
3182 struct totemsrp_instance *instance,
3183 struct memb_commit_token *commit_token)
3184{
3185 unsigned int commit_token_size;
3186
3187 commit_token->token_seq++;
3188 commit_token->header.nodeid = instance->my_id.nodeid;
3189 commit_token_size = sizeof (struct memb_commit_token) +
3190 ((sizeof (struct srp_addr) +
3191 sizeof (struct memb_commit_token_memb_entry)) * commit_token->addr_entries);
3192 /*
3193 * Make a copy for retransmission if necessary
3194 */
3195 memcpy (instance->orf_token_retransmit, commit_token, commit_token_size);
3196 instance->orf_token_retransmit_size = commit_token_size;
3197
3198 instance->stats.memb_commit_token_tx++;
3199
3201 commit_token,
3202 commit_token_size);
3203
3204 /*
3205 * Request retransmission of the commit token in case it is lost
3206 */
3207 reset_token_retransmit_timeout (instance);
3208 return (0);
3209}
3210
3211static int memb_state_commit_token_send (
3212 struct totemsrp_instance *instance)
3213{
3214 unsigned int commit_token_size;
3215
3216 instance->commit_token->token_seq++;
3217 instance->commit_token->header.nodeid = instance->my_id.nodeid;
3218 commit_token_size = sizeof (struct memb_commit_token) +
3219 ((sizeof (struct srp_addr) +
3220 sizeof (struct memb_commit_token_memb_entry)) * instance->commit_token->addr_entries);
3221 /*
3222 * Make a copy for retransmission if necessary
3223 */
3224 memcpy (instance->orf_token_retransmit, instance->commit_token, commit_token_size);
3225 instance->orf_token_retransmit_size = commit_token_size;
3226
3227 instance->stats.memb_commit_token_tx++;
3228
3230 instance->commit_token,
3231 commit_token_size);
3232
3233 /*
3234 * Request retransmission of the commit token in case it is lost
3235 */
3236 reset_token_retransmit_timeout (instance);
3237 return (0);
3238}
3239
3240
3241static int memb_lowest_in_config (struct totemsrp_instance *instance)
3242{
3243 struct srp_addr token_memb[PROCESSOR_COUNT_MAX];
3244 int token_memb_entries = 0;
3245 int i;
3246 unsigned int lowest_nodeid;
3247
3248 memb_set_subtract (token_memb, &token_memb_entries,
3249 instance->my_proc_list, instance->my_proc_list_entries,
3250 instance->my_failed_list, instance->my_failed_list_entries);
3251
3252 /*
3253 * find representative by searching for smallest identifier
3254 */
3255 assert(token_memb_entries > 0);
3256
3257 lowest_nodeid = token_memb[0].nodeid;
3258 for (i = 1; i < token_memb_entries; i++) {
3259 if (lowest_nodeid > token_memb[i].nodeid) {
3260 lowest_nodeid = token_memb[i].nodeid;
3261 }
3262 }
3263 return (lowest_nodeid == instance->my_id.nodeid);
3264}
3265
3266static int srp_addr_compare (const void *a, const void *b)
3267{
3268 const struct srp_addr *srp_a = (const struct srp_addr *)a;
3269 const struct srp_addr *srp_b = (const struct srp_addr *)b;
3270
3271 if (srp_a->nodeid < srp_b->nodeid) {
3272 return -1;
3273 } else if (srp_a->nodeid > srp_b->nodeid) {
3274 return 1;
3275 } else {
3276 return 0;
3277 }
3278}
3279
3280static void memb_state_commit_token_create (
3281 struct totemsrp_instance *instance)
3282{
3283 struct srp_addr token_memb[PROCESSOR_COUNT_MAX];
3284 struct srp_addr *addr;
3285 struct memb_commit_token_memb_entry *memb_list;
3286 int token_memb_entries = 0;
3287
3289 "Creating commit token because I am the rep.");
3290
3291 memb_set_subtract (token_memb, &token_memb_entries,
3292 instance->my_proc_list, instance->my_proc_list_entries,
3293 instance->my_failed_list, instance->my_failed_list_entries);
3294
3295 memset (instance->commit_token, 0, sizeof (struct memb_commit_token));
3299 instance->commit_token->header.encapsulated = 0;
3300 instance->commit_token->header.nodeid = instance->my_id.nodeid;
3301 assert (instance->commit_token->header.nodeid);
3302
3303 instance->commit_token->ring_id.rep = instance->my_id.nodeid;
3304 instance->commit_token->ring_id.seq = instance->token_ring_id_seq + 4;
3305
3306 /*
3307 * This qsort is necessary to ensure the commit token traverses
3308 * the ring in the proper order
3309 */
3310 qsort (token_memb, token_memb_entries, sizeof (struct srp_addr),
3311 srp_addr_compare);
3312
3313 instance->commit_token->memb_index = 0;
3314 instance->commit_token->addr_entries = token_memb_entries;
3315
3316 addr = (struct srp_addr *)instance->commit_token->end_of_commit_token;
3317 memb_list = (struct memb_commit_token_memb_entry *)(addr + instance->commit_token->addr_entries);
3318
3319 memcpy (addr, token_memb,
3320 token_memb_entries * sizeof (struct srp_addr));
3321 memset (memb_list, 0,
3322 sizeof (struct memb_commit_token_memb_entry) * token_memb_entries);
3323}
3324
3325static void memb_join_message_send (struct totemsrp_instance *instance)
3326{
3327 char memb_join_data[40000];
3328 struct memb_join *memb_join = (struct memb_join *)memb_join_data;
3329 char *addr;
3330 unsigned int addr_idx;
3331 size_t msg_len;
3332
3337 memb_join->header.nodeid = instance->my_id.nodeid;
3338 assert (memb_join->header.nodeid);
3339
3340 msg_len = sizeof(struct memb_join) +
3341 ((instance->my_proc_list_entries + instance->my_failed_list_entries) * sizeof(struct srp_addr));
3342
3343 if (msg_len > sizeof(memb_join_data)) {
3345 "memb_join_message too long. Ignoring message.");
3346
3347 return ;
3348 }
3349
3350 memb_join->ring_seq = instance->my_ring_id.seq;
3353 memb_join->system_from = instance->my_id;
3354
3355 /*
3356 * This mess adds the joined and failed processor lists into the join
3357 * message
3358 */
3359 addr = (char *)memb_join;
3360 addr_idx = sizeof (struct memb_join);
3361 memcpy (&addr[addr_idx],
3362 instance->my_proc_list,
3363 instance->my_proc_list_entries *
3364 sizeof (struct srp_addr));
3365 addr_idx +=
3366 instance->my_proc_list_entries *
3367 sizeof (struct srp_addr);
3368 memcpy (&addr[addr_idx],
3369 instance->my_failed_list,
3370 instance->my_failed_list_entries *
3371 sizeof (struct srp_addr));
3372 addr_idx +=
3373 instance->my_failed_list_entries *
3374 sizeof (struct srp_addr);
3375
3376 if (instance->totem_config->send_join_timeout) {
3377 // coverity[DC.WEAK_CRYPTO:SUPPRESS] random is not used in a security context
3378 usleep (random() % (instance->totem_config->send_join_timeout * 1000));
3379 }
3380
3381 instance->stats.memb_join_tx++;
3382
3384 instance->totemnet_context,
3385 memb_join,
3386 addr_idx);
3387}
3388
3389static void memb_leave_message_send (struct totemsrp_instance *instance)
3390{
3391 char memb_join_data[40000];
3392 struct memb_join *memb_join = (struct memb_join *)memb_join_data;
3393 char *addr;
3394 unsigned int addr_idx;
3395 int active_memb_entries;
3396 struct srp_addr active_memb[PROCESSOR_COUNT_MAX];
3397 size_t msg_len;
3398
3400 "sending join/leave message");
3401
3402 /*
3403 * add us to the failed list, and remove us from
3404 * the members list
3405 */
3406 memb_set_merge(
3407 &instance->my_id, 1,
3408 instance->my_failed_list, &instance->my_failed_list_entries);
3409
3410 memb_set_subtract (active_memb, &active_memb_entries,
3411 instance->my_proc_list, instance->my_proc_list_entries,
3412 &instance->my_id, 1);
3413
3414 msg_len = sizeof(struct memb_join) +
3415 ((active_memb_entries + instance->my_failed_list_entries) * sizeof(struct srp_addr));
3416
3417 if (msg_len > sizeof(memb_join_data)) {
3419 "memb_leave message too long. Ignoring message.");
3420
3421 return ;
3422 }
3423
3429
3430 memb_join->ring_seq = instance->my_ring_id.seq;
3431 memb_join->proc_list_entries = active_memb_entries;
3433 memb_join->system_from = instance->my_id;
3434
3435 // TODO: CC Maybe use the actual join send routine.
3436 /*
3437 * This mess adds the joined and failed processor lists into the join
3438 * message
3439 */
3440 addr = (char *)memb_join;
3441 addr_idx = sizeof (struct memb_join);
3442 memcpy (&addr[addr_idx],
3443 active_memb,
3444 active_memb_entries *
3445 sizeof (struct srp_addr));
3446 addr_idx +=
3447 active_memb_entries *
3448 sizeof (struct srp_addr);
3449 memcpy (&addr[addr_idx],
3450 instance->my_failed_list,
3451 instance->my_failed_list_entries *
3452 sizeof (struct srp_addr));
3453 addr_idx +=
3454 instance->my_failed_list_entries *
3455 sizeof (struct srp_addr);
3456
3457
3458 if (instance->totem_config->send_join_timeout) {
3459 // coverity[DC.WEAK_CRYPTO:SUPPRESS] random is not used in a security context
3460 usleep (random() % (instance->totem_config->send_join_timeout * 1000));
3461 }
3462 instance->stats.memb_join_tx++;
3463
3465 instance->totemnet_context,
3466 memb_join,
3467 addr_idx);
3468}
3469
3470static void memb_merge_detect_transmit (struct totemsrp_instance *instance)
3471{
3473
3480 memcpy (&memb_merge_detect.ring_id, &instance->my_ring_id,
3481 sizeof (struct memb_ring_id));
3483
3484 instance->stats.memb_merge_detect_tx++;
3487 sizeof (struct memb_merge_detect));
3488}
3489
3490static void memb_ring_id_set (
3491 struct totemsrp_instance *instance,
3492 const struct memb_ring_id *ring_id)
3493{
3494
3495 memcpy (&instance->my_ring_id, ring_id, sizeof (struct memb_ring_id));
3496}
3497
3499 void *srp_context,
3500 void **handle_out,
3502 int delete,
3503 int (*callback_fn) (enum totem_callback_token_type type, const void *),
3504 const void *data)
3505{
3506 struct totemsrp_instance *instance = (struct totemsrp_instance *)srp_context;
3507 struct token_callback_instance *callback_handle;
3508
3509 token_hold_cancel_send (instance);
3510
3511 callback_handle = malloc (sizeof (struct token_callback_instance));
3512 if (callback_handle == 0) {
3513 return (-1);
3514 }
3515 *handle_out = (void *)callback_handle;
3516 qb_list_init (&callback_handle->list);
3517 callback_handle->callback_fn = callback_fn;
3518 callback_handle->data = (void *) data;
3519 callback_handle->callback_type = type;
3520 callback_handle->delete = delete;
3521 switch (type) {
3523 qb_list_add (&callback_handle->list, &instance->token_callback_received_listhead);
3524 break;
3526 qb_list_add (&callback_handle->list, &instance->token_callback_sent_listhead);
3527 break;
3528 }
3529
3530 return (0);
3531}
3532
3533void totemsrp_callback_token_destroy (void *srp_context, void **handle_out)
3534{
3535 struct token_callback_instance *h;
3536
3537 if (*handle_out) {
3538 h = (struct token_callback_instance *)*handle_out;
3539 qb_list_del (&h->list);
3540 free (h);
3541 h = NULL;
3542 *handle_out = 0;
3543 }
3544}
3545
3546static void token_callbacks_execute (
3547 struct totemsrp_instance *instance,
3549{
3550 struct qb_list_head *list, *tmp_iter;
3551 struct qb_list_head *callback_listhead = 0;
3553 int res;
3554 int del;
3555
3556 switch (type) {
3558 callback_listhead = &instance->token_callback_received_listhead;
3559 break;
3561 callback_listhead = &instance->token_callback_sent_listhead;
3562 break;
3563 default:
3564 assert (0);
3565 }
3566
3567 qb_list_for_each_safe(list, tmp_iter, callback_listhead) {
3568 token_callback_instance = qb_list_entry (list, struct token_callback_instance, list);
3570 if (del == 1) {
3571 qb_list_del (list);
3572 }
3573
3577 /*
3578 * This callback failed to execute, try it again on the next token
3579 */
3580 if (res == -1 && del == 1) {
3581 qb_list_add (list, callback_listhead);
3582 } else if (del) {
3584 }
3585 }
3586}
3587
3588/*
3589 * Flow control functions
3590 */
3591static unsigned int backlog_get (struct totemsrp_instance *instance)
3592{
3593 unsigned int backlog = 0;
3594 struct cs_queue *queue_use = NULL;
3595
3596 if (instance->memb_state == MEMB_STATE_OPERATIONAL) {
3597 if (instance->waiting_trans_ack) {
3598 queue_use = &instance->new_message_queue_trans;
3599 } else {
3600 queue_use = &instance->new_message_queue;
3601 }
3602 } else
3603 if (instance->memb_state == MEMB_STATE_RECOVERY) {
3604 queue_use = &instance->retrans_message_queue;
3605 }
3606
3607 if (queue_use != NULL) {
3608 backlog = cs_queue_used (queue_use);
3609 }
3610
3611 instance->stats.token[instance->stats.latest_token].backlog_calc = backlog;
3612 return (backlog);
3613}
3614
3615static int fcc_calculate (
3616 struct totemsrp_instance *instance,
3617 struct orf_token *token)
3618{
3619 unsigned int transmits_allowed;
3620 unsigned int backlog_calc;
3621
3622 transmits_allowed = instance->totem_config->max_messages;
3623
3624 if (transmits_allowed > instance->totem_config->window_size - token->fcc) {
3625 transmits_allowed = instance->totem_config->window_size - token->fcc;
3626 }
3627
3628 instance->my_cbl = backlog_get (instance);
3629
3630 /*
3631 * Only do backlog calculation if there is a backlog otherwise
3632 * we would result in div by zero
3633 */
3634 if (token->backlog + instance->my_cbl - instance->my_pbl) {
3635 backlog_calc = (instance->totem_config->window_size * instance->my_pbl) /
3636 (token->backlog + instance->my_cbl - instance->my_pbl);
3637 if (backlog_calc > 0 && transmits_allowed > backlog_calc) {
3638 transmits_allowed = backlog_calc;
3639 }
3640 }
3641
3642 return (transmits_allowed);
3643}
3644
3645/*
3646 * don't overflow the RTR sort queue
3647 */
3648static void fcc_rtr_limit (
3649 struct totemsrp_instance *instance,
3650 struct orf_token *token,
3651 unsigned int *transmits_allowed)
3652{
3653 int check = QUEUE_RTR_ITEMS_SIZE_MAX;
3654 check -= (*transmits_allowed + instance->totem_config->window_size);
3655 assert (check >= 0);
3656 if (sq_lt_compare (instance->last_released +
3657 QUEUE_RTR_ITEMS_SIZE_MAX - *transmits_allowed -
3658 instance->totem_config->window_size,
3659
3660 token->seq)) {
3661
3662 *transmits_allowed = 0;
3663 }
3664}
3665
3666static void fcc_token_update (
3667 struct totemsrp_instance *instance,
3668 struct orf_token *token,
3669 unsigned int msgs_transmitted)
3670{
3671 token->fcc += msgs_transmitted - instance->my_trc;
3672 token->backlog += instance->my_cbl - instance->my_pbl;
3673 instance->my_trc = msgs_transmitted;
3674 instance->my_pbl = instance->my_cbl;
3675}
3676
3677/*
3678 * Sanity checkers
3679 */
3680static int check_orf_token_sanity(
3681 const struct totemsrp_instance *instance,
3682 const void *msg,
3683 size_t msg_len,
3684 size_t max_msg_len,
3685 int endian_conversion_needed)
3686{
3687 int rtr_entries;
3688 const struct orf_token *token = (const struct orf_token *)msg;
3689 size_t required_len;
3690
3691 if (msg_len > max_msg_len) {
3693 "Received orf_token message is too long... ignoring.");
3694
3695 return (-1);
3696 }
3697
3698 if (msg_len < sizeof(struct orf_token)) {
3700 "Received orf_token message is too short... ignoring.");
3701
3702 return (-1);
3703 }
3704
3705 if (endian_conversion_needed) {
3706 rtr_entries = swab32(token->rtr_list_entries);
3707 } else {
3708 rtr_entries = token->rtr_list_entries;
3709 }
3710
3711 if (rtr_entries > RETRANSMIT_ENTRIES_MAX) {
3713 "Received orf_token message rtr_entries is corrupted... ignoring.");
3714
3715 return (-1);
3716 }
3717
3718 required_len = sizeof(struct orf_token) + rtr_entries * sizeof(struct rtr_item);
3719 if (msg_len < required_len) {
3721 "Received orf_token message is too short... ignoring.");
3722
3723 return (-1);
3724 }
3725
3726 return (0);
3727}
3728
3729static int check_mcast_sanity(
3730 struct totemsrp_instance *instance,
3731 const void *msg,
3732 size_t msg_len,
3733 int endian_conversion_needed)
3734{
3735
3736 if (msg_len < sizeof(struct mcast)) {
3738 "Received mcast message is too short... ignoring.");
3739
3740 return (-1);
3741 }
3742
3743 return (0);
3744}
3745
3746static int check_memb_merge_detect_sanity(
3747 struct totemsrp_instance *instance,
3748 const void *msg,
3749 size_t msg_len,
3750 int endian_conversion_needed)
3751{
3752
3753 if (msg_len < sizeof(struct memb_merge_detect)) {
3755 "Received memb_merge_detect message is too short... ignoring.");
3756
3757 return (-1);
3758 }
3759
3760 return (0);
3761}
3762
3763static int check_memb_join_sanity(
3764 struct totemsrp_instance *instance,
3765 const void *msg,
3766 size_t msg_len,
3767 int endian_conversion_needed)
3768{
3769 const struct memb_join *mj_msg = (const struct memb_join *)msg;
3770 unsigned int proc_list_entries;
3771 unsigned int failed_list_entries;
3772 size_t required_len;
3773
3774 if (msg_len < sizeof(struct memb_join)) {
3776 "Received memb_join message is too short... ignoring.");
3777
3778 return (-1);
3779 }
3780
3783
3784 if (endian_conversion_needed) {
3787 }
3788
3789 required_len = sizeof(struct memb_join) + ((proc_list_entries + failed_list_entries) * sizeof(struct srp_addr));
3790 if (msg_len < required_len) {
3792 "Received memb_join message is too short... ignoring.");
3793
3794 return (-1);
3795 }
3796
3797 return (0);
3798}
3799
3800static int check_memb_commit_token_sanity(
3801 struct totemsrp_instance *instance,
3802 const void *msg,
3803 size_t msg_len,
3804 int endian_conversion_needed)
3805{
3806 const struct memb_commit_token *mct_msg = (const struct memb_commit_token *)msg;
3807 unsigned int addr_entries;
3808 size_t required_len;
3809
3810 if (msg_len < sizeof(struct memb_commit_token)) {
3812 "Received memb_commit_token message is too short... ignoring.");
3813
3814 return (0);
3815 }
3816
3817 addr_entries= mct_msg->addr_entries;
3818 if (endian_conversion_needed) {
3820 }
3821
3822 required_len = sizeof(struct memb_commit_token) +
3823 (addr_entries * (sizeof(struct srp_addr) + sizeof(struct memb_commit_token_memb_entry)));
3824 if (msg_len < required_len) {
3826 "Received memb_commit_token message is too short... ignoring.");
3827
3828 return (-1);
3829 }
3830
3831 return (0);
3832}
3833
3834static int check_token_hold_cancel_sanity(
3835 struct totemsrp_instance *instance,
3836 const void *msg,
3837 size_t msg_len,
3838 int endian_conversion_needed)
3839{
3840
3841 if (msg_len < sizeof(struct token_hold_cancel)) {
3843 "Received token_hold_cancel message is too short... ignoring.");
3844
3845 return (-1);
3846 }
3847
3848 return (0);
3849}
3850
3851/*
3852 * Message Handlers
3853 */
3854
3855#ifdef GIVEINFO
3856uint64_t tv_old;
3857#endif
3858/*
3859 * message handler called when TOKEN message type received
3860 */
3861static int message_handler_orf_token (
3862 struct totemsrp_instance *instance,
3863 const void *msg,
3864 size_t msg_len,
3865 int endian_conversion_needed)
3866{
3867 char token_storage[1500];
3868 char token_convert[1500];
3869 struct orf_token *token = NULL;
3870 int forward_token;
3871 unsigned int transmits_allowed;
3872 unsigned int mcasted_retransmit;
3873 unsigned int mcasted_regular;
3874 unsigned int last_aru;
3875
3876#ifdef GIVEINFO
3877 uint64_t tv_current;
3878 uint64_t tv_diff;
3879
3880 tv_current = qb_util_nano_current_get ();
3881 tv_diff = tv_current - tv_old;
3882 tv_old = tv_current;
3883
3885 "Time since last token %0.4f ms", tv_diff / (float)QB_TIME_NS_IN_MSEC);
3886#endif
3887
3888 if (check_orf_token_sanity(instance, msg, msg_len, sizeof(token_storage),
3889 endian_conversion_needed) == -1) {
3890 return (0);
3891 }
3892
3893 if (instance->orf_token_discard) {
3894 return (0);
3895 }
3896#ifdef TEST_DROP_ORF_TOKEN_PERCENTAGE
3897 if (random()%100 < TEST_DROP_ORF_TOKEN_PERCENTAGE) {
3898 return (0);
3899 }
3900#endif
3901
3902 if (endian_conversion_needed) {
3903 orf_token_endian_convert ((struct orf_token *)msg,
3904 (struct orf_token *)token_convert);
3905 msg = (struct orf_token *)token_convert;
3906 }
3907
3908 /*
3909 * Make copy of token and retransmit list in case we have
3910 * to flush incoming messages from the kernel queue
3911 */
3912 token = (struct orf_token *)token_storage;
3913 memcpy (token, msg, sizeof (struct orf_token));
3914 memcpy (&token->rtr_list[0], (char *)msg + sizeof (struct orf_token),
3915 sizeof (struct rtr_item) * RETRANSMIT_ENTRIES_MAX);
3916
3917
3918 /*
3919 * Handle merge detection timeout
3920 */
3921 if (token->seq == instance->my_last_seq) {
3922 start_merge_detect_timeout (instance);
3923 instance->my_seq_unchanged += 1;
3924 } else {
3925 cancel_merge_detect_timeout (instance);
3926 cancel_token_hold_retransmit_timeout (instance);
3927 instance->my_seq_unchanged = 0;
3928 }
3929
3930 instance->my_last_seq = token->seq;
3931
3932#ifdef TEST_RECOVERY_MSG_COUNT
3933 if (instance->memb_state == MEMB_STATE_OPERATIONAL && token->seq > TEST_RECOVERY_MSG_COUNT) {
3934 return (0);
3935 }
3936#endif
3937 instance->flushing = 1;
3939 instance->flushing = 0;
3940
3941 /*
3942 * Determine if we should hold (in reality drop) the token
3943 */
3944 instance->my_token_held = 0;
3945 if (instance->my_ring_id.rep == instance->my_id.nodeid &&
3946 instance->my_seq_unchanged > instance->totem_config->seqno_unchanged_const) {
3947 instance->my_token_held = 1;
3948 } else {
3949 if (instance->my_ring_id.rep != instance->my_id.nodeid &&
3950 instance->my_seq_unchanged >= instance->totem_config->seqno_unchanged_const) {
3951 instance->my_token_held = 1;
3952 }
3953 }
3954
3955 /*
3956 * Hold onto token when there is no activity on ring and
3957 * this processor is the ring rep
3958 */
3959 forward_token = 1;
3960 if (instance->my_ring_id.rep == instance->my_id.nodeid) {
3961 if (instance->my_token_held) {
3962 forward_token = 0;
3963 }
3964 }
3965
3966 switch (instance->memb_state) {
3967 case MEMB_STATE_COMMIT:
3968 /* Discard token */
3969 break;
3970
3972 messages_free (instance, token->aru);
3973 /*
3974 * Do NOT add break, this case should also execute code in gather case.
3975 */
3976
3977 case MEMB_STATE_GATHER:
3978 /*
3979 * DO NOT add break, we use different free mechanism in recovery state
3980 */
3981
3983 /*
3984 * Discard tokens from another configuration
3985 */
3986 if (memcmp (&token->ring_id, &instance->my_ring_id,
3987 sizeof (struct memb_ring_id)) != 0) {
3988
3989 if ((forward_token)
3990 && instance->use_heartbeat) {
3991 reset_heartbeat_timeout(instance);
3992 }
3993 else {
3994 cancel_heartbeat_timeout(instance);
3995 }
3996
3997 return (0); /* discard token */
3998 }
3999
4000 /*
4001 * Discard retransmitted tokens
4002 */
4003 if (sq_lte_compare (token->token_seq, instance->my_token_seq)) {
4004 return (0); /* discard token */
4005 }
4006
4007 /*
4008 * Token is valid so trigger callbacks
4009 */
4010 token_callbacks_execute (instance, TOTEM_CALLBACK_TOKEN_RECEIVED);
4011
4012 last_aru = instance->my_last_aru;
4013 instance->my_last_aru = token->aru;
4014
4015 transmits_allowed = fcc_calculate (instance, token);
4016 mcasted_retransmit = orf_token_rtr (instance, token, &transmits_allowed);
4017
4019 instance->my_token_held == 1 &&
4020 (token->rtr_list_entries > 0 || mcasted_retransmit > 0)) {
4021 instance->my_token_held = 0;
4022 forward_token = 1;
4023 }
4024
4025 fcc_rtr_limit (instance, token, &transmits_allowed);
4026 mcasted_regular = orf_token_mcast (instance, token, transmits_allowed);
4027/*
4028if (mcasted_regular) {
4029printf ("mcasted regular %d\n", mcasted_regular);
4030printf ("token seq %d\n", token->seq);
4031}
4032*/
4033 fcc_token_update (instance, token, mcasted_retransmit +
4034 mcasted_regular);
4035
4036 if (sq_lt_compare (instance->my_aru, token->aru) ||
4037 instance->my_id.nodeid == token->aru_addr ||
4038 token->aru_addr == 0) {
4039
4040 token->aru = instance->my_aru;
4041 if (token->aru == token->seq) {
4042 token->aru_addr = 0;
4043 } else {
4044 token->aru_addr = instance->my_id.nodeid;
4045 }
4046 }
4047 if (token->aru == last_aru && token->aru_addr != 0) {
4048 instance->my_aru_count += 1;
4049 } else {
4050 instance->my_aru_count = 0;
4051 }
4052
4053 /*
4054 * We really don't follow specification there. In specification, OTHER nodes
4055 * detect failure of one node (based on aru_count) and my_id IS NEVER added
4056 * to failed list (so node never mark itself as failed)
4057 */
4058 if (instance->my_aru_count > instance->totem_config->fail_to_recv_const &&
4059 token->aru_addr == instance->my_id.nodeid) {
4060
4062 "FAILED TO RECEIVE");
4063
4064 instance->failed_to_recv = 1;
4065
4066 memb_set_merge (&instance->my_id, 1,
4067 instance->my_failed_list,
4068 &instance->my_failed_list_entries);
4069
4070 memb_state_gather_enter (instance, TOTEMSRP_GSFROM_FAILED_TO_RECEIVE);
4071 } else {
4072 instance->my_token_seq = token->token_seq;
4073 token->token_seq += 1;
4074
4075 if (instance->memb_state == MEMB_STATE_RECOVERY) {
4076 /*
4077 * instance->my_aru == instance->my_high_seq_received means this processor
4078 * has recovered all messages it can recover
4079 * (ie: its retrans queue is empty)
4080 */
4081 if (cs_queue_is_empty (&instance->retrans_message_queue) == 0) {
4082
4083 if (token->retrans_flg == 0) {
4084 token->retrans_flg = 1;
4085 instance->my_set_retrans_flg = 1;
4086 }
4087 } else
4088 if (token->retrans_flg == 1 && instance->my_set_retrans_flg) {
4089 token->retrans_flg = 0;
4090 instance->my_set_retrans_flg = 0;
4091 }
4093 "token retrans flag is %d my set retrans flag%d retrans queue empty %d count %d, aru %x",
4094 token->retrans_flg, instance->my_set_retrans_flg,
4095 cs_queue_is_empty (&instance->retrans_message_queue),
4096 instance->my_retrans_flg_count, token->aru);
4097 if (token->retrans_flg == 0) {
4098 instance->my_retrans_flg_count += 1;
4099 } else {
4100 instance->my_retrans_flg_count = 0;
4101 }
4102 if (instance->my_retrans_flg_count == 2) {
4103 instance->my_install_seq = token->seq;
4104 }
4106 "install seq %x aru %x high seq received %x",
4107 instance->my_install_seq, instance->my_aru, instance->my_high_seq_received);
4108 if (instance->my_retrans_flg_count >= 2 &&
4109 instance->my_received_flg == 0 &&
4110 sq_lte_compare (instance->my_install_seq, instance->my_aru)) {
4111 instance->my_received_flg = 1;
4112 instance->my_deliver_memb_entries = instance->my_trans_memb_entries;
4113 memcpy (instance->my_deliver_memb_list, instance->my_trans_memb_list,
4114 sizeof (struct totem_ip_address) * instance->my_trans_memb_entries);
4115 }
4116 if (instance->my_retrans_flg_count >= 3 &&
4117 sq_lte_compare (instance->my_install_seq, token->aru)) {
4118 instance->my_rotation_counter += 1;
4119 } else {
4120 instance->my_rotation_counter = 0;
4121 }
4122 if (instance->my_rotation_counter == 2) {
4124 "retrans flag count %x token aru %x install seq %x aru %x %x",
4125 instance->my_retrans_flg_count, token->aru, instance->my_install_seq,
4126 instance->my_aru, token->seq);
4127
4128 memb_state_operational_enter (instance);
4129 instance->my_rotation_counter = 0;
4130 instance->my_retrans_flg_count = 0;
4131 }
4132 }
4133
4135 token_send (instance, token, forward_token);
4136
4137#ifdef GIVEINFO
4138 tv_current = qb_util_nano_current_get ();
4139 tv_diff = tv_current - tv_old;
4140 tv_old = tv_current;
4142 "I held %0.4f ms",
4143 tv_diff / (float)QB_TIME_NS_IN_MSEC);
4144#endif
4145 if (instance->memb_state == MEMB_STATE_OPERATIONAL) {
4146 messages_deliver_to_app (instance, 0,
4147 instance->my_high_seq_received);
4148 }
4149
4150 /*
4151 * Deliver messages after token has been transmitted
4152 * to improve performance
4153 */
4154 reset_token_timeout (instance); // REVIEWED
4155 reset_token_retransmit_timeout (instance); // REVIEWED
4156 if (instance->my_id.nodeid == instance->my_ring_id.rep &&
4157 instance->my_token_held == 1) {
4158
4159 start_token_hold_retransmit_timeout (instance);
4160 }
4161
4162 token_callbacks_execute (instance, TOTEM_CALLBACK_TOKEN_SENT);
4163 }
4164 break;
4165 }
4166
4167 if ((forward_token)
4168 && instance->use_heartbeat) {
4169 reset_heartbeat_timeout(instance);
4170 }
4171 else {
4172 cancel_heartbeat_timeout(instance);
4173 }
4174
4175 return (0);
4176}
4177
4178static void messages_deliver_to_app (
4179 struct totemsrp_instance *instance,
4180 int skip,
4181 unsigned int end_point)
4182{
4183 struct sort_queue_item *sort_queue_item_p;
4184 unsigned int i;
4185 int res;
4186 struct mcast *mcast_in;
4187 struct mcast mcast_header;
4188 unsigned int range = 0;
4189 int endian_conversion_required;
4190 unsigned int my_high_delivered_stored = 0;
4191 struct srp_addr aligned_system_from;
4192
4193 range = end_point - instance->my_high_delivered;
4194
4195 if (range) {
4197 "Delivering %x to %x", instance->my_high_delivered,
4198 end_point);
4199 }
4200 assert (range < QUEUE_RTR_ITEMS_SIZE_MAX);
4201 my_high_delivered_stored = instance->my_high_delivered;
4202
4203 /*
4204 * Deliver messages in order from rtr queue to pending delivery queue
4205 */
4206 for (i = 1; i <= range; i++) {
4207
4208 void *ptr = 0;
4209
4210 /*
4211 * If out of range of sort queue, stop assembly
4212 */
4213 res = sq_in_range (&instance->regular_sort_queue,
4214 my_high_delivered_stored + i);
4215 if (res == 0) {
4216 break;
4217 }
4218
4219 res = sq_item_get (&instance->regular_sort_queue,
4220 my_high_delivered_stored + i, &ptr);
4221 /*
4222 * If hole, stop assembly
4223 */
4224 if (res != 0 && skip == 0) {
4225 break;
4226 }
4227
4228 instance->my_high_delivered = my_high_delivered_stored + i;
4229
4230 if (res != 0) {
4231 continue;
4232
4233 }
4234
4235 sort_queue_item_p = ptr;
4236
4237 mcast_in = sort_queue_item_p->mcast;
4238 assert (mcast_in != (struct mcast *)0xdeadbeef);
4239
4240 endian_conversion_required = 0;
4241 if (mcast_in->header.magic != TOTEM_MH_MAGIC) {
4242 endian_conversion_required = 1;
4243 mcast_endian_convert (mcast_in, &mcast_header);
4244 } else {
4245 memcpy (&mcast_header, mcast_in, sizeof (struct mcast));
4246 }
4247
4248 aligned_system_from = mcast_header.system_from;
4249
4250 /*
4251 * Skip messages not originated in instance->my_deliver_memb
4252 */
4253 if (skip &&
4254 memb_set_subset (&aligned_system_from,
4255 1,
4256 instance->my_deliver_memb_list,
4257 instance->my_deliver_memb_entries) == 0) {
4258
4259 instance->my_high_delivered = my_high_delivered_stored + i;
4260
4261 continue;
4262 }
4263
4264 /*
4265 * Message found
4266 */
4268 "Delivering MCAST message with seq %x to pending delivery queue",
4269 mcast_header.seq);
4270
4271 /*
4272 * Message is locally originated multicast
4273 */
4274 instance->totemsrp_deliver_fn (
4275 mcast_header.header.nodeid,
4276 ((char *)sort_queue_item_p->mcast) + sizeof (struct mcast),
4277 sort_queue_item_p->msg_len - sizeof (struct mcast),
4278 endian_conversion_required);
4279 }
4280}
4281
4282/*
4283 * recv message handler called when MCAST message type received
4284 */
4285static int message_handler_mcast (
4286 struct totemsrp_instance *instance,
4287 const void *msg,
4288 size_t msg_len,
4289 int endian_conversion_needed)
4290{
4292 struct sq *sort_queue;
4293 struct mcast mcast_header;
4294 struct srp_addr aligned_system_from;
4295
4296 if (check_mcast_sanity(instance, msg, msg_len, endian_conversion_needed) == -1) {
4297 return (0);
4298 }
4299
4300 if (endian_conversion_needed) {
4301 mcast_endian_convert (msg, &mcast_header);
4302 } else {
4303 memcpy (&mcast_header, msg, sizeof (struct mcast));
4304 }
4305
4306 if (mcast_header.header.encapsulated == MESSAGE_ENCAPSULATED) {
4307 sort_queue = &instance->recovery_sort_queue;
4308 } else {
4309 sort_queue = &instance->regular_sort_queue;
4310 }
4311
4312 assert (msg_len <= FRAME_SIZE_MAX);
4313
4314#ifdef TEST_DROP_MCAST_PERCENTAGE
4315 if (random()%100 < TEST_DROP_MCAST_PERCENTAGE) {
4316 return (0);
4317 }
4318#endif
4319
4320 /*
4321 * If the message is foreign execute the switch below
4322 */
4323 if (memcmp (&instance->my_ring_id, &mcast_header.ring_id,
4324 sizeof (struct memb_ring_id)) != 0) {
4325
4326 aligned_system_from = mcast_header.system_from;
4327
4328 switch (instance->memb_state) {
4330 memb_set_merge (
4331 &aligned_system_from, 1,
4332 instance->my_proc_list, &instance->my_proc_list_entries);
4333 memb_state_gather_enter (instance, TOTEMSRP_GSFROM_FOREIGN_MESSAGE_IN_OPERATIONAL_STATE);
4334 break;
4335
4336 case MEMB_STATE_GATHER:
4337 if (!memb_set_subset (
4338 &aligned_system_from,
4339 1,
4340 instance->my_proc_list,
4341 instance->my_proc_list_entries)) {
4342
4343 memb_set_merge (&aligned_system_from, 1,
4344 instance->my_proc_list, &instance->my_proc_list_entries);
4345 memb_state_gather_enter (instance, TOTEMSRP_GSFROM_FOREIGN_MESSAGE_IN_GATHER_STATE);
4346 return (0);
4347 }
4348 break;
4349
4350 case MEMB_STATE_COMMIT:
4351 /* discard message */
4352 instance->stats.rx_msg_dropped++;
4353 break;
4354
4356 /* discard message */
4357 instance->stats.rx_msg_dropped++;
4358 break;
4359 }
4360 return (0);
4361 }
4362
4364 "Received ringid (" CS_PRI_RING_ID ") seq %x",
4365 mcast_header.ring_id.rep,
4366 (uint64_t)mcast_header.ring_id.seq,
4367 mcast_header.seq);
4368
4369 /*
4370 * Add mcast message to rtr queue if not already in rtr queue
4371 * otherwise free io vectors
4372 */
4373 if (msg_len > 0 && msg_len <= FRAME_SIZE_MAX &&
4374 sq_in_range (sort_queue, mcast_header.seq) &&
4375 sq_item_inuse (sort_queue, mcast_header.seq) == 0) {
4376
4377 /*
4378 * Allocate new multicast memory block
4379 */
4380// TODO LEAK
4381 sort_queue_item.mcast = totemsrp_buffer_alloc (instance);
4382 if (sort_queue_item.mcast == NULL) {
4383 return (-1); /* error here is corrected by the algorithm */
4384 }
4385 memcpy (sort_queue_item.mcast, msg, msg_len);
4386 sort_queue_item.msg_len = msg_len;
4387
4388 if (sq_lt_compare (instance->my_high_seq_received,
4389 mcast_header.seq)) {
4390 instance->my_high_seq_received = mcast_header.seq;
4391 }
4392
4393 sq_item_add (sort_queue, &sort_queue_item, mcast_header.seq);
4394 }
4395
4396 update_aru (instance);
4397 if (instance->memb_state == MEMB_STATE_OPERATIONAL) {
4398 messages_deliver_to_app (instance, 0, instance->my_high_seq_received);
4399 }
4400
4401/* TODO remove from retrans message queue for old ring in recovery state */
4402 return (0);
4403}
4404
4405static int message_handler_memb_merge_detect (
4406 struct totemsrp_instance *instance,
4407 const void *msg,
4408 size_t msg_len,
4409 int endian_conversion_needed)
4410{
4412 struct srp_addr aligned_system_from;
4413
4414 if (check_memb_merge_detect_sanity(instance, msg, msg_len, endian_conversion_needed) == -1) {
4415 return (0);
4416 }
4417
4418 if (endian_conversion_needed) {
4419 memb_merge_detect_endian_convert (msg, &memb_merge_detect);
4420 } else {
4421 memcpy (&memb_merge_detect, msg,
4422 sizeof (struct memb_merge_detect));
4423 }
4424
4425 /*
4426 * do nothing if this is a merge detect from this configuration
4427 */
4428 if (memcmp (&instance->my_ring_id, &memb_merge_detect.ring_id,
4429 sizeof (struct memb_ring_id)) == 0) {
4430
4431 return (0);
4432 }
4433
4434 aligned_system_from = memb_merge_detect.system_from;
4435
4436 /*
4437 * Execute merge operation
4438 */
4439 switch (instance->memb_state) {
4441 memb_set_merge (&aligned_system_from, 1,
4442 instance->my_proc_list, &instance->my_proc_list_entries);
4443 memb_state_gather_enter (instance, TOTEMSRP_GSFROM_MERGE_DURING_OPERATIONAL_STATE);
4444 break;
4445
4446 case MEMB_STATE_GATHER:
4447 if (!memb_set_subset (
4448 &aligned_system_from,
4449 1,
4450 instance->my_proc_list,
4451 instance->my_proc_list_entries)) {
4452
4453 memb_set_merge (&aligned_system_from, 1,
4454 instance->my_proc_list, &instance->my_proc_list_entries);
4455 memb_state_gather_enter (instance, TOTEMSRP_GSFROM_MERGE_DURING_GATHER_STATE);
4456 return (0);
4457 }
4458 break;
4459
4460 case MEMB_STATE_COMMIT:
4461 /* do nothing in commit */
4462 break;
4463
4465 /* do nothing in recovery */
4466 break;
4467 }
4468 return (0);
4469}
4470
4471static void memb_join_process (
4472 struct totemsrp_instance *instance,
4473 const struct memb_join *memb_join)
4474{
4475 struct srp_addr *proc_list;
4476 struct srp_addr *failed_list;
4477 int gather_entered = 0;
4478 int fail_minus_memb_entries = 0;
4479 struct srp_addr fail_minus_memb[PROCESSOR_COUNT_MAX];
4480 struct srp_addr aligned_system_from;
4481
4482 proc_list = (struct srp_addr *)memb_join->end_of_memb_join;
4483 failed_list = proc_list + memb_join->proc_list_entries;
4484 aligned_system_from = memb_join->system_from;
4485
4486 log_printf(instance->totemsrp_log_level_trace, "memb_join_process");
4487 memb_set_log(instance, instance->totemsrp_log_level_trace,
4488 "proclist", proc_list, memb_join->proc_list_entries);
4489 memb_set_log(instance, instance->totemsrp_log_level_trace,
4490 "faillist", failed_list, memb_join->failed_list_entries);
4491 memb_set_log(instance, instance->totemsrp_log_level_trace,
4492 "my_proclist", instance->my_proc_list, instance->my_proc_list_entries);
4493 memb_set_log(instance, instance->totemsrp_log_level_trace,
4494 "my_faillist", instance->my_failed_list, instance->my_failed_list_entries);
4495
4497 if (instance->flushing) {
4500 "Discarding LEAVE message during flush, nodeid=" CS_PRI_NODE_ID,
4502 if (memb_join->failed_list_entries > 0) {
4503 my_leave_memb_set(instance, failed_list[memb_join->failed_list_entries - 1 ].nodeid);
4504 }
4505 } else {
4507 "Discarding JOIN message during flush, nodeid=" CS_PRI_NODE_ID, memb_join->header.nodeid);
4508 }
4509 return;
4510 } else {
4513 "Received LEAVE message from " CS_PRI_NODE_ID, memb_join->failed_list_entries > 0 ? failed_list[memb_join->failed_list_entries - 1 ].nodeid : LEAVE_DUMMY_NODEID);
4514 if (memb_join->failed_list_entries > 0) {
4515 my_leave_memb_set(instance, failed_list[memb_join->failed_list_entries - 1 ].nodeid);
4516 }
4517 }
4518 }
4519
4520 }
4521
4522 if (memb_set_equal (proc_list,
4524 instance->my_proc_list,
4525 instance->my_proc_list_entries) &&
4526
4527 memb_set_equal (failed_list,
4529 instance->my_failed_list,
4530 instance->my_failed_list_entries)) {
4531
4533 memb_consensus_set (instance, &aligned_system_from);
4534 }
4535
4536 if (memb_consensus_agreed (instance) && instance->failed_to_recv == 1) {
4537 instance->failed_to_recv = 0;
4538 instance->my_proc_list[0] = instance->my_id;
4539 instance->my_proc_list_entries = 1;
4540 instance->my_failed_list_entries = 0;
4541
4542 memb_state_commit_token_create (instance);
4543
4544 memb_state_commit_enter (instance);
4545 return;
4546 }
4547 if (memb_consensus_agreed (instance) &&
4548 memb_lowest_in_config (instance)) {
4549
4550 memb_state_commit_token_create (instance);
4551
4552 memb_state_commit_enter (instance);
4553 } else {
4554 goto out;
4555 }
4556 } else
4557 if (memb_set_subset (proc_list,
4559 instance->my_proc_list,
4560 instance->my_proc_list_entries) &&
4561
4562 memb_set_subset (failed_list,
4564 instance->my_failed_list,
4565 instance->my_failed_list_entries)) {
4566
4567 goto out;
4568 } else
4569 if (memb_set_subset (&aligned_system_from, 1,
4570 instance->my_failed_list, instance->my_failed_list_entries)) {
4571
4572 goto out;
4573 } else {
4574 memb_set_merge (proc_list,
4576 instance->my_proc_list, &instance->my_proc_list_entries);
4577
4578 if (memb_set_subset (
4579 &instance->my_id, 1,
4580 failed_list, memb_join->failed_list_entries)) {
4581
4582 memb_set_merge (
4583 &aligned_system_from, 1,
4584 instance->my_failed_list, &instance->my_failed_list_entries);
4585 } else {
4586 if (memb_set_subset (
4587 &aligned_system_from, 1,
4588 instance->my_memb_list,
4589 instance->my_memb_entries)) {
4590
4591 if (memb_set_subset (
4592 &aligned_system_from, 1,
4593 instance->my_failed_list,
4594 instance->my_failed_list_entries) == 0) {
4595
4596 memb_set_merge (failed_list,
4598 instance->my_failed_list, &instance->my_failed_list_entries);
4599 } else {
4600 memb_set_subtract (fail_minus_memb,
4601 &fail_minus_memb_entries,
4602 failed_list,
4604 instance->my_memb_list,
4605 instance->my_memb_entries);
4606
4607 memb_set_merge (fail_minus_memb,
4608 fail_minus_memb_entries,
4609 instance->my_failed_list,
4610 &instance->my_failed_list_entries);
4611 }
4612 }
4613 }
4614 memb_state_gather_enter (instance, TOTEMSRP_GSFROM_MERGE_DURING_JOIN);
4615 gather_entered = 1;
4616 }
4617
4618out:
4619 if (gather_entered == 0 &&
4620 instance->memb_state == MEMB_STATE_OPERATIONAL) {
4621
4622 memb_state_gather_enter (instance, TOTEMSRP_GSFROM_JOIN_DURING_OPERATIONAL_STATE);
4623 }
4624}
4625
4626static void memb_join_endian_convert (const struct memb_join *in, struct memb_join *out)
4627{
4628 int i;
4629 struct srp_addr *in_proc_list;
4630 struct srp_addr *in_failed_list;
4631 struct srp_addr *out_proc_list;
4632 struct srp_addr *out_failed_list;
4633
4636 out->header.type = in->header.type;
4637 out->header.nodeid = swab32 (in->header.nodeid);
4638 out->system_from = srp_addr_endian_convert(in->system_from);
4641 out->ring_seq = swab64 (in->ring_seq);
4642
4643 in_proc_list = (struct srp_addr *)in->end_of_memb_join;
4644 in_failed_list = in_proc_list + out->proc_list_entries;
4645 out_proc_list = (struct srp_addr *)out->end_of_memb_join;
4646 out_failed_list = out_proc_list + out->proc_list_entries;
4647
4648 for (i = 0; i < out->proc_list_entries; i++) {
4649 out_proc_list[i] = srp_addr_endian_convert (in_proc_list[i]);
4650 }
4651 for (i = 0; i < out->failed_list_entries; i++) {
4652 out_failed_list[i] = srp_addr_endian_convert (in_failed_list[i]);
4653 }
4654}
4655
4656static void memb_commit_token_endian_convert (const struct memb_commit_token *in, struct memb_commit_token *out)
4657{
4658 int i;
4659 struct srp_addr *in_addr = (struct srp_addr *)in->end_of_commit_token;
4660 struct srp_addr *out_addr = (struct srp_addr *)out->end_of_commit_token;
4661 struct memb_commit_token_memb_entry *in_memb_list;
4662 struct memb_commit_token_memb_entry *out_memb_list;
4663
4664 out->header.magic = TOTEM_MH_MAGIC;
4665 out->header.version = TOTEM_MH_VERSION;
4666 out->header.type = in->header.type;
4667 out->header.nodeid = swab32 (in->header.nodeid);
4668 out->token_seq = swab32 (in->token_seq);
4669 out->ring_id.rep = swab32(in->ring_id.rep);
4670 out->ring_id.seq = swab64 (in->ring_id.seq);
4671 out->retrans_flg = swab32 (in->retrans_flg);
4672 out->memb_index = swab32 (in->memb_index);
4673 out->addr_entries = swab32 (in->addr_entries);
4674
4675 in_memb_list = (struct memb_commit_token_memb_entry *)(in_addr + out->addr_entries);
4676 out_memb_list = (struct memb_commit_token_memb_entry *)(out_addr + out->addr_entries);
4677 for (i = 0; i < out->addr_entries; i++) {
4678 out_addr[i] = srp_addr_endian_convert (in_addr[i]);
4679
4680 /*
4681 * Only convert the memb entry if it has been set
4682 */
4683 if (in_memb_list[i].ring_id.rep != 0) {
4684 out_memb_list[i].ring_id.rep = swab32(in_memb_list[i].ring_id.rep);
4685
4686 out_memb_list[i].ring_id.seq =
4687 swab64 (in_memb_list[i].ring_id.seq);
4688 out_memb_list[i].aru = swab32 (in_memb_list[i].aru);
4689 out_memb_list[i].high_delivered = swab32 (in_memb_list[i].high_delivered);
4690 out_memb_list[i].received_flg = swab32 (in_memb_list[i].received_flg);
4691 }
4692 }
4693}
4694
4695static void orf_token_endian_convert (const struct orf_token *in, struct orf_token *out)
4696{
4697 int i;
4698
4701 out->header.type = in->header.type;
4702 out->header.nodeid = swab32 (in->header.nodeid);
4703 out->seq = swab32 (in->seq);
4704 out->token_seq = swab32 (in->token_seq);
4705 out->aru = swab32 (in->aru);
4706 out->ring_id.rep = swab32(in->ring_id.rep);
4707 out->aru_addr = swab32(in->aru_addr);
4708 out->ring_id.seq = swab64 (in->ring_id.seq);
4709 out->fcc = swab32 (in->fcc);
4710 out->backlog = swab32 (in->backlog);
4711 out->retrans_flg = swab32 (in->retrans_flg);
4713 for (i = 0; i < out->rtr_list_entries; i++) {
4714 out->rtr_list[i].ring_id.rep = swab32(in->rtr_list[i].ring_id.rep);
4715 out->rtr_list[i].ring_id.seq = swab64 (in->rtr_list[i].ring_id.seq);
4716 out->rtr_list[i].seq = swab32 (in->rtr_list[i].seq);
4717 }
4718}
4719
4720static void mcast_endian_convert (const struct mcast *in, struct mcast *out)
4721{
4724 out->header.type = in->header.type;
4725 out->header.nodeid = swab32 (in->header.nodeid);
4727
4728 out->seq = swab32 (in->seq);
4729 out->this_seqno = swab32 (in->this_seqno);
4730 out->ring_id.rep = swab32(in->ring_id.rep);
4731 out->ring_id.seq = swab64 (in->ring_id.seq);
4732 out->node_id = swab32 (in->node_id);
4733 out->guarantee = swab32 (in->guarantee);
4734 out->system_from = srp_addr_endian_convert(in->system_from);
4735}
4736
4737static void memb_merge_detect_endian_convert (
4738 const struct memb_merge_detect *in,
4739 struct memb_merge_detect *out)
4740{
4743 out->header.type = in->header.type;
4744 out->header.nodeid = swab32 (in->header.nodeid);
4745 out->ring_id.rep = swab32(in->ring_id.rep);
4746 out->ring_id.seq = swab64 (in->ring_id.seq);
4747 out->system_from = srp_addr_endian_convert (in->system_from);
4748}
4749
4750static int ignore_join_under_operational (
4751 struct totemsrp_instance *instance,
4752 const struct memb_join *memb_join)
4753{
4754 struct srp_addr *proc_list;
4755 struct srp_addr *failed_list;
4756 unsigned long long ring_seq;
4757 struct srp_addr aligned_system_from;
4758
4759 proc_list = (struct srp_addr *)memb_join->end_of_memb_join;
4760 failed_list = proc_list + memb_join->proc_list_entries;
4762 aligned_system_from = memb_join->system_from;
4763
4764 if (memb_set_subset (&instance->my_id, 1,
4765 failed_list, memb_join->failed_list_entries)) {
4766 return (1);
4767 }
4768
4769 /*
4770 * In operational state, my_proc_list is exactly the same as
4771 * my_memb_list.
4772 */
4773 if ((memb_set_subset (&aligned_system_from, 1,
4774 instance->my_memb_list, instance->my_memb_entries)) &&
4775 (ring_seq < instance->my_ring_id.seq)) {
4776 return (1);
4777 }
4778
4779 return (0);
4780}
4781
4782static int message_handler_memb_join (
4783 struct totemsrp_instance *instance,
4784 const void *msg,
4785 size_t msg_len,
4786 int endian_conversion_needed)
4787{
4788 const struct memb_join *memb_join;
4789 struct memb_join *memb_join_convert = alloca (msg_len);
4790 struct srp_addr aligned_system_from;
4791
4792 if (check_memb_join_sanity(instance, msg, msg_len, endian_conversion_needed) == -1) {
4793 return (0);
4794 }
4795
4796 if (endian_conversion_needed) {
4797 memb_join = memb_join_convert;
4798 memb_join_endian_convert (msg, memb_join_convert);
4799
4800 } else {
4801 memb_join = msg;
4802 }
4803
4804 aligned_system_from = memb_join->system_from;
4805
4806 /*
4807 * If the process paused because it wasn't scheduled in a timely
4808 * fashion, flush the join messages because they may be queued
4809 * entries
4810 */
4811 if (pause_flush (instance)) {
4812 return (0);
4813 }
4814
4815 if (instance->token_ring_id_seq < memb_join->ring_seq) {
4817 }
4818 switch (instance->memb_state) {
4820 if (!ignore_join_under_operational (instance, memb_join)) {
4821 memb_join_process (instance, memb_join);
4822 }
4823 break;
4824
4825 case MEMB_STATE_GATHER:
4826 memb_join_process (instance, memb_join);
4827 break;
4828
4829 case MEMB_STATE_COMMIT:
4830 if (memb_set_subset (&aligned_system_from,
4831 1,
4832 instance->my_new_memb_list,
4833 instance->my_new_memb_entries) &&
4834
4835 memb_join->ring_seq >= instance->my_ring_id.seq) {
4836
4837 memb_join_process (instance, memb_join);
4838 memb_state_gather_enter (instance, TOTEMSRP_GSFROM_JOIN_DURING_COMMIT_STATE);
4839 }
4840 break;
4841
4843 if (memb_set_subset (&aligned_system_from,
4844 1,
4845 instance->my_new_memb_list,
4846 instance->my_new_memb_entries) &&
4847
4848 memb_join->ring_seq >= instance->my_ring_id.seq) {
4849
4850 memb_join_process (instance, memb_join);
4851 memb_recovery_state_token_loss (instance);
4852 memb_state_gather_enter (instance, TOTEMSRP_GSFROM_JOIN_DURING_RECOVERY);
4853 }
4854 break;
4855 }
4856 return (0);
4857}
4858
4859static int message_handler_memb_commit_token (
4860 struct totemsrp_instance *instance,
4861 const void *msg,
4862 size_t msg_len,
4863 int endian_conversion_needed)
4864{
4865 struct memb_commit_token *memb_commit_token_convert = alloca (msg_len);
4867 struct srp_addr sub[PROCESSOR_COUNT_MAX];
4868 int sub_entries;
4869
4870 struct srp_addr *addr;
4871
4873 "got commit token");
4874
4875 if (check_memb_commit_token_sanity(instance, msg, msg_len, endian_conversion_needed) == -1) {
4876 return (0);
4877 }
4878
4879 if (endian_conversion_needed) {
4880 memb_commit_token_endian_convert (msg, memb_commit_token_convert);
4881 } else {
4882 memcpy (memb_commit_token_convert, msg, msg_len);
4883 }
4884 memb_commit_token = memb_commit_token_convert;
4886
4887#ifdef TEST_DROP_COMMIT_TOKEN_PERCENTAGE
4888 if (random()%100 < TEST_DROP_COMMIT_TOKEN_PERCENTAGE) {
4889 return (0);
4890 }
4891#endif
4892 switch (instance->memb_state) {
4894 /* discard token */
4895 break;
4896
4897 case MEMB_STATE_GATHER:
4898 memb_set_subtract (sub, &sub_entries,
4899 instance->my_proc_list, instance->my_proc_list_entries,
4900 instance->my_failed_list, instance->my_failed_list_entries);
4901
4902 if (memb_set_equal (addr,
4904 sub,
4905 sub_entries) &&
4906
4908 memcpy (instance->commit_token, memb_commit_token, msg_len);
4909 memb_state_commit_enter (instance);
4910 }
4911 break;
4912
4913 case MEMB_STATE_COMMIT:
4914 /*
4915 * If retransmitted commit tokens are sent on this ring
4916 * filter them out and only enter recovery once the
4917 * commit token has traversed the array. This is
4918 * determined by :
4919 * memb_commit_token->memb_index == memb_commit_token->addr_entries) {
4920 */
4921 if (memb_commit_token->ring_id.seq == instance->my_ring_id.seq &&
4923 memb_state_recovery_enter (instance, memb_commit_token);
4924 }
4925 break;
4926
4928 if (instance->my_id.nodeid == instance->my_ring_id.rep) {
4929
4930 /* Filter out duplicated tokens */
4931 if (instance->originated_orf_token) {
4932 break;
4933 }
4934
4935 instance->originated_orf_token = 1;
4936
4938 "Sending initial ORF token");
4939
4940 // TODO convert instead of initiate
4941 orf_token_send_initial (instance);
4942 reset_token_timeout (instance); // REVIEWED
4943 reset_token_retransmit_timeout (instance); // REVIEWED
4944 }
4945 break;
4946 }
4947 return (0);
4948}
4949
4950static int message_handler_token_hold_cancel (
4951 struct totemsrp_instance *instance,
4952 const void *msg,
4953 size_t msg_len,
4954 int endian_conversion_needed)
4955{
4956 const struct token_hold_cancel *token_hold_cancel = msg;
4957
4958 if (check_token_hold_cancel_sanity(instance, msg, msg_len, endian_conversion_needed) == -1) {
4959 return (0);
4960 }
4961
4962 if (memcmp (&token_hold_cancel->ring_id, &instance->my_ring_id,
4963 sizeof (struct memb_ring_id)) == 0) {
4964
4965 instance->my_seq_unchanged = 0;
4966 if (instance->my_ring_id.rep == instance->my_id.nodeid) {
4967 timer_function_token_retransmit_timeout (instance);
4968 }
4969 }
4970 return (0);
4971}
4972
4973static int check_message_header_validity(
4974 void *context,
4975 const void *msg,
4976 unsigned int msg_len,
4977 const struct sockaddr_storage *system_from)
4978{
4979 struct totemsrp_instance *instance = context;
4980 const struct totem_message_header *message_header = msg;
4981 const char *guessed_str;
4982 const char *msg_byte = msg;
4983
4984 if (msg_len < sizeof (struct totem_message_header)) {
4986 "Message received from %s is too short... Ignoring %u.",
4987 totemip_sa_print((struct sockaddr *)system_from), (unsigned int)msg_len);
4988 return (-1);
4989 }
4990
4991 if (message_header->magic != TOTEM_MH_MAGIC &&
4992 message_header->magic != swab16(TOTEM_MH_MAGIC)) {
4993 /*
4994 * We've received ether Knet, old version of Corosync,
4995 * or something else. Do some guessing to display (hopefully)
4996 * helpful message
4997 */
4998 guessed_str = NULL;
4999
5000 if (message_header->magic == 0xFFFF) {
5001 /*
5002 * Corosync 2.2 used header with two UINT8_MAX
5003 */
5004 guessed_str = "Corosync 2.2";
5005 } else if (message_header->magic == 0xFEFE) {
5006 /*
5007 * Corosync 2.3+ used header with two UINT8_MAX - 1
5008 */
5009 guessed_str = "Corosync 2.3+";
5010 } else if (msg_byte[0] == 0x01) {
5011 /*
5012 * Knet has stable1 with first byte of message == 1
5013 */
5014 guessed_str = "unencrypted Kronosnet";
5015 } else if (msg_byte[0] >= 0 && msg_byte[0] <= 5) {
5016 /*
5017 * Unencrypted Corosync 1.x/OpenAIS has first byte
5018 * 0-5. Collision with Knet (but still worth the try)
5019 */
5020 guessed_str = "unencrypted Corosync 2.0/2.1/1.x/OpenAIS";
5021 } else {
5022 /*
5023 * Encrypted Kronosned packet has a hash at the end of
5024 * the packet and nothing specific at the beginning of the
5025 * packet (just encrypted data).
5026 * Encrypted Corosync 1.x/OpenAIS is quite similar but hash_digest
5027 * is in the beginning of the packet.
5028 *
5029 * So it's not possible to reliably detect ether of them.
5030 */
5031 guessed_str = "encrypted Kronosnet/Corosync 2.0/2.1/1.x/OpenAIS or unknown";
5032 }
5033
5035 "Message received from %s has bad magic number (probably sent by %s).. Ignoring",
5036 totemip_sa_print((struct sockaddr *)system_from),
5037 guessed_str);
5038
5039 return (-1);
5040 }
5041
5042 if (message_header->version != TOTEM_MH_VERSION) {
5044 "Message received from %s has unsupported version %u... Ignoring",
5045 totemip_sa_print((struct sockaddr *)system_from),
5046 message_header->version);
5047
5048 return (-1);
5049 }
5050
5051 return (0);
5052}
5053
5054
5056 void *context,
5057 const void *msg,
5058 unsigned int msg_len,
5059 const struct sockaddr_storage *system_from)
5060{
5061 struct totemsrp_instance *instance = context;
5062 const struct totem_message_header *message_header = msg;
5063
5064 if (check_message_header_validity(context, msg, msg_len, system_from) == -1) {
5065 return -1;
5066 }
5067
5068 switch (message_header->type) {
5070 instance->stats.orf_token_rx++;
5071 break;
5072 case MESSAGE_TYPE_MCAST:
5073 instance->stats.mcast_rx++;
5074 break;
5076 instance->stats.memb_merge_detect_rx++;
5077 break;
5079 instance->stats.memb_join_rx++;
5080 break;
5082 instance->stats.memb_commit_token_rx++;
5083 break;
5085 instance->stats.token_hold_cancel_rx++;
5086 break;
5087 default:
5089 "Message received from %s has wrong type... ignoring %d.\n",
5090 totemip_sa_print((struct sockaddr *)system_from),
5091 (int)message_header->type);
5092
5093 instance->stats.rx_msg_dropped++;
5094 return 0;
5095 }
5096 /*
5097 * Handle incoming message
5098 */
5099 return totemsrp_message_handlers.handler_functions[(int)message_header->type] (
5100 instance,
5101 msg,
5102 msg_len,
5103 message_header->magic != TOTEM_MH_MAGIC);
5104}
5105
5107 void *context,
5108 const struct totem_ip_address *interface_addr,
5109 unsigned short ip_port,
5110 unsigned int iface_no)
5111{
5112 struct totemsrp_instance *instance = context;
5113 int res;
5114
5115 totemip_copy(&instance->my_addrs[iface_no], interface_addr);
5116
5117 res = totemnet_iface_set (
5118 instance->totemnet_context,
5119 interface_addr,
5120 ip_port,
5121 iface_no);
5122
5123 return (res);
5124}
5125
5126/* Contrary to its name, this only gets called when the interface is enabled */
5128 void *context,
5129 const struct totem_ip_address *iface_addr,
5130 unsigned int iface_no)
5131{
5132 struct totemsrp_instance *instance = context;
5133 int num_interfaces;
5134 int i;
5135 int res = 0;
5136
5137 if (!instance->my_id.nodeid) {
5138 instance->my_id.nodeid = iface_addr->nodeid;
5139 }
5140 totemip_copy (&instance->my_addrs[iface_no], iface_addr);
5141
5142 if (instance->iface_changes++ == 0) {
5143 instance->memb_ring_id_create_or_load (&instance->my_ring_id, instance->my_id.nodeid);
5144 /*
5145 * Increase the ring_id sequence number. This doesn't follow specification.
5146 * Solves problem with restarted leader node (node with lowest nodeid) before
5147 * rest of the cluster forms new membership and guarantees unique ring_id for
5148 * new singleton configuration.
5149 */
5150 instance->my_ring_id.seq++;
5151
5152 instance->token_ring_id_seq = instance->my_ring_id.seq;
5153 log_printf (
5154 instance->totemsrp_log_level_debug,
5155 "Created or loaded sequence id " CS_PRI_RING_ID " for this ring.",
5156 instance->my_ring_id.rep,
5157 (uint64_t)instance->my_ring_id.seq);
5158
5159 if (instance->totemsrp_service_ready_fn) {
5160 instance->totemsrp_service_ready_fn ();
5161 }
5162
5163 }
5164
5165 num_interfaces = 0;
5166 for (i = 0; i < INTERFACE_MAX; i++) {
5167 if (instance->totem_config->interfaces[i].configured) {
5168 num_interfaces++;
5169 }
5170 }
5171
5172 if (instance->iface_changes >= num_interfaces) {
5173 /* We need to clear orig_interfaces so that 'commit' diffs against nothing */
5174 instance->totem_config->orig_interfaces = malloc (sizeof (struct totem_interface) * INTERFACE_MAX);
5175 assert(instance->totem_config->orig_interfaces != NULL);
5176 memset(instance->totem_config->orig_interfaces, 0, sizeof (struct totem_interface) * INTERFACE_MAX);
5177
5179
5180 memb_state_gather_enter (instance, TOTEMSRP_GSFROM_INTERFACE_CHANGE);
5181 free(instance->totem_config->orig_interfaces);
5182 }
5183 return res;
5184}
5185
5187 totem_config->net_mtu -= 2 * sizeof (struct mcast);
5188}
5189
5191 void *context,
5192 void (*totem_service_ready) (void))
5193{
5194 struct totemsrp_instance *instance = (struct totemsrp_instance *)context;
5195
5196 instance->totemsrp_service_ready_fn = totem_service_ready;
5197}
5198
5200 void *context,
5201 const struct totem_ip_address *member,
5202 int iface_no)
5203{
5204 struct totemsrp_instance *instance = (struct totemsrp_instance *)context;
5205 int res;
5206
5207 res = totemnet_member_add (instance->totemnet_context, &instance->my_addrs[iface_no], member, iface_no);
5208
5209 return (res);
5210}
5211
5213 void *context,
5214 const struct totem_ip_address *member,
5215 int iface_no)
5216{
5217 struct totemsrp_instance *instance = (struct totemsrp_instance *)context;
5218 int res;
5219
5220 res = totemnet_member_remove (instance->totemnet_context, member, iface_no);
5221
5222 return (res);
5223}
5224
5226{
5227 struct totemsrp_instance *instance = (struct totemsrp_instance *)context;
5228
5229 instance->threaded_mode_enabled = 1;
5230}
5231
5232void totemsrp_trans_ack (void *context)
5233{
5234 struct totemsrp_instance *instance = (struct totemsrp_instance *)context;
5235
5236 instance->waiting_trans_ack = 0;
5238}
5239
5240
5242{
5243 struct totemsrp_instance *instance = (struct totemsrp_instance *)context;
5244 int res;
5245
5247 return (res);
5248}
5249
5251{
5252 struct totemsrp_instance *instance = (struct totemsrp_instance *)context;
5253 int res;
5254
5256 return (res);
5257}
5258
5259void totemsrp_stats_clear (void *context, int flags)
5260{
5261 struct totemsrp_instance *instance = (struct totemsrp_instance *)context;
5262
5263 memset(&instance->stats, 0, sizeof(totemsrp_stats_t));
5266 }
5267}
5268
5269void totemsrp_force_gather (void *context)
5270{
5271 timer_function_orf_token_timeout(context);
5272}
unsigned long long seq
Definition coroapi.h:1
totem_configuration_type
The totem_configuration_type enum.
Definition coroapi.h:132
@ TOTEM_CONFIGURATION_REGULAR
Definition coroapi.h:133
@ TOTEM_CONFIGURATION_TRANSITIONAL
Definition coroapi.h:134
#define INTERFACE_MAX
Definition coroapi.h:88
#define MESSAGE_QUEUE_MAX
Definition coroapi.h:98
unsigned int nodeid
Definition coroapi.h:0
unsigned char addr[TOTEMIP_ADDRLEN]
Definition coroapi.h:2
totem_callback_token_type
The totem_callback_token_type enum.
Definition coroapi.h:142
@ TOTEM_CALLBACK_TOKEN_SENT
Definition coroapi.h:144
@ TOTEM_CALLBACK_TOKEN_RECEIVED
Definition coroapi.h:143
#define PROCESSOR_COUNT_MAX
Definition coroapi.h:96
#define CS_PRI_RING_ID_SEQ
Definition corotypes.h:61
#define CS_PRI_NODE_ID
Definition corotypes.h:59
#define CS_PRI_RING_ID
Definition corotypes.h:62
uint32_t flags
uint32_t value
icmap_map_t icmap_get_global_map(void)
Return global icmap.
Definition icmap.c:268
#define LOGSYS_LEVEL_DEBUG
Definition logsys.h:76
struct srp_addr addr
Definition totemsrp.c:164
int guarantee
Definition totemsrp.c:190
unsigned int node_id
Definition totemsrp.c:189
struct memb_ring_id ring_id
Definition totemsrp.c:188
struct totem_message_header header
Definition totemsrp.c:184
unsigned int seq
Definition totemsrp.c:186
int this_seqno
Definition totemsrp.c:187
struct srp_addr system_from
Definition totemsrp.c:185
Definition totemsrp.c:243
unsigned int aru
Definition totemsrp.c:245
unsigned int received_flg
Definition totemsrp.c:247
struct memb_ring_id ring_id
Definition totemsrp.c:244
unsigned int high_delivered
Definition totemsrp.c:246
unsigned int retrans_flg
Definition totemsrp.c:255
struct totem_message_header header
Definition totemsrp.c:252
unsigned char end_of_commit_token[0]
Definition totemsrp.c:258
unsigned int token_seq
Definition totemsrp.c:253
struct memb_ring_id ring_id
Definition totemsrp.c:254
struct srp_addr system_from
Definition totemsrp.c:217
struct totem_message_header header
Definition totemsrp.c:216
unsigned char end_of_memb_join[0]
Definition totemsrp.c:221
unsigned long long ring_seq
Definition totemsrp.c:220
unsigned int failed_list_entries
Definition totemsrp.c:219
unsigned int proc_list_entries
Definition totemsrp.c:218
struct totem_message_header header
Definition totemsrp.c:231
struct memb_ring_id ring_id
Definition totemsrp.c:233
struct srp_addr system_from
Definition totemsrp.c:232
The memb_ring_id struct.
Definition coroapi.h:122
unsigned long long seq
Definition coroapi.h:124
unsigned int rep
Definition totem.h:150
int(* handler_functions[6])(struct totemsrp_instance *instance, const void *msg, size_t msg_len, int endian_conversion_needed)
Definition totemsrp.c:535
unsigned int msg_len
Definition totemsrp.c:269
struct mcast * mcast
Definition totemsrp.c:268
unsigned int backlog
Definition totemsrp.c:207
unsigned int token_seq
Definition totemsrp.c:203
unsigned int aru_addr
Definition totemsrp.c:205
unsigned int fcc
Definition totemsrp.c:208
unsigned int aru
Definition totemsrp.c:204
int rtr_list_entries
Definition totemsrp.c:210
struct rtr_item rtr_list[0]
Definition totemsrp.c:211
int retrans_flg
Definition totemsrp.c:209
unsigned int seq
Definition totemsrp.c:202
struct totem_message_header header
Definition totemsrp.c:201
struct memb_ring_id ring_id
Definition totemsrp.c:206
struct memb_ring_id ring_id
Definition totemsrp.c:195
unsigned int seq
Definition totemsrp.c:196
unsigned int msg_len
Definition totemsrp.c:274
struct mcast * mcast
Definition totemsrp.c:273
The sq struct.
Definition sq.h:43
unsigned int nodeid
Definition totemsrp.c:108
struct qb_list_head list
Definition totemsrp.c:170
int(* callback_fn)(enum totem_callback_token_type type, const void *)
Definition totemsrp.c:171
enum totem_callback_token_type callback_type
Definition totemsrp.c:172
struct totem_message_header header
Definition totemsrp.c:238
struct memb_ring_id ring_id
Definition totemsrp.c:239
unsigned int max_messages
Definition totem.h:220
unsigned int heartbeat_failures_allowed
Definition totem.h:214
unsigned int token_timeout
Definition totem.h:182
unsigned int window_size
Definition totem.h:218
struct totem_logging_configuration totem_logging_configuration
Definition totem.h:208
unsigned int downcheck_timeout
Definition totem.h:200
unsigned int miss_count_const
Definition totem.h:242
struct totem_interface * interfaces
Definition totem.h:165
unsigned int cancel_token_hold_on_retransmit
Definition totem.h:248
unsigned int fail_to_recv_const
Definition totem.h:202
unsigned int merge_timeout
Definition totem.h:198
struct totem_interface * orig_interfaces
Definition totem.h:166
unsigned int net_mtu
Definition totem.h:210
void(* totem_memb_ring_id_create_or_load)(struct memb_ring_id *memb_ring_id, unsigned int nodeid)
Definition totem.h:252
unsigned int token_retransmits_before_loss_const
Definition totem.h:190
unsigned int max_network_delay
Definition totem.h:216
unsigned int seqno_unchanged_const
Definition totem.h:204
unsigned int consensus_timeout
Definition totem.h:196
unsigned int threads
Definition totem.h:212
unsigned int send_join_timeout
Definition totem.h:194
void(* totem_memb_ring_id_store)(const struct memb_ring_id *memb_ring_id, unsigned int nodeid)
Definition totem.h:256
unsigned int token_retransmit_timeout
Definition totem.h:186
unsigned int token_warning
Definition totem.h:184
unsigned int join_timeout
Definition totem.h:192
unsigned int token_hold_timeout
Definition totem.h:188
struct totem_ip_address boundto
Definition totem.h:84
uint8_t configured
Definition totem.h:89
int member_count
Definition totem.h:90
struct totem_ip_address member_list[PROCESSOR_COUNT_MAX]
Definition totem.h:97
struct totem_ip_address mcast_addr
Definition totem.h:85
The totem_ip_address struct.
Definition coroapi.h:111
unsigned int nodeid
Definition coroapi.h:112
unsigned short family
Definition coroapi.h:113
void(* log_printf)(int level, int subsys, const char *function_name, const char *file_name, int file_line, const char *format,...) __attribute__((format(printf
Definition totem.h:101
unsigned int nodeid
Definition totem.h:131
unsigned short magic
Definition totem.h:127
uint8_t reachable
Definition totem.h:270
uint32_t version
Definition totem.h:268
struct totem_ip_address mcast_address
Definition totemsrp.c:452
totemsrp_stats_t stats
Definition totemsrp.c:516
struct srp_addr my_left_memb_list[PROCESSOR_COUNT_MAX]
Definition totemsrp.c:320
int consensus_list_entries
Definition totemsrp.c:300
int my_merge_detect_timeout_outstanding
Definition totemsrp.c:346
unsigned int my_last_seq
Definition totemsrp.c:496
qb_loop_timer_handle timer_heartbeat_timeout
Definition totemsrp.c:419
unsigned int my_token_seq
Definition totemsrp.c:396
qb_loop_timer_handle memb_timer_state_gather_join_timeout
Definition totemsrp.c:413
struct consensus_list_item consensus_list[PROCESSOR_COUNT_MAX]
Definition totemsrp.c:298
qb_loop_timer_handle memb_timer_state_gather_consensus_timeout
Definition totemsrp.c:415
uint64_t pause_timestamp
Definition totemsrp.c:512
uint32_t threaded_mode_enabled
Definition totemsrp.c:522
struct srp_addr my_memb_list[PROCESSOR_COUNT_MAX]
Definition totemsrp.c:316
void * totemnet_context
Definition totemsrp.c:500
int my_leave_memb_entries
Definition totemsrp.c:338
struct srp_addr my_proc_list[PROCESSOR_COUNT_MAX]
Definition totemsrp.c:308
struct srp_addr my_new_memb_list[PROCESSOR_COUNT_MAX]
Definition totemsrp.c:312
int my_failed_list_entries
Definition totemsrp.c:326
struct srp_addr my_failed_list[PROCESSOR_COUNT_MAX]
Definition totemsrp.c:310
unsigned int use_heartbeat
Definition totemsrp.c:504
void(* memb_ring_id_create_or_load)(struct memb_ring_id *memb_ring_id, unsigned int nodeid)
Definition totemsrp.c:472
qb_loop_timer_handle memb_timer_state_commit_timeout
Definition totemsrp.c:417
struct cs_queue new_message_queue
Definition totemsrp.c:371
int orf_token_retransmit_size
Definition totemsrp.c:394
unsigned int my_high_seq_received
Definition totemsrp.c:354
void(* totemsrp_deliver_fn)(unsigned int nodeid, const void *msg, unsigned int msg_len, int endian_conversion_required)
Definition totemsrp.c:454
uint32_t orf_token_discard
Definition totemsrp.c:518
struct qb_list_head token_callback_sent_listhead
Definition totemsrp.c:390
unsigned int last_released
Definition totemsrp.c:486
unsigned int set_aru
Definition totemsrp.c:488
int totemsrp_log_level_notice
Definition totemsrp.c:430
struct cs_queue new_message_queue_trans
Definition totemsrp.c:373
int totemsrp_log_level_trace
Definition totemsrp.c:434
char orf_token_retransmit[TOKEN_SIZE_MAX]
Definition totemsrp.c:392
unsigned int my_trc
Definition totemsrp.c:506
struct cs_queue retrans_message_queue
Definition totemsrp.c:375
struct memb_ring_id my_ring_id
Definition totemsrp.c:340
int totemsrp_log_level_error
Definition totemsrp.c:426
void(* totemsrp_waiting_trans_ack_cb_fn)(int waiting_trans_ack)
Definition totemsrp.c:469
unsigned int old_ring_state_high_seq_received
Definition totemsrp.c:494
qb_loop_timer_handle timer_orf_token_hold_retransmit_timeout
Definition totemsrp.c:409
qb_loop_timer_handle timer_pause_timeout
Definition totemsrp.c:401
unsigned int my_high_ring_delivered
Definition totemsrp.c:364
qb_loop_timer_handle timer_orf_token_retransmit_timeout
Definition totemsrp.c:407
struct totem_config * totem_config
Definition totemsrp.c:502
int my_deliver_memb_entries
Definition totemsrp.c:334
void(* totemsrp_service_ready_fn)(void)
Definition totemsrp.c:467
int my_trans_memb_entries
Definition totemsrp.c:330
uint32_t originated_orf_token
Definition totemsrp.c:520
void * token_recv_event_handle
Definition totemsrp.c:528
struct sq recovery_sort_queue
Definition totemsrp.c:379
qb_loop_timer_handle timer_orf_token_timeout
Definition totemsrp.c:403
qb_loop_timer_handle timer_merge_detect_timeout
Definition totemsrp.c:411
unsigned int my_leave_memb_list[PROCESSOR_COUNT_MAX]
Definition totemsrp.c:322
void * token_sent_event_handle
Definition totemsrp.c:529
unsigned int my_high_delivered
Definition totemsrp.c:386
void enum memb_state memb_state
Definition totemsrp.c:446
int totemsrp_log_level_security
Definition totemsrp.c:424
int totemsrp_log_level_warning
Definition totemsrp.c:428
struct memb_commit_token * commit_token
Definition totemsrp.c:514
char commit_token_storage[40000]
Definition totemsrp.c:530
struct memb_ring_id my_old_ring_id
Definition totemsrp.c:342
struct timeval tv_old
Definition totemsrp.c:498
void(* memb_ring_id_store)(const struct memb_ring_id *memb_ring_id, unsigned int nodeid)
Definition totemsrp.c:476
qb_loop_t * totemsrp_poll_handle
Definition totemsrp.c:450
unsigned int my_install_seq
Definition totemsrp.c:356
qb_loop_timer_handle timer_orf_token_warning
Definition totemsrp.c:405
struct srp_addr my_trans_memb_list[PROCESSOR_COUNT_MAX]
Definition totemsrp.c:314
struct srp_addr my_id
Definition totemsrp.c:304
unsigned int my_cbl
Definition totemsrp.c:510
struct qb_list_head token_callback_received_listhead
Definition totemsrp.c:388
unsigned int my_last_aru
Definition totemsrp.c:348
unsigned int my_aru
Definition totemsrp.c:384
uint32_t waiting_trans_ack
Definition totemsrp.c:524
void(* totemsrp_log_printf)(int level, int subsys, const char *function, const char *file, int line, const char *format,...) __attribute__((format(printf
Definition totemsrp.c:438
void(* totemsrp_confchg_fn)(enum totem_configuration_type configuration_type, const unsigned int *member_list, size_t member_list_entries, const unsigned int *left_list, size_t left_list_entries, const unsigned int *joined_list, size_t joined_list_entries, const struct memb_ring_id *ring_id)
Definition totemsrp.c:460
struct sq regular_sort_queue
Definition totemsrp.c:377
unsigned long long token_ring_id_seq
Definition totemsrp.c:484
struct srp_addr my_deliver_memb_list[PROCESSOR_COUNT_MAX]
Definition totemsrp.c:318
int totemsrp_log_level_debug
Definition totemsrp.c:432
unsigned int my_pbl
Definition totemsrp.c:508
struct totem_ip_address my_addrs[INTERFACE_MAX]
Definition totemsrp.c:306
uint64_t memb_join_tx
Definition totemstats.h:59
uint32_t continuous_gather
Definition totemstats.h:78
uint64_t recovery_entered
Definition totemstats.h:74
uint64_t rx_msg_dropped
Definition totemstats.h:77
uint64_t gather_entered
Definition totemstats.h:70
uint64_t memb_commit_token_rx
Definition totemstats.h:65
uint64_t mcast_retx
Definition totemstats.h:62
uint64_t mcast_tx
Definition totemstats.h:61
uint64_t memb_commit_token_tx
Definition totemstats.h:64
uint64_t operational_token_lost
Definition totemstats.h:69
uint64_t operational_entered
Definition totemstats.h:68
uint64_t gather_token_lost
Definition totemstats.h:71
uint64_t commit_token_lost
Definition totemstats.h:73
uint64_t token_hold_cancel_tx
Definition totemstats.h:66
uint64_t orf_token_rx
Definition totemstats.h:56
totemsrp_token_stats_t token[TOTEM_TOKEN_STATS_MAX]
Definition totemstats.h:90
uint64_t recovery_token_lost
Definition totemstats.h:75
uint64_t commit_entered
Definition totemstats.h:72
uint64_t memb_merge_detect_rx
Definition totemstats.h:58
uint64_t memb_join_rx
Definition totemstats.h:60
uint64_t orf_token_tx
Definition totemstats.h:55
uint64_t memb_merge_detect_tx
Definition totemstats.h:57
uint64_t mcast_rx
Definition totemstats.h:63
uint64_t token_hold_cancel_rx
Definition totemstats.h:67
uint64_t consensus_timeouts
Definition totemstats.h:76
#define swab64(x)
The swab64 macro.
Definition swab.h:65
#define swab16(x)
The swab16 macro.
Definition swab.h:39
#define swab32(x)
The swab32 macro.
Definition swab.h:51
totem_event_type
Definition totem.h:292
#define TOTEM_MH_VERSION
Definition totem.h:124
#define FRAME_SIZE_MAX
Definition totem.h:52
cfg_message_crypto_reconfig_phase_t
Definition totem.h:154
#define TOTEM_NODE_STATUS_STRUCTURE_VERSION
Definition totem.h:266
#define TOTEM_MH_MAGIC
Definition totem.h:123
char type
Definition totem.h:2
int totemconfig_commit_new_params(struct totem_config *totem_config, icmap_map_t map)
const char * totemip_sa_print(const struct sockaddr *sa)
Definition totemip.c:234
void totemip_copy(struct totem_ip_address *addr1, const struct totem_ip_address *addr2)
Definition totemip.c:123
int totemnet_initialize(qb_loop_t *loop_pt, void **net_context, struct totem_config *totem_config, totemsrp_stats_t *stats, void *context, int(*deliver_fn)(void *context, const void *msg, unsigned int msg_len, const struct sockaddr_storage *system_from), int(*iface_change_fn)(void *context, const struct totem_ip_address *iface_address, unsigned int ring_no), void(*mtu_changed)(void *context, int net_mtu), void(*target_set_completed)(void *context))
Create an instance.
Definition totemnet.c:317
int totemnet_iface_set(void *net_context, const struct totem_ip_address *interface_addr, unsigned short ip_port, unsigned int iface_no)
Definition totemnet.c:471
int totemnet_member_remove(void *net_context, const struct totem_ip_address *member, int ring_no)
Definition totemnet.c:553
void * totemnet_buffer_alloc(void *net_context)
Definition totemnet.c:367
int totemnet_token_send(void *net_context, const void *msg, unsigned int msg_len)
Definition totemnet.c:414
int totemnet_send_flush(void *net_context)
Definition totemnet.c:404
void totemnet_buffer_release(void *net_context, void *ptr)
Definition totemnet.c:375
int totemnet_mcast_flush_send(void *net_context, const void *msg, unsigned int msg_len)
Definition totemnet.c:426
int totemnet_member_add(void *net_context, const struct totem_ip_address *local, const struct totem_ip_address *member, int ring_no)
Definition totemnet.c:533
int totemnet_finalize(void *net_context)
Definition totemnet.c:306
int totemnet_crypto_set(void *net_context, const char *cipher_type, const char *hash_type)
Definition totemnet.c:292
int totemnet_ifaces_get(void *net_context, char ***status, unsigned int *iface_count)
Definition totemnet.c:497
int totemnet_processor_count_set(void *net_context, int processor_count)
Definition totemnet.c:383
int totemnet_token_target_set(void *net_context, unsigned int nodeid)
Definition totemnet.c:510
int totemnet_recv_flush(void *net_context)
Definition totemnet.c:394
int totemnet_iface_check(void *net_context)
Definition totemnet.c:452
int totemnet_mcast_noflush_send(void *net_context, const void *msg, unsigned int msg_len)
Definition totemnet.c:439
int totemnet_recv_mcast_empty(void *net_context)
Definition totemnet.c:522
int totemnet_nodestatus_get(void *net_context, unsigned int nodeid, struct totem_node_status *node_status)
Definition totemnet.c:484
void totemnet_stats_clear(void *net_context)
Definition totemnet.c:619
int totemnet_reconfigure(void *net_context, struct totem_config *totem_config)
Definition totemnet.c:589
int totemnet_crypto_reconfigure_phase(void *net_context, struct totem_config *totem_config, cfg_message_crypto_reconfig_phase_t phase)
Definition totemnet.c:603
Totem Network interface - also does encryption/decryption.
int totemsrp_my_family_get(void *srp_context)
Definition totemsrp.c:1133
#define SEQNO_START_TOKEN
Definition totemsrp.c:122
int main_iface_change_fn(void *context, const struct totem_ip_address *iface_address, unsigned int iface_no)
Definition totemsrp.c:5127
unsigned long long ring_seq
Definition totemsrp.c:4
#define RETRANSMIT_ENTRIES_MAX
Definition totemsrp.c:100
#define log_printf(level, format, args...)
Definition totemsrp.c:690
void totemsrp_force_gather(void *context)
Definition totemsrp.c:5269
int rtr_list_entries
Definition totemsrp.c:9
void totemsrp_service_ready_register(void *context, void(*totem_service_ready)(void))
Definition totemsrp.c:5190
int totemsrp_initialize(qb_loop_t *poll_handle, void **srp_context, struct totem_config *totem_config, totempg_stats_t *stats, void(*deliver_fn)(unsigned int nodeid, const void *msg, unsigned int msg_len, int endian_conversion_required), void(*confchg_fn)(enum totem_configuration_type configuration_type, const unsigned int *member_list, size_t member_list_entries, const unsigned int *left_list, size_t left_list_entries, const unsigned int *joined_list, size_t joined_list_entries, const struct memb_ring_id *ring_id), void(*waiting_trans_ack_cb_fn)(int waiting_trans_ack))
Create a protocol instance.
Definition totemsrp.c:818
int totemsrp_callback_token_create(void *srp_context, void **handle_out, enum totem_callback_token_type type, int delete, int(*callback_fn)(enum totem_callback_token_type type, const void *), const void *data)
Definition totemsrp.c:3498
#define RETRANS_MESSAGE_QUEUE_SIZE_MAX
Definition totemsrp.c:97
void totemsrp_threaded_mode_enable(void *context)
Definition totemsrp.c:5225
struct rtr_item rtr_list[0]
Definition totemsrp.c:10
message_type
Definition totemsrp.c:146
@ MESSAGE_TYPE_MEMB_COMMIT_TOKEN
Definition totemsrp.c:151
@ MESSAGE_TYPE_TOKEN_HOLD_CANCEL
Definition totemsrp.c:152
@ MESSAGE_TYPE_ORF_TOKEN
Definition totemsrp.c:147
@ MESSAGE_TYPE_MEMB_JOIN
Definition totemsrp.c:150
@ MESSAGE_TYPE_MEMB_MERGE_DETECT
Definition totemsrp.c:149
@ MESSAGE_TYPE_MCAST
Definition totemsrp.c:148
void totemsrp_net_mtu_adjust(struct totem_config *totem_config)
Definition totemsrp.c:5186
#define TOKEN_SIZE_MAX
Definition totemsrp.c:101
encapsulation_type
Definition totemsrp.c:155
@ MESSAGE_NOT_ENCAPSULATED
Definition totemsrp.c:157
@ MESSAGE_ENCAPSULATED
Definition totemsrp.c:156
unsigned int failed_list_entries
Definition totemsrp.c:3
struct message_handlers totemsrp_message_handlers
Definition totemsrp.c:678
int totemsrp_nodestatus_get(void *srp_context, unsigned int nodeid, struct totem_node_status *node_status)
Definition totemsrp.c:1041
#define LEAVE_DUMMY_NODEID
Definition totemsrp.c:102
#define QUEUE_RTR_ITEMS_SIZE_MAX
Definition totemsrp.c:96
int guarantee
Definition totemsrp.c:6
unsigned int aru
Definition totemsrp.c:3
gather_state_from
Definition totemsrp.c:542
@ TOTEMSRP_GSFROM_THE_CONSENSUS_TIMEOUT_EXPIRED
Definition totemsrp.c:546
@ TOTEMSRP_GSFROM_FOREIGN_MESSAGE_IN_GATHER_STATE
Definition totemsrp.c:551
@ TOTEMSRP_GSFROM_FAILED_TO_RECEIVE
Definition totemsrp.c:549
@ TOTEMSRP_GSFROM_CONSENSUS_TIMEOUT
Definition totemsrp.c:543
@ TOTEMSRP_GSFROM_MERGE_DURING_OPERATIONAL_STATE
Definition totemsrp.c:552
@ TOTEMSRP_GSFROM_THE_TOKEN_WAS_LOST_IN_THE_OPERATIONAL_STATE
Definition totemsrp.c:545
@ TOTEMSRP_GSFROM_MERGE_DURING_JOIN
Definition totemsrp.c:554
@ TOTEMSRP_GSFROM_INTERFACE_CHANGE
Definition totemsrp.c:558
@ TOTEMSRP_GSFROM_GATHER_MISSING1
Definition totemsrp.c:544
@ TOTEMSRP_GSFROM_THE_TOKEN_WAS_LOST_IN_THE_COMMIT_STATE
Definition totemsrp.c:547
@ TOTEMSRP_GSFROM_MAX
Definition totemsrp.c:559
@ TOTEMSRP_GSFROM_JOIN_DURING_COMMIT_STATE
Definition totemsrp.c:556
@ TOTEMSRP_GSFROM_THE_TOKEN_WAS_LOST_IN_THE_RECOVERY_STATE
Definition totemsrp.c:548
@ TOTEMSRP_GSFROM_FOREIGN_MESSAGE_IN_OPERATIONAL_STATE
Definition totemsrp.c:550
@ TOTEMSRP_GSFROM_JOIN_DURING_OPERATIONAL_STATE
Definition totemsrp.c:555
@ TOTEMSRP_GSFROM_MERGE_DURING_GATHER_STATE
Definition totemsrp.c:553
@ TOTEMSRP_GSFROM_JOIN_DURING_RECOVERY
Definition totemsrp.c:557
int totemsrp_crypto_set(void *srp_context, const char *cipher_type, const char *hash_type)
Definition totemsrp.c:1108
int totemsrp_avail(void *srp_context)
Return number of available messages that can be queued.
Definition totemsrp.c:2568
void totemsrp_stats_clear(void *context, int flags)
Definition totemsrp.c:5259
int totemsrp_iface_set(void *context, const struct totem_ip_address *interface_addr, unsigned short ip_port, unsigned int iface_no)
Definition totemsrp.c:5106
void totemsrp_finalize(void *srp_context)
Definition totemsrp.c:1026
struct memb_ring_id ring_id
Definition totemsrp.c:4
void totemsrp_trans_ack(void *context)
Definition totemsrp.c:5232
int totemsrp_crypto_reconfigure_phase(void *context, struct totem_config *totem_config, cfg_message_crypto_reconfig_phase_t phase)
Definition totemsrp.c:5250
unsigned int totemsrp_my_nodeid_get(void *srp_context)
Definition totemsrp.c:1122
int addr_entries
Definition totemsrp.c:5
unsigned int backlog
Definition totemsrp.c:6
#define SEQNO_START_MSG
Definition totemsrp.c:121
void totemsrp_event_signal(void *srp_context, enum totem_event_type type, int value)
Definition totemsrp.c:2488
void totemsrp_callback_token_destroy(void *srp_context, void **handle_out)
Definition totemsrp.c:3533
unsigned int received_flg
Definition totemsrp.c:3
struct message_item __attribute__
int main_deliver_fn(void *context, const void *msg, unsigned int msg_len, const struct sockaddr_storage *system_from)
Definition totemsrp.c:5055
int totemsrp_member_add(void *context, const struct totem_ip_address *member, int iface_no)
Definition totemsrp.c:5199
int totemsrp_ifaces_get(void *srp_context, unsigned int nodeid, unsigned int *interface_id, struct totem_ip_address *interfaces, unsigned int interfaces_size, char ***status, unsigned int *iface_count)
Definition totemsrp.c:1070
int totemsrp_reconfigure(void *context, struct totem_config *totem_config)
Definition totemsrp.c:5241
unsigned int high_delivered
Definition totemsrp.c:2
struct srp_addr system_from
Definition totemsrp.c:1
unsigned int proc_list_entries
Definition totemsrp.c:2
const char * gather_state_from_desc[]
Definition totemsrp.c:562
int totemsrp_member_remove(void *context, const struct totem_ip_address *member, int iface_no)
Definition totemsrp.c:5212
int totemsrp_mcast(void *srp_context, struct iovec *iovec, unsigned int iov_len, int guarantee)
Multicast a message.
Definition totemsrp.c:2497
memb_state
Definition totemsrp.c:277
@ MEMB_STATE_GATHER
Definition totemsrp.c:279
@ MEMB_STATE_RECOVERY
Definition totemsrp.c:281
@ MEMB_STATE_COMMIT
Definition totemsrp.c:280
@ MEMB_STATE_OPERATIONAL
Definition totemsrp.c:278
Totem Single Ring Protocol.
#define TOTEMPG_STATS_CLEAR_TRANSPORT
Definition totemstats.h:116
#define TOTEM_TOKEN_STATS_MAX
Definition totemstats.h:89