pacemaker  2.0.1-9e909a5bdd
Scalable High-Availability cluster resource manager
cpg.c
Go to the documentation of this file.
1 /*
2  * Copyright 2004-2018 Andrew Beekhof <andrew@beekhof.net>
3  *
4  * This source code is licensed under the GNU Lesser General Public License
5  * version 2.1 or later (LGPLv2.1+) WITHOUT ANY WARRANTY.
6  */
7 
8 #include <crm_internal.h>
9 #include <bzlib.h>
10 #include <sys/socket.h>
11 #include <netinet/in.h>
12 #include <arpa/inet.h>
13 #include <netdb.h>
14 
15 #include <crm/common/ipc.h>
16 #include <crm/cluster/internal.h>
17 #include <crm/common/mainloop.h>
18 #include <sys/utsname.h>
19 
20 #include <qb/qbipcc.h>
21 #include <qb/qbutil.h>
22 
23 #include <corosync/corodefs.h>
24 #include <corosync/corotypes.h>
25 #include <corosync/hdb.h>
26 #include <corosync/cpg.h>
27 
28 #include <crm/msg_xml.h>
29 
30 cpg_handle_t pcmk_cpg_handle = 0; /* TODO: Remove, use cluster.cpg_handle */
31 
32 static bool cpg_evicted = FALSE;
33 gboolean(*pcmk_cpg_dispatch_fn) (int kind, const char *from, const char *data) = NULL;
34 
35 #define cs_repeat(counter, max, code) do { \
36  code; \
37  if(rc == CS_ERR_TRY_AGAIN || rc == CS_ERR_QUEUE_FULL) { \
38  counter++; \
39  crm_debug("Retrying operation after %ds", counter); \
40  sleep(counter); \
41  } else { \
42  break; \
43  } \
44  } while(counter < max)
45 
46 void
48 {
49  pcmk_cpg_handle = 0;
50  if (cluster->cpg_handle) {
51  crm_trace("Disconnecting CPG");
52  cpg_leave(cluster->cpg_handle, &cluster->group);
53  cpg_finalize(cluster->cpg_handle);
54  cluster->cpg_handle = 0;
55 
56  } else {
57  crm_info("No CPG connection");
58  }
59 }
60 
61 uint32_t get_local_nodeid(cpg_handle_t handle)
62 {
63  int rc = CS_OK;
64  int retries = 0;
65  static uint32_t local_nodeid = 0;
66  cpg_handle_t local_handle = handle;
67  cpg_callbacks_t cb = { };
68 
69  if(local_nodeid != 0) {
70  return local_nodeid;
71  }
72 
73  if(handle == 0) {
74  crm_trace("Creating connection");
75  cs_repeat(retries, 5, rc = cpg_initialize(&local_handle, &cb));
76  }
77 
78  if (rc == CS_OK) {
79  retries = 0;
80  crm_trace("Performing lookup");
81  cs_repeat(retries, 5, rc = cpg_local_get(local_handle, &local_nodeid));
82  }
83 
84  if (rc != CS_OK) {
85  crm_err("Could not get local node id from the CPG API: %s (%d)", ais_error2text(rc), rc);
86  }
87  if(handle == 0) {
88  crm_trace("Closing connection");
89  cpg_finalize(local_handle);
90  }
91  crm_debug("Local nodeid is %u", local_nodeid);
92  return local_nodeid;
93 }
94 
95 
98 
99 static ssize_t crm_cs_flush(gpointer data);
100 
101 static gboolean
102 crm_cs_flush_cb(gpointer data)
103 {
104  cs_message_timer = 0;
105  crm_cs_flush(data);
106  return FALSE;
107 }
108 
109 #define CS_SEND_MAX 200
110 static ssize_t
111 crm_cs_flush(gpointer data)
112 {
113  int sent = 0;
114  ssize_t rc = 0;
115  int queue_len = 0;
116  static unsigned int last_sent = 0;
117  cpg_handle_t *handle = (cpg_handle_t *)data;
118 
119  if (*handle == 0) {
120  crm_trace("Connection is dead");
121  return pcmk_ok;
122  }
123 
124  queue_len = g_list_length(cs_message_queue);
125  if ((queue_len % 1000) == 0 && queue_len > 1) {
126  crm_err("CPG queue has grown to %d", queue_len);
127 
128  } else if (queue_len == CS_SEND_MAX) {
129  crm_warn("CPG queue has grown to %d", queue_len);
130  }
131 
132  if (cs_message_timer) {
133  /* There is already a timer, wait until it goes off */
134  crm_trace("Timer active %d", cs_message_timer);
135  return pcmk_ok;
136  }
137 
138  while (cs_message_queue && sent < CS_SEND_MAX) {
139  struct iovec *iov = cs_message_queue->data;
140 
141  errno = 0;
142  rc = cpg_mcast_joined(*handle, CPG_TYPE_AGREED, iov, 1);
143 
144  if (rc != CS_OK) {
145  break;
146  }
147 
148  sent++;
149  last_sent++;
150  crm_trace("CPG message sent, size=%llu",
151  (unsigned long long) iov->iov_len);
152 
153  cs_message_queue = g_list_remove(cs_message_queue, iov);
154  free(iov->iov_base);
155  free(iov);
156  }
157 
158  queue_len -= sent;
159  if (sent > 1 || cs_message_queue) {
160  crm_info("Sent %d CPG messages (%d remaining, last=%u): %s (%lld)",
161  sent, queue_len, last_sent, ais_error2text(rc),
162  (long long) rc);
163  } else {
164  crm_trace("Sent %d CPG messages (%d remaining, last=%u): %s (%lld)",
165  sent, queue_len, last_sent, ais_error2text(rc),
166  (long long) rc);
167  }
168 
169  if (cs_message_queue) {
170  uint32_t delay_ms = 100;
171  if(rc != CS_OK) {
172  /* Proportionally more if sending failed but cap at 1s */
173  delay_ms = QB_MIN(1000, CS_SEND_MAX + (10 * queue_len));
174  }
175  cs_message_timer = g_timeout_add(delay_ms, crm_cs_flush_cb, data);
176  }
177 
178  return rc;
179 }
180 
181 gboolean
182 send_cpg_iov(struct iovec * iov)
183 {
184  static unsigned int queued = 0;
185 
186  queued++;
187  crm_trace("Queueing CPG message %u (%llu bytes)",
188  queued, (unsigned long long) iov->iov_len);
189  cs_message_queue = g_list_append(cs_message_queue, iov);
190  crm_cs_flush(&pcmk_cpg_handle);
191  return TRUE;
192 }
193 
194 static int
195 pcmk_cpg_dispatch(gpointer user_data)
196 {
197  int rc = 0;
198  crm_cluster_t *cluster = (crm_cluster_t*) user_data;
199 
200  rc = cpg_dispatch(cluster->cpg_handle, CS_DISPATCH_ONE);
201  if (rc != CS_OK) {
202  crm_err("Connection to the CPG API failed: %s (%d)", ais_error2text(rc), rc);
203  cluster->cpg_handle = 0;
204  return -1;
205 
206  } else if(cpg_evicted) {
207  crm_err("Evicted from CPG membership");
208  return -1;
209  }
210  return 0;
211 }
212 
213 char *
214 pcmk_message_common_cs(cpg_handle_t handle, uint32_t nodeid, uint32_t pid, void *content,
215  uint32_t *kind, const char **from)
216 {
217  char *data = NULL;
218  AIS_Message *msg = (AIS_Message *) content;
219 
220  if(handle) {
221  // Do filtering and field massaging
222  uint32_t local_nodeid = get_local_nodeid(handle);
223  const char *local_name = get_local_node_name();
224 
225  if (msg->sender.id > 0 && msg->sender.id != nodeid) {
226  crm_err("Nodeid mismatch from %d.%d: claimed nodeid=%u", nodeid, pid, msg->sender.id);
227  return NULL;
228 
229  } else if (msg->host.id != 0 && (local_nodeid != msg->host.id)) {
230  /* Not for us */
231  crm_trace("Not for us: %u != %u", msg->host.id, local_nodeid);
232  return NULL;
233  } else if (msg->host.size != 0 && safe_str_neq(msg->host.uname, local_name)) {
234  /* Not for us */
235  crm_trace("Not for us: %s != %s", msg->host.uname, local_name);
236  return NULL;
237  }
238 
239  msg->sender.id = nodeid;
240  if (msg->sender.size == 0) {
241  crm_node_t *peer = crm_get_peer(nodeid, NULL);
242 
243  if (peer == NULL) {
244  crm_err("Peer with nodeid=%u is unknown", nodeid);
245 
246  } else if (peer->uname == NULL) {
247  crm_err("No uname for peer with nodeid=%u", nodeid);
248 
249  } else {
250  crm_notice("Fixing uname for peer with nodeid=%u", nodeid);
251  msg->sender.size = strlen(peer->uname);
252  memset(msg->sender.uname, 0, MAX_NAME);
253  memcpy(msg->sender.uname, peer->uname, msg->sender.size);
254  }
255  }
256  }
257 
258  crm_trace("Got new%s message (size=%d, %d, %d)",
259  msg->is_compressed ? " compressed" : "",
260  ais_data_len(msg), msg->size, msg->compressed_size);
261 
262  if (kind != NULL) {
263  *kind = msg->header.id;
264  }
265  if (from != NULL) {
266  *from = msg->sender.uname;
267  }
268 
269  if (msg->is_compressed && msg->size > 0) {
270  int rc = BZ_OK;
271  char *uncompressed = NULL;
272  unsigned int new_size = msg->size + 1;
273 
274  if (check_message_sanity(msg, NULL) == FALSE) {
275  goto badmsg;
276  }
277 
278  crm_trace("Decompressing message data");
279  uncompressed = calloc(1, new_size);
280  rc = BZ2_bzBuffToBuffDecompress(uncompressed, &new_size, msg->data, msg->compressed_size, 1, 0);
281 
282  if (rc != BZ_OK) {
283  crm_err("Decompression failed: %s " CRM_XS " bzerror=%d",
284  bz2_strerror(rc), rc);
285  free(uncompressed);
286  goto badmsg;
287  }
288 
289  CRM_ASSERT(rc == BZ_OK);
290  CRM_ASSERT(new_size == msg->size);
291 
292  data = uncompressed;
293 
294  } else if (check_message_sanity(msg, data) == FALSE) {
295  goto badmsg;
296 
297  } else if (safe_str_eq("identify", data)) {
298  char *pid_s = crm_getpid_s();
299 
300  send_cluster_text(crm_class_cluster, pid_s, TRUE, NULL, crm_msg_ais);
301  free(pid_s);
302  return NULL;
303 
304  } else {
305  data = strdup(msg->data);
306  }
307 
308  // Is this necessary?
309  crm_get_peer(msg->sender.id, msg->sender.uname);
310 
311  crm_trace("Payload: %.200s", data);
312  return data;
313 
314  badmsg:
315  crm_err("Invalid message (id=%d, dest=%s:%s, from=%s:%s.%d):"
316  " min=%d, total=%d, size=%d, bz2_size=%d",
317  msg->id, ais_dest(&(msg->host)), msg_type2text(msg->host.type),
318  ais_dest(&(msg->sender)), msg_type2text(msg->sender.type),
319  msg->sender.pid, (int)sizeof(AIS_Message),
320  msg->header.size, msg->size, msg->compressed_size);
321 
322  free(data);
323  return NULL;
324 }
325 
326 #define PEER_NAME(peer) ((peer)? ((peer)->uname? (peer)->uname : "<unknown>") : "<none>")
327 
328 void
329 pcmk_cpg_membership(cpg_handle_t handle,
330  const struct cpg_name *groupName,
331  const struct cpg_address *member_list, size_t member_list_entries,
332  const struct cpg_address *left_list, size_t left_list_entries,
333  const struct cpg_address *joined_list, size_t joined_list_entries)
334 {
335  int i;
336  gboolean found = FALSE;
337  static int counter = 0;
338  uint32_t local_nodeid = get_local_nodeid(handle);
339 
340  for (i = 0; i < left_list_entries; i++) {
341  crm_node_t *peer = crm_find_peer(left_list[i].nodeid, NULL);
342 
343  crm_info("Group event %s.%d: node %u (%s) left",
344  groupName->value, counter, left_list[i].nodeid,
345  PEER_NAME(peer));
346  if (peer) {
347  crm_update_peer_proc(__FUNCTION__, peer, crm_proc_cpg, OFFLINESTATUS);
348  }
349  }
350 
351  for (i = 0; i < joined_list_entries; i++) {
352  crm_info("Group event %s.%d: node %u joined",
353  groupName->value, counter, joined_list[i].nodeid);
354  }
355 
356  for (i = 0; i < member_list_entries; i++) {
357  crm_node_t *peer = crm_get_peer(member_list[i].nodeid, NULL);
358 
359  crm_info("Group event %s.%d: node %u (%s) is member",
360  groupName->value, counter, member_list[i].nodeid,
361  PEER_NAME(peer));
362 
363  /* If the caller left auto-reaping enabled, this will also update the
364  * state to member.
365  */
366  peer = crm_update_peer_proc(__FUNCTION__, peer, crm_proc_cpg, ONLINESTATUS);
367 
368  if (peer && peer->state && strcmp(peer->state, CRM_NODE_MEMBER)) {
369  /* The node is a CPG member, but we currently think it's not a
370  * cluster member. This is possible only if auto-reaping was
371  * disabled. The node may be joining, and we happened to get the CPG
372  * notification before the quorum notification; or the node may have
373  * just died, and we are processing its final messages; or a bug
374  * has affected the peer cache.
375  */
376  time_t now = time(NULL);
377 
378  if (peer->when_lost == 0) {
379  // Track when we first got into this contradictory state
380  peer->when_lost = now;
381 
382  } else if (now > (peer->when_lost + 60)) {
383  // If it persists for more than a minute, update the state
384  crm_warn("Node %u member of group %s but believed offline",
385  member_list[i].nodeid, groupName->value);
386  crm_update_peer_state(__FUNCTION__, peer, CRM_NODE_MEMBER, 0);
387  }
388  }
389 
390  if (local_nodeid == member_list[i].nodeid) {
391  found = TRUE;
392  }
393  }
394 
395  if (!found) {
396  crm_err("We're not part of CPG group '%s' anymore!", groupName->value);
397  cpg_evicted = TRUE;
398  }
399 
400  counter++;
401 }
402 
403 gboolean
405 {
406  int rc = -1;
407  int fd = 0;
408  int retries = 0;
409  uint32_t id = 0;
410  crm_node_t *peer = NULL;
411  cpg_handle_t handle = 0;
412  const char *message_name = pcmk_message_name(crm_system_name);
413 
414  struct mainloop_fd_callbacks cpg_fd_callbacks = {
415  .dispatch = pcmk_cpg_dispatch,
416  .destroy = cluster->destroy,
417  };
418 
419  cpg_callbacks_t cpg_callbacks = {
420  .cpg_deliver_fn = cluster->cpg.cpg_deliver_fn,
421  .cpg_confchg_fn = cluster->cpg.cpg_confchg_fn,
422  /* .cpg_deliver_fn = pcmk_cpg_deliver, */
423  /* .cpg_confchg_fn = pcmk_cpg_membership, */
424  };
425 
426  cpg_evicted = FALSE;
427  cluster->group.length = 0;
428  cluster->group.value[0] = 0;
429 
430  /* group.value is char[128] */
431  strncpy(cluster->group.value, message_name, 127);
432  cluster->group.value[127] = 0;
433  cluster->group.length = 1 + QB_MIN(127, strlen(cluster->group.value));
434 
435  cs_repeat(retries, 30, rc = cpg_initialize(&handle, &cpg_callbacks));
436  if (rc != CS_OK) {
437  crm_err("Could not connect to the Cluster Process Group API: %d", rc);
438  goto bail;
439  }
440 
441  id = get_local_nodeid(handle);
442  if (id == 0) {
443  crm_err("Could not get local node id from the CPG API");
444  goto bail;
445 
446  }
447  cluster->nodeid = id;
448 
449  retries = 0;
450  cs_repeat(retries, 30, rc = cpg_join(handle, &cluster->group));
451  if (rc != CS_OK) {
452  crm_err("Could not join the CPG group '%s': %d", message_name, rc);
453  goto bail;
454  }
455 
456  rc = cpg_fd_get(handle, &fd);
457  if (rc != CS_OK) {
458  crm_err("Could not obtain the CPG API connection: %d", rc);
459  goto bail;
460  }
461 
462  pcmk_cpg_handle = handle;
463  cluster->cpg_handle = handle;
464  mainloop_add_fd("corosync-cpg", G_PRIORITY_MEDIUM, fd, cluster, &cpg_fd_callbacks);
465 
466  bail:
467  if (rc != CS_OK) {
468  cpg_finalize(handle);
469  return FALSE;
470  }
471 
472  peer = crm_get_peer(id, NULL);
473  crm_update_peer_proc(__FUNCTION__, peer, crm_proc_cpg, ONLINESTATUS);
474  return TRUE;
475 }
476 
477 gboolean
478 send_cluster_message_cs(xmlNode * msg, gboolean local, crm_node_t * node, enum crm_ais_msg_types dest)
479 {
480  gboolean rc = TRUE;
481  char *data = NULL;
482 
483  data = dump_xml_unformatted(msg);
484  rc = send_cluster_text(crm_class_cluster, data, local, node, dest);
485  free(data);
486  return rc;
487 }
488 
489 gboolean
490 send_cluster_text(enum crm_ais_msg_class msg_class, const char *data,
491  gboolean local, crm_node_t *node, enum crm_ais_msg_types dest)
492 {
493  static int msg_id = 0;
494  static int local_pid = 0;
495  static int local_name_len = 0;
496  static const char *local_name = NULL;
497 
498  char *target = NULL;
499  struct iovec *iov;
500  AIS_Message *msg = NULL;
502 
503  switch (msg_class) {
504  case crm_class_cluster:
505  break;
506  default:
507  crm_err("Invalid message class: %d", msg_class);
508  return FALSE;
509  }
510 
511  CRM_CHECK(dest != crm_msg_ais, return FALSE);
512 
513  if(local_name == NULL) {
514  local_name = get_local_node_name();
515  }
516  if(local_name_len == 0 && local_name) {
517  local_name_len = strlen(local_name);
518  }
519 
520  if (data == NULL) {
521  data = "";
522  }
523 
524  if (local_pid == 0) {
525  local_pid = getpid();
526  }
527 
528  if (sender == crm_msg_none) {
529  sender = local_pid;
530  }
531 
532  msg = calloc(1, sizeof(AIS_Message));
533 
534  msg_id++;
535  msg->id = msg_id;
536  msg->header.id = msg_class;
537  msg->header.error = CS_OK;
538 
539  msg->host.type = dest;
540  msg->host.local = local;
541 
542  if (node) {
543  if (node->uname) {
544  target = strdup(node->uname);
545  msg->host.size = strlen(node->uname);
546  memset(msg->host.uname, 0, MAX_NAME);
547  memcpy(msg->host.uname, node->uname, msg->host.size);
548  } else {
549  target = crm_strdup_printf("%u", node->id);
550  }
551  msg->host.id = node->id;
552  } else {
553  target = strdup("all");
554  }
555 
556  msg->sender.id = 0;
557  msg->sender.type = sender;
558  msg->sender.pid = local_pid;
559  msg->sender.size = local_name_len;
560  memset(msg->sender.uname, 0, MAX_NAME);
561  if(local_name && msg->sender.size) {
562  memcpy(msg->sender.uname, local_name, msg->sender.size);
563  }
564 
565  msg->size = 1 + strlen(data);
566  msg->header.size = sizeof(AIS_Message) + msg->size;
567 
568  if (msg->size < CRM_BZ2_THRESHOLD) {
569  msg = realloc_safe(msg, msg->header.size);
570  memcpy(msg->data, data, msg->size);
571 
572  } else {
573  char *compressed = NULL;
574  unsigned int new_size = 0;
575  char *uncompressed = strdup(data);
576 
577  if (crm_compress_string(uncompressed, msg->size, 0, &compressed, &new_size)) {
578 
579  msg->header.size = sizeof(AIS_Message) + new_size;
580  msg = realloc_safe(msg, msg->header.size);
581  memcpy(msg->data, compressed, new_size);
582 
583  msg->is_compressed = TRUE;
584  msg->compressed_size = new_size;
585 
586  } else {
587  msg = realloc_safe(msg, msg->header.size);
588  memcpy(msg->data, data, msg->size);
589  }
590 
591  free(uncompressed);
592  free(compressed);
593  }
594 
595  iov = calloc(1, sizeof(struct iovec));
596  iov->iov_base = msg;
597  iov->iov_len = msg->header.size;
598 
599  if (msg->compressed_size) {
600  crm_trace("Queueing CPG message %u to %s (%llu bytes, %d bytes compressed payload): %.200s",
601  msg->id, target, (unsigned long long) iov->iov_len,
602  msg->compressed_size, data);
603  } else {
604  crm_trace("Queueing CPG message %u to %s (%llu bytes, %d bytes payload): %.200s",
605  msg->id, target, (unsigned long long) iov->iov_len,
606  msg->size, data);
607  }
608  free(target);
609 
610  send_cpg_iov(iov);
611 
612  return TRUE;
613 }
614 
616 text2msg_type(const char *text)
617 {
618  int type = crm_msg_none;
619 
620  CRM_CHECK(text != NULL, return type);
621  text = pcmk_message_name(text);
622  if (safe_str_eq(text, "ais")) {
623  type = crm_msg_ais;
624  } else if (safe_str_eq(text, CRM_SYSTEM_CIB)) {
625  type = crm_msg_cib;
626  } else if (safe_str_eq(text, CRM_SYSTEM_CRMD)
627  || safe_str_eq(text, CRM_SYSTEM_DC)) {
628  type = crm_msg_crmd;
629  } else if (safe_str_eq(text, CRM_SYSTEM_TENGINE)) {
630  type = crm_msg_te;
631  } else if (safe_str_eq(text, CRM_SYSTEM_PENGINE)) {
632  type = crm_msg_pe;
633  } else if (safe_str_eq(text, CRM_SYSTEM_LRMD)) {
634  type = crm_msg_lrmd;
635  } else if (safe_str_eq(text, CRM_SYSTEM_STONITHD)) {
636  type = crm_msg_stonithd;
637  } else if (safe_str_eq(text, "stonith-ng")) {
638  type = crm_msg_stonith_ng;
639  } else if (safe_str_eq(text, "attrd")) {
640  type = crm_msg_attrd;
641 
642  } else {
643  /* This will normally be a transient client rather than
644  * a cluster daemon. Set the type to the pid of the client
645  */
646  int scan_rc = sscanf(text, "%d", &type);
647 
648  if (scan_rc != 1 || type <= crm_msg_stonith_ng) {
649  /* Ensure it's sane */
650  type = crm_msg_none;
651  }
652  }
653  return type;
654 }
enum crm_ais_msg_types type
Definition: internal.h:20
#define CRM_CHECK(expr, failure_action)
Definition: logging.h:165
char data[0]
Definition: internal.h:37
gboolean send_cpg_iov(struct iovec *iov)
Definition: cpg.c:182
#define crm_notice(fmt, args...)
Definition: logging.h:251
gboolean is_compressed
Definition: internal.h:29
const char * bz2_strerror(int rc)
Definition: results.c:425
uint32_t size
Definition: internal.h:34
gboolean safe_str_neq(const char *a, const char *b)
Definition: strings.c:141
crm_ais_msg_types
Definition: cluster.h:94
mainloop_io_t * mainloop_add_fd(const char *name, int priority, int fd, void *userdata, struct mainloop_fd_callbacks *callbacks)
Definition: mainloop.c:804
uint32_t nodeid
Definition: cluster.h:74
uint32_t id
Definition: cluster.h:60
const char * get_local_node_name(void)
Definition: cluster.c:118
void(* destroy)(gpointer)
Definition: cluster.h:76
uint32_t id
Definition: internal.h:17
crm_node_t * crm_get_peer(unsigned int id, const char *uname)
Definition: membership.c:653
char * crm_system_name
Definition: utils.c:59
#define CS_SEND_MAX
Definition: cpg.c:109
uint32_t pid
Definition: internal.h:81
AIS_Host sender
Definition: internal.h:85
gboolean send_cluster_text(enum crm_ais_msg_class msg_class, const char *data, gboolean local, crm_node_t *node, enum crm_ais_msg_types dest)
Definition: cpg.c:490
char * pcmk_message_common_cs(cpg_handle_t handle, uint32_t nodeid, uint32_t pid, void *content, uint32_t *kind, const char **from)
Definition: cpg.c:214
Wrappers for and extensions to glib mainloop.
#define CRM_SYSTEM_DC
Definition: crm.h:74
uint32_t id
Definition: internal.h:80
void cluster_disconnect_cpg(crm_cluster_t *cluster)
Definition: cpg.c:47
int(* dispatch)(gpointer userdata)
Definition: mainloop.h:84
int cs_message_timer
Definition: cpg.c:97
#define crm_warn(fmt, args...)
Definition: logging.h:250
#define crm_debug(fmt, args...)
Definition: logging.h:254
const char * pcmk_message_name(const char *name)
Get name to be used as identifier for cluster messages.
Definition: utils.c:1097
time_t when_lost
Definition: cluster.h:61
GListPtr cs_message_queue
Definition: cpg.c:96
#define crm_trace(fmt, args...)
Definition: logging.h:255
gboolean local
Definition: internal.h:19
crm_node_t * crm_update_peer_proc(const char *source, crm_node_t *peer, uint32_t flag, const char *status)
Definition: membership.c:785
#define CRM_SYSTEM_PENGINE
Definition: crm.h:80
AIS_Host sender
Definition: internal.h:32
uint32_t id
Definition: internal.h:28
gboolean check_message_sanity(const AIS_Message *msg, const char *data)
Definition: corosync.c:379
struct crm_ais_msg_s AIS_Message
Definition: internal.h:14
cpg_handle_t pcmk_cpg_handle
Definition: cpg.c:30
#define ais_data_len(msg)
Definition: internal.h:123
uint32_t size
Definition: internal.h:21
#define CRM_NODE_MEMBER
Definition: cluster.h:29
#define PEER_NAME(peer)
Definition: cpg.c:326
uint32_t compressed_size
Definition: internal.h:35
#define MAX_NAME
Definition: crm.h:36
#define CRM_SYSTEM_CRMD
Definition: crm.h:78
crm_ais_msg_class
Definition: cluster.h:90
#define CRM_XS
Definition: logging.h:43
bool crm_compress_string(const char *data, int length, int max, char **result, unsigned int *result_len)
Definition: strings.c:411
#define CRM_SYSTEM_STONITHD
Definition: crm.h:82
crm_node_t * crm_update_peer_state(const char *source, crm_node_t *node, const char *state, int membership)
Update a node&#39;s state and membership information.
Definition: membership.c:965
#define CRM_SYSTEM_CIB
Definition: crm.h:77
#define CRM_SYSTEM_TENGINE
Definition: crm.h:81
uint32_t get_local_nodeid(cpg_handle_t handle)
Definition: cpg.c:61
gboolean(* pcmk_cpg_dispatch_fn)(int kind, const char *from, const char *data)
Definition: cpg.c:33
#define crm_err(fmt, args...)
Definition: logging.h:249
#define G_PRIORITY_MEDIUM
Definition: mainloop.h:121
#define CRM_ASSERT(expr)
Definition: results.h:20
char uname[MAX_NAME]
Definition: internal.h:22
#define OFFLINESTATUS
Definition: util.h:35
enum crm_ais_msg_types text2msg_type(const char *text)
Definition: cpg.c:616
#define CRM_BZ2_THRESHOLD
Definition: xml.h:45
char * dump_xml_unformatted(xmlNode *msg)
Definition: xml.c:3164
#define CRM_SYSTEM_LRMD
Definition: crm.h:79
gboolean send_cluster_message_cs(xmlNode *msg, gboolean local, crm_node_t *node, enum crm_ais_msg_types dest)
Definition: cpg.c:478
#define uint32_t
Definition: stdint.in.h:158
char data[0]
Definition: internal.h:90
char * state
Definition: cluster.h:54
#define pcmk_ok
Definition: results.h:35
Wrappers for and extensions to libqb IPC.
uint32_t pid
Definition: internal.h:18
char * uname
Definition: cluster.h:52
#define cs_repeat(counter, max, code)
Definition: cpg.c:35
AIS_Host host
Definition: internal.h:31
#define safe_str_eq(a, b)
Definition: util.h:54
#define ONLINESTATUS
Definition: util.h:34
crm_node_t * crm_find_peer(unsigned int id, const char *uname)
Definition: membership.c:522
void pcmk_cpg_membership(cpg_handle_t handle, const struct cpg_name *groupName, const struct cpg_address *member_list, size_t member_list_entries, const struct cpg_address *left_list, size_t left_list_entries, const struct cpg_address *joined_list, size_t joined_list_entries)
Definition: cpg.c:329
char * crm_strdup_printf(char const *format,...) __attribute__((__format__(__printf__
GList * GListPtr
Definition: crm.h:190
#define crm_info(fmt, args...)
Definition: logging.h:252
gboolean cluster_connect_cpg(crm_cluster_t *cluster)
Definition: cpg.c:404
enum crm_ais_msg_types type
Definition: internal.h:83
gboolean local
Definition: internal.h:82