Commit c1cf1fa7 authored by Felix Hamann's avatar Felix Hamann

added option: distribution type

parent 680daf04
......@@ -39,9 +39,18 @@ extern "C" {
typedef struct samwise_t samwise_t;
typedef enum {
SAMWISE_ROUNDROBIN,
SAMWISE_REDUNDANT
} samwise_disttype_t;
/// publishing request options
typedef struct samwise_pub_t {
samwise_disttype_t disttype; ///< either round robin or redundant
int distcount; ///< for disttype=SAMWISE_REDUNDANT
char *exchange;
char *routing_key;
......@@ -73,9 +82,11 @@ samwise_ping (
// --------------------------------------------------------------------------
/// @brief Create a new samwise instance
/// @param endpoint Public endpoint of a samd instance
/// @return The newly created instance
samwise_t *
samwise_new ();
samwise_new (
const char *endpoint);
// --------------------------------------------------------------------------
......
......@@ -65,16 +65,34 @@ samwise_publish (
samwise_pub_t *pub)
{
zmsg_t *msg = create_msg ();
zmsg_addstr (msg, "publish");
zmsg_addstr (msg, "round robin");
// distribution type
if (pub->disttype == SAMWISE_ROUNDROBIN) {
zmsg_addstr (msg, "round robin");
}
else if (pub->disttype == SAMWISE_REDUNDANT) {
zmsg_addstr (msg, "redundant");
char buf [128];
snprintf (buf, 128, "%d", pub->distcount);
zmsg_addstr (msg, buf);
}
else {
assert (false);
}
// amqp options
zmsg_addstr (msg, pub->exchange);
zmsg_addstr (msg, pub->routing_key);
zmsg_addstr (msg, "0"); // mandatory
zmsg_addstr (msg, "0"); // immediate
// options
// rabbitmq options
zmsg_addstr (msg, "12");
zmsg_addmem (msg, NULL, 0);
zmsg_addmem (msg, NULL, 0);
......@@ -89,13 +107,14 @@ samwise_publish (
zmsg_addmem (msg, NULL, 0);
zmsg_addmem (msg, NULL, 0);
// header
zmsg_addstr (msg, "0");
// payload
zmsg_addmem (msg, pub->msg, pub->size);
printf ("publishing message of size %zu\n", pub->size);
zmsg_send (&msg, self->req);
return handle_response (self);
}
......@@ -107,8 +126,6 @@ int
samwise_ping (
samwise_t *self)
{
printf ("send ping request\n");
zmsg_t *msg = create_msg ();
zmsg_addstr (msg, "ping");
......@@ -120,12 +137,15 @@ samwise_ping (
// --------------------------------------------------------------------------
/// Create a new samwise instance and connect to samd's endpoint.
samwise_t *
samwise_new ()
samwise_new (
const char *endpoint)
{
assert (endpoint);
samwise_t *self = malloc (sizeof (samwise_t));
assert (self);
self->req = zsock_new_req ("ipc://../../sam_ipc");
self->req = zsock_new_req (endpoint);
if (!self->req) {
fprintf (stderr, "could not connect to endpoint\n");
return NULL;
......@@ -136,7 +156,7 @@ samwise_new ()
// --------------------------------------------------------------------------
/// Destroy a samwise instance.
/// destroy a samwise instance.
void
samwise_destroy (
samwise_t **self)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment