Commit a2c6b266 authored by Felix Hamann's avatar Felix Hamann

Merge branch 'master' of github:dreadworks/samwise into ruby-c-client

parents 1082e0dc 3ea6b7d1
...@@ -64,6 +64,8 @@ typedef struct state_t { ...@@ -64,6 +64,8 @@ typedef struct state_t {
zsock_t *frontend_rpc; ///< reply socket for rpc requests zsock_t *frontend_rpc; ///< reply socket for rpc requests
zsock_t *frontend_pub; ///< pull socket for publishing requests zsock_t *frontend_pub; ///< pull socket for publishing requests
zlist_t *backends; ///< maintains backend handles zlist_t *backends; ///< maintains backend handles
sam_stat_handle_t *stat;
} state_t; } state_t;
...@@ -85,8 +87,6 @@ struct sam_t { ...@@ -85,8 +87,6 @@ struct sam_t {
sam_stat_handle_t *stat; ///< handle to send metrics sam_stat_handle_t *stat; ///< handle to send metrics
zactor_t *actor; ///< thread maintaining broker connections zactor_t *actor; ///< thread maintaining broker connections
int TMP_COUNTER;
}; };
...@@ -177,6 +177,7 @@ handle_frontend_pub ( ...@@ -177,6 +177,7 @@ handle_frontend_pub (
void *args) void *args)
{ {
state_t *state = args; state_t *state = args;
sam_stat (state->stat, "sam.publishing requests (total)", 1);
int key, n; int key, n;
sam_msg_t *msg; // only use thread safe methods! sam_msg_t *msg; // only use thread safe methods!
...@@ -192,6 +193,7 @@ handle_frontend_pub ( ...@@ -192,6 +193,7 @@ handle_frontend_pub (
int backend_c = zlist_size (state->backends); int backend_c = zlist_size (state->backends);
if (!backend_c) { if (!backend_c) {
sam_log_trace ("discarding message, no backends available"); sam_log_trace ("discarding message, no backends available");
sam_stat (state->stat, "sam.publishing requests (discarded)", 1);
sam_msg_destroy (&msg); sam_msg_destroy (&msg);
return 0; return 0;
} }
...@@ -201,10 +203,7 @@ handle_frontend_pub ( ...@@ -201,10 +203,7 @@ handle_frontend_pub (
n, backend_c, be_acks); n, backend_c, be_acks);
while (0 < n && 0 < backend_c) { while (n && backend_c) {
n -= 1;
backend_c -= 1;
sam_backend_t *backend = zlist_next (state->backends); sam_backend_t *backend = zlist_next (state->backends);
if (backend == NULL) { if (backend == NULL) {
...@@ -221,12 +220,19 @@ handle_frontend_pub ( ...@@ -221,12 +220,19 @@ handle_frontend_pub (
key, backend->name); key, backend->name);
zsock_send (backend->sock_pub, "ip", key, msg); zsock_send (backend->sock_pub, "ip", key, msg);
n -= 1;
sam_stat (state->stat, "sam.publishing requests (distributed)", 1);
} }
backend_c -= 1;
if (n && !backend_c) { if (n && !backend_c) {
sam_log_info ( sam_log_trace (
"discarding redundant msg, not enough backends available"); "discarding redundant msg, not enough backends available");
sam_stat (state->stat, "sam.publishing requests (discarded)", n);
} }
} }
sam_msg_destroy (&msg); sam_msg_destroy (&msg);
...@@ -382,6 +388,8 @@ actor ( ...@@ -382,6 +388,8 @@ actor (
sam_log_trace ("destroying loop"); sam_log_trace ("destroying loop");
zloop_destroy (&loop); zloop_destroy (&loop);
sam_stat_handle_destroy (&state->stat);
zsock_destroy (&state->frontend_pub); zsock_destroy (&state->frontend_pub);
zsock_destroy (&state->frontend_rpc); zsock_destroy (&state->frontend_rpc);
...@@ -427,7 +435,7 @@ sam_new ( ...@@ -427,7 +435,7 @@ sam_new (
self->stat_actor = sam_stat_new (); self->stat_actor = sam_stat_new ();
self->stat = sam_stat_handle_new (); self->stat = sam_stat_handle_new ();
state->stat = sam_stat_handle_new ();
// publishing requests // publishing requests
self->frontend_pub_endpoint = "inproc://sam-pub"; self->frontend_pub_endpoint = "inproc://sam-pub";
...@@ -475,7 +483,6 @@ sam_new ( ...@@ -475,7 +483,6 @@ sam_new (
state->backends = zlist_new (); state->backends = zlist_new ();
self->TMP_COUNTER = 0;
return self; return self;
} }
...@@ -620,11 +627,16 @@ init_backends ( ...@@ -620,11 +627,16 @@ init_backends (
names_ptr = names; names_ptr = names;
opts_ptr = opts; opts_ptr = opts;
if (rc || !count) { if (rc) {
sam_log_error ("there are no backends to initialize"); sam_log_error ("backends could not be loaded, "
"check the configuration for errors");
return -1; return -1;
} }
if (!count) {
return 0;
}
while (count) { while (count) {
const char *name = *names_ptr; const char *name = *names_ptr;
sam_backend_t *be = sam_be_create (self, name, opts_ptr); sam_backend_t *be = sam_be_create (self, name, opts_ptr);
...@@ -633,7 +645,9 @@ init_backends ( ...@@ -633,7 +645,9 @@ init_backends (
zsock_send (self->ctl_req, "sp", "be.add", be); zsock_send (self->ctl_req, "sp", "be.add", be);
rc = -1; rc = -1;
sam_log_tracef ("recv () for return code of 'be.add' for '%s'", name); sam_log_tracef (
"recv () for return code of 'be.add' for '%s'", name);
zsock_recv (self->ctl_req, "i", &rc); zsock_recv (self->ctl_req, "i", &rc);
if (rc) { if (rc) {
sam_log_errorf ( sam_log_errorf (
...@@ -860,7 +874,7 @@ aggregate_backend_info (sam_t *self) ...@@ -860,7 +874,7 @@ aggregate_backend_info (sam_t *self)
if (!backend_c) { if (!backend_c) {
buf = malloc (buf_size); buf = malloc (buf_size);
snprintf (buf, buf_size, "No backends connected"); snprintf (buf, buf_size, "No backends registered");
return buf; return buf;
} }
...@@ -883,7 +897,7 @@ aggregate_backend_info (sam_t *self) ...@@ -883,7 +897,7 @@ aggregate_backend_info (sam_t *self)
snprintf ( snprintf (
head, head,
buf_size, buf_size,
"%d backend(s) connected:", "%d backend(s) registered:",
backend_c); backend_c);
size_t size_t
...@@ -962,7 +976,7 @@ sam_eval ( ...@@ -962,7 +976,7 @@ sam_eval (
return error (msg, "malformed publishing request"); return error (msg, "malformed publishing request");
} }
sam_stat (self->stat, "sam.publishing requests", 1); sam_stat (self->stat, "sam.publishing requests (clients)", 1);
// analyze distribution method and count // analyze distribution method and count
......
...@@ -73,6 +73,7 @@ typedef struct state_t { ...@@ -73,6 +73,7 @@ typedef struct state_t {
// data to be restored after restart // data to be restored after restart
int seq; ///< used to assign unique message id's int seq; ///< used to assign unique message id's
int last_stored; ///< used to decide if it's a premature ack int last_stored; ///< used to decide if it's a premature ack
int tombstone_zone; ///< to skip tombstones upon re-send
sam_db_t *db; ///< storage engine sam_db_t *db; ///< storage engine
...@@ -83,6 +84,8 @@ typedef struct state_t { ...@@ -83,6 +84,8 @@ typedef struct state_t {
int tries; ///< maximum number of retries for a message int tries; ///< maximum number of retries for a message
uint64_t interval; ///< how often messages are being tried again uint64_t interval; ///< how often messages are being tried again
uint64_t threshold; ///< at which point messages are tried again uint64_t threshold; ///< at which point messages are tried again
sam_stat_handle_t *stat;
} state_t; } state_t;
...@@ -157,8 +160,6 @@ del ( ...@@ -157,8 +160,6 @@ del (
sam_db_t *db = state->db; sam_db_t *db = state->db;
do { do {
sam_db_del (db);
// determine previous tombstone - if any // determine previous tombstone - if any
record_t *header; record_t *header;
sam_db_get_val (db, NULL, (void **) &header); sam_db_get_val (db, NULL, (void **) &header);
...@@ -177,6 +178,9 @@ del ( ...@@ -177,6 +178,9 @@ del (
assert (false); assert (false);
} }
sam_db_del (db);
sam_log_errorf ("deleting %d", curr_key);
// prepare next round // prepare next round
if (prev_key) { if (prev_key) {
curr_key = prev_key; curr_key = prev_key;
...@@ -220,6 +224,7 @@ insert_tombstone ( ...@@ -220,6 +224,7 @@ insert_tombstone (
// write new key // write new key
sam_db_set_key (db, position); sam_db_set_key (db, position);
state->tombstone_zone = *position;
// write new data // write new data
return sam_db_put (db, size, (void *) &tombstone); return sam_db_put (db, size, (void *) &tombstone);
...@@ -237,10 +242,12 @@ update_record_tries ( ...@@ -237,10 +242,12 @@ update_record_tries (
header->c.record.tries -= 1; header->c.record.tries -= 1;
if (!header->c.record.tries) { if (!header->c.record.tries) {
sam_log_infof ( sam_log_tracef (
"discarding message '%d'", sam_db_get_key (state->db)); "discarding message '%d'", sam_db_get_key (state->db));
del (state); del (state);
sam_stat (state->stat, "buf.discarded messages", 1);
return -1; return -1;
} }
...@@ -249,7 +256,7 @@ update_record_tries ( ...@@ -249,7 +256,7 @@ update_record_tries (
// -------------------------------------------------------------------------- // --------------------------------------------------------------------------
/// Checks if the record is inside the bounds of messages to be resent. /// Checks if the record is inside the bounds of messages to be re-sent.
static int static int
resend_condition ( resend_condition (
state_t *state, state_t *state,
...@@ -258,8 +265,8 @@ resend_condition ( ...@@ -258,8 +265,8 @@ resend_condition (
assert (header); assert (header);
if (header->type == RECORD) { if (header->type == RECORD) {
uint64_t eps = zclock_mono () - header->c.record.ts; int64_t eps = zclock_mono () - header->c.record.ts;
return (state->threshold < eps)? 0: -1; return ((int64_t) state->threshold < eps)? 0: -1;
} }
return 0; return 0;
...@@ -295,7 +302,7 @@ resend_message ( ...@@ -295,7 +302,7 @@ resend_message (
int msg_id = sam_db_get_key (db); int msg_id = sam_db_get_key (db);
int count = header->c.record.acks_remaining; int count = header->c.record.acks_remaining;
sam_log_tracef ("resending msg '%d'", msg_id); sam_log_tracef ("re-sending msg '%d'", msg_id);
zsock_send (state->out, "ifip", msg_id, id_frame, count, msg); zsock_send (state->out, "ifip", msg_id, id_frame, count, msg);
zframe_destroy (&id_frame); zframe_destroy (&id_frame);
...@@ -303,7 +310,6 @@ resend_message ( ...@@ -303,7 +310,6 @@ resend_message (
} }
// -------------------------------------------------------------------------- // --------------------------------------------------------------------------
/// Determines how much space is needed to store a record in a /// Determines how much space is needed to store a record in a
/// continuous block of memory. If the sam_msg is null, just the space /// continuous block of memory. If the sam_msg is null, just the space
...@@ -610,10 +616,12 @@ handle_storage_req ( ...@@ -610,10 +616,12 @@ handle_storage_req (
else if (ret == SAM_DB_NOTFOUND) { else if (ret == SAM_DB_NOTFOUND) {
// key was already set by get () // key was already set by get ()
rc = create_record_store (state, msg, count); rc = create_record_store (state, msg, count);
sam_stat (state->stat, "buf.created records", 1);
} }
sam_db_end (db, (rc)? true: false); sam_db_end (db, (rc)? true: false);
sam_msg_destroy (&msg); sam_msg_destroy (&msg);
return rc; return rc;
} }
...@@ -650,35 +658,46 @@ handle_backend_req ( ...@@ -650,35 +658,46 @@ handle_backend_req (
be_id, msg_id); be_id, msg_id);
rc = handle_ack (state, be_id, msg_id); rc = handle_ack (state, be_id, msg_id);
sam_stat (state->stat, "buf.acknowledgments", 1);
return rc; return rc;
} }
// -------------------------------------------------------------------------- // --------------------------------------------------------------------------
/// Checks in a fixed interval if messages need to be resent. /// Checks in a fixed interval if messages need to be re-sent.
static int static int
handle_resend ( handle_resend (
zloop_t *loop UU, zloop_t *loop UU,
int timer_id UU, int timer_id UU,
void *args) void *args)
{ {
sam_log_trace ("resend cycle triggered"); sam_log_trace ("re-send cycle triggered");
state_t *state = args; state_t *state = args;
sam_db_t *db = state->db; sam_db_t *db = state->db;
sam_db_begin (db); sam_db_begin (db);
sam_log_trace ("beginning transaction");
int first_requeued_key = 0; // can never be zero int first_requeued_key = 0; // can never be zero
int rc = sam_db_sibling (db, SAM_DB_NEXT); sam_db_ret_t rc = SAM_DB_NOTFOUND;
// skip tombstones if there are any
if (state->tombstone_zone) {
rc = sam_db_get (db, &state->tombstone_zone);
} else {
rc = sam_db_sibling (db, SAM_DB_NEXT);
}
record_t *header; record_t *header;
if (!rc) { if (!rc) {
sam_db_get_val (db, NULL, (void **) &header); sam_db_get_val (db, NULL, (void **) &header);
} }
while ( while (
!rc && // there's another item !rc && // there's another item
!resend_condition (state, header) && // check threshold !resend_condition (state, header) && // check threshold
...@@ -736,7 +755,6 @@ handle_resend ( ...@@ -736,7 +755,6 @@ handle_resend (
break; break;
} }
// create tombstone // create tombstone
// resets cursor position // resets cursor position
if (insert_tombstone (state, prev_id, &cur_id)) { if (insert_tombstone (state, prev_id, &cur_id)) {
...@@ -744,6 +762,7 @@ handle_resend ( ...@@ -744,6 +762,7 @@ handle_resend (
break; break;
} }
sam_stat (state->stat, "buf.re-sent messages", 1);
rc = sam_db_sibling (db, SAM_DB_NEXT); rc = sam_db_sibling (db, SAM_DB_NEXT);
} }
...@@ -792,6 +811,8 @@ actor ( ...@@ -792,6 +811,8 @@ actor (
zsock_destroy (&state->out); zsock_destroy (&state->out);
zsock_destroy (&state->store_sock); zsock_destroy (&state->store_sock);
sam_stat_handle_destroy (&state->stat);
free (state); free (state);
} }
...@@ -806,6 +827,7 @@ sam_db_restore ( ...@@ -806,6 +827,7 @@ sam_db_restore (
{ {
state->seq = 0; state->seq = 0;
state->last_stored = 0; state->last_stored = 0;
state->tombstone_zone = 0;
sam_db_t *db = state->db; sam_db_t *db = state->db;
if (sam_db_begin (db)) { if (sam_db_begin (db)) {
...@@ -922,6 +944,8 @@ sam_buf_new ( ...@@ -922,6 +944,8 @@ sam_buf_new (
goto abort; goto abort;
} }
state->stat = sam_stat_handle_new ();
// spawn actor // spawn actor
self->actor = zactor_new (actor, state); self->actor = zactor_new (actor, state);
sam_log_info ("created buffer instance"); sam_log_info ("created buffer instance");
......
...@@ -485,6 +485,7 @@ read_backends_rmq ( ...@@ -485,6 +485,7 @@ read_backends_rmq (
"interval", &interval_str); "interval", &interval_str);
if (rc) { if (rc) {
free (names);
free (opts); free (opts);
return rc; return rc;
} }
......
...@@ -83,6 +83,21 @@ PRINT_DB_INFO (sam_db_t *self) ...@@ -83,6 +83,21 @@ PRINT_DB_INFO (sam_db_t *self)
*/ */
static int
bt_compare_int (
DB *dbp UU,
const DBT *a,
const DBT *b,
size_t *size UU)
{
int
ai = *(int *) a->data,
bi = *(int *) b->data;
return ai - bi;
}
// -------------------------------------------------------------------------- // --------------------------------------------------------------------------
/// Generic error handler invoked by DB. /// Generic error handler invoked by DB.
static void static void
...@@ -184,7 +199,9 @@ sam_db_new ( ...@@ -184,7 +199,9 @@ sam_db_new (
return NULL; return NULL;
} }
rc = self->dbp->open ( self->dbp->set_bt_compare (self->dbp, bt_compare_int);
rc = self->dbp->open (
self->dbp, self->dbp,
NULL, // transaction pointer NULL, // transaction pointer
fname, // on disk file fname, // on disk file
...@@ -199,7 +216,6 @@ sam_db_new ( ...@@ -199,7 +216,6 @@ sam_db_new (
return NULL; return NULL;
} }
stat_db_size (self); stat_db_size (self);
self->dbp->set_errcall (self->dbp, db_error_handler); self->dbp->set_errcall (self->dbp, db_error_handler);
return self; return self;
......
...@@ -46,6 +46,7 @@ typedef struct state_t { ...@@ -46,6 +46,7 @@ typedef struct state_t {
struct { struct {
zhash_t *sam; zhash_t *sam;
zhash_t *samd; zhash_t *samd;
zhash_t *buf;
} metrics; } metrics;
} state_t; } state_t;
...@@ -91,7 +92,7 @@ resolve ( ...@@ -91,7 +92,7 @@ resolve (
if (tok) { if (tok) {
// retrieve map // retrieve map
zhash_t *map; zhash_t *map = NULL;
if (!strcmp (tok, "sam")) { if (!strcmp (tok, "sam")) {
map = state->metrics.sam; map = state->metrics.sam;
...@@ -99,10 +100,11 @@ resolve ( ...@@ -99,10 +100,11 @@ resolve (
else if (!strcmp (tok, "samd")) { else if (!strcmp (tok, "samd")) {
map = state->metrics.samd; map = state->metrics.samd;
} }
else { else if (!strcmp (tok, "buf")) {
assert (false); map = state->metrics.buf;
} }
assert (map);
// insert/update item // insert/update item
char *key = strtok (NULL, delim); char *key = strtok (NULL, delim);
...@@ -169,12 +171,14 @@ handle_digest ( ...@@ -169,12 +171,14 @@ handle_digest (
zhash_t *map_refs [] = { zhash_t *map_refs [] = {
state->metrics.samd, state->metrics.samd,
state->metrics.sam, state->metrics.sam,
state->metrics.buf,
NULL NULL
}; };
const char *map_names [] = { const char *map_names [] = {
"samd", "samd",
"sam", "sam",
"buffer",
NULL NULL
}; };
...@@ -241,6 +245,9 @@ actor ( ...@@ -241,6 +245,9 @@ actor (
state.metrics.samd = zhash_new (); state.metrics.samd = zhash_new ();
zhash_set_destructor (state.metrics.samd, free_metric); zhash_set_destructor (state.metrics.samd, free_metric);
state.metrics.buf = zhash_new ();
zhash_set_destructor (state.metrics.buf, free_metric);
// initialize reactor // initialize reactor
zloop_reader (loop, pipe, sam_gen_handle_pipe, NULL); zloop_reader (loop, pipe, sam_gen_handle_pipe, NULL);
...@@ -263,6 +270,7 @@ actor ( ...@@ -263,6 +270,7 @@ actor (
zhash_destroy (&state.metrics.sam); zhash_destroy (&state.metrics.sam);
zhash_destroy (&state.metrics.samd); zhash_destroy (&state.metrics.samd);
zhash_destroy (&state.metrics.buf);
} }
......
...@@ -90,15 +90,17 @@ handle_req ( ...@@ -90,15 +90,17 @@ handle_req (
} }
sam_log_tracef ("sending reply to client (%d)", ret->rc); sam_log_tracef ("sending reply to client (%d)", ret->rc);
zsock_send (client_rep, "is", ret->rc, ret->msg);
int rc = (ret->rc == SAM_RET_RESTART)? 0: ret->rc;
zsock_send (client_rep, "is", rc, ret->msg);
if (ret->allocated) { if (ret->allocated) {
free (ret->msg); free (ret->msg);
} }
int rc = ret->rc; rc = (ret->rc == SAM_RET_RESTART)? -1: 0;
free (ret); free (ret);
return (rc == SAM_RET_RESTART)? -1: 0; return rc;
} }
...@@ -160,7 +162,6 @@ samd_new ( ...@@ -160,7 +162,6 @@ samd_new (
goto abort; goto abort;
} }
sam_log_info ("created samd"); sam_log_info ("created samd");
return self; return self;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment