Commit 1082e0dc authored by Felix Hamann's avatar Felix Hamann

added rabbitmq and amqp options

parent 3a51e1d5
......@@ -53,6 +53,29 @@ typedef struct samwise_pub_t {
char *exchange;
char *routing_key;
int mandatory;
int immediate;
struct {
char *content_type;
char *content_encoding;
char *delivery_mode;
char *priority;
char *correlation_id;
char *reply_to;
char *expiration;
char *message_id;
char *type;
char *user_id;
char *app_id;
char *cluster_id;
} options;
struct {
char **keys;
char **values;
size_t amount;
} headers;
char *msg;
size_t size;
......
......@@ -115,16 +115,15 @@ publish (
char buf [64];
snprintf (buf, 64, "message no %d", count);
samwise_pub_t pub = {
.disttype = args->disttype,
.distcount = args->d,
samwise_pub_t pub;
memset (&pub, 0, sizeof (samwise_pub_t));
.exchange = "amq.direct",
.routing_key = "",
pub.disttype = args->disttype;
pub.distcount = args->d;
.size = strlen (buf),
.msg = buf
};
pub.exchange = "amq.direct";
pub.size = strlen (buf);
pub.msg = buf;
sprintf (buf, "publishing message %d", count);
out (VERBOSE, state->args, buf);
......
......@@ -67,6 +67,9 @@ samwise_publish (
zmsg_t *msg = create_msg ();
zmsg_addstr (msg, "publish");
size_t buf_size = 256;
char buf [buf_size];
// distribution type
if (pub->disttype == SAMWISE_ROUNDROBIN) {
......@@ -74,9 +77,7 @@ samwise_publish (
}
else if (pub->disttype == SAMWISE_REDUNDANT) {
zmsg_addstr (msg, "redundant");
char buf [128];
snprintf (buf, 128, "%d", pub->distcount);
snprintf (buf, buf_size, "%d", pub->distcount);
zmsg_addstr (msg, buf);
}
......@@ -86,33 +87,56 @@ samwise_publish (
// amqp options
assert (pub->exchange);
zmsg_addstr (msg, pub->exchange);
zmsg_addstr (msg, pub->routing_key);
zmsg_addstr (msg, "0"); // mandatory
zmsg_addstr (msg, "0"); // immediate
zmsg_addstr (msg, (pub->routing_key)? pub->routing_key: "");
snprintf (buf, buf_size, "%d", pub->mandatory);
zmsg_addstr (msg, buf);
snprintf (buf, buf_size, "%d", pub->immediate);
zmsg_addstr (msg, buf);
// rabbitmq options
zmsg_addstr (msg, "12");
zmsg_addmem (msg, NULL, 0);
zmsg_addmem (msg, NULL, 0);
zmsg_addmem (msg, NULL, 0);
zmsg_addmem (msg, NULL, 0);
zmsg_addmem (msg, NULL, 0);
zmsg_addmem (msg, NULL, 0);
zmsg_addmem (msg, NULL, 0);
zmsg_addmem (msg, NULL, 0);
zmsg_addmem (msg, NULL, 0);
zmsg_addmem (msg, NULL, 0);
zmsg_addmem (msg, NULL, 0);
zmsg_addmem (msg, NULL, 0);
size_t amount = 12;
while (amount > 0) {
char **opt = &pub->options.content_type;
zmsg_addstr (msg, *opt? *opt: "");
amount -= 1;
}
// header
zmsg_addstr (msg, "0");
if (!pub->headers.amount) {
zmsg_addstr (msg, "0");
}
else {
snprintf (buf, buf_size, "%zu", pub->headers.amount);
zmsg_addstr (msg, buf);
char
**keys = pub->headers.keys,
**values = pub->headers.values;
size_t amount = pub->headers.amount;
while (amount) {
zmsg_addstr (msg, *keys);
zmsg_addstr (msg, *values);
amount -= 1;
keys += 1;
values += 1;
}
}
// payload
assert (pub->msg);
assert (pub->size);
zmsg_addmem (msg, pub->msg, pub->size);
zmsg_send (&msg, self->req);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment