Commit 9a39dac0 authored by Felix Hamann's avatar Felix Hamann

bulk commit, see long description

- updated redundancy count is considered upon resend, closes #19
- sam_be_rmq: fixed bug where the connection retry count is reset
- fixed bug where trying to delete backends loops indefinitely
parent 1fdce68e
......@@ -49,11 +49,13 @@ sam_buf_destroy (
/// @brief Hand message over to store, get a key as the receipt
/// @param self A buf instance
/// @param msg A publishing request wrapped by sam_msg_t
/// @param count How many backends must acknowledge the message
/// @return A unique id used to identify the message
int
sam_buf_save (
sam_buf_t *self,
sam_msg_t *msg);
sam_msg_t *msg,
int count);
// --------------------------------------------------------------------------
......
......@@ -85,6 +85,8 @@ struct sam_t {
sam_stat_handle_t *stat; ///< handle to send metrics
zactor_t *actor; ///< thread maintaining broker connections
int TMP_COUNTER;
};
......@@ -126,6 +128,8 @@ remove_backend (
sam_be_rmq_destroy (&rabbit);
rc = 0;
}
be = zlist_next (state->backends);
}
return rc;
......@@ -153,7 +157,7 @@ handle_sig (
return rc;
}
sam_log_errorf ("got signal %d from '%s'!", code, be_name);
sam_log_errorf ("got signal 0x%x from '%s'!", code, be_name);
if (code == SAM_BE_SIG_KILL) {
rc = remove_backend (state, loop, be_name);
......@@ -174,27 +178,17 @@ handle_frontend_pub (
{
state_t *state = args;
int key;
sam_msg_t *msg;
int key, n;
sam_msg_t *msg; // only use thread safe methods!
zframe_t *id_frame;
sam_log_trace ("recv () frontend pub");
zsock_recv (pll, "ifp", &key, &id_frame, &msg);
zsock_recv (pll, "ifip", &key, &id_frame, &n, &msg);
// get mask containing already ack'd backends
uint64_t be_acks = *(uint64_t *) zframe_data (id_frame);
zframe_destroy (&id_frame);
char *distribution;
int rc = sam_msg_pop (msg, "s", &distribution);
assert (!rc);
int n = 1;
if (!strcmp (distribution, "redundant")) {
rc = sam_msg_pop (msg, "i", &n);
assert (!rc);
}
int backend_c = zlist_size (state->backends);
if (!backend_c) {
sam_log_trace ("discarding message, no backends available");
......@@ -202,10 +196,9 @@ handle_frontend_pub (
return 0;
}
sam_log_tracef (
"publish %s(%d), %d broker(s) available; 0x%" PRIx64 " ack'd",
distribution, n, backend_c, be_acks);
"publish to %d brokers, %d broker(s) available; 0x%" PRIx64 " ack'd",
n, backend_c, be_acks);
while (0 < n && 0 < backend_c) {
......@@ -237,7 +230,7 @@ handle_frontend_pub (
}
sam_msg_destroy (&msg);
return rc;
return 0;
}
......@@ -481,6 +474,8 @@ sam_new (
self->be_type = be_type;
state->backends = zlist_new ();
self->TMP_COUNTER = 0;
return self;
}
......@@ -705,6 +700,7 @@ error (
sam_msg_destroy (&msg);
sam_ret_t *ret = new_ret ();
ret->rc = -1;
ret->msg = error_msg;
......@@ -855,30 +851,28 @@ aggregate_backend_info (sam_t *self)
sam_log_trace ("recv () ctl internally (be.active)");
zsock_recv (self->ctl_req, "im", &backend_c, &backends);
// aggregate backend information
size_t buf_size = 512;
char *buf;
if (backend_c) {
buf = malloc (buf_size * backend_c * sizeof (char));
char *buf_ptr = buf;
if (!backend_c) {
buf = malloc (buf_size);
snprintf (buf, buf_size, "No backends connected");
return buf;
}
while (zmsg_size (backends)) {
char *str = zmsg_popstr (backends);
size_t str_len = strlen (str);
snprintf (buf_ptr, buf_size, "\n%s", str);
// aggregate backend information
buf = malloc (buf_size * backend_c * sizeof (char));
char *buf_ptr = buf;
buf_ptr += str_len;
free (str);
}
}
while (zmsg_size (backends)) {
char *str = zmsg_popstr (backends);
size_t str_len = strlen (str);
else {
char *str = "No backends connected.";
buf = malloc (strlen (str) * sizeof (char));
memcpy (buf, str, strlen (str));
snprintf (buf_ptr, buf_size, "\n%s", str);
buf_ptr += str_len;
free (str);
}
......@@ -894,7 +888,7 @@ aggregate_backend_info (sam_t *self)
head_len = strlen (head),
buf_len = strlen (buf);
size_t str_size = (head_len + buf_len + 1) * sizeof (char);
size_t str_size = (head_len + buf_len + buf_size) * sizeof (char);
char *str = malloc (str_size);
assert (str);
......@@ -968,21 +962,34 @@ sam_eval (
sam_stat (self->stat, "sam.publishing requests", 1);
// Create a copy of the message and pass that over to the
// store. Crafting a copy is necessary because the actor
// starts chewing up the message which is not thread safe. If
// the store is going to act completely synchronously, this
// operation may become obsolete...
sam_msg_t *dup = sam_msg_dup (msg);
int key = sam_buf_save (self->buf, dup);
// pass the message on for distribution, 0 backends ack'd already
// analyze distribution method and count
char *distribution;
int rc = sam_msg_pop (msg, "s", &distribution);
assert (!rc);
int n = 1;
if (!strcmp (distribution, "redundant")) {
rc = sam_msg_pop (msg, "i", &n);
assert (!rc);
}
// save to buffer
sam_msg_own (msg);
int key = sam_buf_save (self->buf, msg, n);
// pass the message on for distribution
// (0 backends ack'd already)
uint64_t be_acks = 0;
zframe_t *id_frame = zframe_new (&be_acks, sizeof (be_acks));
sam_log_tracef ("send () message '%d' internally", key);
zsock_send (self->frontend_pub, "ifp", key, id_frame, msg);
zsock_send (self->frontend_pub, "ifip", key, id_frame, n, msg);
// clean up
zframe_destroy (&id_frame);
return new_ret ();
}
......
......@@ -127,7 +127,7 @@ to_string (
self->connection.tries, opts->tries, opts->interval,
opts->heartbeat,
self->amqp.seq,
zlist_size (self->store));
(self->store)? zlist_size (self->store): 0);
return str;
}
......@@ -327,12 +327,14 @@ handle_reconnect (
// reconnect failed, set timer for next round
if (rc) {
uint64_t iv = self->connection.opts.interval;
sam_log_infof (
"reconnecting '%s' failed, next try in %u",
self->name, iv);
zloop_timer (loop, iv, 1, handle_reconnect, self);
if (self->connection.tries) {
uint64_t iv = self->connection.opts.interval;
sam_log_infof (
"reconnecting '%s' failed, next try in %u",
self->name, iv);
zloop_timer (loop, iv, 1, handle_reconnect, self);
}
}
......@@ -345,7 +347,7 @@ handle_reconnect (
}
// no tries left, shut down backend
else {
if (!self->connection.tries) {
zsock_send (
self->sock.sig, "is",
SAM_BE_SIG_KILL, self->name);
......@@ -687,6 +689,7 @@ sam_be_rmq_new (
"creating rabbitmq message backend (%s:%d)", name, id);
sam_be_rmq_t *self = malloc (sizeof (sam_be_rmq_t));
memset (self, 0, sizeof (sam_be_rmq_t));
assert (self);
self->name = malloc (strlen (name) * sizeof (char) + 1);
......@@ -702,6 +705,7 @@ sam_be_rmq_new (
self->amqp.method_channel = 2;
self->connection.established = false;
self->connection.tries = -2;
return self;
}
......@@ -768,8 +772,10 @@ sam_be_rmq_connect (
// save options for reconnects
memcpy (&self->connection.opts, opts, sizeof (sam_be_rmq_opts_t));
self->connection.tries = opts->tries;
if (self->connection.tries == -2) {
self->connection.tries = opts->tries;
}
// for re-initialize rabbitmq-c
if (self->amqp.connection) {
......
......@@ -169,7 +169,11 @@ del (
else if (header->type == RECORD_TOMBSTONE) {
prev_key = header->c.tombstone.prev;
}
else if (header->type == RECORD_ACK) {
prev_key = 0;
}
else {
sam_log_errorf ("unexpected record type: 0x%x", header->type);
assert (false);
}
......@@ -251,6 +255,8 @@ resend_condition (
state_t *state,
record_t *header)
{
assert (header);
if (header->type == RECORD) {
uint64_t eps = zclock_mono () - header->c.record.ts;
return (state->threshold < eps)? 0: -1;
......@@ -287,9 +293,10 @@ resend_message (
uint64_t be_acks = header->c.record.be_acks;
zframe_t *id_frame = zframe_new (&be_acks, sizeof (be_acks));
int msg_id = sam_db_get_key (db);
int count = header->c.record.acks_remaining;
sam_log_tracef ("resending msg '%d'", msg_id);
zsock_send (state->out, "ifp", msg_id, id_frame, msg);
zsock_send (state->out, "ifip", msg_id, id_frame, count, msg);
zframe_destroy (&id_frame);
return 0;
......@@ -322,42 +329,14 @@ record_size (
}
// --------------------------------------------------------------------------
/// Analyzes the publishing request and returns the number of
/// acknowledgements needed to consider the distribution guarantee
/// fulfilled.
static int
acks_remaining (
sam_msg_t *msg)
{
char *distribution;
int rc = sam_msg_get (msg, "s", &distribution);
assert (!rc);
if (!strcmp (distribution, "round robin")) {
rc = 1;
}
else if (!strcmp (distribution, "redundant")) {
int ack_c = -1;
sam_msg_get (msg, "?i", &ack_c);
rc = ack_c;
}
else {
assert (false);
}
free (distribution);
return rc;
}
// --------------------------------------------------------------------------
/// Create a fresh database record based on a sam_msg enclosed
/// publishing request.
static int
create_record_store (
state_t *state,
sam_msg_t *msg)
sam_msg_t *msg,
int count)
{
size_t size, header_size;
record_size (&size, &header_size, msg);
......@@ -377,7 +356,7 @@ create_record_store (
header->type = RECORD;
header->c.record.prev = 0;
header->c.record.acks_remaining = acks_remaining (msg);
header->c.record.acks_remaining = count;
header->c.record.be_acks = 0;
header->c.record.ts = zclock_mono ();
header->c.record.tries = state->tries;
......@@ -402,7 +381,8 @@ create_record_store (
static int
update_record_store (
state_t *state,
sam_msg_t *msg)
sam_msg_t *msg,
int count)
{
int rc = 0;
size_t total_size, header_size;
......@@ -417,7 +397,7 @@ update_record_store (
sam_log_tracef (
"ack already there, %d arrived already",
header->c.record.acks_remaining * -1);
header->c.record.acks_remaining += acks_remaining (msg);
header->c.record.acks_remaining += count;
// remove if there are no outstanding acks
if (!header->c.record.acks_remaining) {
......@@ -599,9 +579,10 @@ handle_storage_req (
state_t *state = args;
sam_db_t *db = state->db;
int count;
sam_msg_t *msg;
sam_log_trace ("recv () storage request");
zsock_recv (store_sock, "p", &msg);
zsock_recv (store_sock, "ip", &count, &msg);
int msg_id = create_msg_id (state);
assert (msg_id >= 0);
......@@ -619,16 +600,16 @@ handle_storage_req (
int rc = -1;
// record already there (ack arrived early), update data (this is
// not possible with the current implementation, but I leave it if
// for the future if the whole system acts more asynchronously)
// not possible with the current implementation, but I leave it
// for the future where the whole system may act more asynchronously)
if (ret == SAM_DB_OK) {
rc = update_record_store (state, msg);
rc = update_record_store (state, msg, count);
}
// record not yet there, create db entry
else if (ret == SAM_DB_NOTFOUND) {
// key was already set by get ()
rc = create_record_store (state, msg);
rc = create_record_store (state, msg, count);
}
sam_db_end (db, (rc)? true: false);
......@@ -980,10 +961,11 @@ sam_buf_destroy (
int
sam_buf_save (
sam_buf_t *self,
sam_msg_t *msg)
sam_msg_t *msg,
int count)
{
assert (self);
zsock_send (self->store_sock, "p", msg);
zsock_send (self->store_sock, "ip", count, msg);
int msg_id;
zsock_recv (self->store_sock, "i", &msg_id);
......
......@@ -95,14 +95,16 @@ send_ack (uint64_t be_id, int key)
// --------------------------------------------------------------------------
/// Finish composing the message and hand it over to sam_buf.
static int
save (zmsg_t *zmsg, const char *payload)
save (const char *payload, int count)
{
zmsg_t *zmsg = zmsg_new ();
zmsg_addstr (zmsg, "test-x");
zmsg_addstr (zmsg, "");
zmsg_addstr (zmsg, payload);
sam_msg_t *msg = sam_msg_new (&zmsg);
return sam_buf_save (buf, msg);
return sam_buf_save (buf, msg, count);
}
......@@ -137,21 +139,16 @@ eat ()
static int
save_roundrobin (const char *payload)
{
zmsg_t *zmsg = zmsg_new ();
zmsg_addstr (zmsg, "round robin");
return save (zmsg, payload);
return save (payload, 1);
}
// --------------------------------------------------------------------------
/// Save a sam_msg_t to sam_buf with redundant distribution type.
static int
save_redundant (const char *n, const char *payload)
save_redundant (const char *payload, int count)
{
zmsg_t *zmsg = zmsg_new ();
zmsg_addstr (zmsg, "redundant");
zmsg_addstr (zmsg, n);
return save (zmsg, payload);
return save (payload, count);
}
......@@ -225,7 +222,7 @@ START_TEST(test_buf_save_redundant)
sam_selftest_introduce ("test_buf_save_redundant");
// save
int key = save_redundant ("3", "redundant");
int key = save_redundant ("redundant", 3);
zclock_sleep (10);
// ack1, ack2, ack4
......@@ -253,7 +250,7 @@ START_TEST(test_buf_save_redundant_race)
zclock_sleep (10);
// save
int key = save_redundant ("2", "redundant race");
int key = save_redundant ("redundant race", 2);
ck_assert_int_eq (key, 1);
zclock_sleep (10);
......@@ -272,7 +269,7 @@ START_TEST(test_buf_save_redundant_idempotency)
sam_selftest_introduce ("test_buf_save_redundant_idempotency");
// save
save_redundant ("2", "redundant idempotency");
save_redundant ("redundant idempotency", 2);
zclock_sleep (10);
// ack1
......@@ -306,7 +303,7 @@ START_TEST(test_buf_save_redundant_race_idempotency)
zclock_sleep (10);
// save
save_redundant ("2", "redundant race idempotency");
save_redundant ("redundant race idempotency", 2);
zclock_sleep (10);
// ack 2
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment