Commit 6726070a authored by Felix Hamann's avatar Felix Hamann

gathering more metrics (added buffer)

parent 940789ff
......@@ -193,6 +193,7 @@ handle_frontend_pub (
int backend_c = zlist_size (state->backends);
if (!backend_c) {
sam_log_trace ("discarding message, no backends available");
sam_stat (state->stat, "sam.publishing requests (discarded)", 1);
sam_msg_destroy (&msg);
return 0;
}
......@@ -228,6 +229,7 @@ handle_frontend_pub (
if (n && !backend_c) {
sam_log_info (
"discarding redundant msg, not enough backends available");
sam_stat (state->stat, "sam.publishing requests (discarded)", 1);
}
}
......
......@@ -83,6 +83,8 @@ typedef struct state_t {
int tries; ///< maximum number of retries for a message
uint64_t interval; ///< how often messages are being tried again
uint64_t threshold; ///< at which point messages are tried again
sam_stat_handle_t *stat;
} state_t;
......@@ -241,6 +243,8 @@ update_record_tries (
"discarding message '%d'", sam_db_get_key (state->db));
del (state);
sam_stat (state->stat, "buf.discarded messages", 1);
return -1;
}
......@@ -249,7 +253,7 @@ update_record_tries (
// --------------------------------------------------------------------------
/// Checks if the record is inside the bounds of messages to be resent.
/// Checks if the record is inside the bounds of messages to be re-sent.
static int
resend_condition (
state_t *state,
......@@ -295,7 +299,7 @@ resend_message (
int msg_id = sam_db_get_key (db);
int count = header->c.record.acks_remaining;
sam_log_tracef ("resending msg '%d'", msg_id);
sam_log_tracef ("re-sending msg '%d'", msg_id);
zsock_send (state->out, "ifip", msg_id, id_frame, count, msg);
zframe_destroy (&id_frame);
......@@ -610,10 +614,12 @@ handle_storage_req (
else if (ret == SAM_DB_NOTFOUND) {
// key was already set by get ()
rc = create_record_store (state, msg, count);
sam_stat (state->stat, "buf.created records", 1);
}
sam_db_end (db, (rc)? true: false);
sam_msg_destroy (&msg);
return rc;
}
......@@ -650,20 +656,21 @@ handle_backend_req (
be_id, msg_id);
rc = handle_ack (state, be_id, msg_id);
sam_stat (state->stat, "buf.acknowledgments", 1);
return rc;
}
// --------------------------------------------------------------------------
/// Checks in a fixed interval if messages need to be resent.
/// Checks in a fixed interval if messages need to be re-sent.
static int
handle_resend (
zloop_t *loop UU,
int timer_id UU,
void *args)
{
sam_log_trace ("resend cycle triggered");
sam_log_trace ("re-send cycle triggered");
state_t *state = args;
sam_db_t *db = state->db;
......@@ -736,7 +743,6 @@ handle_resend (
break;
}
// create tombstone
// resets cursor position
if (insert_tombstone (state, prev_id, &cur_id)) {
......@@ -744,6 +750,7 @@ handle_resend (
break;
}
sam_stat (state->stat, "buf.re-sent messages", 1);
rc = sam_db_sibling (db, SAM_DB_NEXT);
}
......@@ -792,6 +799,8 @@ actor (
zsock_destroy (&state->out);
zsock_destroy (&state->store_sock);
sam_stat_handle_destroy (&state->stat);
free (state);
}
......@@ -922,6 +931,8 @@ sam_buf_new (
goto abort;
}
state->stat = sam_stat_handle_new ();
// spawn actor
self->actor = zactor_new (actor, state);
sam_log_info ("created buffer instance");
......
......@@ -46,6 +46,7 @@ typedef struct state_t {
struct {
zhash_t *sam;
zhash_t *samd;
zhash_t *buf;
} metrics;
} state_t;
......@@ -91,7 +92,7 @@ resolve (
if (tok) {
// retrieve map
zhash_t *map;
zhash_t *map = NULL;
if (!strcmp (tok, "sam")) {
map = state->metrics.sam;
......@@ -99,10 +100,11 @@ resolve (
else if (!strcmp (tok, "samd")) {
map = state->metrics.samd;
}
else {
assert (false);
else if (!strcmp (tok, "buf")) {
map = state->metrics.buf;
}
assert (map);
// insert/update item
char *key = strtok (NULL, delim);
......@@ -169,12 +171,14 @@ handle_digest (
zhash_t *map_refs [] = {
state->metrics.samd,
state->metrics.sam,
state->metrics.buf,
NULL
};
const char *map_names [] = {
"samd",
"sam",
"buffer",
NULL
};
......@@ -241,6 +245,9 @@ actor (
state.metrics.samd = zhash_new ();
zhash_set_destructor (state.metrics.samd, free_metric);
state.metrics.buf = zhash_new ();
zhash_set_destructor (state.metrics.buf, free_metric);
// initialize reactor
zloop_reader (loop, pipe, sam_gen_handle_pipe, NULL);
......@@ -263,6 +270,7 @@ actor (
zhash_destroy (&state.metrics.sam);
zhash_destroy (&state.metrics.samd);
zhash_destroy (&state.metrics.buf);
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment