pacemaker  2.0.1-9e909a5bdd
Scalable High-Availability cluster resource manager
mainloop.c
Go to the documentation of this file.
1 /*
2  * Copyright 2004-2018 Andrew Beekhof <andrew@beekhof.net>
3  *
4  * This source code is licensed under the GNU Lesser General Public License
5  * version 2.1 or later (LGPLv2.1+) WITHOUT ANY WARRANTY.
6  */
7 
8 #include <crm_internal.h>
9 
10 #ifndef _GNU_SOURCE
11 # define _GNU_SOURCE
12 #endif
13 
14 #include <stdlib.h>
15 #include <signal.h>
16 #include <errno.h>
17 
18 #include <sys/wait.h>
19 
20 #include <crm/crm.h>
21 #include <crm/common/xml.h>
22 #include <crm/common/mainloop.h>
23 #include <crm/common/ipcs.h>
24 
25 #include <qb/qbarray.h>
26 
27 struct mainloop_child_s {
28  pid_t pid;
29  char *desc;
30  unsigned timerid;
31  gboolean timeout;
32  void *privatedata;
33 
35 
36  /* Called when a process dies */
37  void (*callback) (mainloop_child_t * p, pid_t pid, int core, int signo, int exitcode);
38 };
39 
40 struct trigger_s {
41  GSource source;
42  gboolean running;
43  gboolean trigger;
44  void *user_data;
45  guint id;
46 
47 };
48 
49 static gboolean
50 crm_trigger_prepare(GSource * source, gint * timeout)
51 {
52  crm_trigger_t *trig = (crm_trigger_t *) source;
53 
54  /* cluster-glue's FD and IPC related sources make use of
55  * g_source_add_poll() but do not set a timeout in their prepare
56  * functions
57  *
58  * This means mainloop's poll() will block until an event for one
59  * of these sources occurs - any /other/ type of source, such as
60  * this one or g_idle_*, that doesn't use g_source_add_poll() is
61  * S-O-L and won't be processed until there is something fd-based
62  * happens.
63  *
64  * Luckily the timeout we can set here affects all sources and
65  * puts an upper limit on how long poll() can take.
66  *
67  * So unconditionally set a small-ish timeout, not too small that
68  * we're in constant motion, which will act as an upper bound on
69  * how long the signal handling might be delayed for.
70  */
71  *timeout = 500; /* Timeout in ms */
72 
73  return trig->trigger;
74 }
75 
76 static gboolean
77 crm_trigger_check(GSource * source)
78 {
79  crm_trigger_t *trig = (crm_trigger_t *) source;
80 
81  return trig->trigger;
82 }
83 
84 static gboolean
85 crm_trigger_dispatch(GSource * source, GSourceFunc callback, gpointer userdata)
86 {
87  int rc = TRUE;
88  crm_trigger_t *trig = (crm_trigger_t *) source;
89 
90  if (trig->running) {
91  /* Wait until the existing job is complete before starting the next one */
92  return TRUE;
93  }
94  trig->trigger = FALSE;
95 
96  if (callback) {
97  rc = callback(trig->user_data);
98  if (rc < 0) {
99  crm_trace("Trigger handler %p not yet complete", trig);
100  trig->running = TRUE;
101  rc = TRUE;
102  }
103  }
104  return rc;
105 }
106 
107 static void
108 crm_trigger_finalize(GSource * source)
109 {
110  crm_trace("Trigger %p destroyed", source);
111 }
112 
113 #if 0
114 struct _GSourceCopy
115 {
116  gpointer callback_data;
117  GSourceCallbackFuncs *callback_funcs;
118 
119  const GSourceFuncs *source_funcs;
120  guint ref_count;
121 
122  GMainContext *context;
123 
124  gint priority;
125  guint flags;
126  guint source_id;
127 
128  GSList *poll_fds;
129 
130  GSource *prev;
131  GSource *next;
132 
133  char *name;
134 
135  void *priv;
136 };
137 
138 static int
139 g_source_refcount(GSource * source)
140 {
141  /* Duplicating the contents of private header files is a necessary evil */
142  if (source) {
143  struct _GSourceCopy *evil = (struct _GSourceCopy*)source;
144  return evil->ref_count;
145  }
146  return 0;
147 }
148 #else
149 static int g_source_refcount(GSource * source)
150 {
151  return 0;
152 }
153 #endif
154 
155 static GSourceFuncs crm_trigger_funcs = {
156  crm_trigger_prepare,
157  crm_trigger_check,
158  crm_trigger_dispatch,
159  crm_trigger_finalize,
160 };
161 
162 static crm_trigger_t *
163 mainloop_setup_trigger(GSource * source, int priority, int (*dispatch) (gpointer user_data),
164  gpointer userdata)
165 {
166  crm_trigger_t *trigger = NULL;
167 
168  trigger = (crm_trigger_t *) source;
169 
170  trigger->id = 0;
171  trigger->trigger = FALSE;
172  trigger->user_data = userdata;
173 
174  if (dispatch) {
175  g_source_set_callback(source, dispatch, trigger, NULL);
176  }
177 
178  g_source_set_priority(source, priority);
179  g_source_set_can_recurse(source, FALSE);
180 
181  crm_trace("Setup %p with ref-count=%u", source, g_source_refcount(source));
182  trigger->id = g_source_attach(source, NULL);
183  crm_trace("Attached %p with ref-count=%u", source, g_source_refcount(source));
184 
185  return trigger;
186 }
187 
188 void
190 {
191  crm_trace("Trigger handler %p complete", trig);
192  trig->running = FALSE;
193 }
194 
195 /* If dispatch returns:
196  * -1: Job running but not complete
197  * 0: Remove the trigger from mainloop
198  * 1: Leave the trigger in mainloop
199  */
201 mainloop_add_trigger(int priority, int (*dispatch) (gpointer user_data), gpointer userdata)
202 {
203  GSource *source = NULL;
204 
205  CRM_ASSERT(sizeof(crm_trigger_t) > sizeof(GSource));
206  source = g_source_new(&crm_trigger_funcs, sizeof(crm_trigger_t));
207  CRM_ASSERT(source != NULL);
208 
209  return mainloop_setup_trigger(source, priority, dispatch, userdata);
210 }
211 
212 void
214 {
215  if(source) {
216  source->trigger = TRUE;
217  }
218 }
219 
220 gboolean
222 {
223  GSource *gs = NULL;
224 
225  if(source == NULL) {
226  return TRUE;
227  }
228 
229  gs = (GSource *)source;
230 
231  if(g_source_refcount(gs) > 2) {
232  crm_info("Trigger %p is still referenced %u times", gs, g_source_refcount(gs));
233  }
234 
235  g_source_destroy(gs); /* Remove from mainloop, ref_count-- */
236  g_source_unref(gs); /* The caller no longer carries a reference to source
237  *
238  * At this point the source should be free'd,
239  * unless we're currently processing said
240  * source, in which case mainloop holds an
241  * additional reference and it will be free'd
242  * once our processing completes
243  */
244  return TRUE;
245 }
246 
247 typedef struct signal_s {
248  crm_trigger_t trigger; /* must be first */
249  void (*handler) (int sig);
250  int signal;
251 
252 } crm_signal_t;
253 
254 static crm_signal_t *crm_signals[NSIG];
255 
256 static gboolean
257 crm_signal_dispatch(GSource * source, GSourceFunc callback, gpointer userdata)
258 {
259  crm_signal_t *sig = (crm_signal_t *) source;
260 
261  if(sig->signal != SIGCHLD) {
262  crm_notice("Caught '%s' signal "CRM_XS" %d (%s handler)",
263  strsignal(sig->signal), sig->signal,
264  (sig->handler? "invoking" : "no"));
265  }
266 
267  sig->trigger.trigger = FALSE;
268  if (sig->handler) {
269  sig->handler(sig->signal);
270  }
271  return TRUE;
272 }
273 
274 static void
275 mainloop_signal_handler(int sig)
276 {
277  if (sig > 0 && sig < NSIG && crm_signals[sig] != NULL) {
278  mainloop_set_trigger((crm_trigger_t *) crm_signals[sig]);
279  }
280 }
281 
282 static GSourceFuncs crm_signal_funcs = {
283  crm_trigger_prepare,
284  crm_trigger_check,
285  crm_signal_dispatch,
286  crm_trigger_finalize,
287 };
288 
289 gboolean
290 crm_signal(int sig, void (*dispatch) (int sig))
291 {
292  sigset_t mask;
293  struct sigaction sa;
294  struct sigaction old;
295 
296  if (sigemptyset(&mask) < 0) {
297  crm_perror(LOG_ERR, "Call to sigemptyset failed");
298  return FALSE;
299  }
300 
301  memset(&sa, 0, sizeof(struct sigaction));
302  sa.sa_handler = dispatch;
303  sa.sa_flags = SA_RESTART;
304  sa.sa_mask = mask;
305 
306  if (sigaction(sig, &sa, &old) < 0) {
307  crm_perror(LOG_ERR, "Could not install signal handler for signal %d", sig);
308  return FALSE;
309  }
310 
311  return TRUE;
312 }
313 
314 static void
315 mainloop_destroy_signal_entry(int sig)
316 {
317  crm_signal_t *tmp = crm_signals[sig];
318 
319  crm_signals[sig] = NULL;
320 
321  crm_trace("Destroying signal %d", sig);
323 }
324 
325 gboolean
326 mainloop_add_signal(int sig, void (*dispatch) (int sig))
327 {
328  GSource *source = NULL;
329  int priority = G_PRIORITY_HIGH - 1;
330 
331  if (sig == SIGTERM) {
332  /* TERM is higher priority than other signals,
333  * signals are higher priority than other ipc.
334  * Yes, minus: smaller is "higher"
335  */
336  priority--;
337  }
338 
339  if (sig >= NSIG || sig < 0) {
340  crm_err("Signal %d is out of range", sig);
341  return FALSE;
342 
343  } else if (crm_signals[sig] != NULL && crm_signals[sig]->handler == dispatch) {
344  crm_trace("Signal handler for %d is already installed", sig);
345  return TRUE;
346 
347  } else if (crm_signals[sig] != NULL) {
348  crm_err("Different signal handler for %d is already installed", sig);
349  return FALSE;
350  }
351 
352  CRM_ASSERT(sizeof(crm_signal_t) > sizeof(GSource));
353  source = g_source_new(&crm_signal_funcs, sizeof(crm_signal_t));
354 
355  crm_signals[sig] = (crm_signal_t *) mainloop_setup_trigger(source, priority, NULL, NULL);
356  CRM_ASSERT(crm_signals[sig] != NULL);
357 
358  crm_signals[sig]->handler = dispatch;
359  crm_signals[sig]->signal = sig;
360 
361  if (crm_signal(sig, mainloop_signal_handler) == FALSE) {
362  mainloop_destroy_signal_entry(sig);
363  return FALSE;
364  }
365 #if 0
366  /* If we want signals to interrupt mainloop's poll(), instead of waiting for
367  * the timeout, then we should call siginterrupt() below
368  *
369  * For now, just enforce a low timeout
370  */
371  if (siginterrupt(sig, 1) < 0) {
372  crm_perror(LOG_INFO, "Could not enable system call interruptions for signal %d", sig);
373  }
374 #endif
375 
376  return TRUE;
377 }
378 
379 gboolean
381 {
382  if (sig >= NSIG || sig < 0) {
383  crm_err("Signal %d is out of range", sig);
384  return FALSE;
385 
386  } else if (crm_signal(sig, NULL) == FALSE) {
387  crm_perror(LOG_ERR, "Could not uninstall signal handler for signal %d", sig);
388  return FALSE;
389 
390  } else if (crm_signals[sig] == NULL) {
391  return TRUE;
392  }
393  mainloop_destroy_signal_entry(sig);
394  return TRUE;
395 }
396 
397 static qb_array_t *gio_map = NULL;
398 
399 void
401 {
402  if (gio_map) {
403  qb_array_free(gio_map);
404  }
405 
406  for (int sig = 0; sig < NSIG; ++sig) {
407  mainloop_destroy_signal_entry(sig);
408  }
409 }
410 
411 /*
412  * libqb...
413  */
414 struct gio_to_qb_poll {
415  int32_t is_used;
416  guint source;
417  int32_t events;
418  void *data;
419  qb_ipcs_dispatch_fn_t fn;
420  enum qb_loop_priority p;
421 };
422 
423 static gboolean
424 gio_read_socket(GIOChannel * gio, GIOCondition condition, gpointer data)
425 {
426  struct gio_to_qb_poll *adaptor = (struct gio_to_qb_poll *)data;
427  gint fd = g_io_channel_unix_get_fd(gio);
428 
429  crm_trace("%p.%d %d", data, fd, condition);
430 
431  /* if this assert get's hit, then there is a race condition between
432  * when we destroy a fd and when mainloop actually gives it up */
433  CRM_ASSERT(adaptor->is_used > 0);
434 
435  return (adaptor->fn(fd, condition, adaptor->data) == 0);
436 }
437 
438 static void
439 gio_poll_destroy(gpointer data)
440 {
441  struct gio_to_qb_poll *adaptor = (struct gio_to_qb_poll *)data;
442 
443  adaptor->is_used--;
444  CRM_ASSERT(adaptor->is_used >= 0);
445 
446  if (adaptor->is_used == 0) {
447  crm_trace("Marking adaptor %p unused", adaptor);
448  adaptor->source = 0;
449  }
450 }
451 
452 static int32_t
453 gio_poll_dispatch_update(enum qb_loop_priority p, int32_t fd, int32_t evts,
454  void *data, qb_ipcs_dispatch_fn_t fn, int32_t add)
455 {
456  struct gio_to_qb_poll *adaptor;
457  GIOChannel *channel;
458  int32_t res = 0;
459 
460  res = qb_array_index(gio_map, fd, (void **)&adaptor);
461  if (res < 0) {
462  crm_err("Array lookup failed for fd=%d: %d", fd, res);
463  return res;
464  }
465 
466  crm_trace("Adding fd=%d to mainloop as adaptor %p", fd, adaptor);
467 
468  if (add && adaptor->source) {
469  crm_err("Adaptor for descriptor %d is still in-use", fd);
470  return -EEXIST;
471  }
472  if (!add && !adaptor->is_used) {
473  crm_err("Adaptor for descriptor %d is not in-use", fd);
474  return -ENOENT;
475  }
476 
477  /* channel is created with ref_count = 1 */
478  channel = g_io_channel_unix_new(fd);
479  if (!channel) {
480  crm_err("No memory left to add fd=%d", fd);
481  return -ENOMEM;
482  }
483 
484  if (adaptor->source) {
485  g_source_remove(adaptor->source);
486  adaptor->source = 0;
487  }
488 
489  /* Because unlike the poll() API, glib doesn't tell us about HUPs by default */
490  evts |= (G_IO_HUP | G_IO_NVAL | G_IO_ERR);
491 
492  adaptor->fn = fn;
493  adaptor->events = evts;
494  adaptor->data = data;
495  adaptor->p = p;
496  adaptor->is_used++;
497  adaptor->source =
498  g_io_add_watch_full(channel, G_PRIORITY_DEFAULT, evts, gio_read_socket, adaptor,
499  gio_poll_destroy);
500 
501  /* Now that mainloop now holds a reference to channel,
502  * thanks to g_io_add_watch_full(), drop ours from g_io_channel_unix_new().
503  *
504  * This means that channel will be free'd by:
505  * g_main_context_dispatch()
506  * -> g_source_destroy_internal()
507  * -> g_source_callback_unref()
508  * shortly after gio_poll_destroy() completes
509  */
510  g_io_channel_unref(channel);
511 
512  crm_trace("Added to mainloop with gsource id=%d", adaptor->source);
513  if (adaptor->source > 0) {
514  return 0;
515  }
516 
517  return -EINVAL;
518 }
519 
520 static int32_t
521 gio_poll_dispatch_add(enum qb_loop_priority p, int32_t fd, int32_t evts,
522  void *data, qb_ipcs_dispatch_fn_t fn)
523 {
524  return gio_poll_dispatch_update(p, fd, evts, data, fn, QB_TRUE);
525 }
526 
527 static int32_t
528 gio_poll_dispatch_mod(enum qb_loop_priority p, int32_t fd, int32_t evts,
529  void *data, qb_ipcs_dispatch_fn_t fn)
530 {
531  return gio_poll_dispatch_update(p, fd, evts, data, fn, QB_FALSE);
532 }
533 
534 static int32_t
535 gio_poll_dispatch_del(int32_t fd)
536 {
537  struct gio_to_qb_poll *adaptor;
538 
539  crm_trace("Looking for fd=%d", fd);
540  if (qb_array_index(gio_map, fd, (void **)&adaptor) == 0) {
541  if (adaptor->source) {
542  g_source_remove(adaptor->source);
543  adaptor->source = 0;
544  }
545  }
546  return 0;
547 }
548 
549 struct qb_ipcs_poll_handlers gio_poll_funcs = {
550  .job_add = NULL,
551  .dispatch_add = gio_poll_dispatch_add,
552  .dispatch_mod = gio_poll_dispatch_mod,
553  .dispatch_del = gio_poll_dispatch_del,
554 };
555 
556 static enum qb_ipc_type
557 pick_ipc_type(enum qb_ipc_type requested)
558 {
559  const char *env = getenv("PCMK_ipc_type");
560 
561  if (env && strcmp("shared-mem", env) == 0) {
562  return QB_IPC_SHM;
563  } else if (env && strcmp("socket", env) == 0) {
564  return QB_IPC_SOCKET;
565  } else if (env && strcmp("posix", env) == 0) {
566  return QB_IPC_POSIX_MQ;
567  } else if (env && strcmp("sysv", env) == 0) {
568  return QB_IPC_SYSV_MQ;
569  } else if (requested == QB_IPC_NATIVE) {
570  /* We prefer shared memory because the server never blocks on
571  * send. If part of a message fits into the socket, libqb
572  * needs to block until the remainder can be sent also.
573  * Otherwise the client will wait forever for the remaining
574  * bytes.
575  */
576  return QB_IPC_SHM;
577  }
578  return requested;
579 }
580 
581 qb_ipcs_service_t *
582 mainloop_add_ipc_server(const char *name, enum qb_ipc_type type,
583  struct qb_ipcs_service_handlers * callbacks)
584 {
585  int rc = 0;
586  qb_ipcs_service_t *server = NULL;
587 
588  if (gio_map == NULL) {
589  gio_map = qb_array_create_2(64, sizeof(struct gio_to_qb_poll), 1);
590  }
591 
592  crm_client_init();
593  server = qb_ipcs_create(name, 0, pick_ipc_type(type), callbacks);
594 
595 #ifdef HAVE_IPCS_GET_BUFFER_SIZE
596  /* All clients should use at least ipc_buffer_max as their buffer size */
597  qb_ipcs_enforce_buffer_size(server, crm_ipc_default_buffer_size());
598 #endif
599 
600  qb_ipcs_poll_handlers_set(server, &gio_poll_funcs);
601 
602  rc = qb_ipcs_run(server);
603  if (rc < 0) {
604  crm_err("Could not start %s IPC server: %s (%d)", name, pcmk_strerror(rc), rc);
605  return NULL;
606  }
607 
608  return server;
609 }
610 
611 void
612 mainloop_del_ipc_server(qb_ipcs_service_t * server)
613 {
614  if (server) {
615  qb_ipcs_destroy(server);
616  }
617 }
618 
619 struct mainloop_io_s {
620  char *name;
621  void *userdata;
622 
623  int fd;
624  guint source;
625  crm_ipc_t *ipc;
626  GIOChannel *channel;
627 
628  int (*dispatch_fn_ipc) (const char *buffer, ssize_t length, gpointer userdata);
629  int (*dispatch_fn_io) (gpointer userdata);
630  void (*destroy_fn) (gpointer userdata);
631 
632 };
633 
634 static gboolean
635 mainloop_gio_callback(GIOChannel * gio, GIOCondition condition, gpointer data)
636 {
637  gboolean keep = TRUE;
638  mainloop_io_t *client = data;
639 
640  CRM_ASSERT(client->fd == g_io_channel_unix_get_fd(gio));
641 
642  if (condition & G_IO_IN) {
643  if (client->ipc) {
644  long rc = 0;
645  int max = 10;
646 
647  do {
648  rc = crm_ipc_read(client->ipc);
649  if (rc <= 0) {
650  crm_trace("Message acquisition from %s[%p] failed: %s (%ld)",
651  client->name, client, pcmk_strerror(rc), rc);
652 
653  } else if (client->dispatch_fn_ipc) {
654  const char *buffer = crm_ipc_buffer(client->ipc);
655 
656  crm_trace("New message from %s[%p] = %ld (I/O condition=%d)", client->name, client, rc, condition);
657  if (client->dispatch_fn_ipc(buffer, rc, client->userdata) < 0) {
658  crm_trace("Connection to %s no longer required", client->name);
659  keep = FALSE;
660  }
661  }
662 
663  } while (keep && rc > 0 && --max > 0);
664 
665  } else {
666  crm_trace("New message from %s[%p] %u", client->name, client, condition);
667  if (client->dispatch_fn_io) {
668  if (client->dispatch_fn_io(client->userdata) < 0) {
669  crm_trace("Connection to %s no longer required", client->name);
670  keep = FALSE;
671  }
672  }
673  }
674  }
675 
676  if (client->ipc && crm_ipc_connected(client->ipc) == FALSE) {
677  crm_err("Connection to %s closed " CRM_XS "client=%p condition=%d",
678  client->name, client, condition);
679  keep = FALSE;
680 
681  } else if (condition & (G_IO_HUP | G_IO_NVAL | G_IO_ERR)) {
682  crm_trace("The connection %s[%p] has been closed (I/O condition=%d)",
683  client->name, client, condition);
684  keep = FALSE;
685 
686  } else if ((condition & G_IO_IN) == 0) {
687  /*
688  #define GLIB_SYSDEF_POLLIN =1
689  #define GLIB_SYSDEF_POLLPRI =2
690  #define GLIB_SYSDEF_POLLOUT =4
691  #define GLIB_SYSDEF_POLLERR =8
692  #define GLIB_SYSDEF_POLLHUP =16
693  #define GLIB_SYSDEF_POLLNVAL =32
694 
695  typedef enum
696  {
697  G_IO_IN GLIB_SYSDEF_POLLIN,
698  G_IO_OUT GLIB_SYSDEF_POLLOUT,
699  G_IO_PRI GLIB_SYSDEF_POLLPRI,
700  G_IO_ERR GLIB_SYSDEF_POLLERR,
701  G_IO_HUP GLIB_SYSDEF_POLLHUP,
702  G_IO_NVAL GLIB_SYSDEF_POLLNVAL
703  } GIOCondition;
704 
705  A bitwise combination representing a condition to watch for on an event source.
706 
707  G_IO_IN There is data to read.
708  G_IO_OUT Data can be written (without blocking).
709  G_IO_PRI There is urgent data to read.
710  G_IO_ERR Error condition.
711  G_IO_HUP Hung up (the connection has been broken, usually for pipes and sockets).
712  G_IO_NVAL Invalid request. The file descriptor is not open.
713  */
714  crm_err("Strange condition: %d", condition);
715  }
716 
717  /* keep == FALSE results in mainloop_gio_destroy() being called
718  * just before the source is removed from mainloop
719  */
720  return keep;
721 }
722 
723 static void
724 mainloop_gio_destroy(gpointer c)
725 {
726  mainloop_io_t *client = c;
727  char *c_name = strdup(client->name);
728 
729  /* client->source is valid but about to be destroyed (ref_count == 0) in gmain.c
730  * client->channel will still have ref_count > 0... should be == 1
731  */
732  crm_trace("Destroying client %s[%p]", c_name, c);
733 
734  if (client->ipc) {
735  crm_ipc_close(client->ipc);
736  }
737 
738  if (client->destroy_fn) {
739  void (*destroy_fn) (gpointer userdata) = client->destroy_fn;
740 
741  client->destroy_fn = NULL;
742  destroy_fn(client->userdata);
743  }
744 
745  if (client->ipc) {
746  crm_ipc_t *ipc = client->ipc;
747 
748  client->ipc = NULL;
749  crm_ipc_destroy(ipc);
750  }
751 
752  crm_trace("Destroyed client %s[%p]", c_name, c);
753 
754  free(client->name); client->name = NULL;
755  free(client);
756 
757  free(c_name);
758 }
759 
761 mainloop_add_ipc_client(const char *name, int priority, size_t max_size, void *userdata,
762  struct ipc_client_callbacks *callbacks)
763 {
764  mainloop_io_t *client = NULL;
765  crm_ipc_t *conn = crm_ipc_new(name, max_size);
766 
767  if (conn && crm_ipc_connect(conn)) {
768  int32_t fd = crm_ipc_get_fd(conn);
769 
770  client = mainloop_add_fd(name, priority, fd, userdata, NULL);
771  }
772 
773  if (client == NULL) {
774  crm_perror(LOG_TRACE, "Connection to %s failed", name);
775  if (conn) {
776  crm_ipc_close(conn);
777  crm_ipc_destroy(conn);
778  }
779  return NULL;
780  }
781 
782  client->ipc = conn;
783  client->destroy_fn = callbacks->destroy;
784  client->dispatch_fn_ipc = callbacks->dispatch;
785  return client;
786 }
787 
788 void
790 {
791  mainloop_del_fd(client);
792 }
793 
794 crm_ipc_t *
796 {
797  if (client) {
798  return client->ipc;
799  }
800  return NULL;
801 }
802 
804 mainloop_add_fd(const char *name, int priority, int fd, void *userdata,
805  struct mainloop_fd_callbacks * callbacks)
806 {
807  mainloop_io_t *client = NULL;
808 
809  if (fd >= 0) {
810  client = calloc(1, sizeof(mainloop_io_t));
811  if (client == NULL) {
812  return NULL;
813  }
814  client->name = strdup(name);
815  client->userdata = userdata;
816 
817  if (callbacks) {
818  client->destroy_fn = callbacks->destroy;
819  client->dispatch_fn_io = callbacks->dispatch;
820  }
821 
822  client->fd = fd;
823  client->channel = g_io_channel_unix_new(fd);
824  client->source =
825  g_io_add_watch_full(client->channel, priority,
826  (G_IO_IN | G_IO_HUP | G_IO_NVAL | G_IO_ERR), mainloop_gio_callback,
827  client, mainloop_gio_destroy);
828 
829  /* Now that mainloop now holds a reference to channel,
830  * thanks to g_io_add_watch_full(), drop ours from g_io_channel_unix_new().
831  *
832  * This means that channel will be free'd by:
833  * g_main_context_dispatch() or g_source_remove()
834  * -> g_source_destroy_internal()
835  * -> g_source_callback_unref()
836  * shortly after mainloop_gio_destroy() completes
837  */
838  g_io_channel_unref(client->channel);
839  crm_trace("Added connection %d for %s[%p].%d", client->source, client->name, client, fd);
840  } else {
841  errno = EINVAL;
842  }
843 
844  return client;
845 }
846 
847 void
849 {
850  if (client != NULL) {
851  crm_trace("Removing client %s[%p]", client->name, client);
852  if (client->source) {
853  /* Results in mainloop_gio_destroy() being called just
854  * before the source is removed from mainloop
855  */
856  g_source_remove(client->source);
857  }
858  }
859 }
860 
861 static GListPtr child_list = NULL;
862 
863 pid_t
865 {
866  return child->pid;
867 }
868 
869 const char *
871 {
872  return child->desc;
873 }
874 
875 int
877 {
878  return child->timeout;
879 }
880 
881 void *
883 {
884  return child->privatedata;
885 }
886 
887 void
889 {
890  child->privatedata = NULL;
891 }
892 
893 /* good function name */
894 static void
895 child_free(mainloop_child_t *child)
896 {
897  if (child->timerid != 0) {
898  crm_trace("Removing timer %d", child->timerid);
899  g_source_remove(child->timerid);
900  child->timerid = 0;
901  }
902  free(child->desc);
903  free(child);
904 }
905 
906 /* terrible function name */
907 static int
908 child_kill_helper(mainloop_child_t *child)
909 {
910  int rc;
911  if (child->flags & mainloop_leave_pid_group) {
912  crm_debug("Kill pid %d only. leave group intact.", child->pid);
913  rc = kill(child->pid, SIGKILL);
914  } else {
915  crm_debug("Kill pid %d's group", child->pid);
916  rc = kill(-child->pid, SIGKILL);
917  }
918 
919  if (rc < 0) {
920  if (errno != ESRCH) {
921  crm_perror(LOG_ERR, "kill(%d, KILL) failed", child->pid);
922  }
923  return -errno;
924  }
925  return 0;
926 }
927 
928 static gboolean
929 child_timeout_callback(gpointer p)
930 {
931  mainloop_child_t *child = p;
932  int rc = 0;
933 
934  child->timerid = 0;
935  if (child->timeout) {
936  crm_crit("%s process (PID %d) will not die!", child->desc, (int)child->pid);
937  return FALSE;
938  }
939 
940  rc = child_kill_helper(child);
941  if (rc == ESRCH) {
942  /* Nothing left to do. pid doesn't exist */
943  return FALSE;
944  }
945 
946  child->timeout = TRUE;
947  crm_warn("%s process (PID %d) timed out", child->desc, (int)child->pid);
948 
949  child->timerid = g_timeout_add(5000, child_timeout_callback, child);
950  return FALSE;
951 }
952 
953 static gboolean
954 child_waitpid(mainloop_child_t *child, int flags)
955 {
956  int rc = 0;
957  int core = 0;
958  int signo = 0;
959  int status = 0;
960  int exitcode = 0;
961 
962  rc = waitpid(child->pid, &status, flags);
963  if(rc == 0) {
964  crm_perror(LOG_DEBUG, "wait(%d) = %d", child->pid, rc);
965  return FALSE;
966 
967  } else if(rc != child->pid) {
968  signo = SIGCHLD;
969  exitcode = 1;
970  status = 1;
971  crm_perror(LOG_ERR, "Call to waitpid(%d) failed", child->pid);
972 
973  } else {
974  crm_trace("Managed process %d exited: %p", child->pid, child);
975 
976  if (WIFEXITED(status)) {
977  exitcode = WEXITSTATUS(status);
978  crm_trace("Managed process %d (%s) exited with rc=%d", child->pid, child->desc, exitcode);
979 
980  } else if (WIFSIGNALED(status)) {
981  signo = WTERMSIG(status);
982  crm_trace("Managed process %d (%s) exited with signal=%d", child->pid, child->desc, signo);
983  }
984 #ifdef WCOREDUMP
985  if (WCOREDUMP(status)) {
986  core = 1;
987  crm_err("Managed process %d (%s) dumped core", child->pid, child->desc);
988  }
989 #endif
990  }
991 
992  if (child->callback) {
993  child->callback(child, child->pid, core, signo, exitcode);
994  }
995  return TRUE;
996 }
997 
998 static void
999 child_death_dispatch(int signal)
1000 {
1001  GListPtr iter = child_list;
1002  gboolean exited;
1003 
1004  while(iter) {
1005  GListPtr saved = NULL;
1006  mainloop_child_t *child = iter->data;
1007  exited = child_waitpid(child, WNOHANG);
1008 
1009  saved = iter;
1010  iter = iter->next;
1011 
1012  if (exited == FALSE) {
1013  continue;
1014  }
1015  crm_trace("Removing process entry %p for %d", child, child->pid);
1016 
1017  child_list = g_list_remove_link(child_list, saved);
1018  g_list_free(saved);
1019  child_free(child);
1020  }
1021 }
1022 
1023 static gboolean
1024 child_signal_init(gpointer p)
1025 {
1026  crm_trace("Installed SIGCHLD handler");
1027  /* Do NOT use g_child_watch_add() and friends, they rely on pthreads */
1028  mainloop_add_signal(SIGCHLD, child_death_dispatch);
1029 
1030  /* In case they terminated before the signal handler was installed */
1031  child_death_dispatch(SIGCHLD);
1032  return FALSE;
1033 }
1034 
1035 int
1037 {
1038  GListPtr iter;
1039  mainloop_child_t *child = NULL;
1040  mainloop_child_t *match = NULL;
1041  /* It is impossible to block SIGKILL, this allows us to
1042  * call waitpid without WNOHANG flag.*/
1043  int waitflags = 0, rc = 0;
1044 
1045  for (iter = child_list; iter != NULL && match == NULL; iter = iter->next) {
1046  child = iter->data;
1047  if (pid == child->pid) {
1048  match = child;
1049  }
1050  }
1051 
1052  if (match == NULL) {
1053  return FALSE;
1054  }
1055 
1056  rc = child_kill_helper(match);
1057  if(rc == -ESRCH) {
1058  /* It's gone, but hasn't shown up in waitpid() yet
1059  *
1060  * Wait until we get SIGCHLD and let child_death_dispatch()
1061  * clean it up as normal (so we get the correct return
1062  * code/status)
1063  *
1064  * The blocking alternative would be to call:
1065  * child_waitpid(match, 0);
1066  */
1067  crm_trace("Waiting for child %d to be reaped by child_death_dispatch()", match->pid);
1068  return TRUE;
1069 
1070  } else if(rc != 0) {
1071  /* If KILL for some other reason set the WNOHANG flag since we
1072  * can't be certain what happened.
1073  */
1074  waitflags = WNOHANG;
1075  }
1076 
1077  if (child_waitpid(match, waitflags) == FALSE) {
1078  /* not much we can do if this occurs */
1079  return FALSE;
1080  }
1081 
1082  child_list = g_list_remove(child_list, match);
1083  child_free(match);
1084  return TRUE;
1085 }
1086 
1087 /* Create/Log a new tracked process
1088  * To track a process group, use -pid
1089  */
1090 void
1091 mainloop_child_add_with_flags(pid_t pid, int timeout, const char *desc, void *privatedata, enum mainloop_child_flags flags,
1092  void (*callback) (mainloop_child_t * p, pid_t pid, int core, int signo, int exitcode))
1093 {
1094  static bool need_init = TRUE;
1095  mainloop_child_t *child = g_new(mainloop_child_t, 1);
1096 
1097  child->pid = pid;
1098  child->timerid = 0;
1099  child->timeout = FALSE;
1100  child->privatedata = privatedata;
1101  child->callback = callback;
1102  child->flags = flags;
1103 
1104  if(desc) {
1105  child->desc = strdup(desc);
1106  }
1107 
1108  if (timeout) {
1109  child->timerid = g_timeout_add(timeout, child_timeout_callback, child);
1110  }
1111 
1112  child_list = g_list_append(child_list, child);
1113 
1114  if(need_init) {
1115  need_init = FALSE;
1116  /* SIGCHLD processing has to be invoked from mainloop.
1117  * We do not want it to be possible to both add a child pid
1118  * to mainloop, and have the pid's exit callback invoked within
1119  * the same callstack. */
1120  g_timeout_add(1, child_signal_init, NULL);
1121  }
1122 }
1123 
1124 void
1125 mainloop_child_add(pid_t pid, int timeout, const char *desc, void *privatedata,
1126  void (*callback) (mainloop_child_t * p, pid_t pid, int core, int signo, int exitcode))
1127 {
1128  mainloop_child_add_with_flags(pid, timeout, desc, privatedata, 0, callback);
1129 }
1130 
1131 struct mainloop_timer_s {
1132  guint id;
1133  guint period_ms;
1134  bool repeat;
1135  char *name;
1136  GSourceFunc cb;
1137  void *userdata;
1138 };
1139 
1140 static gboolean mainloop_timer_cb(gpointer user_data)
1141 {
1142  int id = 0;
1143  bool repeat = FALSE;
1144  struct mainloop_timer_s *t = user_data;
1145 
1146  CRM_ASSERT(t != NULL);
1147 
1148  id = t->id;
1149  t->id = 0; /* Ensure it's unset during callbacks so that
1150  * mainloop_timer_running() works as expected
1151  */
1152 
1153  if(t->cb) {
1154  crm_trace("Invoking callbacks for timer %s", t->name);
1155  repeat = t->repeat;
1156  if(t->cb(t->userdata) == FALSE) {
1157  crm_trace("Timer %s complete", t->name);
1158  repeat = FALSE;
1159  }
1160  }
1161 
1162  if(repeat) {
1163  /* Restore if repeating */
1164  t->id = id;
1165  }
1166 
1167  return repeat;
1168 }
1169 
1171 {
1172  if(t && t->id != 0) {
1173  return TRUE;
1174  }
1175  return FALSE;
1176 }
1177 
1179 {
1181  if(t && t->period_ms > 0) {
1182  crm_trace("Starting timer %s", t->name);
1183  t->id = g_timeout_add(t->period_ms, mainloop_timer_cb, t);
1184  }
1185 }
1186 
1188 {
1189  if(t && t->id != 0) {
1190  crm_trace("Stopping timer %s", t->name);
1191  g_source_remove(t->id);
1192  t->id = 0;
1193  }
1194 }
1195 
1197 {
1198  guint last = 0;
1199 
1200  if(t) {
1201  last = t->period_ms;
1202  t->period_ms = period_ms;
1203  }
1204 
1205  if(t && t->id != 0 && last != t->period_ms) {
1207  }
1208  return last;
1209 }
1210 
1211 
1213 mainloop_timer_add(const char *name, guint period_ms, bool repeat, GSourceFunc cb, void *userdata)
1214 {
1215  mainloop_timer_t *t = calloc(1, sizeof(mainloop_timer_t));
1216 
1217  if(t) {
1218  if(name) {
1219  t->name = crm_strdup_printf("%s-%u-%d", name, period_ms, repeat);
1220  } else {
1221  t->name = crm_strdup_printf("%p-%u-%d", t, period_ms, repeat);
1222  }
1223  t->id = 0;
1224  t->period_ms = period_ms;
1225  t->repeat = repeat;
1226  t->cb = cb;
1227  t->userdata = userdata;
1228  crm_trace("Created timer %s with %p %p", t->name, userdata, t->userdata);
1229  }
1230  return t;
1231 }
1232 
1233 void
1235 {
1236  if(t) {
1237  crm_trace("Destroying timer %s", t->name);
1239  free(t->name);
1240  free(t);
1241  }
1242 }
1243 
1244 /*
1245  * Helpers to make sure certain events aren't lost at shutdown
1246  */
1247 
1248 static gboolean
1249 drain_timeout_cb(gpointer user_data)
1250 {
1251  bool *timeout_popped = (bool*) user_data;
1252 
1253  *timeout_popped = TRUE;
1254  return FALSE;
1255 }
1256 
1269 void
1270 pcmk_drain_main_loop(GMainLoop *mloop, guint timer_ms, bool (*check)(guint))
1271 {
1272  bool timeout_popped = FALSE;
1273  guint timer = 0;
1274  GMainContext *ctx = NULL;
1275 
1276  CRM_CHECK(mloop && check, return);
1277 
1278  ctx = g_main_loop_get_context(mloop);
1279  if (ctx) {
1280  time_t start_time = time(NULL);
1281 
1282  timer = g_timeout_add(timer_ms, drain_timeout_cb, &timeout_popped);
1283  while (!timeout_popped
1284  && check(timer_ms - (time(NULL) - start_time) * 1000)) {
1285  g_main_context_iteration(ctx, TRUE);
1286  }
1287  }
1288  if (!timeout_popped && (timer > 0)) {
1289  g_source_remove(timer);
1290  }
1291 }
void * mainloop_child_userdata(mainloop_child_t *child)
Definition: mainloop.c:882
#define LOG_TRACE
Definition: logging.h:35
#define CRM_CHECK(expr, failure_action)
Definition: logging.h:165
pid_t mainloop_child_pid(mainloop_child_t *child)
Definition: mainloop.c:864
bool mainloop_timer_running(mainloop_timer_t *t)
Definition: mainloop.c:1170
bool crm_ipc_connect(crm_ipc_t *client)
Establish an IPC connection to a Pacemaker component.
Definition: ipc.c:938
A dumping ground.
void(* destroy)(gpointer)
Definition: mainloop.h:68
#define crm_notice(fmt, args...)
Definition: logging.h:251
const char * pcmk_strerror(int rc)
Definition: results.c:184
struct signal_s crm_signal_t
#define crm_crit(fmt, args...)
Definition: logging.h:248
void mainloop_del_fd(mainloop_io_t *client)
Definition: mainloop.c:848
void mainloop_del_ipc_server(qb_ipcs_service_t *server)
Definition: mainloop.c:612
mainloop_child_flags
Definition: mainloop.h:23
gboolean mainloop_destroy_signal(int sig)
Definition: mainloop.c:380
crm_trigger_t * mainloop_add_trigger(int priority, int(*dispatch)(gpointer user_data), gpointer userdata)
Definition: mainloop.c:201
void(* destroy)(gpointer userdata)
Definition: mainloop.h:85
int crm_ipc_get_fd(crm_ipc_t *client)
Definition: ipc.c:1007
void mainloop_trigger_complete(crm_trigger_t *trig)
Definition: mainloop.c:189
const char * mainloop_child_name(mainloop_child_t *child)
Definition: mainloop.c:870
struct mainloop_timer_s mainloop_timer_t
Definition: mainloop.h:31
struct mainloop_io_s mainloop_io_t
Definition: mainloop.h:29
struct mainloop_child_s mainloop_child_t
Definition: mainloop.h:30
gboolean crm_signal(int sig, void(*dispatch)(int sig))
Definition: mainloop.c:290
long crm_ipc_read(crm_ipc_t *client)
Definition: ipc.c:1116
mainloop_timer_t * mainloop_timer_add(const char *name, guint period_ms, bool repeat, GSourceFunc cb, void *userdata)
Definition: mainloop.c:1213
uint32_t pid
Definition: internal.h:81
mainloop_io_t * mainloop_add_ipc_client(const char *name, int priority, size_t max_size, void *userdata, struct ipc_client_callbacks *callbacks)
Definition: mainloop.c:761
struct qb_ipcs_poll_handlers gio_poll_funcs
Definition: mainloop.c:549
guint mainloop_timer_set_period(mainloop_timer_t *t, guint period_ms)
Definition: mainloop.c:1196
Wrappers for and extensions to glib mainloop.
void crm_client_init(void)
Definition: ipc.c:237
void mainloop_timer_del(mainloop_timer_t *t)
Definition: mainloop.c:1234
crm_ipc_t * mainloop_get_ipc_client(mainloop_io_t *client)
Definition: mainloop.c:795
const char * crm_ipc_buffer(crm_ipc_t *client)
Definition: ipc.c:1163
void mainloop_del_ipc_client(mainloop_io_t *client)
Definition: mainloop.c:789
struct trigger_s crm_trigger_t
Definition: mainloop.h:28
uint32_t id
Definition: internal.h:80
int(* dispatch)(gpointer userdata)
Definition: mainloop.h:84
#define crm_warn(fmt, args...)
Definition: logging.h:250
int mainloop_child_timeout(mainloop_child_t *child)
Definition: mainloop.c:876
#define crm_debug(fmt, args...)
Definition: logging.h:254
struct crm_ipc_s crm_ipc_t
Definition: ipc.h:56
void mainloop_set_trigger(crm_trigger_t *source)
Definition: mainloop.c:213
#define crm_trace(fmt, args...)
Definition: logging.h:255
mainloop_io_t * mainloop_add_fd(const char *name, int priority, int fd, void *userdata, struct mainloop_fd_callbacks *callbacks)
Definition: mainloop.c:804
void mainloop_timer_start(mainloop_timer_t *t)
Definition: mainloop.c:1178
qb_ipcs_service_t * mainloop_add_ipc_server(const char *name, enum qb_ipc_type type, struct qb_ipcs_service_handlers *callbacks)
Definition: mainloop.c:582
Wrappers for and extensions to libxml2.
void mainloop_clear_child_userdata(mainloop_child_t *child)
Definition: mainloop.c:888
void mainloop_timer_stop(mainloop_timer_t *t)
Definition: mainloop.c:1187
void mainloop_child_add_with_flags(pid_t pid, int timeout, const char *desc, void *privatedata, enum mainloop_child_flags flags, void(*callback)(mainloop_child_t *p, pid_t pid, int core, int signo, int exitcode))
Definition: mainloop.c:1091
unsigned int crm_ipc_default_buffer_size(void)
Definition: ipc.c:56
void crm_ipc_destroy(crm_ipc_t *client)
Definition: ipc.c:984
void mainloop_child_add(pid_t pid, int timeout, const char *desc, void *privatedata, void(*callback)(mainloop_child_t *p, pid_t pid, int core, int signo, int exitcode))
Definition: mainloop.c:1125
bool crm_ipc_connected(crm_ipc_t *client)
Definition: ipc.c:1021
#define CRM_XS
Definition: logging.h:43
#define crm_perror(level, fmt, args...)
Log a system error message.
Definition: logging.h:227
#define crm_err(fmt, args...)
Definition: logging.h:249
#define CRM_ASSERT(expr)
Definition: results.h:20
crm_ipc_t * crm_ipc_new(const char *name, size_t max_size)
Definition: ipc.c:910
char data[0]
Definition: internal.h:90
void mainloop_cleanup(void)
Definition: mainloop.c:400
gboolean mainloop_add_signal(int sig, void(*dispatch)(int sig))
Definition: mainloop.c:326
int mainloop_child_kill(pid_t pid)
Definition: mainloop.c:1036
gboolean mainloop_destroy_trigger(crm_trigger_t *source)
Definition: mainloop.c:221
void pcmk_drain_main_loop(GMainLoop *mloop, guint timer_ms, bool(*check)(guint))
Process main loop events while a certain condition is met.
Definition: mainloop.c:1270
void crm_ipc_close(crm_ipc_t *client)
Definition: ipc.c:969
char * crm_strdup_printf(char const *format,...) __attribute__((__format__(__printf__
GList * GListPtr
Definition: crm.h:190
#define crm_info(fmt, args...)
Definition: logging.h:252
uint64_t flags
Definition: remote.c:148
int(* dispatch)(const char *buffer, ssize_t length, gpointer userdata)
Definition: mainloop.h:67
enum crm_ais_msg_types type
Definition: internal.h:83
#define int32_t
Definition: stdint.in.h:157