corosync 3.1.10
totempg.c
Go to the documentation of this file.
1/*
2 * Copyright (c) 2003-2005 MontaVista Software, Inc.
3 * Copyright (c) 2005 OSDL.
4 * Copyright (c) 2006-2012 Red Hat, Inc.
5 *
6 * All rights reserved.
7 *
8 * Author: Steven Dake (sdake@redhat.com)
9 * Author: Mark Haverkamp (markh@osdl.org)
10 *
11 * This software licensed under BSD license, the text of which follows:
12 *
13 * Redistribution and use in source and binary forms, with or without
14 * modification, are permitted provided that the following conditions are met:
15 *
16 * - Redistributions of source code must retain the above copyright notice,
17 * this list of conditions and the following disclaimer.
18 * - Redistributions in binary form must reproduce the above copyright notice,
19 * this list of conditions and the following disclaimer in the documentation
20 * and/or other materials provided with the distribution.
21 * - Neither the name of the MontaVista Software, Inc. nor the names of its
22 * contributors may be used to endorse or promote products derived from this
23 * software without specific prior written permission.
24 *
25 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
26 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
27 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
28 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
29 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
30 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
31 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
32 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
33 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
34 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF
35 * THE POSSIBILITY OF SUCH DAMAGE.
36 */
37
38/*
39 * FRAGMENTATION AND PACKING ALGORITHM:
40 *
41 * Assemble the entire message into one buffer
42 * if full fragment
43 * store fragment into lengths list
44 * for each full fragment
45 * multicast fragment
46 * set length and fragment fields of pg mesage
47 * store remaining multicast into head of fragmentation data and set lens field
48 *
49 * If a message exceeds the maximum packet size allowed by the totem
50 * single ring protocol, the protocol could lose forward progress.
51 * Statically calculating the allowed data amount doesn't work because
52 * the amount of data allowed depends on the number of fragments in
53 * each message. In this implementation, the maximum fragment size
54 * is dynamically calculated for each fragment added to the message.
55
56 * It is possible for a message to be two bytes short of the maximum
57 * packet size. This occurs when a message or collection of
58 * messages + the mcast header + the lens are two bytes short of the
59 * end of the packet. Since another len field consumes two bytes, the
60 * len field would consume the rest of the packet without room for data.
61 *
62 * One optimization would be to forgo the final len field and determine
63 * it from the size of the udp datagram. Then this condition would no
64 * longer occur.
65 */
66
67/*
68 * ASSEMBLY AND UNPACKING ALGORITHM:
69 *
70 * copy incoming packet into assembly data buffer indexed by current
71 * location of end of fragment
72 *
73 * if not fragmented
74 * deliver all messages in assembly data buffer
75 * else
76 * if msg_count > 1 and fragmented
77 * deliver all messages except last message in assembly data buffer
78 * copy last fragmented section to start of assembly data buffer
79 * else
80 * if msg_count = 1 and fragmented
81 * do nothing
82 *
83 */
84
85#include <config.h>
86
87#ifdef HAVE_ALLOCA_H
88#include <alloca.h>
89#endif
90#include <sys/types.h>
91#include <sys/socket.h>
92#include <netinet/in.h>
93#include <arpa/inet.h>
94#include <sys/uio.h>
95#include <stdio.h>
96#include <stdlib.h>
97#include <string.h>
98#include <assert.h>
99#include <pthread.h>
100#include <errno.h>
101#include <limits.h>
102
103#include <corosync/swab.h>
104#include <qb/qblist.h>
105#include <qb/qbloop.h>
106#include <qb/qbipcs.h>
108#define LOGSYS_UTILS_ONLY 1
109#include <corosync/logsys.h>
110
111#include "util.h"
112#include "totemsrp.h"
113
115 short version;
116 short type;
117};
118
119#if !(defined(__i386__) || defined(__x86_64__))
120/*
121 * Need align on architectures different then i386 or x86_64
122 */
123#define TOTEMPG_NEED_ALIGN 1
124#endif
125
126/*
127 * totempg_mcast structure
128 *
129 * header: Identify the mcast.
130 * fragmented: Set if this message continues into next message
131 * continuation: Set if this message is a continuation from last message
132 * msg_count Indicates how many packed messages are contained
133 * in the mcast.
134 * Also, the size of each packed message and the messages themselves are
135 * appended to the end of this structure when sent.
136 */
139 unsigned char fragmented;
140 unsigned char continuation;
141 unsigned short msg_count;
142 /*
143 * short msg_len[msg_count];
144 */
145 /*
146 * data for messages
147 */
148};
149
150/*
151 * Maximum packet size for totem pg messages
152 */
153#define TOTEMPG_PACKET_SIZE (totempg_totem_config->net_mtu - \
154 sizeof (struct totempg_mcast))
155
156/*
157 * Local variables used for packing small messages
158 */
159static unsigned short mcast_packed_msg_lens[FRAME_SIZE_MAX];
160
161static int mcast_packed_msg_count = 0;
162
163static int totempg_reserved = 1;
164
165static unsigned int totempg_size_limit;
166
167static totem_queue_level_changed_fn totem_queue_level_changed = NULL;
168
169static uint32_t totempg_threaded_mode = 0;
170
171static void *totemsrp_context;
172
173/*
174 * Function and data used to log messages
175 */
176static int totempg_log_level_security;
177static int totempg_log_level_error;
178static int totempg_log_level_warning;
179static int totempg_log_level_notice;
180static int totempg_log_level_debug;
181static int totempg_subsys_id;
182static void (*totempg_log_printf) (
183 int level,
184 int subsys,
185 const char *function,
186 const char *file,
187 int line,
188 const char *format, ...) __attribute__((format(printf, 6, 7)));
189
191
192static totempg_stats_t totempg_stats;
193
198
199struct assembly {
200 unsigned int nodeid;
201 unsigned char data[MESSAGE_SIZE_MAX+KNET_MAX_PACKET_SIZE];
202 int index;
203 unsigned char last_frag_num;
205 struct qb_list_head list;
206};
207
208static void assembly_deref (struct assembly *assembly);
209
210static int callback_token_received_fn (enum totem_callback_token_type type,
211 const void *data);
212
213QB_LIST_DECLARE(assembly_list_inuse);
214
215/*
216 * Free list is used both for transitional and operational assemblies
217 */
218QB_LIST_DECLARE(assembly_list_free);
219
220QB_LIST_DECLARE(assembly_list_inuse_trans);
221
222QB_LIST_DECLARE(totempg_groups_list);
223
224/*
225 * Staging buffer for packed messages. Messages are staged in this buffer
226 * before sending. Multiple messages may fit which cuts down on the
227 * number of mcasts sent. If a message doesn't completely fit, then
228 * the mcast header has a fragment bit set that says that there are more
229 * data to follow. fragment_size is an index into the buffer. It indicates
230 * the size of message data and where to place new message data.
231 * fragment_contuation indicates whether the first packed message in
232 * the buffer is a continuation of a previously packed fragment.
233 */
234static unsigned char *fragmentation_data;
235
236static int fragment_size = 0;
237
238static int fragment_continuation = 0;
239
240static int totempg_waiting_transack = 0;
241
243 void (*deliver_fn) (
244 unsigned int nodeid,
245 const void *msg,
246 unsigned int msg_len,
247 int endian_conversion_required);
248
249 void (*confchg_fn) (
250 enum totem_configuration_type configuration_type,
251 const unsigned int *member_list, size_t member_list_entries,
252 const unsigned int *left_list, size_t left_list_entries,
253 const unsigned int *joined_list, size_t joined_list_entries,
254 const struct memb_ring_id *ring_id);
255
257
259 int32_t q_level;
260
261 struct qb_list_head list;
262};
263
264static unsigned char next_fragment = 1;
265
266static pthread_mutex_t totempg_mutex = PTHREAD_MUTEX_INITIALIZER;
267
268static pthread_mutex_t callback_token_mutex = PTHREAD_MUTEX_INITIALIZER;
269
270static pthread_mutex_t mcast_msg_mutex = PTHREAD_MUTEX_INITIALIZER;
271
272#define log_printf(level, format, args...) \
273do { \
274 totempg_log_printf(level, \
275 totempg_subsys_id, \
276 __FUNCTION__, __FILE__, __LINE__, \
277 format, ##args); \
278} while (0);
279
280static int msg_count_send_ok (int msg_count);
281
282static int byte_count_send_ok (int byte_count);
283
284static void totempg_waiting_trans_ack_cb (int waiting_trans_ack)
285{
286 log_printf(LOG_DEBUG, "waiting_trans_ack changed to %u", waiting_trans_ack);
287 totempg_waiting_transack = waiting_trans_ack;
288}
289
290static struct assembly *assembly_ref (unsigned int nodeid)
291{
292 struct assembly *assembly;
293 struct qb_list_head *list;
294 struct qb_list_head *active_assembly_list_inuse;
295
296 if (totempg_waiting_transack) {
297 active_assembly_list_inuse = &assembly_list_inuse_trans;
298 } else {
299 active_assembly_list_inuse = &assembly_list_inuse;
300 }
301
302 /*
303 * Search inuse list for node id and return assembly buffer if found
304 */
305 qb_list_for_each(list, active_assembly_list_inuse) {
306 assembly = qb_list_entry (list, struct assembly, list);
307
308 if (nodeid == assembly->nodeid) {
309 return (assembly);
310 }
311 }
312
313 /*
314 * Nothing found in inuse list get one from free list if available
315 */
316 if (qb_list_empty (&assembly_list_free) == 0) {
317 assembly = qb_list_first_entry (&assembly_list_free, struct assembly, list);
318 qb_list_del (&assembly->list);
319 qb_list_add (&assembly->list, active_assembly_list_inuse);
321 assembly->index = 0;
324 return (assembly);
325 }
326
327 /*
328 * Nothing available in inuse or free list, so allocate a new one
329 */
330 assembly = malloc (sizeof (struct assembly));
331 /*
332 * TODO handle memory allocation failure here
333 */
334 assert (assembly);
336 assembly->data[0] = 0;
337 assembly->index = 0;
340 qb_list_init (&assembly->list);
341 qb_list_add (&assembly->list, active_assembly_list_inuse);
342
343 return (assembly);
344}
345
346static void assembly_deref (struct assembly *assembly)
347{
348 qb_list_del (&assembly->list);
349 qb_list_add (&assembly->list, &assembly_list_free);
350}
351
352static void assembly_deref_from_normal_and_trans (int nodeid)
353{
354 int j;
355 struct qb_list_head *list, *tmp_iter;
356 struct qb_list_head *active_assembly_list_inuse;
357 struct assembly *assembly;
358
359 for (j = 0; j < 2; j++) {
360 if (j == 0) {
361 active_assembly_list_inuse = &assembly_list_inuse;
362 } else {
363 active_assembly_list_inuse = &assembly_list_inuse_trans;
364 }
365
366 qb_list_for_each_safe(list, tmp_iter, active_assembly_list_inuse) {
367 assembly = qb_list_entry (list, struct assembly, list);
368
369 if (nodeid == assembly->nodeid) {
370 qb_list_del (&assembly->list);
371 qb_list_add (&assembly->list, &assembly_list_free);
372 }
373 }
374 }
375
376}
377
378static inline void app_confchg_fn (
379 enum totem_configuration_type configuration_type,
380 const unsigned int *member_list, size_t member_list_entries,
381 const unsigned int *left_list, size_t left_list_entries,
382 const unsigned int *joined_list, size_t joined_list_entries,
383 const struct memb_ring_id *ring_id)
384{
385 int i;
386 struct totempg_group_instance *instance;
387 struct qb_list_head *list;
388
389 /*
390 * For every leaving processor, add to free list
391 * This also has the side effect of clearing out the dataset
392 * In the leaving processor's assembly buffer.
393 */
394 for (i = 0; i < left_list_entries; i++) {
395 assembly_deref_from_normal_and_trans (left_list[i]);
396 }
397
398 qb_list_for_each(list, &totempg_groups_list) {
399 instance = qb_list_entry (list, struct totempg_group_instance, list);
400
401 if (instance->confchg_fn) {
402 instance->confchg_fn (
403 configuration_type,
404 member_list,
405 member_list_entries,
406 left_list,
407 left_list_entries,
408 joined_list,
409 joined_list_entries,
410 ring_id);
411 }
412 }
413}
414
415static inline void group_endian_convert (
416 void *msg,
417 int msg_len)
418{
419 unsigned short *group_len;
420 int i;
421 char *aligned_msg;
422
423#ifdef TOTEMPG_NEED_ALIGN
424 /*
425 * Align data structure for not i386 or x86_64
426 */
427 if ((size_t)msg % sizeof(char *) != 0) {
428 aligned_msg = alloca(msg_len);
429 memcpy(aligned_msg, msg, msg_len);
430 } else {
431 aligned_msg = msg;
432 }
433#else
434 aligned_msg = msg;
435#endif
436
437 group_len = (unsigned short *)aligned_msg;
438 group_len[0] = swab16(group_len[0]);
439 for (i = 1; i < group_len[0] + 1; i++) {
440 group_len[i] = swab16(group_len[i]);
441 }
442
443 if (aligned_msg != msg) {
444 memcpy(msg, aligned_msg, msg_len);
445 }
446}
447
448static inline int group_matches (
449 struct iovec *iovec,
450 unsigned int iov_len,
451 struct totempg_group *groups_b,
452 unsigned int group_b_cnt,
453 unsigned int *adjust_iovec)
454{
455 unsigned short *group_len;
456 char *group_name;
457 int i;
458 int j;
459#ifdef TOTEMPG_NEED_ALIGN
460 struct iovec iovec_aligned = { NULL, 0 };
461#endif
462
463 assert (iov_len == 1);
464
465#ifdef TOTEMPG_NEED_ALIGN
466 /*
467 * Align data structure for not i386 or x86_64
468 */
469 if ((size_t)iovec->iov_base % sizeof(char *) != 0) {
470 iovec_aligned.iov_base = alloca(iovec->iov_len);
471 memcpy(iovec_aligned.iov_base, iovec->iov_base, iovec->iov_len);
472 iovec_aligned.iov_len = iovec->iov_len;
473 iovec = &iovec_aligned;
474 }
475#endif
476
477 group_len = (unsigned short *)iovec->iov_base;
478 group_name = ((char *)iovec->iov_base) +
479 sizeof (unsigned short) * (group_len[0] + 1);
480
481
482 /*
483 * Calculate amount to adjust the iovec by before delivering to app
484 */
485 *adjust_iovec = sizeof (unsigned short) * (group_len[0] + 1);
486 for (i = 1; i < group_len[0] + 1; i++) {
487 *adjust_iovec += group_len[i];
488 }
489
490 /*
491 * Determine if this message should be delivered to this instance
492 */
493 for (i = 1; i < group_len[0] + 1; i++) {
494 for (j = 0; j < group_b_cnt; j++) {
495 if ((group_len[i] == groups_b[j].group_len) &&
496 (memcmp (groups_b[j].group, group_name, group_len[i]) == 0)) {
497 return (1);
498 }
499 }
500 group_name += group_len[i];
501 }
502 return (0);
503}
504
505
506static inline void app_deliver_fn (
507 unsigned int nodeid,
508 void *msg,
509 unsigned int msg_len,
510 int endian_conversion_required)
511{
512 struct totempg_group_instance *instance;
513 struct iovec stripped_iovec;
514 unsigned int adjust_iovec;
515 struct iovec *iovec;
516 struct qb_list_head *list;
517
518 struct iovec aligned_iovec = { NULL, 0 };
519
520 if (endian_conversion_required) {
521 group_endian_convert (msg, msg_len);
522 }
523
524 /*
525 * TODO: segmentation/assembly need to be redesigned to provide aligned access
526 * in all cases to avoid memory copies on non386 archs. Probably broke backwars
527 * compatibility
528 */
529
530#ifdef TOTEMPG_NEED_ALIGN
531 /*
532 * Align data structure for not i386 or x86_64
533 */
534 aligned_iovec.iov_base = alloca(msg_len);
535 aligned_iovec.iov_len = msg_len;
536 memcpy(aligned_iovec.iov_base, msg, msg_len);
537#else
538 aligned_iovec.iov_base = msg;
539 aligned_iovec.iov_len = msg_len;
540#endif
541
542 iovec = &aligned_iovec;
543
544 qb_list_for_each(list, &totempg_groups_list) {
545 instance = qb_list_entry (list, struct totempg_group_instance, list);
546 if (group_matches (iovec, 1, instance->groups, instance->groups_cnt, &adjust_iovec)) {
547 stripped_iovec.iov_len = iovec->iov_len - adjust_iovec;
548 stripped_iovec.iov_base = (char *)iovec->iov_base + adjust_iovec;
549
550#ifdef TOTEMPG_NEED_ALIGN
551 /*
552 * Align data structure for not i386 or x86_64
553 */
554 if ((uintptr_t)((char *)iovec->iov_base + adjust_iovec) % (sizeof(char *)) != 0) {
555 /*
556 * Deal with misalignment
557 */
558 stripped_iovec.iov_base =
559 alloca (stripped_iovec.iov_len);
560 memcpy (stripped_iovec.iov_base,
561 (char *)iovec->iov_base + adjust_iovec,
562 stripped_iovec.iov_len);
563 }
564#endif
565 instance->deliver_fn (
566 nodeid,
567 stripped_iovec.iov_base,
568 stripped_iovec.iov_len,
569 endian_conversion_required);
570 }
571 }
572}
573
574static void totempg_confchg_fn (
575 enum totem_configuration_type configuration_type,
576 const unsigned int *member_list, size_t member_list_entries,
577 const unsigned int *left_list, size_t left_list_entries,
578 const unsigned int *joined_list, size_t joined_list_entries,
579 const struct memb_ring_id *ring_id)
580{
581// TODO optimize this
582 app_confchg_fn (configuration_type,
583 member_list, member_list_entries,
584 left_list, left_list_entries,
585 joined_list, joined_list_entries,
586 ring_id);
587}
588
589static void totempg_deliver_fn (
590 unsigned int nodeid,
591 const void *msg,
592 unsigned int msg_len,
593 int endian_conversion_required)
594{
595 struct totempg_mcast *mcast;
596 unsigned short *msg_lens;
597 int i;
598 struct assembly *assembly;
600 int msg_count;
601 int continuation;
602 int start;
603 const char *data;
604 int datasize;
605 struct iovec iov_delv;
606 size_t expected_msg_len;
607
608 assembly = assembly_ref (nodeid);
609 assert (assembly);
610
611 if (msg_len < sizeof(struct totempg_mcast)) {
612 log_printf(LOG_WARNING,
613 "Message (totempg_mcast) received from node " CS_PRI_NODE_ID " is too short... Ignoring.", nodeid);
614
615 return ;
616 }
617
618 /*
619 * Assemble the header into one block of data and
620 * assemble the packet contents into one block of data to simplify delivery
621 */
622
623 mcast = (struct totempg_mcast *)msg;
624 if (endian_conversion_required) {
625 mcast->msg_count = swab16 (mcast->msg_count);
626 }
627
628 msg_count = mcast->msg_count;
629 datasize = sizeof (struct totempg_mcast) +
630 msg_count * sizeof (unsigned short);
631
632 if (msg_len < datasize) {
633 log_printf(LOG_WARNING,
634 "Message (totempg_mcast datasize) received from node " CS_PRI_NODE_ID
635 " is too short... Ignoring.", nodeid);
636
637 return ;
638 }
639
640 memcpy (header, msg, datasize);
641 data = msg;
642
643 msg_lens = (unsigned short *) (header + sizeof (struct totempg_mcast));
644 expected_msg_len = datasize;
645 for (i = 0; i < mcast->msg_count; i++) {
646 if (endian_conversion_required) {
647 msg_lens[i] = swab16 (msg_lens[i]);
648 }
649
650 expected_msg_len += msg_lens[i];
651 }
652
653 if (msg_len != expected_msg_len) {
654 log_printf(LOG_WARNING,
655 "Message (totempg_mcast) received from node " CS_PRI_NODE_ID
656 " doesn't have expected length of %zu (has %u) bytes... Ignoring.",
657 nodeid, expected_msg_len, msg_len);
658
659 return ;
660 }
661
662 assert((assembly->index+msg_len) < sizeof(assembly->data));
663 memcpy (&assembly->data[assembly->index], &data[datasize],
664 msg_len - datasize);
665
666 /*
667 * If the last message in the buffer is a fragment, then we
668 * can't deliver it. We'll first deliver the full messages
669 * then adjust the assembly buffer so we can add the rest of the
670 * fragment when it arrives.
671 */
672 msg_count = mcast->fragmented ? mcast->msg_count - 1 : mcast->msg_count;
673 continuation = mcast->continuation;
674 iov_delv.iov_base = (void *)&assembly->data[0];
675 iov_delv.iov_len = assembly->index + msg_lens[0];
676
677 /*
678 * Make sure that if this message is a continuation, that it
679 * matches the sequence number of the previous fragment.
680 * Also, if the first packed message is a continuation
681 * of a previous message, but the assembly buffer
682 * is empty, then we need to discard it since we can't
683 * assemble a complete message. Likewise, if this message isn't a
684 * continuation and the assembly buffer is empty, we have to discard
685 * the continued message.
686 */
687 start = 0;
688
690 /* Throw away the first msg block */
691 if (mcast->fragmented == 0 || mcast->fragmented == 1) {
693
694 assembly->index += msg_lens[0];
695 iov_delv.iov_base = (void *)&assembly->data[assembly->index];
696 iov_delv.iov_len = msg_lens[1];
697 start = 1;
698 }
699 } else
702 assembly->last_frag_num = mcast->fragmented;
703 for (i = start; i < msg_count; i++) {
704 app_deliver_fn(nodeid, iov_delv.iov_base, iov_delv.iov_len,
705 endian_conversion_required);
706 assembly->index += msg_lens[i];
707 iov_delv.iov_base = (void *)&assembly->data[assembly->index];
708 if (i < (msg_count - 1)) {
709 iov_delv.iov_len = msg_lens[i + 1];
710 }
711 }
712 } else {
713 log_printf (LOG_DEBUG, "fragmented continuation %u is not equal to assembly last_frag_num %u",
716 }
717 }
718
719 if (mcast->fragmented == 0) {
720 /*
721 * End of messages, dereference assembly struct
722 */
724 assembly->index = 0;
725 assembly_deref (assembly);
726 } else {
727 /*
728 * Message is fragmented, keep around assembly list
729 */
730 if (mcast->msg_count > 1) {
731 memmove (&assembly->data[0],
733 msg_lens[msg_count]);
734
735 assembly->index = 0;
736 }
737 assembly->index += msg_lens[msg_count];
738 }
739}
740
741/*
742 * Totem Process Group Abstraction
743 * depends on poll abstraction, POSIX, IPV4
744 */
745
747
748int callback_token_received_fn (enum totem_callback_token_type type,
749 const void *data)
750{
751 struct totempg_mcast mcast;
752 struct iovec iovecs[3];
753
754 if (totempg_threaded_mode == 1) {
755 pthread_mutex_lock (&mcast_msg_mutex);
756 }
757 if (mcast_packed_msg_count == 0) {
758 if (totempg_threaded_mode == 1) {
759 pthread_mutex_unlock (&mcast_msg_mutex);
760 }
761 return (0);
762 }
763 if (totemsrp_avail(totemsrp_context) == 0) {
764 if (totempg_threaded_mode == 1) {
765 pthread_mutex_unlock (&mcast_msg_mutex);
766 }
767 return (0);
768 }
769 mcast.header.version = 0;
770 mcast.header.type = 0;
771 mcast.fragmented = 0;
772
773 /*
774 * Was the first message in this buffer a continuation of a
775 * fragmented message?
776 */
777 mcast.continuation = fragment_continuation;
778 fragment_continuation = 0;
779
780 mcast.msg_count = mcast_packed_msg_count;
781
782 iovecs[0].iov_base = (void *)&mcast;
783 iovecs[0].iov_len = sizeof (struct totempg_mcast);
784 iovecs[1].iov_base = (void *)mcast_packed_msg_lens;
785 iovecs[1].iov_len = mcast_packed_msg_count * sizeof (unsigned short);
786 iovecs[2].iov_base = (void *)&fragmentation_data[0];
787 iovecs[2].iov_len = fragment_size;
788 (void)totemsrp_mcast (totemsrp_context, iovecs, 3, 0);
789
790 mcast_packed_msg_count = 0;
791 fragment_size = 0;
792
793 if (totempg_threaded_mode == 1) {
794 pthread_mutex_unlock (&mcast_msg_mutex);
795 }
796 return (0);
797}
798
799/*
800 * Initialize the totem process group abstraction
801 */
803 qb_loop_t *poll_handle,
805{
806 int res;
807
809 totempg_log_level_security = totem_config->totem_logging_configuration.log_level_security;
816
817 fragmentation_data = malloc (TOTEMPG_PACKET_SIZE);
818 if (fragmentation_data == 0) {
819 return (-1);
820 }
821
823
824 res = totemsrp_initialize (
825 poll_handle,
826 &totemsrp_context,
828 &totempg_stats,
829 totempg_deliver_fn,
830 totempg_confchg_fn,
831 totempg_waiting_trans_ack_cb);
832
833 if (res == -1) {
834 goto error_exit;
835 }
836
838 totemsrp_context,
841 0,
842 callback_token_received_fn,
843 0);
844
845 totempg_size_limit = (totemsrp_avail(totemsrp_context) - 1) *
846 (totempg_totem_config->net_mtu -
847 sizeof (struct totempg_mcast) - 16);
848
849 qb_list_init (&totempg_groups_list);
850
851error_exit:
852 return (res);
853}
854
856{
857 if (totempg_threaded_mode == 1) {
858 pthread_mutex_lock (&totempg_mutex);
859 }
860 // coverity[SLEEP:SUPPRESS] sleep is not a problem because it is shutdown
861 totemsrp_finalize (totemsrp_context);
862 if (totempg_threaded_mode == 1) {
863 pthread_mutex_unlock (&totempg_mutex);
864 }
865}
866
867/*
868 * Multicast a message
869 */
870static int mcast_msg (
871 struct iovec *iovec_in,
872 unsigned int iov_len,
873 int guarantee)
874{
875 int res = 0;
876 struct totempg_mcast mcast;
877 struct iovec iovecs[3];
878 struct iovec iovec[64];
879 int i;
880 int dest, src;
881 int max_packet_size = 0;
882 int copy_len = 0;
883 int copy_base = 0;
884 int total_size = 0;
885
886 if (totempg_threaded_mode == 1) {
887 pthread_mutex_lock (&mcast_msg_mutex);
888 }
889 totemsrp_event_signal (totemsrp_context, TOTEM_EVENT_NEW_MSG, 1);
890
891 /*
892 * Remove zero length iovectors from the list
893 */
894 assert (iov_len < 64);
895 for (dest = 0, src = 0; src < iov_len; src++) {
896 if (iovec_in[src].iov_len) {
897 memcpy (&iovec[dest++], &iovec_in[src],
898 sizeof (struct iovec));
899 }
900 }
901 iov_len = dest;
902
903 max_packet_size = TOTEMPG_PACKET_SIZE -
904 (sizeof (unsigned short) * (mcast_packed_msg_count + 1));
905
906 mcast_packed_msg_lens[mcast_packed_msg_count] = 0;
907
908 /*
909 * Check if we would overwrite new message queue
910 */
911 for (i = 0; i < iov_len; i++) {
912 total_size += iovec[i].iov_len;
913 }
914
915 if (byte_count_send_ok (total_size + sizeof(unsigned short) *
916 (mcast_packed_msg_count)) == 0) {
917
918 if (totempg_threaded_mode == 1) {
919 pthread_mutex_unlock (&mcast_msg_mutex);
920 }
921 return(-1);
922 }
923
924 memset(&mcast, 0, sizeof(mcast));
925
926 mcast.header.version = 0;
927 for (i = 0; i < iov_len; ) {
928 mcast.fragmented = 0;
929 mcast.continuation = fragment_continuation;
930 copy_len = iovec[i].iov_len - copy_base;
931
932 /*
933 * If it all fits with room left over, copy it in.
934 * We need to leave at least sizeof(short) + 1 bytes in the
935 * fragment_buffer on exit so that max_packet_size + fragment_size
936 * doesn't exceed the size of the fragment_buffer on the next call.
937 */
938 if ((iovec[i].iov_len + fragment_size) <
939 (max_packet_size - sizeof (unsigned short))) {
940
941 memcpy (&fragmentation_data[fragment_size],
942 (char *)iovec[i].iov_base + copy_base, copy_len);
943 fragment_size += copy_len;
944 mcast_packed_msg_lens[mcast_packed_msg_count] += copy_len;
945 next_fragment = 1;
946 // coverity[UNUSED_VALUE:SUPPRESS] defensive programming
947 copy_len = 0;
948 copy_base = 0;
949 i++;
950 continue;
951
952 /*
953 * If it just fits or is too big, then send out what fits.
954 */
955 } else {
956 unsigned char *data_ptr;
957
958 copy_len = min(copy_len, max_packet_size - fragment_size);
959 if( copy_len == max_packet_size )
960 data_ptr = (unsigned char *)iovec[i].iov_base + copy_base;
961 else {
962 data_ptr = fragmentation_data;
963 }
964
965 memcpy (&fragmentation_data[fragment_size],
966 (unsigned char *)iovec[i].iov_base + copy_base, copy_len);
967 mcast_packed_msg_lens[mcast_packed_msg_count] += copy_len;
968
969 /*
970 * if we're not on the last iovec or the iovec is too large to
971 * fit, then indicate a fragment. This also means that the next
972 * message will have the continuation of this one.
973 */
974 if ((i < (iov_len - 1)) ||
975 ((copy_base + copy_len) < iovec[i].iov_len)) {
976 if (!next_fragment) {
977 next_fragment++;
978 }
979 fragment_continuation = next_fragment;
980 mcast.fragmented = next_fragment++;
981 assert(fragment_continuation != 0);
982 assert(mcast.fragmented != 0);
983 } else {
984 fragment_continuation = 0;
985 }
986
987 /*
988 * assemble the message and send it
989 */
990 mcast.msg_count = ++mcast_packed_msg_count;
991 iovecs[0].iov_base = (void *)&mcast;
992 iovecs[0].iov_len = sizeof(struct totempg_mcast);
993 iovecs[1].iov_base = (void *)mcast_packed_msg_lens;
994 iovecs[1].iov_len = mcast_packed_msg_count *
995 sizeof(unsigned short);
996 iovecs[2].iov_base = (void *)data_ptr;
997 iovecs[2].iov_len = fragment_size + copy_len;
998 assert (totemsrp_avail(totemsrp_context) > 0);
999 res = totemsrp_mcast (totemsrp_context, iovecs, 3, guarantee);
1000 if (res == -1) {
1001 goto error_exit;
1002 }
1003
1004 /*
1005 * Recalculate counts and indexes for the next.
1006 */
1007 mcast_packed_msg_lens[0] = 0;
1008 mcast_packed_msg_count = 0;
1009 fragment_size = 0;
1010 max_packet_size = TOTEMPG_PACKET_SIZE - (sizeof(unsigned short));
1011
1012 /*
1013 * If the iovec all fit, go to the next iovec
1014 */
1015 if ((copy_base + copy_len) == iovec[i].iov_len) {
1016 // coverity[UNUSED_VALUE:SUPPRESS] defensive programming
1017 copy_len = 0;
1018 copy_base = 0;
1019 i++;
1020
1021 /*
1022 * Continue with the rest of the current iovec.
1023 */
1024 } else {
1025 copy_base += copy_len;
1026 }
1027 }
1028 }
1029
1030 /*
1031 * Bump only if we added message data. This may be zero if
1032 * the last buffer just fit into the fragmentation_data buffer
1033 * and we were at the last iovec.
1034 */
1035 if (mcast_packed_msg_lens[mcast_packed_msg_count]) {
1036 mcast_packed_msg_count++;
1037 }
1038
1039error_exit:
1040 if (totempg_threaded_mode == 1) {
1041 pthread_mutex_unlock (&mcast_msg_mutex);
1042 }
1043 return (res);
1044}
1045
1046/*
1047 * Determine if a message of msg_size could be queued
1048 */
1049static int msg_count_send_ok (
1050 int msg_count)
1051{
1052 int avail = 0;
1053
1054 avail = totemsrp_avail (totemsrp_context);
1055 totempg_stats.msg_queue_avail = avail;
1056
1057 return ((avail - totempg_reserved) > msg_count);
1058}
1059
1060static int byte_count_send_ok (
1061 int byte_count)
1062{
1063 unsigned int msg_count = 0;
1064 int avail = 0;
1065
1066 avail = totemsrp_avail (totemsrp_context);
1067
1068 msg_count = (byte_count / (totempg_totem_config->net_mtu - sizeof (struct totempg_mcast) - 16)) + 1;
1069
1070 return (avail >= msg_count);
1071}
1072
1073static int send_reserve (
1074 int msg_size)
1075{
1076 unsigned int msg_count = 0;
1077
1078 msg_count = (msg_size / (totempg_totem_config->net_mtu - sizeof (struct totempg_mcast) - 16)) + 1;
1079 totempg_reserved += msg_count;
1080 totempg_stats.msg_reserved = totempg_reserved;
1081
1082 return (msg_count);
1083}
1084
1085static void send_release (
1086 int msg_count)
1087{
1088 totempg_reserved -= msg_count;
1089 totempg_stats.msg_reserved = totempg_reserved;
1090}
1091
1092#ifndef HAVE_SMALL_MEMORY_FOOTPRINT
1093#undef MESSAGE_QUEUE_MAX
1094#define MESSAGE_QUEUE_MAX ((4 * MESSAGE_SIZE_MAX) / totempg_totem_config->net_mtu)
1095#endif /* HAVE_SMALL_MEMORY_FOOTPRINT */
1096
1097static uint32_t q_level_precent_used(void)
1098{
1099 return (100 - (((totemsrp_avail(totemsrp_context) - totempg_reserved) * 100) / MESSAGE_QUEUE_MAX));
1100}
1101
1103 void **handle_out,
1105 int delete,
1106 int (*callback_fn) (enum totem_callback_token_type type, const void *),
1107 const void *data)
1108{
1109 int res;
1110 if (totempg_threaded_mode == 1) {
1111 pthread_mutex_lock (&callback_token_mutex);
1112 }
1113 res = totemsrp_callback_token_create (totemsrp_context, handle_out, type, delete,
1114 callback_fn, data);
1115 if (totempg_threaded_mode == 1) {
1116 pthread_mutex_unlock (&callback_token_mutex);
1117 }
1118 return (res);
1119}
1120
1122 void *handle_out)
1123{
1124 if (totempg_threaded_mode == 1) {
1125 pthread_mutex_lock (&callback_token_mutex);
1126 }
1127 totemsrp_callback_token_destroy (totemsrp_context, handle_out);
1128 if (totempg_threaded_mode == 1) {
1129 pthread_mutex_unlock (&callback_token_mutex);
1130 }
1131}
1132
1133/*
1134 * vi: set autoindent tabstop=4 shiftwidth=4 :
1135 */
1136
1138 void **totempg_groups_instance,
1139
1140 void (*deliver_fn) (
1141 unsigned int nodeid,
1142 const void *msg,
1143 unsigned int msg_len,
1144 int endian_conversion_required),
1145
1146 void (*confchg_fn) (
1147 enum totem_configuration_type configuration_type,
1148 const unsigned int *member_list, size_t member_list_entries,
1149 const unsigned int *left_list, size_t left_list_entries,
1150 const unsigned int *joined_list, size_t joined_list_entries,
1151 const struct memb_ring_id *ring_id))
1152{
1153 struct totempg_group_instance *instance;
1154
1155 if (totempg_threaded_mode == 1) {
1156 pthread_mutex_lock (&totempg_mutex);
1157 }
1158
1159 instance = malloc (sizeof (struct totempg_group_instance));
1160 if (instance == NULL) {
1161 goto error_exit;
1162 }
1163
1164 instance->deliver_fn = deliver_fn;
1165 instance->confchg_fn = confchg_fn;
1166 instance->groups = 0;
1167 instance->groups_cnt = 0;
1168 instance->q_level = QB_LOOP_MED;
1169 qb_list_init (&instance->list);
1170 qb_list_add (&instance->list, &totempg_groups_list);
1171
1172 if (totempg_threaded_mode == 1) {
1173 pthread_mutex_unlock (&totempg_mutex);
1174 }
1175 *totempg_groups_instance = instance;
1176 return (0);
1177
1178error_exit:
1179 if (totempg_threaded_mode == 1) {
1180 pthread_mutex_unlock (&totempg_mutex);
1181 }
1182 return (-1);
1183}
1184
1186 void *totempg_groups_instance,
1187 const struct totempg_group *groups,
1188 size_t group_cnt)
1189{
1190 struct totempg_group_instance *instance = (struct totempg_group_instance *)totempg_groups_instance;
1191 struct totempg_group *new_groups;
1192 int res = 0;
1193
1194 if (totempg_threaded_mode == 1) {
1195 pthread_mutex_lock (&totempg_mutex);
1196 }
1197
1198 new_groups = realloc (instance->groups,
1199 sizeof (struct totempg_group) *
1200 (instance->groups_cnt + group_cnt));
1201 if (new_groups == 0) {
1202 res = -1;
1203 goto error_exit;
1204 }
1205 memcpy (&new_groups[instance->groups_cnt],
1206 groups, group_cnt * sizeof (struct totempg_group));
1207 instance->groups = new_groups;
1208 instance->groups_cnt += group_cnt;
1209
1210error_exit:
1211 if (totempg_threaded_mode == 1) {
1212 pthread_mutex_unlock (&totempg_mutex);
1213 }
1214 return (res);
1215}
1216
1218 void *totempg_groups_instance,
1219 const struct totempg_group *groups,
1220 size_t group_cnt)
1221{
1222 if (totempg_threaded_mode == 1) {
1223 pthread_mutex_lock (&totempg_mutex);
1224 }
1225
1226 if (totempg_threaded_mode == 1) {
1227 pthread_mutex_unlock (&totempg_mutex);
1228 }
1229 return (0);
1230}
1231
1232#define MAX_IOVECS_FROM_APP 32
1233#define MAX_GROUPS_PER_MSG 32
1234
1236 void *totempg_groups_instance,
1237 const struct iovec *iovec,
1238 unsigned int iov_len,
1239 int guarantee)
1240{
1241 struct totempg_group_instance *instance = (struct totempg_group_instance *)totempg_groups_instance;
1242 unsigned short group_len[MAX_GROUPS_PER_MSG + 1];
1243 struct iovec iovec_mcast[MAX_GROUPS_PER_MSG + 1 + MAX_IOVECS_FROM_APP];
1244 int i;
1245 int res;
1246
1247 if (totempg_threaded_mode == 1) {
1248 pthread_mutex_lock (&totempg_mutex);
1249 }
1250
1251 /*
1252 * Build group_len structure and the iovec_mcast structure
1253 */
1254 group_len[0] = instance->groups_cnt;
1255 for (i = 0; i < instance->groups_cnt; i++) {
1256 group_len[i + 1] = instance->groups[i].group_len;
1257 iovec_mcast[i + 1].iov_len = instance->groups[i].group_len;
1258 iovec_mcast[i + 1].iov_base = (void *) instance->groups[i].group;
1259 }
1260 iovec_mcast[0].iov_len = (instance->groups_cnt + 1) * sizeof (unsigned short);
1261 iovec_mcast[0].iov_base = (char *)group_len;
1262 for (i = 0; i < iov_len; i++) {
1263 iovec_mcast[i + instance->groups_cnt + 1].iov_len = iovec[i].iov_len;
1264 iovec_mcast[i + instance->groups_cnt + 1].iov_base = iovec[i].iov_base;
1265 }
1266
1267 res = mcast_msg (iovec_mcast, iov_len + instance->groups_cnt + 1, guarantee);
1268
1269 if (totempg_threaded_mode == 1) {
1270 pthread_mutex_unlock (&totempg_mutex);
1271 }
1272
1273 return (res);
1274}
1275
1276static void check_q_level(
1277 void *totempg_groups_instance)
1278{
1279 struct totempg_group_instance *instance = (struct totempg_group_instance *)totempg_groups_instance;
1280 int32_t old_level = instance->q_level;
1281 int32_t percent_used = q_level_precent_used();
1282
1283 if (percent_used >= 75 && instance->q_level != TOTEM_Q_LEVEL_CRITICAL) {
1284 instance->q_level = TOTEM_Q_LEVEL_CRITICAL;
1285 } else if (percent_used < 30 && instance->q_level != TOTEM_Q_LEVEL_LOW) {
1286 instance->q_level = TOTEM_Q_LEVEL_LOW;
1287 } else if (percent_used > 40 && percent_used < 50 && instance->q_level != TOTEM_Q_LEVEL_GOOD) {
1288 instance->q_level = TOTEM_Q_LEVEL_GOOD;
1289 } else if (percent_used > 60 && percent_used < 70 && instance->q_level != TOTEM_Q_LEVEL_HIGH) {
1290 instance->q_level = TOTEM_Q_LEVEL_HIGH;
1291 }
1292 if (totem_queue_level_changed && old_level != instance->q_level) {
1293 totem_queue_level_changed(instance->q_level);
1294 }
1295}
1296
1298 void *totempg_groups_instance)
1299{
1300 struct totempg_group_instance *instance = (struct totempg_group_instance *)totempg_groups_instance;
1301
1302 check_q_level(instance);
1303}
1304
1306 void *totempg_groups_instance,
1307 const struct iovec *iovec,
1308 unsigned int iov_len)
1309{
1310 struct totempg_group_instance *instance = (struct totempg_group_instance *)totempg_groups_instance;
1311 unsigned int size = 0;
1312 unsigned int i;
1313 unsigned int reserved = 0;
1314
1315 if (totempg_threaded_mode == 1) {
1316 pthread_mutex_lock (&totempg_mutex);
1317 pthread_mutex_lock (&mcast_msg_mutex);
1318 }
1319
1320 for (i = 0; i < instance->groups_cnt; i++) {
1321 size += instance->groups[i].group_len;
1322 }
1323 for (i = 0; i < iov_len; i++) {
1324 size += iovec[i].iov_len;
1325 }
1326
1327 if (size >= totempg_size_limit) {
1328 reserved = -1;
1329 goto error_exit;
1330 }
1331
1332 if (byte_count_send_ok (size)) {
1333 reserved = send_reserve (size);
1334 } else {
1335 reserved = 0;
1336 }
1337
1338error_exit:
1339 check_q_level(instance);
1340
1341 if (totempg_threaded_mode == 1) {
1342 pthread_mutex_unlock (&mcast_msg_mutex);
1343 pthread_mutex_unlock (&totempg_mutex);
1344 }
1345 return (reserved);
1346}
1347
1348
1350{
1351 if (totempg_threaded_mode == 1) {
1352 pthread_mutex_lock (&totempg_mutex);
1353 pthread_mutex_lock (&mcast_msg_mutex);
1354 }
1355 send_release (msg_count);
1356 if (totempg_threaded_mode == 1) {
1357 pthread_mutex_unlock (&mcast_msg_mutex);
1358 pthread_mutex_unlock (&totempg_mutex);
1359 }
1360 return 0;
1361}
1362
1364 void *totempg_groups_instance,
1365 int guarantee,
1366 const struct totempg_group *groups,
1367 size_t groups_cnt,
1368 const struct iovec *iovec,
1369 unsigned int iov_len)
1370{
1371 unsigned short group_len[MAX_GROUPS_PER_MSG + 1];
1372 struct iovec iovec_mcast[MAX_GROUPS_PER_MSG + 1 + MAX_IOVECS_FROM_APP];
1373 int i;
1374 int res;
1375
1376 if (totempg_threaded_mode == 1) {
1377 pthread_mutex_lock (&totempg_mutex);
1378 }
1379
1380 /*
1381 * Build group_len structure and the iovec_mcast structure
1382 */
1383 group_len[0] = groups_cnt;
1384 for (i = 0; i < groups_cnt; i++) {
1385 group_len[i + 1] = groups[i].group_len;
1386 iovec_mcast[i + 1].iov_len = groups[i].group_len;
1387 iovec_mcast[i + 1].iov_base = (void *) groups[i].group;
1388 }
1389 iovec_mcast[0].iov_len = (groups_cnt + 1) * sizeof (unsigned short);
1390 iovec_mcast[0].iov_base = (char *)group_len;
1391 for (i = 0; i < iov_len; i++) {
1392 iovec_mcast[i + groups_cnt + 1].iov_len = iovec[i].iov_len;
1393 iovec_mcast[i + groups_cnt + 1].iov_base = iovec[i].iov_base;
1394 }
1395
1396 res = mcast_msg (iovec_mcast, iov_len + groups_cnt + 1, guarantee);
1397
1398 if (totempg_threaded_mode == 1) {
1399 pthread_mutex_unlock (&totempg_mutex);
1400 }
1401 return (res);
1402}
1403
1404/*
1405 * Returns -1 if error, 0 if can't send, 1 if can send the message
1406 */
1408 void *totempg_groups_instance,
1409 const struct totempg_group *groups,
1410 size_t groups_cnt,
1411 const struct iovec *iovec,
1412 unsigned int iov_len)
1413{
1414 unsigned int size = 0;
1415 unsigned int i;
1416 unsigned int res;
1417
1418 if (totempg_threaded_mode == 1) {
1419 pthread_mutex_lock (&totempg_mutex);
1420 }
1421
1422 for (i = 0; i < groups_cnt; i++) {
1423 size += groups[i].group_len;
1424 }
1425 for (i = 0; i < iov_len; i++) {
1426 size += iovec[i].iov_len;
1427 }
1428
1429 res = msg_count_send_ok (size);
1430
1431 if (totempg_threaded_mode == 1) {
1432 pthread_mutex_unlock (&totempg_mutex);
1433 }
1434 return (res);
1435}
1436
1438 struct totem_ip_address *interface_addr,
1439 unsigned short ip_port,
1440 unsigned int iface_no)
1441{
1442 int res;
1443
1444 res = totemsrp_iface_set (
1445 totemsrp_context,
1446 interface_addr,
1447 ip_port,
1448 iface_no);
1449
1450 return (res);
1451}
1452
1454 struct totem_node_status *node_status)
1455{
1456 memset(node_status, 0, sizeof(struct totem_node_status));
1457 return totemsrp_nodestatus_get (totemsrp_context, nodeid, node_status);
1458}
1459
1461 unsigned int nodeid,
1462 unsigned int *interface_id,
1463 struct totem_ip_address *interfaces,
1464 unsigned int interfaces_size,
1465 char ***status,
1466 unsigned int *iface_count)
1467{
1468 int res;
1469
1470 res = totemsrp_ifaces_get (
1471 totemsrp_context,
1472 nodeid,
1473 interface_id,
1474 interfaces,
1475 interfaces_size,
1476 status,
1477 iface_count);
1478
1479 return (res);
1480}
1481
1483{
1484 totemsrp_event_signal (totemsrp_context, type, value);
1485}
1486
1488{
1489 return &totempg_stats;
1490}
1491
1493 const char *cipher_type,
1494 const char *hash_type)
1495{
1496 int res;
1497
1498 res = totemsrp_crypto_set (totemsrp_context, cipher_type, hash_type);
1499
1500 return (res);
1501}
1502
1503#define ONE_IFACE_LEN 63
1504const char *totempg_ifaces_print (unsigned int nodeid)
1505{
1506 static char iface_string[256 * INTERFACE_MAX];
1507 char one_iface[ONE_IFACE_LEN+1];
1508 struct totem_ip_address interfaces[INTERFACE_MAX];
1509 unsigned int iface_count;
1510 unsigned int iface_ids[INTERFACE_MAX];
1511 unsigned int i;
1512 int res;
1513
1514 iface_string[0] = '\0';
1515
1516 res = totempg_ifaces_get (nodeid, iface_ids, interfaces, INTERFACE_MAX, NULL, &iface_count);
1517 if (res == -1) {
1518 return ("no interface found for nodeid");
1519 }
1520
1521 res = totempg_ifaces_get (nodeid, iface_ids, interfaces, INTERFACE_MAX, NULL, &iface_count);
1522
1523 for (i = 0; i < iface_count; i++) {
1524 if (!interfaces[i].family) {
1525 continue;
1526 }
1527 snprintf (one_iface, ONE_IFACE_LEN,
1528 "r(%d) ip(%s) ",
1529 i, totemip_print (&interfaces[i]));
1530 strcat (iface_string, one_iface);
1531 }
1532 return (iface_string);
1533}
1534
1535unsigned int totempg_my_nodeid_get (void)
1536{
1537 return (totemsrp_my_nodeid_get(totemsrp_context));
1538}
1539
1541{
1542 return (totemsrp_my_family_get(totemsrp_context));
1543}
1545 void (*totem_service_ready) (void))
1546{
1547 totemsrp_service_ready_register (totemsrp_context, totem_service_ready);
1548}
1549
1551{
1552 totem_queue_level_changed = fn;
1553}
1554
1556 const struct totem_ip_address *member,
1557 int ring_no)
1558{
1559 return totemsrp_member_add (totemsrp_context, member, ring_no);
1560}
1561
1563 const struct totem_ip_address *member,
1564 int ring_no)
1565{
1566 return totemsrp_member_remove (totemsrp_context, member, ring_no);
1567}
1568
1569extern int totempg_reconfigure (void)
1570{
1571 return totemsrp_reconfigure (totemsrp_context, totempg_totem_config);
1572}
1573
1578
1580{
1582 totempg_stats.msg_reserved = 0;
1583 totempg_stats.msg_queue_avail = 0;
1584 }
1585 return totemsrp_stats_clear (totemsrp_context, flags);
1586}
1587
1589{
1590 totempg_threaded_mode = 1;
1591 totemsrp_threaded_mode_enable (totemsrp_context);
1592}
1593
1595{
1596 totemsrp_trans_ack (totemsrp_context);
1597}
1598
1600{
1601 totemsrp_force_gather(totemsrp_context);
1602}
1603
1604/* Assumes ->orig_interfaces is already allocated */
1606{
1607 struct totem_interface *temp_if = config->orig_interfaces;
1608
1609 memcpy(config, totempg_totem_config, sizeof(struct totem_config));
1610 config->orig_interfaces = temp_if;
1611 memcpy(config->orig_interfaces, totempg_totem_config->interfaces, sizeof(struct totem_interface) * INTERFACE_MAX);
1612 config->interfaces = NULL;
1613}
1614
1616{
1617 struct totem_interface *temp_if = totempg_totem_config->interfaces;
1618
1619 /* Preseve the existing interfaces[] array as transports might have pointers saved */
1620 memcpy(totempg_totem_config->interfaces, config->interfaces, sizeof(struct totem_interface) * INTERFACE_MAX);
1621 memcpy(totempg_totem_config, config, sizeof(struct totem_config));
1622 totempg_totem_config->interfaces = temp_if;
1623}
totem_configuration_type
The totem_configuration_type enum.
Definition coroapi.h:132
#define INTERFACE_MAX
Definition coroapi.h:88
unsigned short family
Definition coroapi.h:1
#define MESSAGE_SIZE_MAX
Definition coroapi.h:97
#define MESSAGE_QUEUE_MAX
Definition coroapi.h:98
unsigned int nodeid
Definition coroapi.h:0
totem_callback_token_type
The totem_callback_token_type enum.
Definition coroapi.h:142
@ TOTEM_CALLBACK_TOKEN_RECEIVED
Definition coroapi.h:143
#define CS_PRI_NODE_ID
Definition corotypes.h:59
#define min(a, b)
Definition exec/util.h:66
uint32_t flags
uint32_t value
int index
Definition totempg.c:202
struct qb_list_head list
Definition totempg.c:205
unsigned int nodeid
Definition totempg.c:200
unsigned char last_frag_num
Definition totempg.c:203
enum throw_away_mode throw_away_mode
Definition totempg.c:204
unsigned char data[MESSAGE_SIZE_MAX+KNET_MAX_PACKET_SIZE]
Definition totempg.c:201
struct totem_message_header header
Definition totemsrp.c:184
The memb_ring_id struct.
Definition coroapi.h:122
struct totem_logging_configuration totem_logging_configuration
Definition totem.h:208
struct totem_interface * interfaces
Definition totem.h:165
struct totem_interface * orig_interfaces
Definition totem.h:166
The totem_ip_address struct.
Definition coroapi.h:111
void(* log_printf)(int level, int subsys, const char *function_name, const char *file_name, int file_line, const char *format,...) __attribute__((format(printf
Definition totem.h:101
struct qb_list_head list
Definition totempg.c:261
void(* confchg_fn)(enum totem_configuration_type configuration_type, const unsigned int *member_list, size_t member_list_entries, const unsigned int *left_list, size_t left_list_entries, const unsigned int *joined_list, size_t joined_list_entries, const struct memb_ring_id *ring_id)
Definition totempg.c:249
struct totempg_group * groups
Definition totempg.c:256
void(* deliver_fn)(unsigned int nodeid, const void *msg, unsigned int msg_len, int endian_conversion_required)
Definition totempg.c:243
const void * group
Definition totempg.h:56
size_t group_len
Definition totempg.h:57
unsigned short msg_count
Definition totempg.c:141
unsigned char fragmented
Definition totempg.c:139
struct totempg_mcast_header header
Definition totempg.c:138
unsigned char continuation
Definition totempg.c:140
#define swab16(x)
The swab16 macro.
Definition swab.h:39
typedef __attribute__
totem_event_type
Definition totem.h:292
@ TOTEM_EVENT_NEW_MSG
Definition totem.h:294
#define FRAME_SIZE_MAX
Definition totem.h:52
cfg_message_crypto_reconfig_phase_t
Definition totem.h:154
char type
Definition totem.h:2
const char * totemip_print(const struct totem_ip_address *addr)
Definition totemip.c:256
int totempg_my_family_get(void)
Definition totempg.c:1540
unsigned int totempg_my_nodeid_get(void)
Definition totempg.c:1535
int totempg_groups_leave(void *totempg_groups_instance, const struct totempg_group *groups, size_t group_cnt)
Definition totempg.c:1217
void totempg_check_q_level(void *totempg_groups_instance)
Definition totempg.c:1297
int totempg_callback_token_create(void **handle_out, enum totem_callback_token_type type, int delete, int(*callback_fn)(enum totem_callback_token_type type, const void *), const void *data)
Definition totempg.c:1102
#define log_printf(level, format, args...)
Definition totempg.c:272
throw_away_mode
Definition totempg.c:194
@ THROW_AWAY_INACTIVE
Definition totempg.c:195
@ THROW_AWAY_ACTIVE
Definition totempg.c:196
const char * totempg_ifaces_print(unsigned int nodeid)
Definition totempg.c:1504
#define ONE_IFACE_LEN
Definition totempg.c:1503
int totempg_groups_joined_reserve(void *totempg_groups_instance, const struct iovec *iovec, unsigned int iov_len)
Definition totempg.c:1305
QB_LIST_DECLARE(assembly_list_inuse)
int totempg_groups_initialize(void **totempg_groups_instance, void(*deliver_fn)(unsigned int nodeid, const void *msg, unsigned int msg_len, int endian_conversion_required), void(*confchg_fn)(enum totem_configuration_type configuration_type, const unsigned int *member_list, size_t member_list_entries, const unsigned int *left_list, size_t left_list_entries, const unsigned int *joined_list, size_t joined_list_entries, const struct memb_ring_id *ring_id))
Initialize a groups instance.
Definition totempg.c:1137
#define MAX_IOVECS_FROM_APP
Definition totempg.c:1232
int totempg_crypto_set(const char *cipher_type, const char *hash_type)
Definition totempg.c:1492
void totempg_get_config(struct totem_config *config)
Definition totempg.c:1605
void * callback_token_received_handle
Definition totempg.c:746
int totempg_iface_set(struct totem_ip_address *interface_addr, unsigned short ip_port, unsigned int iface_no)
Definition totempg.c:1437
int totempg_nodestatus_get(unsigned int nodeid, struct totem_node_status *node_status)
Definition totempg.c:1453
void totempg_put_config(struct totem_config *config)
Definition totempg.c:1615
int totempg_reconfigure(void)
Definition totempg.c:1569
#define TOTEMPG_PACKET_SIZE
Definition totempg.c:153
int totempg_crypto_reconfigure_phase(cfg_message_crypto_reconfig_phase_t phase)
Definition totempg.c:1574
void totempg_trans_ack(void)
Definition totempg.c:1594
void totempg_force_gather(void)
Definition totempg.c:1599
int totempg_member_remove(const struct totem_ip_address *member, int ring_no)
Definition totempg.c:1562
void totempg_queue_level_register_callback(totem_queue_level_changed_fn fn)
Definition totempg.c:1550
int totempg_initialize(qb_loop_t *poll_handle, struct totem_config *totem_config)
Initialize the totem process groups abstraction.
Definition totempg.c:802
int totempg_groups_joined_release(int msg_count)
Definition totempg.c:1349
#define MAX_GROUPS_PER_MSG
Definition totempg.c:1233
void totempg_service_ready_register(void(*totem_service_ready)(void))
Definition totempg.c:1544
int totempg_ifaces_get(unsigned int nodeid, unsigned int *interface_id, struct totem_ip_address *interfaces, unsigned int interfaces_size, char ***status, unsigned int *iface_count)
Definition totempg.c:1460
void totempg_callback_token_destroy(void *handle_out)
Definition totempg.c:1121
void totempg_event_signal(enum totem_event_type type, int value)
Definition totempg.c:1482
void totempg_stats_clear(int flags)
Definition totempg.c:1579
void totempg_finalize(void)
Definition totempg.c:855
void * totempg_get_stats(void)
Definition totempg.c:1487
static void struct totem_config * totempg_totem_config
Definition totempg.c:190
int totempg_groups_mcast_joined(void *totempg_groups_instance, const struct iovec *iovec, unsigned int iov_len, int guarantee)
Definition totempg.c:1235
void totempg_threaded_mode_enable(void)
Definition totempg.c:1588
int totempg_groups_send_ok_groups(void *totempg_groups_instance, const struct totempg_group *groups, size_t groups_cnt, const struct iovec *iovec, unsigned int iov_len)
Definition totempg.c:1407
int totempg_groups_mcast_groups(void *totempg_groups_instance, int guarantee, const struct totempg_group *groups, size_t groups_cnt, const struct iovec *iovec, unsigned int iov_len)
Definition totempg.c:1363
int totempg_member_add(const struct totem_ip_address *member, int ring_no)
Definition totempg.c:1555
int totempg_groups_join(void *totempg_groups_instance, const struct totempg_group *groups, size_t group_cnt)
Definition totempg.c:1185
Totem Single Ring Protocol.
void(* totem_queue_level_changed_fn)(enum totem_q_level level)
Definition totempg.h:189
@ TOTEM_Q_LEVEL_GOOD
Definition totempg.h:182
@ TOTEM_Q_LEVEL_HIGH
Definition totempg.h:183
@ TOTEM_Q_LEVEL_LOW
Definition totempg.h:181
@ TOTEM_Q_LEVEL_CRITICAL
Definition totempg.h:184
int totemsrp_my_family_get(void *srp_context)
Definition totemsrp.c:1133
void totemsrp_force_gather(void *context)
Definition totemsrp.c:5269
void totemsrp_service_ready_register(void *context, void(*totem_service_ready)(void))
Definition totemsrp.c:5190
int totemsrp_initialize(qb_loop_t *poll_handle, void **srp_context, struct totem_config *totem_config, totempg_stats_t *stats, void(*deliver_fn)(unsigned int nodeid, const void *msg, unsigned int msg_len, int endian_conversion_required), void(*confchg_fn)(enum totem_configuration_type configuration_type, const unsigned int *member_list, size_t member_list_entries, const unsigned int *left_list, size_t left_list_entries, const unsigned int *joined_list, size_t joined_list_entries, const struct memb_ring_id *ring_id), void(*waiting_trans_ack_cb_fn)(int waiting_trans_ack))
Create a protocol instance.
Definition totemsrp.c:818
int totemsrp_callback_token_create(void *srp_context, void **handle_out, enum totem_callback_token_type type, int delete, int(*callback_fn)(enum totem_callback_token_type type, const void *), const void *data)
Definition totemsrp.c:3498
void totemsrp_threaded_mode_enable(void *context)
Definition totemsrp.c:5225
void totemsrp_net_mtu_adjust(struct totem_config *totem_config)
Definition totemsrp.c:5186
int totemsrp_nodestatus_get(void *srp_context, unsigned int nodeid, struct totem_node_status *node_status)
Definition totemsrp.c:1041
int guarantee
Definition totemsrp.c:6
int totemsrp_crypto_set(void *srp_context, const char *cipher_type, const char *hash_type)
Definition totemsrp.c:1108
int totemsrp_avail(void *srp_context)
Return number of available messages that can be queued.
Definition totemsrp.c:2568
void totemsrp_stats_clear(void *context, int flags)
Definition totemsrp.c:5259
int totemsrp_iface_set(void *context, const struct totem_ip_address *interface_addr, unsigned short ip_port, unsigned int iface_no)
Definition totemsrp.c:5106
void totemsrp_finalize(void *srp_context)
Definition totemsrp.c:1026
struct memb_ring_id ring_id
Definition totemsrp.c:4
void totemsrp_trans_ack(void *context)
Definition totemsrp.c:5232
struct totem_message_header header
Definition totemsrp.c:0
int totemsrp_crypto_reconfigure_phase(void *context, struct totem_config *totem_config, cfg_message_crypto_reconfig_phase_t phase)
Definition totemsrp.c:5250
unsigned int totemsrp_my_nodeid_get(void *srp_context)
Definition totemsrp.c:1122
void totemsrp_event_signal(void *srp_context, enum totem_event_type type, int value)
Definition totemsrp.c:2488
void totemsrp_callback_token_destroy(void *srp_context, void **handle_out)
Definition totemsrp.c:3533
int totemsrp_member_add(void *context, const struct totem_ip_address *member, int iface_no)
Definition totemsrp.c:5199
int totemsrp_ifaces_get(void *srp_context, unsigned int nodeid, unsigned int *interface_id, struct totem_ip_address *interfaces, unsigned int interfaces_size, char ***status, unsigned int *iface_count)
Definition totemsrp.c:1070
int totemsrp_reconfigure(void *context, struct totem_config *totem_config)
Definition totemsrp.c:5241
int totemsrp_member_remove(void *context, const struct totem_ip_address *member, int iface_no)
Definition totemsrp.c:5212
int totemsrp_mcast(void *srp_context, struct iovec *iovec, unsigned int iov_len, int guarantee)
Multicast a message.
Definition totemsrp.c:2497
Totem Single Ring Protocol.
#define TOTEMPG_STATS_CLEAR_TOTEM
Definition totemstats.h:115