48#include <sys/socket.h>
69#define MAP_ANONYMOUS MAP_ANON
76#define MAX_RETRIES 100
81#define CPG_MEMORY_MAP_UMASK 077
93 qb_ipcc_connection_t *
c;
104static void cpg_inst_free (
void *inst);
123coroipcc_msg_send_reply_receive (
124 qb_ipcc_connection_t *c,
125 const struct iovec *iov,
126 unsigned int iov_len,
130 return qb_to_cs_error(qb_ipcc_sendv_recv(c, iov, iov_len, res_msg, res_len,
140static void cpg_inst_free (
void *inst)
148 struct qb_list_head *iter, *tmp_iter;
159 hdb_handle_destroy (&cpg_handle_t_db, handle);
196 goto error_no_destroy;
200 if (error !=
CS_OK) {
201 goto error_no_destroy;
205 if (error !=
CS_OK) {
212 goto error_put_destroy;
237 hdb_handle_put (&cpg_handle_t_db, *handle);
242 hdb_handle_put (&cpg_handle_t_db, *handle);
244 hdb_handle_destroy (&cpg_handle_t_db, *handle);
259 if (error !=
CS_OK) {
267 hdb_handle_put (&cpg_handle_t_db, handle);
282 error = coroipcc_msg_send_reply_receive (
cpg_inst->
c,
288 cpg_inst_finalize (
cpg_inst, handle);
289 hdb_handle_put (&cpg_handle_t_db, handle);
302 if (error !=
CS_OK) {
308 hdb_handle_put (&cpg_handle_t_db, handle);
321 if (error !=
CS_OK) {
327 hdb_handle_put (&cpg_handle_t_db, handle);
340 if (error !=
CS_OK) {
346 hdb_handle_put (&cpg_handle_t_db, handle);
359 if (error !=
CS_OK) {
365 hdb_handle_put (&cpg_handle_t_db, handle);
383 struct qb_ipc_response_header *dispatch_data;
389 struct qb_list_head *iter, *tmp_iter;
399 if (error !=
CS_OK) {
411 dispatch_data = (
struct qb_ipc_response_header *)dispatch_buf;
413 errno_res = qb_ipcc_event_recv (
437 if (error !=
CS_OK) {
452 switch (dispatch_data->id) {
460 marshall_from_mar_cpg_name_t (
462 &res_cpg_deliver_callback->group_name);
466 res_cpg_deliver_callback->nodeid,
467 res_cpg_deliver_callback->pid,
468 &res_cpg_deliver_callback->message,
469 res_cpg_deliver_callback->msglen);
475 marshall_from_mar_cpg_name_t (
477 &res_cpg_partial_deliver_callback->group_name);
482 assembly_data = NULL;
485 if (current_assembly_data->
nodeid == res_cpg_partial_deliver_callback->nodeid && current_assembly_data->
pid == res_cpg_partial_deliver_callback->pid) {
486 assembly_data = current_assembly_data;
499 qb_list_del (&assembly_data->
list);
503 assembly_data = NULL;
507 if (!assembly_data) {
512 assembly_data->
nodeid = res_cpg_partial_deliver_callback->nodeid;
513 assembly_data->
pid = res_cpg_partial_deliver_callback->pid;
514 assembly_data->
assembly_buf = malloc(res_cpg_partial_deliver_callback->msglen);
521 qb_list_init (&assembly_data->
list);
527 res_cpg_partial_deliver_callback->message, res_cpg_partial_deliver_callback->fraglen);
528 assembly_data->
assembly_buf_ptr += res_cpg_partial_deliver_callback->fraglen;
534 res_cpg_partial_deliver_callback->nodeid,
535 res_cpg_partial_deliver_callback->pid,
537 res_cpg_partial_deliver_callback->msglen);
540 qb_list_del (&assembly_data->
list);
554 for (i = 0; i < res_cpg_confchg_callback->member_list_entries; i++) {
558 left_list_start = res_cpg_confchg_callback->
member_list +
559 res_cpg_confchg_callback->member_list_entries;
560 for (i = 0; i < res_cpg_confchg_callback->left_list_entries; i++) {
561 marshall_from_mar_cpg_address_t (&left_list[i],
562 &left_list_start[i]);
564 joined_list_start = res_cpg_confchg_callback->
member_list +
565 res_cpg_confchg_callback->member_list_entries +
566 res_cpg_confchg_callback->left_list_entries;
567 for (i = 0; i < res_cpg_confchg_callback->joined_list_entries; i++) {
568 marshall_from_mar_cpg_address_t (&joined_list[i],
569 &joined_list_start[i]);
571 marshall_from_mar_cpg_name_t (
573 &res_cpg_confchg_callback->group_name);
578 res_cpg_confchg_callback->member_list_entries,
580 res_cpg_confchg_callback->left_list_entries,
582 res_cpg_confchg_callback->joined_list_entries);
587 for (i = 0; i < res_cpg_confchg_callback->left_list_entries; i++) {
590 if (current_assembly_data->
nodeid != left_list[i].
nodeid || current_assembly_data->
pid != left_list[i].
pid)
593 qb_list_del (¤t_assembly_data->
list);
595 free(current_assembly_data);
607 marshall_from_mar_cpg_ring_id_t (&
ring_id, &res_cpg_totem_confchg_callback->ring_id);
608 for (i = 0; i < res_cpg_totem_confchg_callback->member_list_entries; i++) {
609 totem_member_list[i] = res_cpg_totem_confchg_callback->
member_list[i];
614 res_cpg_totem_confchg_callback->member_list_entries,
643 hdb_handle_put (&cpg_handle_t_db, handle);
662 if (error !=
CS_OK) {
685 error = coroipcc_msg_send_reply_receive (
cpg_inst->
c, iov, 1,
688 if (error !=
CS_OK) {
693 error = response.header.error;
696 hdb_handle_put (&cpg_handle_t_db, handle);
716 if (error !=
CS_OK) {
730 error = coroipcc_msg_send_reply_receive (
cpg_inst->
c, iov, 1,
733 if (error !=
CS_OK) {
741 hdb_handle_put (&cpg_handle_t_db, handle);
750 int *member_list_entries)
765 if (member_list_entries == NULL) {
770 if (error !=
CS_OK) {
783 error = coroipcc_msg_send_reply_receive (
cpg_inst->
c, &iov, 1,
786 if (error !=
CS_OK) {
798 marshall_from_mar_cpg_address_t (&member_list[i],
804 hdb_handle_put (&cpg_handle_t_db, handle);
811 unsigned int *local_nodeid)
820 if (error !=
CS_OK) {
830 error = coroipcc_msg_send_reply_receive (
cpg_inst->
c, &iov, 1,
833 if (error !=
CS_OK) {
842 hdb_handle_put (&cpg_handle_t_db, handle);
855 if (error !=
CS_OK) {
861 hdb_handle_put (&cpg_handle_t_db, handle);
867memory_map (
char *path,
const char *file,
void **buf,
size_t bytes)
876 long int sysconf_page_size;
879 snprintf (path, PATH_MAX,
"/dev/shm/%s", file);
883 (void)umask(old_umask);
888 (void)umask(old_umask);
894 res = ftruncate (fd, bytes);
896 goto error_close_unlink;
898 sysconf_page_size = sysconf(_SC_PAGESIZE);
899 if (sysconf_page_size <= 0) {
900 goto error_close_unlink;
902 page_size = sysconf_page_size;
903 buffer = malloc (page_size);
904 if (buffer == NULL) {
905 goto error_close_unlink;
907 memset (buffer, 0, page_size);
908 for (i = 0; i < (bytes / page_size); i++) {
910 written = write (fd, buffer, page_size);
911 if (written == -1 && errno == EINTR) {
914 if (written != page_size) {
916 goto error_close_unlink;
921 addr = mmap (NULL, bytes, PROT_READ | PROT_WRITE,
924 if (
addr == MAP_FAILED) {
925 goto error_close_unlink;
928 madvise(
addr, bytes, MADV_NOSYNC);
955 struct qb_ipc_response_header res_coroipcs_zc_alloc;
963 if (error !=
CS_OK) {
968 assert(memory_map (path,
"corosync_zerocopy-XXXXXX", &buf, map_size) != -1);
972 munmap (buf, map_size);
978 req_coroipcc_zc_alloc.map_size = map_size;
979 strcpy (req_coroipcc_zc_alloc.path_to_file, path);
981 iovec.iov_base = (
void *)&req_coroipcc_zc_alloc;
984 error = coroipcc_msg_send_reply_receive (
988 &res_coroipcs_zc_alloc,
989 sizeof (
struct qb_ipc_response_header));
991 if (error !=
CS_OK) {
1000 hdb_handle_put (&cpg_handle_t_db, handle);
1021 struct qb_ipc_response_header res_coroipcs_zc_free;
1026 if (error !=
CS_OK) {
1032 req_coroipcc_zc_free.map_size =
header->map_size;
1033 req_coroipcc_zc_free.server_address =
header->server_address;
1035 iovec.iov_base = (
void *)&req_coroipcc_zc_free;
1038 error = coroipcc_msg_send_reply_receive (
1042 &res_coroipcs_zc_free,
1043 sizeof (
struct qb_ipc_response_header));
1045 if (error !=
CS_OK) {
1057 hdb_handle_put (&cpg_handle_t_db, handle);
1077 if (error !=
CS_OK) {
1100 iovec.iov_base = (
void *)&req_coroipcc_zc_execute;
1103 error = coroipcc_msg_send_reply_receive (
1110 if (error !=
CS_OK) {
1117 hdb_handle_put (&cpg_handle_t_db, handle);
1126 const struct iovec *iovec,
1127 unsigned int iov_len)
1131 struct iovec iov[2];
1135 size_t iov_sent = 0;
1147 qb_ipcc_fc_enable_max_set(
cpg_inst->
c, 2);
1149 while (error ==
CS_OK && sent < msg_len) {
1156 iov[1].iov_len = iovec[i].iov_len - iov_sent;
1162 else if ((sent + iov[1].iov_len) == msg_len) {
1171 iov[1].iov_base = (
char *)iovec[i].iov_base + iov_sent;
1174 error = coroipcc_msg_send_reply_receive (
cpg_inst->
c, iov, 2,
1179 fprintf(stderr,
"sleep. counter=%d\n", retry_count);
1187 iov_sent += iov[1].iov_len;
1188 sent += iov[1].iov_len;
1191 if (iov_sent >= iovec[i].iov_len) {
1198 qb_ipcc_fc_enable_max_set(
cpg_inst->
c, 1);
1207 const struct iovec *iovec,
1208 unsigned int iov_len)
1213 struct iovec iov[64];
1218 if (error !=
CS_OK) {
1222 for (i = 0; i < iov_len; i++ ) {
1223 msg_len += iovec[i].iov_len;
1240 memcpy (&iov[1], iovec, iov_len *
sizeof (
struct iovec));
1242 qb_ipcc_fc_enable_max_set(
cpg_inst->
c, 2);
1244 qb_ipcc_fc_enable_max_set(
cpg_inst->
c, 1);
1247 hdb_handle_put (&cpg_handle_t_db, handle);
1268 if (cpg_iteration_handle == NULL) {
1284 if (error !=
CS_OK) {
1288 error =
hdb_error_to_cs (hdb_handle_create (&cpg_iteration_handle_t_db,
1290 if (error !=
CS_OK) {
1291 goto error_put_cpg_db;
1294 error =
hdb_error_to_cs (hdb_handle_get (&cpg_iteration_handle_t_db, *cpg_iteration_handle,
1296 if (error !=
CS_OK) {
1314 error = coroipcc_msg_send_reply_receive (
cpg_inst->
c,
1320 if (error !=
CS_OK) {
1321 goto error_put_destroy;
1330 hdb_handle_put (&cpg_iteration_handle_t_db, *cpg_iteration_handle);
1331 hdb_handle_put (&cpg_handle_t_db, handle);
1336 hdb_handle_put (&cpg_iteration_handle_t_db, *cpg_iteration_handle);
1338 hdb_handle_destroy (&cpg_iteration_handle_t_db, *cpg_iteration_handle);
1340 hdb_handle_put (&cpg_handle_t_db, handle);
1354 if (description == NULL) {
1358 error =
hdb_error_to_cs (hdb_handle_get (&cpg_iteration_handle_t_db, handle,
1360 if (error !=
CS_OK) {
1371 if (error !=
CS_OK) {
1378 if (error !=
CS_OK) {
1382 marshall_from_mar_cpg_iteration_description_t(
1389 hdb_handle_put (&cpg_iteration_handle_t_db, handle);
1404 error =
hdb_error_to_cs (hdb_handle_get (&cpg_iteration_handle_t_db, handle,
1406 if (error !=
CS_OK) {
1423 if (error !=
CS_OK) {
1433 hdb_handle_put (&cpg_iteration_handle_t_db, handle);
unsigned char addr[TOTEMIP_ADDRLEN]
cs_dispatch_flags_t
The cs_dispatch_flags_t enum.
@ CS_DISPATCH_ONE_NONBLOCKING
cs_error_t qb_to_cs_error(int result)
qb_to_cs_error
cs_error_t
The cs_error_t enum.
#define CS_IPC_TIMEOUT_MS
cs_error_t cpg_flow_control_state_get(cpg_handle_t handle, cpg_flow_control_state_t *flow_control_state)
cpg_flow_control_state_get
cs_error_t cpg_iteration_next(cpg_iteration_handle_t handle, struct cpg_iteration_description_t *description)
cpg_iteration_next
cs_error_t cpg_model_initialize(cpg_handle_t *handle, cpg_model_t model, cpg_model_data_t *model_data, void *context)
Create a new cpg connection, initialize with model.
cs_error_t cpg_max_atomic_msgsize_get(cpg_handle_t handle, uint32_t *size)
Get maximum size of a message that will not be fragmented.
cs_error_t cpg_mcast_joined(cpg_handle_t handle, cpg_guarantee_t guarantee, const struct iovec *iovec, unsigned int iov_len)
Multicast to groups joined with cpg_join.
cs_error_t cpg_leave(cpg_handle_t handle, const struct cpg_name *group)
Leave one or more groups.
cs_error_t cpg_context_get(cpg_handle_t handle, void **context)
Get contexts for a CPG handle.
cs_error_t cpg_fd_get(cpg_handle_t handle, int *fd)
Get a file descriptor on which to poll.
cs_error_t cpg_join(cpg_handle_t handle, const struct cpg_name *group)
Join one or more groups.
cs_error_t cpg_zcb_free(cpg_handle_t handle, void *buffer)
cpg_zcb_free
cs_error_t cpg_iteration_finalize(cpg_iteration_handle_t handle)
cpg_iteration_finalize
cs_error_t cpg_zcb_alloc(cpg_handle_t handle, size_t size, void **buffer)
cpg_zcb_alloc
cs_error_t cpg_membership_get(cpg_handle_t handle, struct cpg_name *group_name, struct cpg_address *member_list, int *member_list_entries)
Get membership information from cpg.
cs_error_t cpg_initialize(cpg_handle_t *handle, cpg_callbacks_t *callbacks)
Create a new cpg connection.
cs_error_t cpg_local_get(cpg_handle_t handle, unsigned int *local_nodeid)
cpg_local_get
cs_error_t cpg_iteration_initialize(cpg_handle_t handle, cpg_iteration_type_t iteration_type, const struct cpg_name *group, cpg_iteration_handle_t *cpg_iteration_handle)
cpg_iteration_initialize
cs_error_t cpg_context_set(cpg_handle_t handle, void *context)
Set contexts for a CPG handle.
cs_error_t cpg_finalize(cpg_handle_t handle)
Close the cpg handle.
cs_error_t cpg_dispatch(cpg_handle_t handle, cs_dispatch_flags_t dispatch_types)
Dispatch messages and configuration changes.
cs_error_t cpg_zcb_mcast_joined(cpg_handle_t handle, cpg_guarantee_t guarantee, void *msg, size_t msg_len)
cpg_zcb_mcast_joined
cpg_flow_control_state_t
The cpg_flow_control_state_t enum.
#define CPG_MAX_NAME_LENGTH
cpg_guarantee_t
The cpg_guarantee_t enum.
cpg_iteration_type_t
The cpg_iteration_type_t enum.
uint64_t cpg_handle_t
cpg_handle_t
uint64_t cpg_iteration_handle_t
cpg_iteration_handle_t
cpg_model_t
The cpg_model_t enum.
#define CPG_MODEL_V1_DELIVER_INITIAL_TOTEM_CONF
@ CPG_FLOW_CONTROL_DISABLED
flow control is disabled - new messages may be sent
@ CPG_ITERATION_ONE_GROUP
@ CPG_ITERATION_NAME_ONLY
#define DECLARE_HDB_DATABASE
@ LIBCPG_PARTIAL_CONTINUED
@ MESSAGE_REQ_CPG_ZC_EXECUTE
@ MESSAGE_REQ_CPG_LOCAL_GET
@ MESSAGE_REQ_CPG_ITERATIONFINALIZE
@ MESSAGE_REQ_CPG_PARTIAL_MCAST
@ MESSAGE_REQ_CPG_ZC_ALLOC
@ MESSAGE_REQ_CPG_ZC_FREE
@ MESSAGE_REQ_CPG_ITERATIONINITIALIZE
@ MESSAGE_REQ_CPG_FINALIZE
@ MESSAGE_REQ_CPG_MEMBERSHIP
@ MESSAGE_REQ_CPG_ITERATIONNEXT
@ MESSAGE_RES_CPG_PARTIAL_DELIVER_CALLBACK
@ MESSAGE_RES_CPG_DELIVER_CALLBACK
@ MESSAGE_RES_CPG_TOTEM_CONFCHG_CALLBACK
@ MESSAGE_RES_CPG_CONFCHG_CALLBACK
#define CPG_MEMORY_MAP_UMASK
cs_error_t hdb_error_to_cs(int res)
#define IPC_DISPATCH_SIZE
uint32_t assembly_buf_ptr
The cpg_callbacks_t struct.
cpg_deliver_fn_t cpg_deliver_fn
cpg_confchg_fn_t cpg_confchg_fn
cpg_model_data_t model_data
struct qb_list_head assembly_list_head
struct qb_list_head iteration_list_head
cpg_model_v1_data_t model_v1_data
The cpg_iteration_description_t struct.
qb_ipcc_connection_t * conn
cpg_iteration_handle_t cpg_iteration_handle
hdb_handle_t executive_iteration_handle
The cpg_model_data_t struct.
The cpg_model_v1_data_t struct.
cpg_totem_confchg_fn_t cpg_totem_confchg_fn
cpg_confchg_fn_t cpg_confchg_fn
cpg_deliver_fn_t cpg_deliver_fn
mar_req_coroipcc_zc_alloc_t struct
mar_req_coroipcc_zc_execute_t struct
mar_req_coroipcc_zc_free_t struct
The req_lib_cpg_finalize struct.
The req_lib_cpg_iterationfinalize struct.
The req_lib_cpg_iterationinitialize struct.
The req_lib_cpg_iterationnext struct.
The req_lib_cpg_join struct.
The req_lib_cpg_leave struct.
The req_lib_cpg_local_get struct.
The req_lib_cpg_mcast struct.
The req_lib_cpg_membership_get struct.
The req_lib_cpg_partial_mcast struct.
The res_lib_cpg_confchg_callback struct.
mar_cpg_address_t member_list[]
Message from another node.
The res_lib_cpg_finalize struct.
The res_lib_cpg_iterationfinalize struct.
The res_lib_cpg_iterationinitialize struct.
The res_lib_cpg_iterationnext struct.
The res_lib_cpg_join struct.
The res_lib_cpg_leave struct.
The res_lib_cpg_local_get struct.
The res_lib_cpg_mcast struct.
The res_lib_cpg_membership_get struct.
mar_cpg_address_t member_list[PROCESSOR_COUNT_MAX]
The res_lib_cpg_partial_deliver_callback struct.
The res_lib_cpg_partial_send struct.
The res_lib_cpg_totem_confchg_callback struct.
mar_uint32_t member_list[]
struct memb_ring_id ring_id
struct totem_message_header header