Commit 3ea6b7d1 authored by Felix Hamann's avatar Felix Hamann

skip previous tombstones

closes #28
parent 76d3f5af
......@@ -73,6 +73,7 @@ typedef struct state_t {
// data to be restored after restart
int seq; ///< used to assign unique message id's
int last_stored; ///< used to decide if it's a premature ack
int tombstone_zone; ///< to skip tombstones upon re-send
sam_db_t *db; ///< storage engine
......@@ -159,8 +160,6 @@ del (
sam_db_t *db = state->db;
do {
sam_db_del (db);
// determine previous tombstone - if any
record_t *header;
sam_db_get_val (db, NULL, (void **) &header);
......@@ -179,6 +178,9 @@ del (
assert (false);
}
sam_db_del (db);
sam_log_errorf ("deleting %d", curr_key);
// prepare next round
if (prev_key) {
curr_key = prev_key;
......@@ -222,6 +224,7 @@ insert_tombstone (
// write new key
sam_db_set_key (db, position);
state->tombstone_zone = *position;
// write new data
return sam_db_put (db, size, (void *) &tombstone);
......@@ -262,8 +265,8 @@ resend_condition (
assert (header);
if (header->type == RECORD) {
uint64_t eps = zclock_mono () - header->c.record.ts;
return (state->threshold < eps)? 0: -1;
int64_t eps = zclock_mono () - header->c.record.ts;
return ((int64_t) state->threshold < eps)? 0: -1;
}
return 0;
......@@ -307,7 +310,6 @@ resend_message (
}
// --------------------------------------------------------------------------
/// Determines how much space is needed to store a record in a
/// continuous block of memory. If the sam_msg is null, just the space
......@@ -676,16 +678,26 @@ handle_resend (
sam_db_t *db = state->db;
sam_db_begin (db);
sam_log_trace ("beginning transaction");
int first_requeued_key = 0; // can never be zero
int rc = sam_db_sibling (db, SAM_DB_NEXT);
sam_db_ret_t rc = SAM_DB_NOTFOUND;
// skip tombstones if there are any
if (state->tombstone_zone) {
rc = sam_db_get (db, &state->tombstone_zone);
} else {
rc = sam_db_sibling (db, SAM_DB_NEXT);
}
record_t *header;
if (!rc) {
sam_db_get_val (db, NULL, (void **) &header);
}
while (
!rc && // there's another item
!resend_condition (state, header) && // check threshold
......@@ -815,6 +827,7 @@ sam_db_restore (
{
state->seq = 0;
state->last_stored = 0;
state->tombstone_zone = 0;
sam_db_t *db = state->db;
if (sam_db_begin (db)) {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment