Commit 3a51e1d5 authored by Felix Hamann's avatar Felix Hamann

Merge branch 'master' into ruby-c-client

parents e85cb9db 3b775cad
......@@ -502,7 +502,10 @@ sam_destroy (
sam_stat_handle_destroy (&(*self)->stat);
sam_stat_destroy (&(*self)->stat_actor);
sam_cfg_destroy (&(*self)->cfg);
if ((*self)->cfg) {
sam_cfg_destroy (&(*self)->cfg);
}
free (*self);
*self = NULL;
......@@ -669,6 +672,7 @@ sam_init (
assert (*cfg);
if (self->cfg) {
sam_log_error ("DESTROY");
sam_cfg_destroy (&self->cfg);
}
......@@ -860,7 +864,6 @@ aggregate_backend_info (sam_t *self)
return buf;
}
// aggregate backend information
buf = malloc (buf_size * backend_c * sizeof (char));
char *buf_ptr = buf;
......@@ -875,7 +878,6 @@ aggregate_backend_info (sam_t *self)
free (str);
}
// compose final string
char head [buf_size];
snprintf (
......
......@@ -106,7 +106,7 @@ struct sam_be_rmq_t {
/// Return a string representation of the current backend
/// state. Memory must be free'd by the caller.
static char *
to_string (
be_to_string (
sam_backend_t *be)
{
size_t buf_size = 512;
......@@ -127,7 +127,7 @@ to_string (
self->connection.tries, opts->tries, opts->interval,
opts->heartbeat,
self->amqp.seq,
(self->store)? zlist_size (self->store): 0);
(self->store)? zlist_size (self->store): (size_t) 0);
return str;
}
......@@ -276,9 +276,9 @@ handle_amqp (
// this must be handled
if (frame.payload.method.id != AMQP_BASIC_ACK_METHOD) {
sam_log_errorf (
"got something different than an ack: %d",
"got something different than an ack: 0x%x",
frame.payload.method.id);
assert (false);
return connection_loss (self, loop);
}
// handle acknowledgement
......@@ -311,6 +311,8 @@ handle_reconnect (
{
sam_be_rmq_t *self = args;
sam_log_errorf ("handle reconnect for '%s', %d", self->name, self->connection.tries);
// invoke callback immediately
if (self->connection.tries) {
if (self->connection.tries > 0) {
......@@ -703,9 +705,11 @@ sam_be_rmq_new (
memset (&self->amqp.connection, 0, sizeof (amqp_connection_state_t));
self->amqp.message_channel = 1;
self->amqp.method_channel = 2;
self->amqp.seq = 0;
self->connection.established = false;
self->connection.tries = -2;
return self;
}
......@@ -873,6 +877,7 @@ sam_be_rmq_connect (
self->store = zlist_new ();
zlist_set_destructor (self->store, free_store_item);
self->connection.tries = opts->tries;
return 0;
}
......@@ -1089,7 +1094,7 @@ sam_be_rmq_start (
assert (backend);
backend->name = (*self)->name;
backend->id = (*self)->id;
backend->str = to_string;
backend->str = be_to_string;
// signals
......
......@@ -83,6 +83,8 @@ handle_req (
}
else {
sam_stat (self->stat, "samd.valid requests", 1);
sam_msg_t *msg = sam_msg_new (&zmsg);
ret = sam_eval (self->sam, msg);
}
......@@ -126,40 +128,51 @@ samd_new (
samd_t *self = malloc (sizeof (samd_t));
assert (self);
self->stat = sam_stat_handle_new ();
sam_cfg_t *cfg = sam_cfg_new (cfg_file);
if (!cfg) {
return NULL;
goto abort;
}
char *endpoint;
int rc = sam_cfg_endpoint (cfg, &endpoint);
if (rc) {
goto abort;
}
sam_be_t be_type;
int rc = sam_cfg_be_type (cfg, &be_type);
rc = sam_cfg_be_type (cfg, &be_type);
self->sam = sam_new (be_type);
assert (self->sam);
char *endpoint;
rc = sam_cfg_endpoint (cfg, &endpoint);
if (rc) {
return NULL;
self->client_rep = zsock_new_rep (endpoint);
if (!self->client_rep) {
sam_log_errorf ("could not bind endpoint '%s'", endpoint);
goto abort;
}
self->client_rep = zsock_new_rep (endpoint);
assert (self->client_rep);
sam_log_tracef ("bound public endpoint '%s'", endpoint);
rc = sam_init (self->sam, &cfg);
if (rc) {
if (cfg) {
sam_cfg_destroy (&cfg);
}
samd_destroy (&self);
return NULL;
goto abort;
}
self->stat = sam_stat_handle_new ();
sam_log_info ("created samd");
return self;
abort:
samd_destroy (&self);
if (cfg) {
sam_cfg_destroy (&cfg);
}
return NULL;
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment