corosync 3.1.10
lib/cpg.c
Go to the documentation of this file.
1/*
2 * vi: set autoindent tabstop=4 shiftwidth=4 :
3 *
4 * Copyright (c) 2006-2015 Red Hat, Inc.
5 *
6 * All rights reserved.
7 *
8 * Author: Christine Caulfield (ccaulfi@redhat.com)
9 * Author: Jan Friesse (jfriesse@redhat.com)
10 *
11 * This software licensed under BSD license, the text of which follows:
12 *
13 * Redistribution and use in source and binary forms, with or without
14 * modification, are permitted provided that the following conditions are met:
15 *
16 * - Redistributions of source code must retain the above copyright notice,
17 * this list of conditions and the following disclaimer.
18 * - Redistributions in binary form must reproduce the above copyright notice,
19 * this list of conditions and the following disclaimer in the documentation
20 * and/or other materials provided with the distribution.
21 * - Neither the name of the MontaVista Software, Inc. nor the names of its
22 * contributors may be used to endorse or promote products derived from this
23 * software without specific prior written permission.
24 *
25 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
26 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
27 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
28 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
29 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
30 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
31 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
32 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
33 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
34 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF
35 * THE POSSIBILITY OF SUCH DAMAGE.
36 */
37/*
38 * Provides a closed process group API using the coroipcc executive
39 */
40
41#include <config.h>
42
43#include <stdlib.h>
44#include <stdio.h>
45#include <string.h>
46#include <unistd.h>
47#include <sys/types.h>
48#include <sys/socket.h>
49#include <sys/mman.h>
50#include <sys/uio.h>
51#include <sys/stat.h>
52#include <errno.h>
53#include <limits.h>
54
55#include <qb/qblist.h>
56#include <qb/qbdefs.h>
57#include <qb/qbipcc.h>
58#include <qb/qblog.h>
59
60#include <corosync/hdb.h>
61#include <corosync/corotypes.h>
62#include <corosync/corodefs.h>
63#include <corosync/cpg.h>
64#include <corosync/ipc_cpg.h>
65
66#include "util.h"
67
68#ifndef MAP_ANONYMOUS
69#define MAP_ANONYMOUS MAP_ANON
70#endif
71
72/*
73 * Maximum number of times to retry a send when transmitting
74 * a large message fragment
75 */
76#define MAX_RETRIES 100
77
78/*
79 * ZCB files have following umask (umask is same as used in libqb)
80 */
81#define CPG_MEMORY_MAP_UMASK 077
82
84{
85 struct qb_list_head list;
86 uint32_t nodeid;
87 uint32_t pid;
90};
91
92struct cpg_inst {
93 qb_ipcc_connection_t *c;
95 void *context;
96 union {
99 };
100 struct qb_list_head iteration_list_head;
101 uint32_t max_msg_size;
102 struct qb_list_head assembly_list_head;
103};
104static void cpg_inst_free (void *inst);
105
106DECLARE_HDB_DATABASE(cpg_handle_t_db, cpg_inst_free);
107
114
115DECLARE_HDB_DATABASE(cpg_iteration_handle_t_db,NULL);
116
117
118/*
119 * Internal (not visible by API) functions
120 */
121
122static cs_error_t
123coroipcc_msg_send_reply_receive (
124 qb_ipcc_connection_t *c,
125 const struct iovec *iov,
126 unsigned int iov_len,
127 void *res_msg,
128 size_t res_len)
129{
130 return qb_to_cs_error(qb_ipcc_sendv_recv(c, iov, iov_len, res_msg, res_len,
132}
133
134static void cpg_iteration_instance_finalize (struct cpg_iteration_instance_t *cpg_iteration_instance)
135{
136 qb_list_del (&cpg_iteration_instance->list);
137 hdb_handle_destroy (&cpg_iteration_handle_t_db, cpg_iteration_instance->cpg_iteration_handle);
138}
139
140static void cpg_inst_free (void *inst)
141{
142 struct cpg_inst *cpg_inst = (struct cpg_inst *)inst;
143 qb_ipcc_disconnect(cpg_inst->c);
144}
145
146static void cpg_inst_finalize (struct cpg_inst *cpg_inst, hdb_handle_t handle)
147{
148 struct qb_list_head *iter, *tmp_iter;
150
151 /*
152 * Traverse thru iteration instances and delete them
153 */
154 qb_list_for_each_safe(iter, tmp_iter, &(cpg_inst->iteration_list_head)) {
155 cpg_iteration_instance = qb_list_entry (iter, struct cpg_iteration_instance_t, list);
156
157 cpg_iteration_instance_finalize (cpg_iteration_instance);
158 }
159 hdb_handle_destroy (&cpg_handle_t_db, handle);
160}
161
168
170 cpg_handle_t *handle,
171 cpg_callbacks_t *callbacks)
172{
173 cpg_model_v1_data_t model_v1_data;
174
175 memset (&model_v1_data, 0, sizeof (cpg_model_v1_data_t));
176
177 if (callbacks) {
178 model_v1_data.cpg_deliver_fn = callbacks->cpg_deliver_fn;
179 model_v1_data.cpg_confchg_fn = callbacks->cpg_confchg_fn;
180 }
181
182 return (cpg_model_initialize (handle, CPG_MODEL_V1, (cpg_model_data_t *)&model_v1_data, NULL));
183}
184
186 cpg_handle_t *handle,
187 cpg_model_t model,
188 cpg_model_data_t *model_data,
189 void *context)
190{
191 cs_error_t error;
192 struct cpg_inst *cpg_inst;
193
194 if (model != CPG_MODEL_V1) {
195 error = CS_ERR_INVALID_PARAM;
196 goto error_no_destroy;
197 }
198
199 error = hdb_error_to_cs (hdb_handle_create (&cpg_handle_t_db, sizeof (struct cpg_inst), handle));
200 if (error != CS_OK) {
201 goto error_no_destroy;
202 }
203
204 error = hdb_error_to_cs (hdb_handle_get (&cpg_handle_t_db, *handle, (void *)&cpg_inst));
205 if (error != CS_OK) {
206 goto error_destroy;
207 }
208
209 cpg_inst->c = qb_ipcc_connect ("cpg", IPC_REQUEST_SIZE);
210 if (cpg_inst->c == NULL) {
211 error = qb_to_cs_error(-errno);
212 goto error_put_destroy;
213 }
214
215 if (model_data != NULL) {
216 switch (model) {
217 case CPG_MODEL_V1:
220 error = CS_ERR_INVALID_PARAM;
221
222 goto error_destroy;
223 }
224 break;
225 }
226 }
227
228 /* Allow space for corosync internal headers */
230 cpg_inst->model_data.model = model;
232
233 qb_list_init(&cpg_inst->iteration_list_head);
234
235 qb_list_init(&cpg_inst->assembly_list_head);
236
237 hdb_handle_put (&cpg_handle_t_db, *handle);
238
239 return (CS_OK);
240
241error_put_destroy:
242 hdb_handle_put (&cpg_handle_t_db, *handle);
243error_destroy:
244 hdb_handle_destroy (&cpg_handle_t_db, *handle);
245error_no_destroy:
246 return (error);
247}
248
250 cpg_handle_t handle)
251{
252 struct cpg_inst *cpg_inst;
253 struct iovec iov;
256 cs_error_t error;
257
258 error = hdb_error_to_cs (hdb_handle_get (&cpg_handle_t_db, handle, (void *)&cpg_inst));
259 if (error != CS_OK) {
260 return (error);
261 }
262
263 /*
264 * Another thread has already started finalizing
265 */
266 if (cpg_inst->finalize) {
267 hdb_handle_put (&cpg_handle_t_db, handle);
268 return (CS_ERR_BAD_HANDLE);
269 }
270
271 cpg_inst->finalize = 1;
272
273 /*
274 * Send service request
275 */
276 req_lib_cpg_finalize.header.size = sizeof (struct req_lib_cpg_finalize);
278
279 iov.iov_base = (void *)&req_lib_cpg_finalize;
280 iov.iov_len = sizeof (struct req_lib_cpg_finalize);
281
282 error = coroipcc_msg_send_reply_receive (cpg_inst->c,
283 &iov,
284 1,
286 sizeof (struct res_lib_cpg_finalize));
287
288 cpg_inst_finalize (cpg_inst, handle);
289 hdb_handle_put (&cpg_handle_t_db, handle);
290
291 return (error);
292}
293
295 cpg_handle_t handle,
296 int *fd)
297{
298 cs_error_t error;
299 struct cpg_inst *cpg_inst;
300
301 error = hdb_error_to_cs (hdb_handle_get (&cpg_handle_t_db, handle, (void *)&cpg_inst));
302 if (error != CS_OK) {
303 return (error);
304 }
305
306 error = qb_to_cs_error (qb_ipcc_fd_get (cpg_inst->c, fd));
307
308 hdb_handle_put (&cpg_handle_t_db, handle);
309
310 return (error);
311}
312
314 cpg_handle_t handle,
315 uint32_t *size)
316{
317 cs_error_t error;
318 struct cpg_inst *cpg_inst;
319
320 error = hdb_error_to_cs (hdb_handle_get (&cpg_handle_t_db, handle, (void *)&cpg_inst));
321 if (error != CS_OK) {
322 return (error);
323 }
324
325 *size = cpg_inst->max_msg_size;
326
327 hdb_handle_put (&cpg_handle_t_db, handle);
328
329 return (error);
330}
331
333 cpg_handle_t handle,
334 void **context)
335{
336 cs_error_t error;
337 struct cpg_inst *cpg_inst;
338
339 error = hdb_error_to_cs (hdb_handle_get (&cpg_handle_t_db, handle, (void *)&cpg_inst));
340 if (error != CS_OK) {
341 return (error);
342 }
343
345
346 hdb_handle_put (&cpg_handle_t_db, handle);
347
348 return (CS_OK);
349}
350
352 cpg_handle_t handle,
353 void *context)
354{
355 cs_error_t error;
356 struct cpg_inst *cpg_inst;
357
358 error = hdb_error_to_cs (hdb_handle_get (&cpg_handle_t_db, handle, (void *)&cpg_inst));
359 if (error != CS_OK) {
360 return (error);
361 }
362
364
365 hdb_handle_put (&cpg_handle_t_db, handle);
366
367 return (CS_OK);
368}
369
371 cpg_handle_t handle,
372 cs_dispatch_flags_t dispatch_types)
373{
374 int timeout = -1;
375 cs_error_t error;
376 int cont = 1; /* always continue do loop except when set to 0 */
377 struct cpg_inst *cpg_inst;
378 struct res_lib_cpg_confchg_callback *res_cpg_confchg_callback;
379 struct res_lib_cpg_deliver_callback *res_cpg_deliver_callback;
380 struct res_lib_cpg_partial_deliver_callback *res_cpg_partial_deliver_callback;
381 struct res_lib_cpg_totem_confchg_callback *res_cpg_totem_confchg_callback;
382 struct cpg_inst cpg_inst_copy;
383 struct qb_ipc_response_header *dispatch_data;
384 struct cpg_address member_list[CPG_MEMBERS_MAX];
385 struct cpg_address left_list[CPG_MEMBERS_MAX];
386 struct cpg_address joined_list[CPG_MEMBERS_MAX];
387 struct cpg_name group_name;
388 struct cpg_assembly_data *assembly_data;
389 struct qb_list_head *iter, *tmp_iter;
390 mar_cpg_address_t *left_list_start;
391 mar_cpg_address_t *joined_list_start;
392 unsigned int i;
393 struct cpg_ring_id ring_id;
394 uint32_t totem_member_list[CPG_MEMBERS_MAX];
395 int32_t errno_res;
396 char dispatch_buf[IPC_DISPATCH_SIZE];
397
398 error = hdb_error_to_cs (hdb_handle_get (&cpg_handle_t_db, handle, (void *)&cpg_inst));
399 if (error != CS_OK) {
400 return (error);
401 }
402
403 /*
404 * Timeout instantly for CS_DISPATCH_ONE_NONBLOCKING or CS_DISPATCH_ALL and
405 * wait indefinitely for CS_DISPATCH_ONE or CS_DISPATCH_BLOCKING
406 */
407 if (dispatch_types == CS_DISPATCH_ALL || dispatch_types == CS_DISPATCH_ONE_NONBLOCKING) {
408 timeout = 0;
409 }
410
411 dispatch_data = (struct qb_ipc_response_header *)dispatch_buf;
412 do {
413 errno_res = qb_ipcc_event_recv (
414 cpg_inst->c,
415 dispatch_buf,
417 timeout);
418 error = qb_to_cs_error (errno_res);
419 if (error == CS_ERR_BAD_HANDLE) {
420 error = CS_OK;
421 goto error_put;
422 }
423 if (error == CS_ERR_TRY_AGAIN) {
424 if (dispatch_types == CS_DISPATCH_ONE_NONBLOCKING) {
425 /*
426 * Don't mask error
427 */
428 goto error_put;
429 }
430 error = CS_OK;
431 if (dispatch_types == CS_DISPATCH_ALL) {
432 break; /* exit do while cont is 1 loop */
433 } else {
434 continue; /* next poll */
435 }
436 }
437 if (error != CS_OK) {
438 goto error_put;
439 }
440
441 /*
442 * Make copy of callbacks, message data, unlock instance, and call callback
443 * A risk of this dispatch method is that the callback routines may
444 * operate at the same time that cpgFinalize has been called.
445 */
446 memcpy (&cpg_inst_copy, cpg_inst, sizeof (struct cpg_inst));
447 switch (cpg_inst_copy.model_data.model) {
448 case CPG_MODEL_V1:
449 /*
450 * Dispatch incoming message
451 */
452 switch (dispatch_data->id) {
454 if (cpg_inst_copy.model_v1_data.cpg_deliver_fn == NULL) {
455 break;
456 }
457
458 res_cpg_deliver_callback = (struct res_lib_cpg_deliver_callback *)dispatch_data;
459
460 marshall_from_mar_cpg_name_t (
461 &group_name,
462 &res_cpg_deliver_callback->group_name);
463
464 cpg_inst_copy.model_v1_data.cpg_deliver_fn (handle,
465 &group_name,
466 res_cpg_deliver_callback->nodeid,
467 res_cpg_deliver_callback->pid,
468 &res_cpg_deliver_callback->message,
469 res_cpg_deliver_callback->msglen);
470 break;
471
473 res_cpg_partial_deliver_callback = (struct res_lib_cpg_partial_deliver_callback *)dispatch_data;
474
475 marshall_from_mar_cpg_name_t (
476 &group_name,
477 &res_cpg_partial_deliver_callback->group_name);
478
479 /*
480 * Search for assembly data for current messages (nodeid, pid) pair in list of assemblies
481 */
482 assembly_data = NULL;
483 qb_list_for_each(iter, &(cpg_inst->assembly_list_head)) {
484 struct cpg_assembly_data *current_assembly_data = qb_list_entry (iter, struct cpg_assembly_data, list);
485 if (current_assembly_data->nodeid == res_cpg_partial_deliver_callback->nodeid && current_assembly_data->pid == res_cpg_partial_deliver_callback->pid) {
486 assembly_data = current_assembly_data;
487 break;
488 }
489 }
490
491 if (res_cpg_partial_deliver_callback->type == LIBCPG_PARTIAL_FIRST) {
492
493 /*
494 * As this is LIBCPG_PARTIAL_FIRST packet, check that there is no ongoing assembly.
495 * Otherwise the sending of packet must have been interrupted and error should have
496 * been reported to sending client. Therefore here last assembly will be dropped.
497 */
498 if (assembly_data) {
499 qb_list_del (&assembly_data->list);
500 free(assembly_data->assembly_buf);
501 free(assembly_data);
502 // coverity[UNUSED_VALUE:SUPPRESS] defensive programming
503 assembly_data = NULL;
504 }
505
506 assembly_data = malloc(sizeof(struct cpg_assembly_data));
507 if (!assembly_data) {
508 error = CS_ERR_NO_MEMORY;
509 goto error_put;
510 }
511
512 assembly_data->nodeid = res_cpg_partial_deliver_callback->nodeid;
513 assembly_data->pid = res_cpg_partial_deliver_callback->pid;
514 assembly_data->assembly_buf = malloc(res_cpg_partial_deliver_callback->msglen);
515 if (!assembly_data->assembly_buf) {
516 free(assembly_data);
517 error = CS_ERR_NO_MEMORY;
518 goto error_put;
519 }
520 assembly_data->assembly_buf_ptr = 0;
521 qb_list_init (&assembly_data->list);
522
523 qb_list_add (&assembly_data->list, &cpg_inst->assembly_list_head);
524 }
525 if (assembly_data) {
526 memcpy(assembly_data->assembly_buf + assembly_data->assembly_buf_ptr,
527 res_cpg_partial_deliver_callback->message, res_cpg_partial_deliver_callback->fraglen);
528 assembly_data->assembly_buf_ptr += res_cpg_partial_deliver_callback->fraglen;
529
530 if (res_cpg_partial_deliver_callback->type == LIBCPG_PARTIAL_LAST) {
531 if (cpg_inst_copy.model_v1_data.cpg_deliver_fn != NULL) {
532 cpg_inst_copy.model_v1_data.cpg_deliver_fn (handle,
533 &group_name,
534 res_cpg_partial_deliver_callback->nodeid,
535 res_cpg_partial_deliver_callback->pid,
536 assembly_data->assembly_buf,
537 res_cpg_partial_deliver_callback->msglen);
538 }
539
540 qb_list_del (&assembly_data->list);
541 free(assembly_data->assembly_buf);
542 free(assembly_data);
543 }
544 }
545 break;
546
548 if (cpg_inst_copy.model_v1_data.cpg_confchg_fn == NULL) {
549 break;
550 }
551
552 res_cpg_confchg_callback = (struct res_lib_cpg_confchg_callback *)dispatch_data;
553
554 for (i = 0; i < res_cpg_confchg_callback->member_list_entries; i++) {
555 marshall_from_mar_cpg_address_t (&member_list[i],
556 &res_cpg_confchg_callback->member_list[i]);
557 }
558 left_list_start = res_cpg_confchg_callback->member_list +
559 res_cpg_confchg_callback->member_list_entries;
560 for (i = 0; i < res_cpg_confchg_callback->left_list_entries; i++) {
561 marshall_from_mar_cpg_address_t (&left_list[i],
562 &left_list_start[i]);
563 }
564 joined_list_start = res_cpg_confchg_callback->member_list +
565 res_cpg_confchg_callback->member_list_entries +
566 res_cpg_confchg_callback->left_list_entries;
567 for (i = 0; i < res_cpg_confchg_callback->joined_list_entries; i++) {
568 marshall_from_mar_cpg_address_t (&joined_list[i],
569 &joined_list_start[i]);
570 }
571 marshall_from_mar_cpg_name_t (
572 &group_name,
573 &res_cpg_confchg_callback->group_name);
574
575 cpg_inst_copy.model_v1_data.cpg_confchg_fn (handle,
576 &group_name,
578 res_cpg_confchg_callback->member_list_entries,
579 left_list,
580 res_cpg_confchg_callback->left_list_entries,
581 joined_list,
582 res_cpg_confchg_callback->joined_list_entries);
583
584 /*
585 * If member left while his partial packet was being assembled, assembly data must be removed from list
586 */
587 for (i = 0; i < res_cpg_confchg_callback->left_list_entries; i++) {
588 qb_list_for_each_safe(iter, tmp_iter, &(cpg_inst->assembly_list_head)) {
589 struct cpg_assembly_data *current_assembly_data = qb_list_entry (iter, struct cpg_assembly_data, list);
590 if (current_assembly_data->nodeid != left_list[i].nodeid || current_assembly_data->pid != left_list[i].pid)
591 continue;
592
593 qb_list_del (&current_assembly_data->list);
594 free(current_assembly_data->assembly_buf);
595 free(current_assembly_data);
596 }
597 }
598
599 break;
601 if (cpg_inst_copy.model_v1_data.cpg_totem_confchg_fn == NULL) {
602 break;
603 }
604
605 res_cpg_totem_confchg_callback = (struct res_lib_cpg_totem_confchg_callback *)dispatch_data;
606
607 marshall_from_mar_cpg_ring_id_t (&ring_id, &res_cpg_totem_confchg_callback->ring_id);
608 for (i = 0; i < res_cpg_totem_confchg_callback->member_list_entries; i++) {
609 totem_member_list[i] = res_cpg_totem_confchg_callback->member_list[i];
610 }
611
612 cpg_inst_copy.model_v1_data.cpg_totem_confchg_fn (handle,
613 ring_id,
614 res_cpg_totem_confchg_callback->member_list_entries,
615 totem_member_list);
616 break;
617 default:
618 error = CS_ERR_LIBRARY;
619 goto error_put;
620 break;
621 } /* - switch (dispatch_data->id) */
622 break; /* case CPG_MODEL_V1 */
623 } /* - switch (cpg_inst_copy.model_data.model) */
624
625 if (cpg_inst_copy.finalize || cpg_inst->finalize) {
626 /*
627 * If the finalize has been called then get out of the dispatch.
628 */
629 cpg_inst->finalize = 1;
630 error = CS_ERR_BAD_HANDLE;
631 goto error_put;
632 }
633
634 /*
635 * Determine if more messages should be processed
636 */
637 if (dispatch_types == CS_DISPATCH_ONE || dispatch_types == CS_DISPATCH_ONE_NONBLOCKING) {
638 cont = 0;
639 }
640 } while (cont);
641
642error_put:
643 hdb_handle_put (&cpg_handle_t_db, handle);
644 return (error);
645}
646
648 cpg_handle_t handle,
649 const struct cpg_name *group)
650{
651 cs_error_t error;
652 struct cpg_inst *cpg_inst;
653 struct iovec iov[2];
655 struct res_lib_cpg_join response;
656
657 if (group->length > CPG_MAX_NAME_LENGTH) {
658 return (CS_ERR_NAME_TOO_LONG);
659 }
660
661 error = hdb_error_to_cs (hdb_handle_get (&cpg_handle_t_db, handle, (void *)&cpg_inst));
662 if (error != CS_OK) {
663 return (error);
664 }
665
666 /* Now join */
667 req_lib_cpg_join.header.size = sizeof (struct req_lib_cpg_join);
669 req_lib_cpg_join.pid = getpid();
670 req_lib_cpg_join.flags = 0;
671
672 switch (cpg_inst->model_data.model) {
673 case CPG_MODEL_V1:
675 break;
676 }
677
678 marshall_to_mar_cpg_name_t (&req_lib_cpg_join.group_name,
679 group);
680
681 iov[0].iov_base = (void *)&req_lib_cpg_join;
682 iov[0].iov_len = sizeof (struct req_lib_cpg_join);
683
684 do {
685 error = coroipcc_msg_send_reply_receive (cpg_inst->c, iov, 1,
686 &response, sizeof (struct res_lib_cpg_join));
687
688 if (error != CS_OK) {
689 goto error_exit;
690 }
691 } while (response.header.error == CS_ERR_BUSY);
692
693 error = response.header.error;
694
695error_exit:
696 hdb_handle_put (&cpg_handle_t_db, handle);
697
698 return (error);
699}
700
702 cpg_handle_t handle,
703 const struct cpg_name *group)
704{
705 cs_error_t error;
706 struct cpg_inst *cpg_inst;
707 struct iovec iov[2];
710
711 if (group->length > CPG_MAX_NAME_LENGTH) {
712 return (CS_ERR_NAME_TOO_LONG);
713 }
714
715 error = hdb_error_to_cs (hdb_handle_get (&cpg_handle_t_db, handle, (void *)&cpg_inst));
716 if (error != CS_OK) {
717 return (error);
718 }
719
720 req_lib_cpg_leave.header.size = sizeof (struct req_lib_cpg_leave);
722 req_lib_cpg_leave.pid = getpid();
723 marshall_to_mar_cpg_name_t (&req_lib_cpg_leave.group_name,
724 group);
725
726 iov[0].iov_base = (void *)&req_lib_cpg_leave;
727 iov[0].iov_len = sizeof (struct req_lib_cpg_leave);
728
729 do {
730 error = coroipcc_msg_send_reply_receive (cpg_inst->c, iov, 1,
731 &res_lib_cpg_leave, sizeof (struct res_lib_cpg_leave));
732
733 if (error != CS_OK) {
734 goto error_exit;
735 }
736 } while (res_lib_cpg_leave.header.error == CS_ERR_BUSY);
737
738 error = res_lib_cpg_leave.header.error;
739
740error_exit:
741 hdb_handle_put (&cpg_handle_t_db, handle);
742
743 return (error);
744}
745
747 cpg_handle_t handle,
748 struct cpg_name *group_name,
749 struct cpg_address *member_list,
750 int *member_list_entries)
751{
752 cs_error_t error;
753 struct cpg_inst *cpg_inst;
754 struct iovec iov;
757 unsigned int i;
758
759 if (group_name->length > CPG_MAX_NAME_LENGTH) {
760 return (CS_ERR_NAME_TOO_LONG);
761 }
762 if (member_list == NULL) {
763 return (CS_ERR_INVALID_PARAM);
764 }
765 if (member_list_entries == NULL) {
766 return (CS_ERR_INVALID_PARAM);
767 }
768
769 error = hdb_error_to_cs (hdb_handle_get (&cpg_handle_t_db, handle, (void *)&cpg_inst));
770 if (error != CS_OK) {
771 return (error);
772 }
773
774 req_lib_cpg_membership_get.header.size = sizeof (struct req_lib_cpg_membership_get);
776
777 marshall_to_mar_cpg_name_t (&req_lib_cpg_membership_get.group_name,
778 group_name);
779
780 iov.iov_base = (void *)&req_lib_cpg_membership_get;
781 iov.iov_len = sizeof (struct req_lib_cpg_membership_get);
782
783 error = coroipcc_msg_send_reply_receive (cpg_inst->c, &iov, 1,
785
786 if (error != CS_OK) {
787 goto error_exit;
788 }
789
790 error = res_lib_cpg_membership_get.header.error;
791
792 /*
793 * Copy results to caller
794 */
795 *member_list_entries = res_lib_cpg_membership_get.member_count;
796 if (member_list) {
797 for (i = 0; i < res_lib_cpg_membership_get.member_count; i++) {
798 marshall_from_mar_cpg_address_t (&member_list[i],
800 }
801 }
802
803error_exit:
804 hdb_handle_put (&cpg_handle_t_db, handle);
805
806 return (error);
807}
808
810 cpg_handle_t handle,
811 unsigned int *local_nodeid)
812{
813 cs_error_t error;
814 struct cpg_inst *cpg_inst;
815 struct iovec iov;
818
819 error = hdb_error_to_cs (hdb_handle_get (&cpg_handle_t_db, handle, (void *)&cpg_inst));
820 if (error != CS_OK) {
821 return (error);
822 }
823
824 req_lib_cpg_local_get.header.size = sizeof (struct qb_ipc_request_header);
826
827 iov.iov_base = (void *)&req_lib_cpg_local_get;
828 iov.iov_len = sizeof (struct req_lib_cpg_local_get);
829
830 error = coroipcc_msg_send_reply_receive (cpg_inst->c, &iov, 1,
832
833 if (error != CS_OK) {
834 goto error_exit;
835 }
836
837 error = res_lib_cpg_local_get.header.error;
838
839 *local_nodeid = res_lib_cpg_local_get.local_nodeid;
840
841error_exit:
842 hdb_handle_put (&cpg_handle_t_db, handle);
843
844 return (error);
845}
846
848 cpg_handle_t handle,
849 cpg_flow_control_state_t *flow_control_state)
850{
851 cs_error_t error;
852 struct cpg_inst *cpg_inst;
853
854 error = hdb_error_to_cs (hdb_handle_get (&cpg_handle_t_db, handle, (void *)&cpg_inst));
855 if (error != CS_OK) {
856 return (error);
857 }
858 *flow_control_state = CPG_FLOW_CONTROL_DISABLED;
859 error = CS_OK;
860
861 hdb_handle_put (&cpg_handle_t_db, handle);
862
863 return (error);
864}
865
866static int
867memory_map (char *path, const char *file, void **buf, size_t bytes)
868{
869 int32_t fd;
870 void *addr;
871 int32_t res;
872 char *buffer;
873 int32_t i;
874 size_t written;
875 size_t page_size;
876 long int sysconf_page_size;
877 mode_t old_umask;
878
879 snprintf (path, PATH_MAX, "/dev/shm/%s", file);
880
881 old_umask = umask(CPG_MEMORY_MAP_UMASK);
882 fd = mkstemp (path);
883 (void)umask(old_umask);
884 if (fd == -1) {
885 snprintf (path, PATH_MAX, LOCALSTATEDIR "/run/%s", file);
886 old_umask = umask(CPG_MEMORY_MAP_UMASK);
887 fd = mkstemp (path);
888 (void)umask(old_umask);
889 if (fd == -1) {
890 return (-1);
891 }
892 }
893
894 res = ftruncate (fd, bytes);
895 if (res == -1) {
896 goto error_close_unlink;
897 }
898 sysconf_page_size = sysconf(_SC_PAGESIZE);
899 if (sysconf_page_size <= 0) {
900 goto error_close_unlink;
901 }
902 page_size = sysconf_page_size;
903 buffer = malloc (page_size);
904 if (buffer == NULL) {
905 goto error_close_unlink;
906 }
907 memset (buffer, 0, page_size);
908 for (i = 0; i < (bytes / page_size); i++) {
909retry_write:
910 written = write (fd, buffer, page_size);
911 if (written == -1 && errno == EINTR) {
912 goto retry_write;
913 }
914 if (written != page_size) {
915 free (buffer);
916 goto error_close_unlink;
917 }
918 }
919 free (buffer);
920
921 addr = mmap (NULL, bytes, PROT_READ | PROT_WRITE,
922 MAP_SHARED, fd, 0);
923
924 if (addr == MAP_FAILED) {
925 goto error_close_unlink;
926 }
927#ifdef MADV_NOSYNC
928 madvise(addr, bytes, MADV_NOSYNC);
929#endif
930
931 res = close (fd);
932 if (res) {
933 munmap(addr, bytes);
934
935 return (-1);
936 }
937 *buf = addr;
938
939 return 0;
940
941error_close_unlink:
942 close (fd);
943 unlink(path);
944 return -1;
945}
946
948 cpg_handle_t handle,
949 size_t size,
950 void **buffer)
951{
952 void *buf = NULL;
953 char path[PATH_MAX];
954 mar_req_coroipcc_zc_alloc_t req_coroipcc_zc_alloc;
955 struct qb_ipc_response_header res_coroipcs_zc_alloc;
956 size_t map_size;
957 struct iovec iovec;
958 struct coroipcs_zc_header *hdr;
959 cs_error_t error;
960 struct cpg_inst *cpg_inst;
961
962 error = hdb_error_to_cs (hdb_handle_get (&cpg_handle_t_db, handle, (void *)&cpg_inst));
963 if (error != CS_OK) {
964 return (error);
965 }
966
967 map_size = size + sizeof (struct req_lib_cpg_mcast) + sizeof (struct coroipcs_zc_header);
968 assert(memory_map (path, "corosync_zerocopy-XXXXXX", &buf, map_size) != -1);
969
970 if (strlen(path) >= CPG_ZC_PATH_LEN) {
971 unlink(path);
972 munmap (buf, map_size);
973 return (CS_ERR_NAME_TOO_LONG);
974 }
975
976 req_coroipcc_zc_alloc.header.size = sizeof (mar_req_coroipcc_zc_alloc_t);
977 req_coroipcc_zc_alloc.header.id = MESSAGE_REQ_CPG_ZC_ALLOC;
978 req_coroipcc_zc_alloc.map_size = map_size;
979 strcpy (req_coroipcc_zc_alloc.path_to_file, path);
980
981 iovec.iov_base = (void *)&req_coroipcc_zc_alloc;
982 iovec.iov_len = sizeof (mar_req_coroipcc_zc_alloc_t);
983
984 error = coroipcc_msg_send_reply_receive (
985 cpg_inst->c,
986 &iovec,
987 1,
988 &res_coroipcs_zc_alloc,
989 sizeof (struct qb_ipc_response_header));
990
991 if (error != CS_OK) {
992 goto error_exit;
993 }
994
995 hdr = (struct coroipcs_zc_header *)buf;
996 hdr->map_size = map_size;
997 *buffer = ((char *)buf) + sizeof (struct coroipcs_zc_header) + sizeof (struct req_lib_cpg_mcast);
998
999error_exit:
1000 hdb_handle_put (&cpg_handle_t_db, handle);
1001 /*
1002 * Coverity correctly reports an error here. We cannot safely munmap and unlink the file, because
1003 * the timing of the failure is the key issue: if a failure occurs before the IPC reply,
1004 * the file should be deleted.
1005 * However, if the failure happens during the IPC reply, Corosync has already deleted the file.
1006 * This means the cpg library could attempt to delete a non-existing file (not a problem) or,
1007 * in a theoretical race condition, delete a new file created by another application.
1008 * There are multiple possible solutions, but none of them are ready to be implemented yet.
1009 */
1010 return (error);
1011}
1012
1014 cpg_handle_t handle,
1015 void *buffer)
1016{
1017 cs_error_t error;
1018 unsigned int res;
1019 struct cpg_inst *cpg_inst;
1020 mar_req_coroipcc_zc_free_t req_coroipcc_zc_free;
1021 struct qb_ipc_response_header res_coroipcs_zc_free;
1022 struct iovec iovec;
1023 struct coroipcs_zc_header *header = (struct coroipcs_zc_header *)((char *)buffer - sizeof (struct coroipcs_zc_header) - sizeof (struct req_lib_cpg_mcast));
1024
1025 error = hdb_error_to_cs (hdb_handle_get (&cpg_handle_t_db, handle, (void *)&cpg_inst));
1026 if (error != CS_OK) {
1027 return (error);
1028 }
1029
1030 req_coroipcc_zc_free.header.size = sizeof (mar_req_coroipcc_zc_free_t);
1031 req_coroipcc_zc_free.header.id = MESSAGE_REQ_CPG_ZC_FREE;
1032 req_coroipcc_zc_free.map_size = header->map_size;
1033 req_coroipcc_zc_free.server_address = header->server_address;
1034
1035 iovec.iov_base = (void *)&req_coroipcc_zc_free;
1036 iovec.iov_len = sizeof (mar_req_coroipcc_zc_free_t);
1037
1038 error = coroipcc_msg_send_reply_receive (
1039 cpg_inst->c,
1040 &iovec,
1041 1,
1042 &res_coroipcs_zc_free,
1043 sizeof (struct qb_ipc_response_header));
1044
1045 if (error != CS_OK) {
1046 goto error_exit;
1047 }
1048
1049 res = munmap ((void *)header, header->map_size);
1050 if (res == -1) {
1051 error = qb_to_cs_error(-errno);
1052
1053 goto error_exit;
1054 }
1055
1056error_exit:
1057 hdb_handle_put (&cpg_handle_t_db, handle);
1058
1059 return (error);
1060}
1061
1063 cpg_handle_t handle,
1065 void *msg,
1066 size_t msg_len)
1067{
1068 cs_error_t error;
1069 struct cpg_inst *cpg_inst;
1072 mar_req_coroipcc_zc_execute_t req_coroipcc_zc_execute;
1073 struct coroipcs_zc_header *hdr;
1074 struct iovec iovec;
1075
1076 error = hdb_error_to_cs (hdb_handle_get (&cpg_handle_t_db, handle, (void *)&cpg_inst));
1077 if (error != CS_OK) {
1078 return (error);
1079 }
1080
1081 if (msg_len > IPC_REQUEST_SIZE) {
1082 error = CS_ERR_TOO_BIG;
1083 goto error_exit;
1084 }
1085
1086 req_lib_cpg_mcast = (struct req_lib_cpg_mcast *)(((char *)msg) - sizeof (struct req_lib_cpg_mcast));
1087 req_lib_cpg_mcast->header.size = sizeof (struct req_lib_cpg_mcast) +
1088 msg_len;
1089
1091 req_lib_cpg_mcast->guarantee = guarantee;
1092 req_lib_cpg_mcast->msglen = msg_len;
1093
1094 hdr = (struct coroipcs_zc_header *)(((char *)req_lib_cpg_mcast) - sizeof (struct coroipcs_zc_header));
1095
1096 req_coroipcc_zc_execute.header.size = sizeof (mar_req_coroipcc_zc_execute_t);
1097 req_coroipcc_zc_execute.header.id = MESSAGE_REQ_CPG_ZC_EXECUTE;
1098 req_coroipcc_zc_execute.server_address = hdr->server_address;
1099
1100 iovec.iov_base = (void *)&req_coroipcc_zc_execute;
1101 iovec.iov_len = sizeof (mar_req_coroipcc_zc_execute_t);
1102
1103 error = coroipcc_msg_send_reply_receive (
1104 cpg_inst->c,
1105 &iovec,
1106 1,
1108 sizeof(res_lib_cpg_mcast));
1109
1110 if (error != CS_OK) {
1111 goto error_exit;
1112 }
1113
1114 error = res_lib_cpg_mcast.header.error;
1115
1116error_exit:
1117 hdb_handle_put (&cpg_handle_t_db, handle);
1118
1119 return (error);
1120}
1121
1122static cs_error_t send_fragments (
1123 struct cpg_inst *cpg_inst,
1125 size_t msg_len,
1126 const struct iovec *iovec,
1127 unsigned int iov_len)
1128{
1129 int i;
1130 cs_error_t error = CS_OK;
1131 struct iovec iov[2];
1134 size_t sent = 0;
1135 size_t iov_sent = 0;
1136 int retry_count;
1137
1139 req_lib_cpg_mcast.guarantee = guarantee;
1140 req_lib_cpg_mcast.msglen = msg_len;
1141
1142 iov[0].iov_base = (void *)&req_lib_cpg_mcast;
1143 iov[0].iov_len = sizeof (struct req_lib_cpg_partial_mcast);
1144
1145 i=0;
1146 iov_sent = 0 ;
1147 qb_ipcc_fc_enable_max_set(cpg_inst->c, 2);
1148
1149 while (error == CS_OK && sent < msg_len) {
1150
1151 retry_count = 0;
1152 if ( (iovec[i].iov_len - iov_sent) > cpg_inst->max_msg_size) {
1153 iov[1].iov_len = cpg_inst->max_msg_size;
1154 }
1155 else {
1156 iov[1].iov_len = iovec[i].iov_len - iov_sent;
1157 }
1158
1159 if (sent == 0) {
1161 }
1162 else if ((sent + iov[1].iov_len) == msg_len) {
1164 }
1165 else {
1167 }
1168
1169 req_lib_cpg_mcast.fraglen = iov[1].iov_len;
1170 req_lib_cpg_mcast.header.size = sizeof (struct req_lib_cpg_partial_mcast) + iov[1].iov_len;
1171 iov[1].iov_base = (char *)iovec[i].iov_base + iov_sent;
1172
1173 resend:
1174 error = coroipcc_msg_send_reply_receive (cpg_inst->c, iov, 2,
1176 sizeof (res_lib_cpg_partial_send));
1177
1178 if (error == CS_ERR_TRY_AGAIN) {
1179 fprintf(stderr, "sleep. counter=%d\n", retry_count);
1180 if (++retry_count > MAX_RETRIES) {
1181 goto error_exit;
1182 }
1183 usleep(10000);
1184 goto resend;
1185 }
1186
1187 iov_sent += iov[1].iov_len;
1188 sent += iov[1].iov_len;
1189
1190 /* Next iovec */
1191 if (iov_sent >= iovec[i].iov_len) {
1192 i++;
1193 iov_sent = 0;
1194 }
1195 error = res_lib_cpg_partial_send.header.error;
1196 }
1197error_exit:
1198 qb_ipcc_fc_enable_max_set(cpg_inst->c, 1);
1199
1200 return error;
1201}
1202
1203
1205 cpg_handle_t handle,
1207 const struct iovec *iovec,
1208 unsigned int iov_len)
1209{
1210 int i;
1211 cs_error_t error;
1212 struct cpg_inst *cpg_inst;
1213 struct iovec iov[64];
1215 size_t msg_len = 0;
1216
1217 error = hdb_error_to_cs (hdb_handle_get (&cpg_handle_t_db, handle, (void *)&cpg_inst));
1218 if (error != CS_OK) {
1219 return (error);
1220 }
1221
1222 for (i = 0; i < iov_len; i++ ) {
1223 msg_len += iovec[i].iov_len;
1224 }
1225
1226 if (msg_len > cpg_inst->max_msg_size) {
1227 error = send_fragments(cpg_inst, guarantee, msg_len, iovec, iov_len);
1228 goto error_exit;
1229 }
1230
1231 req_lib_cpg_mcast.header.size = sizeof (struct req_lib_cpg_mcast) +
1232 msg_len;
1233
1235 req_lib_cpg_mcast.guarantee = guarantee;
1236 req_lib_cpg_mcast.msglen = msg_len;
1237
1238 iov[0].iov_base = (void *)&req_lib_cpg_mcast;
1239 iov[0].iov_len = sizeof (struct req_lib_cpg_mcast);
1240 memcpy (&iov[1], iovec, iov_len * sizeof (struct iovec));
1241
1242 qb_ipcc_fc_enable_max_set(cpg_inst->c, 2);
1243 error = qb_to_cs_error(qb_ipcc_sendv(cpg_inst->c, iov, iov_len + 1));
1244 qb_ipcc_fc_enable_max_set(cpg_inst->c, 1);
1245
1246error_exit:
1247 hdb_handle_put (&cpg_handle_t_db, handle);
1248
1249 return (error);
1250}
1251
1253 cpg_handle_t handle,
1254 cpg_iteration_type_t iteration_type,
1255 const struct cpg_name *group,
1256 cpg_iteration_handle_t *cpg_iteration_handle)
1257{
1258 cs_error_t error;
1259 struct iovec iov;
1260 struct cpg_inst *cpg_inst;
1264
1265 if (group && group->length > CPG_MAX_NAME_LENGTH) {
1266 return (CS_ERR_NAME_TOO_LONG);
1267 }
1268 if (cpg_iteration_handle == NULL) {
1269 return (CS_ERR_INVALID_PARAM);
1270 }
1271
1272 if ((iteration_type == CPG_ITERATION_ONE_GROUP && group == NULL) ||
1273 (iteration_type != CPG_ITERATION_ONE_GROUP && group != NULL)) {
1274 return (CS_ERR_INVALID_PARAM);
1275 }
1276
1277 if (iteration_type != CPG_ITERATION_NAME_ONLY && iteration_type != CPG_ITERATION_ONE_GROUP &&
1278 iteration_type != CPG_ITERATION_ALL) {
1279
1280 return (CS_ERR_INVALID_PARAM);
1281 }
1282
1283 error = hdb_error_to_cs (hdb_handle_get (&cpg_handle_t_db, handle, (void *)&cpg_inst));
1284 if (error != CS_OK) {
1285 return (error);
1286 }
1287
1288 error = hdb_error_to_cs (hdb_handle_create (&cpg_iteration_handle_t_db,
1289 sizeof (struct cpg_iteration_instance_t), cpg_iteration_handle));
1290 if (error != CS_OK) {
1291 goto error_put_cpg_db;
1292 }
1293
1294 error = hdb_error_to_cs (hdb_handle_get (&cpg_iteration_handle_t_db, *cpg_iteration_handle,
1295 (void *)&cpg_iteration_instance));
1296 if (error != CS_OK) {
1297 goto error_destroy;
1298 }
1299
1301
1302 qb_list_init (&cpg_iteration_instance->list);
1303
1306 req_lib_cpg_iterationinitialize.iteration_type = iteration_type;
1307 if (group) {
1308 marshall_to_mar_cpg_name_t (&req_lib_cpg_iterationinitialize.group_name, group);
1309 }
1310
1311 iov.iov_base = (void *)&req_lib_cpg_iterationinitialize;
1312 iov.iov_len = sizeof (struct req_lib_cpg_iterationinitialize);
1313
1314 error = coroipcc_msg_send_reply_receive (cpg_inst->c,
1315 &iov,
1316 1,
1318 sizeof (struct res_lib_cpg_iterationinitialize));
1319
1320 if (error != CS_OK) {
1321 goto error_put_destroy;
1322 }
1323
1324 cpg_iteration_instance->executive_iteration_handle =
1325 res_lib_cpg_iterationinitialize.iteration_handle;
1326 cpg_iteration_instance->cpg_iteration_handle = *cpg_iteration_handle;
1327
1329
1330 hdb_handle_put (&cpg_iteration_handle_t_db, *cpg_iteration_handle);
1331 hdb_handle_put (&cpg_handle_t_db, handle);
1332
1333 return (res_lib_cpg_iterationinitialize.header.error);
1334
1335error_put_destroy:
1336 hdb_handle_put (&cpg_iteration_handle_t_db, *cpg_iteration_handle);
1337error_destroy:
1338 hdb_handle_destroy (&cpg_iteration_handle_t_db, *cpg_iteration_handle);
1339error_put_cpg_db:
1340 hdb_handle_put (&cpg_handle_t_db, handle);
1341
1342 return (error);
1343}
1344
1347 struct cpg_iteration_description_t *description)
1348{
1349 cs_error_t error;
1353
1354 if (description == NULL) {
1355 return CS_ERR_INVALID_PARAM;
1356 }
1357
1358 error = hdb_error_to_cs (hdb_handle_get (&cpg_iteration_handle_t_db, handle,
1359 (void *)&cpg_iteration_instance));
1360 if (error != CS_OK) {
1361 goto error_exit;
1362 }
1363
1364 req_lib_cpg_iterationnext.header.size = sizeof (struct req_lib_cpg_iterationnext);
1366 req_lib_cpg_iterationnext.iteration_handle = cpg_iteration_instance->executive_iteration_handle;
1367
1368 error = qb_to_cs_error (qb_ipcc_send (cpg_iteration_instance->conn,
1370 req_lib_cpg_iterationnext.header.size));
1371 if (error != CS_OK) {
1372 goto error_put;
1373 }
1374
1375 error = qb_to_cs_error (qb_ipcc_recv (cpg_iteration_instance->conn,
1377 sizeof(struct res_lib_cpg_iterationnext), -1));
1378 if (error != CS_OK) {
1379 goto error_put;
1380 }
1381
1382 marshall_from_mar_cpg_iteration_description_t(
1383 description,
1384 &res_lib_cpg_iterationnext.description);
1385
1386 error = res_lib_cpg_iterationnext.header.error;
1387
1388error_put:
1389 hdb_handle_put (&cpg_iteration_handle_t_db, handle);
1390
1391error_exit:
1392 return (error);
1393}
1394
1397{
1398 cs_error_t error;
1399 struct iovec iov;
1403
1404 error = hdb_error_to_cs (hdb_handle_get (&cpg_iteration_handle_t_db, handle,
1405 (void *)&cpg_iteration_instance));
1406 if (error != CS_OK) {
1407 goto error_exit;
1408 }
1409
1412 req_lib_cpg_iterationfinalize.iteration_handle = cpg_iteration_instance->executive_iteration_handle;
1413
1414 iov.iov_base = (void *)&req_lib_cpg_iterationfinalize;
1415 iov.iov_len = sizeof (struct req_lib_cpg_iterationfinalize);
1416
1417 error = coroipcc_msg_send_reply_receive (cpg_iteration_instance->conn,
1418 &iov,
1419 1,
1421 sizeof (struct req_lib_cpg_iterationfinalize));
1422
1423 if (error != CS_OK) {
1424 goto error_put;
1425 }
1426
1427 cpg_iteration_instance_finalize (cpg_iteration_instance);
1428 hdb_handle_put (&cpg_iteration_handle_t_db, cpg_iteration_instance->cpg_iteration_handle);
1429
1430 return (res_lib_cpg_iterationfinalize.header.error);
1431
1432error_put:
1433 hdb_handle_put (&cpg_iteration_handle_t_db, handle);
1434error_exit:
1435 return (error);
1436}
1437
#define LOCALSTATEDIR
Definition config.h:361
unsigned char addr[TOTEMIP_ADDRLEN]
Definition coroapi.h:2
cs_dispatch_flags_t
The cs_dispatch_flags_t enum.
Definition corotypes.h:84
@ CS_DISPATCH_ONE
Definition corotypes.h:85
@ CS_DISPATCH_ONE_NONBLOCKING
Definition corotypes.h:88
@ CS_DISPATCH_ALL
Definition corotypes.h:86
cs_error_t qb_to_cs_error(int result)
qb_to_cs_error
cs_error_t
The cs_error_t enum.
Definition corotypes.h:98
@ CS_ERR_NAME_TOO_LONG
Definition corotypes.h:111
@ CS_ERR_NO_MEMORY
Definition corotypes.h:106
@ CS_ERR_BUSY
Definition corotypes.h:108
@ CS_ERR_BAD_HANDLE
Definition corotypes.h:107
@ CS_ERR_TRY_AGAIN
Definition corotypes.h:104
@ CS_OK
Definition corotypes.h:99
@ CS_ERR_INVALID_PARAM
Definition corotypes.h:105
@ CS_ERR_LIBRARY
Definition corotypes.h:100
@ CS_ERR_TOO_BIG
Definition corotypes.h:124
#define CS_IPC_TIMEOUT_MS
Definition corotypes.h:131
cs_error_t cpg_flow_control_state_get(cpg_handle_t handle, cpg_flow_control_state_t *flow_control_state)
cpg_flow_control_state_get
Definition lib/cpg.c:847
cs_error_t cpg_iteration_next(cpg_iteration_handle_t handle, struct cpg_iteration_description_t *description)
cpg_iteration_next
Definition lib/cpg.c:1345
cs_error_t cpg_model_initialize(cpg_handle_t *handle, cpg_model_t model, cpg_model_data_t *model_data, void *context)
Create a new cpg connection, initialize with model.
Definition lib/cpg.c:185
cs_error_t cpg_max_atomic_msgsize_get(cpg_handle_t handle, uint32_t *size)
Get maximum size of a message that will not be fragmented.
Definition lib/cpg.c:313
cs_error_t cpg_mcast_joined(cpg_handle_t handle, cpg_guarantee_t guarantee, const struct iovec *iovec, unsigned int iov_len)
Multicast to groups joined with cpg_join.
Definition lib/cpg.c:1204
cs_error_t cpg_leave(cpg_handle_t handle, const struct cpg_name *group)
Leave one or more groups.
Definition lib/cpg.c:701
cs_error_t cpg_context_get(cpg_handle_t handle, void **context)
Get contexts for a CPG handle.
Definition lib/cpg.c:332
cs_error_t cpg_fd_get(cpg_handle_t handle, int *fd)
Get a file descriptor on which to poll.
Definition lib/cpg.c:294
cs_error_t cpg_join(cpg_handle_t handle, const struct cpg_name *group)
Join one or more groups.
Definition lib/cpg.c:647
cs_error_t cpg_zcb_free(cpg_handle_t handle, void *buffer)
cpg_zcb_free
Definition lib/cpg.c:1013
cs_error_t cpg_iteration_finalize(cpg_iteration_handle_t handle)
cpg_iteration_finalize
Definition lib/cpg.c:1395
cs_error_t cpg_zcb_alloc(cpg_handle_t handle, size_t size, void **buffer)
cpg_zcb_alloc
Definition lib/cpg.c:947
cs_error_t cpg_membership_get(cpg_handle_t handle, struct cpg_name *group_name, struct cpg_address *member_list, int *member_list_entries)
Get membership information from cpg.
Definition lib/cpg.c:746
cs_error_t cpg_initialize(cpg_handle_t *handle, cpg_callbacks_t *callbacks)
Create a new cpg connection.
Definition lib/cpg.c:169
cs_error_t cpg_local_get(cpg_handle_t handle, unsigned int *local_nodeid)
cpg_local_get
Definition lib/cpg.c:809
cs_error_t cpg_iteration_initialize(cpg_handle_t handle, cpg_iteration_type_t iteration_type, const struct cpg_name *group, cpg_iteration_handle_t *cpg_iteration_handle)
cpg_iteration_initialize
Definition lib/cpg.c:1252
cs_error_t cpg_context_set(cpg_handle_t handle, void *context)
Set contexts for a CPG handle.
Definition lib/cpg.c:351
cs_error_t cpg_finalize(cpg_handle_t handle)
Close the cpg handle.
Definition lib/cpg.c:249
cs_error_t cpg_dispatch(cpg_handle_t handle, cs_dispatch_flags_t dispatch_types)
Dispatch messages and configuration changes.
Definition lib/cpg.c:370
cs_error_t cpg_zcb_mcast_joined(cpg_handle_t handle, cpg_guarantee_t guarantee, void *msg, size_t msg_len)
cpg_zcb_mcast_joined
Definition lib/cpg.c:1062
cpg_flow_control_state_t
The cpg_flow_control_state_t enum.
Definition cpg.h:74
#define CPG_MAX_NAME_LENGTH
Definition cpg.h:117
cpg_guarantee_t
The cpg_guarantee_t enum.
Definition cpg.h:64
cpg_iteration_type_t
The cpg_iteration_type_t enum.
Definition cpg.h:95
uint64_t cpg_handle_t
cpg_handle_t
Definition cpg.h:54
uint64_t cpg_iteration_handle_t
cpg_iteration_handle_t
Definition cpg.h:59
cpg_model_t
The cpg_model_t enum.
Definition cpg.h:104
#define CPG_MODEL_V1_DELIVER_INITIAL_TOTEM_CONF
Definition cpg.h:194
#define CPG_MEMBERS_MAX
Definition cpg.h:126
@ CPG_FLOW_CONTROL_DISABLED
flow control is disabled - new messages may be sent
Definition cpg.h:75
@ CPG_ITERATION_ONE_GROUP
Definition cpg.h:97
@ CPG_ITERATION_ALL
Definition cpg.h:98
@ CPG_ITERATION_NAME_ONLY
Definition cpg.h:96
@ CPG_MODEL_V1
Definition cpg.h:105
#define DECLARE_HDB_DATABASE
Definition hdb.h:98
qb_handle_t hdb_handle_t
Definition hdb.h:52
@ LIBCPG_PARTIAL_FIRST
Definition ipc_cpg.h:104
@ LIBCPG_PARTIAL_LAST
Definition ipc_cpg.h:106
@ LIBCPG_PARTIAL_CONTINUED
Definition ipc_cpg.h:105
#define CPG_ZC_PATH_LEN
Definition ipc_cpg.h:43
@ MESSAGE_REQ_CPG_ZC_EXECUTE
Definition ipc_cpg.h:60
@ MESSAGE_REQ_CPG_LOCAL_GET
Definition ipc_cpg.h:53
@ MESSAGE_REQ_CPG_ITERATIONFINALIZE
Definition ipc_cpg.h:56
@ MESSAGE_REQ_CPG_PARTIAL_MCAST
Definition ipc_cpg.h:61
@ MESSAGE_REQ_CPG_ZC_ALLOC
Definition ipc_cpg.h:58
@ MESSAGE_REQ_CPG_JOIN
Definition ipc_cpg.h:49
@ MESSAGE_REQ_CPG_ZC_FREE
Definition ipc_cpg.h:59
@ MESSAGE_REQ_CPG_ITERATIONINITIALIZE
Definition ipc_cpg.h:54
@ MESSAGE_REQ_CPG_FINALIZE
Definition ipc_cpg.h:57
@ MESSAGE_REQ_CPG_MEMBERSHIP
Definition ipc_cpg.h:52
@ MESSAGE_REQ_CPG_LEAVE
Definition ipc_cpg.h:50
@ MESSAGE_REQ_CPG_ITERATIONNEXT
Definition ipc_cpg.h:55
@ MESSAGE_REQ_CPG_MCAST
Definition ipc_cpg.h:51
@ MESSAGE_RES_CPG_PARTIAL_DELIVER_CALLBACK
Definition ipc_cpg.h:85
@ MESSAGE_RES_CPG_DELIVER_CALLBACK
Definition ipc_cpg.h:73
@ MESSAGE_RES_CPG_TOTEM_CONFCHG_CALLBACK
Definition ipc_cpg.h:81
@ MESSAGE_RES_CPG_CONFCHG_CALLBACK
Definition ipc_cpg.h:72
#define CPG_MEMORY_MAP_UMASK
Definition lib/cpg.c:81
#define MAX_RETRIES
Definition lib/cpg.c:76
#define IPC_REQUEST_SIZE
Definition lib/util.h:49
cs_error_t hdb_error_to_cs(int res)
#define IPC_DISPATCH_SIZE
Definition lib/util.h:51
coroipcs_zc_header struct
Definition ipc_cpg.h:498
uint64_t server_address
Definition ipc_cpg.h:500
The cpg_address struct.
Definition cpg.h:111
uint32_t pid
Definition cpg.h:113
uint32_t nodeid
Definition cpg.h:112
uint32_t nodeid
Definition lib/cpg.c:86
uint32_t assembly_buf_ptr
Definition lib/cpg.c:89
struct qb_list_head list
Definition lib/cpg.c:85
char * assembly_buf
Definition lib/cpg.c:88
uint32_t pid
Definition lib/cpg.c:87
The cpg_callbacks_t struct.
Definition cpg.h:182
cpg_deliver_fn_t cpg_deliver_fn
Definition cpg.h:183
cpg_confchg_fn_t cpg_confchg_fn
Definition cpg.h:184
cpg_model_data_t model_data
Definition lib/cpg.c:97
int finalize
Definition lib/cpg.c:94
void * context
Definition lib/cpg.c:95
uint32_t max_msg_size
Definition lib/cpg.c:101
qb_ipcc_connection_t * c
Definition lib/cpg.c:93
struct qb_list_head assembly_list_head
Definition lib/cpg.c:102
struct qb_list_head iteration_list_head
Definition lib/cpg.c:100
cpg_model_v1_data_t model_v1_data
Definition lib/cpg.c:98
The cpg_iteration_description_t struct.
Definition cpg.h:131
qb_ipcc_connection_t * conn
Definition lib/cpg.c:110
struct qb_list_head list
Definition lib/cpg.c:112
cpg_iteration_handle_t cpg_iteration_handle
Definition lib/cpg.c:109
hdb_handle_t executive_iteration_handle
Definition lib/cpg.c:111
struct qb_list_head list
Definition exec/cpg.c:161
The cpg_model_data_t struct.
Definition cpg.h:190
cpg_model_t model
Definition cpg.h:191
The cpg_model_v1_data_t struct.
Definition cpg.h:199
cpg_totem_confchg_fn_t cpg_totem_confchg_fn
Definition cpg.h:203
unsigned int flags
Definition cpg.h:204
cpg_confchg_fn_t cpg_confchg_fn
Definition cpg.h:202
cpg_deliver_fn_t cpg_deliver_fn
Definition cpg.h:201
The cpg_name struct.
Definition cpg.h:121
uint32_t length
Definition cpg.h:122
The cpg_ring_id struct.
Definition cpg.h:140
mar_cpg_address_t struct
Definition ipc_cpg.h:155
mar_req_coroipcc_zc_alloc_t struct
Definition ipc_cpg.h:472
mar_req_coroipcc_zc_execute_t struct
Definition ipc_cpg.h:490
mar_req_coroipcc_zc_free_t struct
Definition ipc_cpg.h:481
The req_lib_cpg_finalize struct.
Definition ipc_cpg.h:268
The req_lib_cpg_iterationfinalize struct.
Definition ipc_cpg.h:457
The req_lib_cpg_iterationinitialize struct.
Definition ipc_cpg.h:424
The req_lib_cpg_iterationnext struct.
Definition ipc_cpg.h:441
The req_lib_cpg_join struct.
Definition ipc_cpg.h:251
The req_lib_cpg_leave struct.
Definition ipc_cpg.h:408
The req_lib_cpg_local_get struct.
Definition ipc_cpg.h:282
The req_lib_cpg_mcast struct.
Definition ipc_cpg.h:304
The req_lib_cpg_membership_get struct.
Definition ipc_cpg.h:367
The req_lib_cpg_partial_mcast struct.
Definition ipc_cpg.h:314
The res_lib_cpg_confchg_callback struct.
Definition ipc_cpg.h:384
mar_cpg_address_t member_list[]
Definition ipc_cpg.h:390
Message from another node.
Definition ipc_cpg.h:333
The res_lib_cpg_finalize struct.
Definition ipc_cpg.h:275
The res_lib_cpg_iterationfinalize struct.
Definition ipc_cpg.h:465
The res_lib_cpg_iterationinitialize struct.
Definition ipc_cpg.h:433
The res_lib_cpg_iterationnext struct.
Definition ipc_cpg.h:449
The res_lib_cpg_join struct.
Definition ipc_cpg.h:261
The res_lib_cpg_leave struct.
Definition ipc_cpg.h:417
The res_lib_cpg_local_get struct.
Definition ipc_cpg.h:289
The res_lib_cpg_mcast struct.
Definition ipc_cpg.h:326
The res_lib_cpg_membership_get struct.
Definition ipc_cpg.h:375
mar_cpg_address_t member_list[PROCESSOR_COUNT_MAX]
Definition ipc_cpg.h:378
The res_lib_cpg_partial_deliver_callback struct.
Definition ipc_cpg.h:345
The res_lib_cpg_partial_send struct.
Definition ipc_cpg.h:297
The res_lib_cpg_totem_confchg_callback struct.
Definition ipc_cpg.h:398
int guarantee
Definition totemsrp.c:6
struct memb_ring_id ring_id
Definition totemsrp.c:4
struct totem_message_header header
Definition totemsrp.c:0