47#include <sys/socket.h>
66#define SAM_CMAP_S_FAILED "failed"
67#define SAM_CMAP_S_REGISTERED "stopped"
68#define SAM_CMAP_S_STARTED "running"
69#define SAM_CMAP_S_Q_WAIT "waiting for quorum"
71#define SAM_RP_MASK_Q(pol) (pol & (~SAM_RECOVERY_POLICY_QUORUM))
72#define SAM_RP_MASK_C(pol) (pol & (~SAM_RECOVERY_POLICY_CMAP))
73#define SAM_RP_MASK(pol) (pol & (~(SAM_RECOVERY_POLICY_QUORUM | SAM_RECOVERY_POLICY_CMAP)))
139} sam_internal_data = {
140 .lock = PTHREAD_MUTEX_INITIALIZER
147 uint64_t hc_period, last_hc;
154 svalue = ssvalue[
SAM_RP_MASK (sam_internal_data.recovery_policy)];
168 hc_period = sam_internal_data.time_interval;
182 last_hc = cs_timestamp_get();
215static cs_error_t sam_cmap_destroy_pid_path (
void)
221 err =
cmap_iter_init(sam_internal_data.cmap_handle, sam_internal_data.cmap_pid_path, &iter);
226 while ((err =
cmap_iter_next(sam_internal_data.cmap_handle, iter, key_name, NULL, NULL)) ==
CS_OK) {
227 cmap_delete(sam_internal_data.cmap_handle, key_name);
245 snprintf(sam_internal_data.cmap_pid_path,
CMAP_KEYNAME_MAXLEN,
"resources.process.%d.", getpid());
250 goto destroy_finalize_error;
254 goto destroy_finalize_error;
259destroy_finalize_error:
260 sam_cmap_destroy_pid_path ();
265static void quorum_notification_fn (
269 uint32_t view_list_entries,
273 sam_internal_data.quorate =
quorate;
281 uint32_t quorum_type;
291 pthread_mutex_lock (&sam_internal_data.lock);
295 goto exit_mutex_unlock;
303 if ((ret =
quorum_initialize (&sam_internal_data.quorum_handle, &quorum_callbacks, &quorum_type)) !=
CS_OK) {
304 goto exit_mutex_unlock;
308 goto exit_error_quorum;
311 if ((ret =
quorum_fd_get (sam_internal_data.quorum_handle, &sam_internal_data.quorum_fd)) !=
CS_OK) {
312 goto exit_error_quorum;
319 goto exit_error_quorum;
328 sam_internal_data.warn_signal = SIGTERM;
330 sam_internal_data.am_i_child = 0;
332 sam_internal_data.user_data = NULL;
333 sam_internal_data.user_data_size = 0;
334 sam_internal_data.user_data_allocated = 0;
336 pthread_mutex_unlock (&sam_internal_data.lock);
344 pthread_mutex_unlock (&sam_internal_data.lock);
352static size_t sam_safe_write (
358 ssize_t tmp_bytes_write;
363 tmp_bytes_write = write (d, (
const char *)buf + bytes_write,
364 (nbyte - bytes_write > SSIZE_MAX) ? SSIZE_MAX : nbyte - bytes_write);
366 if (tmp_bytes_write == -1) {
367 if (!(errno == EAGAIN || errno == EINTR))
370 bytes_write += tmp_bytes_write;
372 }
while (bytes_write != nbyte);
374 return (bytes_write);
380static size_t sam_safe_read (
386 ssize_t tmp_bytes_read;
391 tmp_bytes_read = read (d, (
char *)buf + bytes_read,
392 (nbyte - bytes_read > SSIZE_MAX) ? SSIZE_MAX : nbyte - bytes_read);
394 if (tmp_bytes_read == -1) {
395 if (!(errno == EAGAIN || errno == EINTR))
398 bytes_read += tmp_bytes_read;
401 }
while (bytes_read != nbyte && tmp_bytes_read != 0);
412 if (sam_safe_read (sam_internal_data.child_fd_in, &reply, sizeof (reply)) !=
sizeof (reply)) {
421 if (sam_safe_read (sam_internal_data.child_fd_in, &err, sizeof (err)) !=
sizeof (err)) {
450 pthread_mutex_lock (&sam_internal_data.lock);
457 goto exit_mutex_unlock;
461 *size = sam_internal_data.user_data_size;
464 pthread_mutex_unlock (&sam_internal_data.lock);
481 pthread_mutex_lock (&sam_internal_data.lock);
488 goto exit_mutex_unlock;
492 if (sam_internal_data.user_data_size == 0) {
495 goto exit_mutex_unlock;
498 if (size < sam_internal_data.user_data_size) {
501 goto exit_mutex_unlock;
504 memcpy (data, sam_internal_data.user_data, sam_internal_data.user_data_size);
507 pthread_mutex_unlock (&sam_internal_data.lock);
520 if (size >= SSIZE_MAX) {
534 if (sam_internal_data.am_i_child) {
539 if (sam_safe_write (sam_internal_data.child_fd_out, &command, sizeof (command)) !=
sizeof (command)) {
543 if (sam_safe_write (sam_internal_data.child_fd_out, &size, sizeof (size)) !=
sizeof (size)) {
547 if (data != NULL && sam_safe_write (sam_internal_data.child_fd_out, data, size) != size) {
554 if ((err = sam_read_reply (sam_internal_data.child_fd_in)) !=
CS_OK) {
563 free (sam_internal_data.user_data);
564 sam_internal_data.user_data = NULL;
565 sam_internal_data.user_data_allocated = 0;
566 sam_internal_data.user_data_size = 0;
568 if (sam_internal_data.user_data_allocated < size) {
569 if ((new_data = realloc (sam_internal_data.user_data, size)) == NULL) {
573 sam_internal_data.user_data_allocated = size;
575 new_data = sam_internal_data.user_data;
577 sam_internal_data.user_data = new_data;
578 sam_internal_data.user_data_size = size;
580 memcpy (sam_internal_data.user_data, data, size);
592 pthread_mutex_lock (&sam_internal_data.lock);
594 ret = sam_data_store_nolock (data, size);
596 pthread_mutex_unlock (&sam_internal_data.lock);
609 pthread_mutex_lock (&sam_internal_data.lock);
613 goto exit_mutex_unlock;
616 recpol = sam_internal_data.recovery_policy;
620 if (sam_safe_write (sam_internal_data.child_fd_out, &command, sizeof (command)) !=
sizeof (command)) {
622 goto exit_mutex_unlock;
629 if ((ret = sam_read_reply (sam_internal_data.child_fd_in)) !=
CS_OK) {
630 goto exit_mutex_unlock;
634 if (sam_internal_data.hc_callback) {
635 if (sam_safe_write (sam_internal_data.cb_wpipe_fd, &command, sizeof (command)) !=
sizeof (command)) {
637 goto exit_mutex_unlock;
644 pthread_mutex_unlock (&sam_internal_data.lock);
660 if (sam_safe_write (sam_internal_data.child_fd_out, &command, sizeof (command)) !=
sizeof (command)) {
668 if ((cs_err = sam_read_reply (sam_internal_data.child_fd_in)) !=
CS_OK) {
673 if (sam_internal_data.hc_callback) {
674 if (sam_safe_write (sam_internal_data.cb_wpipe_fd, &command, sizeof (command)) !=
sizeof (command)) {
688 pthread_mutex_lock (&sam_internal_data.lock);
690 ret = sam_stop_nolock ();
692 pthread_mutex_unlock (&sam_internal_data.lock);
704 pthread_mutex_lock (&sam_internal_data.lock);
708 goto exit_mutex_unlock;
713 if (sam_safe_write (sam_internal_data.child_fd_out, &command, sizeof (command)) !=
sizeof (command)) {
715 goto exit_mutex_unlock;
719 pthread_mutex_unlock (&sam_internal_data.lock);
730 pthread_mutex_lock (&sam_internal_data.lock);
736 goto exit_mutex_unlock;
740 ret = sam_stop_nolock ();
742 goto exit_mutex_unlock;
748 free (sam_internal_data.user_data);
749 sam_internal_data.user_data = NULL;
750 sam_internal_data.user_data_allocated = 0;
751 sam_internal_data.user_data_size = 0;
754 pthread_mutex_unlock (&sam_internal_data.lock);
766 pthread_mutex_lock (&sam_internal_data.lock);
771 goto exit_mutex_unlock;
776 goto exit_mutex_unlock;
781 if (sam_safe_write (sam_internal_data.child_fd_out, &command, sizeof (command)) !=
sizeof (command)) {
783 goto exit_mutex_unlock;
787 pthread_mutex_unlock (&sam_internal_data.lock);
803 if (sam_internal_data.am_i_child) {
808 if (sam_safe_write (sam_internal_data.child_fd_out, &command, sizeof (command)) !=
sizeof (command)) {
820 if ((err = sam_read_reply (sam_internal_data.child_fd_in)) !=
CS_OK) {
837 pthread_mutex_lock (&sam_internal_data.lock);
841 pthread_mutex_unlock (&sam_internal_data.lock);
856 if (sam_safe_write (parent_fd_out, &reply,
sizeof (reply)) !=
sizeof (reply)) {
866 if (sam_safe_write (parent_fd_out, &reply,
sizeof (reply)) !=
sizeof (reply)) {
869 if (sam_safe_write (parent_fd_out, &err,
sizeof (err)) !=
sizeof (err)) {
897 return (sam_parent_reply_send (
CS_OK, parent_fd_in, parent_fd_out));
900 return (sam_parent_reply_send (err, parent_fd_in, parent_fd_out));
908 struct pollfd pfds[2];
927 while (!sam_internal_data.quorate) {
928 pfds[0].fd = parent_fd_in;
932 pfds[1].fd = sam_internal_data.quorum_fd;
933 pfds[1].events = POLLIN;
936 poll_err = poll (pfds, 2, -1);
938 if (poll_err == -1) {
943 if (errno != EINTR) {
949 if (pfds[0].revents != 0) {
950 if (pfds[0].revents == POLLERR || pfds[0].revents == POLLHUP ||pfds[0].revents == POLLNVAL) {
958 if (pfds[1].revents != 0) {
971 return (sam_parent_reply_send (
CS_OK, parent_fd_in, parent_fd_out));
978 return (sam_parent_reply_send (err, parent_fd_in, parent_fd_out));
999 return (sam_parent_reply_send (
CS_OK, parent_fd_in, parent_fd_out));
1002 return (sam_parent_reply_send (err, parent_fd_in, parent_fd_out));
1012 if (!sam_internal_data.term_send) {
1016 kill (child_pid, sam_internal_data.warn_signal);
1018 sam_internal_data.term_send = 1;
1023 kill (child_pid, SIGKILL);
1030static cs_error_t sam_parent_mark_child_failed (
1036 recpol = sam_internal_data.recovery_policy;
1038 sam_internal_data.term_send = 1;
1043 return (sam_parent_kill_child (action, child_pid));
1057 if (sam_safe_read (parent_fd_in, &size,
sizeof (size)) !=
sizeof (size)) {
1062 if (size >= SSIZE_MAX) {
1074 if (sam_safe_read (parent_fd_in,
user_data, size) != size) {
1076 goto free_error_reply;
1080 err = sam_data_store_nolock (
user_data, size);
1082 goto free_error_reply;
1087 return (sam_parent_reply_send (
CS_OK, parent_fd_in, parent_fd_out));
1092 return (sam_parent_reply_send (err, parent_fd_in, parent_fd_out));
1106 struct pollfd pfds[2];
1114 recpol = sam_internal_data.recovery_policy;
1117 pfds[0].fd = parent_fd_in;
1118 pfds[0].events = POLLIN;
1119 pfds[0].revents = 0;
1122 if (status == 1 && sam_internal_data.time_interval != 0) {
1129 pfds[nfds].fd = sam_internal_data.quorum_fd;
1130 pfds[nfds].events = POLLIN;
1131 pfds[nfds].revents = 0;
1137 if (poll_error == -1) {
1142 if (errno != EINTR) {
1147 if (poll_error == 0) {
1154 sam_parent_kill_child (&action, child_pid);
1158 if (poll_error > 0) {
1159 if (pfds[0].revents != 0) {
1163 bytes_read = sam_safe_read (parent_fd_in, &command, 1);
1165 if (bytes_read == 0) {
1177 if (bytes_read == -1) {
1196 if (sam_parent_wait_for_quorum (parent_fd_in,
1197 parent_fd_out) !=
CS_OK) {
1203 if (sam_parent_cmap_state_set (parent_fd_in,
1204 parent_fd_out, 1) !=
CS_OK) {
1218 if (sam_parent_cmap_state_set (parent_fd_in,
1219 parent_fd_out, 0) !=
CS_OK) {
1228 sam_parent_data_store (parent_fd_in, parent_fd_out);
1231 sam_parent_warn_signal_set (parent_fd_in, parent_fd_out);
1235 sam_parent_mark_child_failed (&action, child_pid);
1241 pfds[1].revents != 0) {
1249 sam_parent_kill_child (&action, child_pid);
1264 int pipe_fd_out[2], pipe_fd_in[2];
1277 recpol = sam_internal_data.recovery_policy;
1283 if ((ret = sam_cmap_register ()) !=
CS_OK) {
1290 if ((pipe_error = pipe (pipe_fd_out)) != 0) {
1295 if ((pipe_error = pipe (pipe_fd_in)) != 0) {
1296 close (pipe_fd_out[0]);
1297 close (pipe_fd_out[1]);
1309 sam_internal_data.instance_id++;
1311 sam_internal_data.term_send = 0;
1319 sam_internal_data.instance_id--;
1329 close (pipe_fd_out[0]);
1330 close (pipe_fd_in[1]);
1332 sam_internal_data.child_fd_out = pipe_fd_out[1];
1333 sam_internal_data.child_fd_in = pipe_fd_in[0];
1338 sam_internal_data.am_i_child = 1;
1346 close (pipe_fd_out[1]);
1347 close (pipe_fd_in[0]);
1349 action = sam_parent_handler (pipe_fd_out[0], pipe_fd_in[1], pid);
1351 close (pipe_fd_out[0]);
1352 close (pipe_fd_in[1]);
1362 while (waitpid (pid, &child_status, 0) == -1 && errno == EINTR)
1365 old_action = action;
1385 sam_cmap_destroy_pid_path ();
1389 exit (WEXITSTATUS (child_status));
1400static void *hc_callback_thread (
void *unused_param)
1404 ssize_t bytes_readed;
1416 pfds.fd = sam_internal_data.cb_rpipe_fd;
1417 pfds.events = POLLIN;
1423 tmp_time_interval = -1;
1426 poll_error = poll (&pfds, 1, tmp_time_interval);
1428 if (poll_error == 0) {
1434 if (sam_internal_data.hc_callback () != 0) {
1442 if (poll_error > 0) {
1443 bytes_readed = sam_safe_read (sam_internal_data.cb_rpipe_fd, &command, 1);
1445 if (bytes_readed > 0) {
1459 return (unused_param);
1465 pthread_attr_t thread_attr;
1473 if (sam_internal_data.time_interval == 0) {
1477 if (sam_internal_data.cb_registered) {
1478 sam_internal_data.hc_callback = cb;
1491 pipe_error = pipe (pipe_fd);
1493 if (pipe_error != 0) {
1501 sam_internal_data.cb_rpipe_fd = pipe_fd[0];
1502 sam_internal_data.cb_wpipe_fd = pipe_fd[1];
1507 error = pthread_attr_init (&thread_attr);
1510 goto error_close_fd_exit;
1514 pthread_attr_setdetachstate (&thread_attr, PTHREAD_CREATE_DETACHED);
1515 pthread_attr_setstacksize (&thread_attr, 32768);
1520 error = pthread_create (&sam_internal_data.cb_thread, &thread_attr, hc_callback_thread, NULL);
1524 goto error_attr_destroy_exit;
1530 pthread_attr_destroy(&thread_attr);
1532 sam_internal_data.cb_registered = 1;
1533 sam_internal_data.hc_callback = cb;
1537error_attr_destroy_exit:
1538 pthread_attr_destroy(&thread_attr);
1540 sam_internal_data.cb_rpipe_fd = sam_internal_data.cb_wpipe_fd = 0;
cs_error_t
The cs_error_t enum.
uint64_t cmap_iter_handle_t
cs_error_t cmap_finalize(cmap_handle_t handle)
Close the cmap handle.
#define CMAP_KEYNAME_MAXLEN
cs_error_t cmap_iter_next(cmap_handle_t handle, cmap_iter_handle_t iter_handle, char key_name[], size_t *value_len, cmap_value_types_t *type)
Return next item in iterator iter.
cs_error_t cmap_delete(cmap_handle_t handle, const char *key_name)
Deletes key from cmap database.
cs_error_t cmap_initialize(cmap_handle_t *handle)
Create a new cmap connection.
cs_error_t cmap_iter_finalize(cmap_handle_t handle, cmap_iter_handle_t iter_handle)
Finalize iterator.
cs_error_t cmap_iter_init(cmap_handle_t handle, const char *prefix, cmap_iter_handle_t *cmap_iter_handle)
Initialize iterator with given prefix.
cs_error_t cmap_set_string(cmap_handle_t handle, const char *key_name, const char *value)
cs_error_t cmap_set_uint64(cmap_handle_t handle, const char *key_name, uint64_t value)
uint64_t quorum_handle_t
quorum_handle_t
cs_error_t quorum_initialize(quorum_handle_t *handle, quorum_callbacks_t *callbacks, uint32_t *quorum_type)
Create a new quorum connection.
cs_error_t quorum_fd_get(quorum_handle_t handle, int *fd)
Get a file descriptor on which to poll.
cs_error_t quorum_trackstart(quorum_handle_t handle, unsigned int flags)
Track node and quorum changes.
cs_error_t quorum_finalize(quorum_handle_t handle)
Close the quorum handle.
cs_error_t quorum_dispatch(quorum_handle_t handle, cs_dispatch_flags_t dispatch_types)
Dispatch messages and configuration changes.
cs_error_t sam_warn_signal_set(int warn_signal)
Set warning signal to be sent.
#define SAM_CMAP_S_FAILED
cs_error_t sam_finalize(void)
Close the SAM handle.
cs_error_t sam_hc_callback_register(sam_hc_callback_t cb)
Register healtcheck callback.
#define SAM_CMAP_S_STARTED
#define SAM_CMAP_S_Q_WAIT
cs_error_t sam_data_store(const void *data, size_t size)
Store user data.
@ SAM_COMMAND_MARK_FAILED
@ SAM_COMMAND_WARN_SIGNAL_SET
sam_hc_callback_t hc_callback
quorum_handle_t quorum_handle
#define SAM_CMAP_S_REGISTERED
cs_error_t sam_mark_failed(void)
Marks child as failed.
#define SAM_RP_MASK_C(pol)
#define SAM_RP_MASK_Q(pol)
cs_error_t sam_data_restore(void *data, size_t size)
Return stored data.
sam_recovery_policy_t recovery_policy
enum sam_internal_status_t internal_status
@ SAM_INTERNAL_STATUS_STARTED
@ SAM_INTERNAL_STATUS_NOT_INITIALIZED
@ SAM_INTERNAL_STATUS_FINALIZED
@ SAM_INTERNAL_STATUS_REGISTERED
@ SAM_INTERNAL_STATUS_INITIALIZED
cs_error_t sam_register(unsigned int *instance_id)
Register application.
cs_error_t sam_data_getsize(size_t *size)
Return size of stored data.
cs_error_t sam_stop(void)
Stop healthchecking.
cs_error_t sam_initialize(int time_interval, sam_recovery_policy_t recovery_policy)
Create a new SAM connection.
cs_error_t sam_hc_send(void)
Send healthcheck confirmation.
char cmap_pid_path[CMAP_KEYNAME_MAXLEN]
@ SAM_PARENT_ACTION_ERROR
@ SAM_PARENT_ACTION_CONTINUE
@ SAM_PARENT_ACTION_RECOVERY
cs_error_t sam_start(void)
Start healthchecking.
cmap_handle_t cmap_handle
size_t user_data_allocated
sam_recovery_policy_t
sam_recovery_policy_t enum
@ SAM_RECOVERY_POLICY_CMAP
@ SAM_RECOVERY_POLICY_QUORUM
@ SAM_RECOVERY_POLICY_QUIT
@ SAM_RECOVERY_POLICY_RESTART
int(* sam_hc_callback_t)(void)
Callback definition for event driven checking.
The quorum_callbacks_t struct.
quorum_notification_fn_t quorum_notify_fn
struct memb_ring_id ring_id