Commit 3b775cad authored by Felix Hamann's avatar Felix Hamann

Merge branch 'master' of github:dreadworks/samwise

Conflicts:
	samwise/src/sam.c
	samwise/src/sam_be_rmq.c
parents 982141e2 8b75e2f7
aclocal.m4
autom4te.cache/
compile
config.h
config.h.in
config.log
config.status
config.guess
config.sub
configure
depcomp
install-sh
libtool
ltmain.sh
missing
m4/
stamp-h1
src/.deps/
src/.libs/
src/.dirstamp
Makefile
samcli
\ No newline at end of file
This diff is collapsed.
This diff is collapsed.
ACLOCAL_AMFLAGS = -I m4
AM_CFLAGS = -g -Werror -Wall -Wextra -pedantic -std=gnu99
lib_LTLIBRARIES = src/libsamwise.la
include_HEADERS = include/samwise.h
src_libsamwise_la_SOURCES = \
include/samwise.h \
src/samwise.c
src_libsamwise_la_LDFLAGS = \
-lzmq \
-lczmq
bin_PROGRAMS = samcli
samcli_SOURCES = src/samcli.c
samcli_LDADD = src/libsamwise.la
samcli_LDFLAGS = -static
#!/bin/sh -e
test -n "$srcdir" || srcdir=`dirname "$0"`
test -n "$srcdir" || srcdir=.
autoreconf --force --install --verbose "$srcdir"
test -n "$NOCONFIGURE" || "$srcdir/configure" "$@"
# -*- Autoconf -*-
# Process this file with autoconf to produce a configure script.
AC_PREREQ([2.69])
AC_INIT(samwise, 0.0.1, github.com/dreadworks/samwise/issues)
AC_CONFIG_SRCDIR([include/samwise.h])
AC_CONFIG_HEADERS([config.h])
# Checks for programs.
AM_INIT_AUTOMAKE([subdir-objects])
AC_CONFIG_MACRO_DIR([m4])
AM_PROG_CC_C_O
AC_PROG_INSTALL
LT_INIT
# Checks for libraries.
# FIXME: Replace `main' with a function in `-lczmq':
AC_CHECK_LIB([czmq], [main])
# FIXME: Replace `main' with a function in `-lzmq':
AC_CHECK_LIB([zmq], [main])
# Checks for header files.
# Checks for typedefs, structures, and compiler characteristics.
AC_FUNC_MALLOC
# Checks for library functions.
AC_CONFIG_FILES([Makefile])
AC_OUTPUT
/* =========================================================================
samwise - reliable message publishing
This Source Code Form is subject to the terms of the MIT
License. If a copy of the MIT License was not distributed with
this file, You can obtain one at http://opensource.org/licenses/MIT
=========================================================================
*/
/**
@brief Samwise API interface
Offers all public functions necessary to store and forward
messages.
*/
#ifndef __SAMWISE_H__
#define __SAMWISE_H__
#include <stdlib.h>
#include <stdio.h>
#include <string.h>
#include <assert.h>
#ifdef __cplusplus
extern "C" {
#endif
#define UU __attribute__((unused))
typedef struct samwise_t samwise_t;
typedef enum {
SAMWISE_ROUNDROBIN,
SAMWISE_REDUNDANT
} samwise_disttype_t;
/// publishing request options
typedef struct samwise_pub_t {
samwise_disttype_t disttype; ///< either round robin or redundant
int distcount; ///< for disttype=SAMWISE_REDUNDANT
char *exchange;
char *routing_key;
char *msg;
size_t size;
} samwise_pub_t;
// --------------------------------------------------------------------------
/// @brief Publish a message to samd
/// @param self A samwise instance
/// @param pub Publishing options
/// @return 0 if success, -1 otherwise
int
samwise_publish (
samwise_t *self,
samwise_pub_t *pub);
// --------------------------------------------------------------------------
/// @brief Send a ping to samwise
/// @param self A samwise instance
/// @return 0 if success, -1 otherwise
int
samwise_ping (
samwise_t *self);
// --------------------------------------------------------------------------
/// @brief Create a new samwise instance
/// @param endpoint Public endpoint of a samd instance
/// @return The newly created instance
samwise_t *
samwise_new (
const char *endpoint);
// --------------------------------------------------------------------------
/// @brief Destroy a samwise instance
/// @param self Pointer to be free'd and nullified
void
samwise_destroy (
samwise_t **self);
#ifdef __cplusplus
}
#endif
#endif
/* =========================================================================
samwise - best effort store and forward message publishing
This Source Code Form is subject to the terms of the MIT
License. If a copy of the MIT License was not distributed with
this file, You can obtain one at http://opensource.org/licenses/MIT
=========================================================================
*/
/**
@brief samwise c client
@file samwise.c
*/
#include <time.h>
#include <stdbool.h>
#include <argp.h>
#include <inttypes.h>
#include "../include/samwise.h"
/// arguments provided by the user
typedef struct args_t {
bool verbose; ///< print additional information
bool quiet; ///< suppress all output
const char *endpoint; ///< public endpoint to connect to
const char *action; ///< positional action string
// publishing options
int n; ///< number of messages to be published
samwise_disttype_t disttype; ///< distribution type (roundrobin|redundant)
int d; ///< count for distribution=redundant
} args_t;
/// state maintained by the client
typedef struct state_t {
samwise_t *sam; ///< samwise client instance
args_t *args; ///< arguments provided by the user
} state_t;
/*
* ---- HELPER ----
*
*/
/// output levels
typedef enum out_t {
NORMAL, ///< print to stdout unless -q
ERROR, ///< print to stderr unless -q
VERBOSE ///< print to stdout if -v
} out_t;
// --------------------------------------------------------------------------
/// Print a message to std* based on the level and either -q or -v.
static void
out (
out_t lvl,
args_t *args,
const char *line)
{
if (
(lvl == VERBOSE && args->verbose) ||
(lvl == NORMAL && !args->quiet)) {
fprintf (stdout, "%s\n", line);
}
if (lvl == ERROR && !args->quiet) {
fprintf (stderr, "error: %s\n", line);
}
}
// --------------------------------------------------------------------------
/// Returns current monotonic timestamp in ms.
static int64_t
clock_mono ()
{
struct timespec ts;
clock_gettime (CLOCK_MONOTONIC, &ts);
return (int64_t)
((int64_t) ts.tv_sec * 1000 + (int64_t) ts.tv_nsec / 1000000);
}
// --------------------------------------------------------------------------
/// Publish n messages to samd.
static void
publish (
state_t *state)
{
assert (state);
args_t *args = state->args;
int count = 1;
int64_t ts = clock_mono ();
char buf [256];
while (count <= args->n) {
char buf [64];
snprintf (buf, 64, "message no %d", count);
samwise_pub_t pub = {
.disttype = args->disttype,
.distcount = args->d,
.exchange = "amq.direct",
.routing_key = "",
.size = strlen (buf),
.msg = buf
};
sprintf (buf, "publishing message %d", count);
out (VERBOSE, state->args, buf);
int rc = samwise_publish (state->sam, &pub);
if (rc) {
fprintf (stderr, "publishing failed\n");
}
count += 1;
}
char *fmt = "publishing took %" PRId64 "ms";
sprintf (buf, fmt, clock_mono () - ts);
out (NORMAL, state->args, buf);
}
/*
* ---- ARGP ----
*
*/
const char
*argp_program_version = "0.1",
*argp_program_bug_adress = "http://github.com/dreadworks/samwise/issues";
static char doc [] =
" ___ __ _ _ ____ __ _(_)___ ___ __| (_)___ _ _| |_\n"
"(_-</ _` | ' \\ V V / (_-</ -_) / _| | / -_) ' \\ _|\n"
"/__/\\__,_|_|_|_\\_/\\_/|_/__/\\___| \\__|_|_\\___|_||_\\__|\n\n"
"Currently the following actions are supported:\n"
" ping Ping a samd instance\n"
" publish Publish some messages to samd\n"
"\nAdditionally, the following options can be provided:\n";
static char args_doc [] = "ACTION";
// possible options
static struct argp_option options [] = {
{ .name = "verbose", .key = 'v', .arg = 0,
.flags = OPTION_ARG_OPTIONAL,
.doc = "Verbose output" },
{ .name = "quiet", .key = 'q', .arg = 0,
.flags = OPTION_ARG_OPTIONAL,
.doc = "Suppress output" },
{ .name = "endpoint", .key = 'e', .arg = "ENDPOINT",
.doc = "Public endpoint of samd" },
// for ACTION = publish
{ .name = "number", .key = 'n', .arg = "N",
.doc = "Number of messages to be published (default: 1)" },
{ .name = "type", .key = 't', .arg = "TYPE",
.doc = "Distribution type (roundrobin|redundant) "
"(default: roundrobin)" },
{ .name = "distcount", .key = 'd', .arg = "D",
.doc = "Count for distribution=redundant (default: 2)" },
// excond
{ 0 }
};
// --------------------------------------------------------------------------
/// Parse the argument vector.
static error_t
parse_opt (
int key,
char *arg,
struct argp_state *a_state)
{
args_t *args = a_state->input;
switch (key) {
// verbose
case 'v':
if (args->quiet) {
out (ERROR, args, "-q and -v are mutually exclusive");
argp_usage (a_state);
return -1;
}
args->verbose = true;
out (VERBOSE, args, "setting output verbose");
break;
// quiet
case 'q':
if (args->verbose) {
out (ERROR, args, "-q and -v are mutually exclusive");
argp_usage (a_state);
return -1;
}
args->quiet = true;
break;
// endpoint
case 'e':
args->endpoint = arg;
break;
/*
* for ACTION = publish
*
*/
// number
case 'n':
args->n = atoi (arg);
break;
// distribution type
case 't':
if (!strcmp (arg, "roundrobin")) {
args->disttype = SAMWISE_ROUNDROBIN;
out (VERBOSE, args, "publishing in a round robin fashion");
}
else if (!strcmp (arg, "redundant")) {
args->disttype = SAMWISE_REDUNDANT;
out (VERBOSE, args, "publishing redundantly");
}
else {
out (ERROR, args, "unknown distribution type");
}
break;
// distribution count
case 'd':
args->d = atoi (arg);
break;
/*
* ACTION
*
*/
// key args (action)
case ARGP_KEY_ARG:
if (a_state->arg_num >= 1) {
out (ERROR, args, "too many arguments");
argp_usage (a_state);
return -1;
}
args->action = arg;
break;
default:
return ARGP_ERR_UNKNOWN;
}
return 0;
}
/// configures argp
static struct argp argp = {
.options = options,
.parser = parse_opt,
.args_doc = args_doc,
.doc = doc,
.children = 0
};
static int
eval (
state_t *state)
{
const char *action = state->args->action;
if (!strcmp (action, "ping")) {
samwise_ping (state->sam);
}
else if (!strcmp (action, "publish")) {
publish (state);
}
else {
return -1;
}
return 0;
}
// --------------------------------------------------------------------------
/// Entry point.
int
main (
int argc,
char **argv)
{
args_t args = {
.quiet = false,
.verbose = false,
.action = NULL,
.endpoint = "",
.n = 1,
.disttype = SAMWISE_ROUNDROBIN,
.d = 2
};
error_t rc = argp_parse (&argp, argc, argv, 0, 0, &args);
if (rc) {
out (ERROR, &args, "argument error");
return EXIT_FAILURE;
}
state_t state = {
.sam = samwise_new (args.endpoint),
.args = &args
};
if (!state.sam || eval (&state)) {
out (ERROR, &args, "an error occured, exiting\n");
samwise_destroy (&state.sam);
return EXIT_FAILURE;
}
samwise_destroy (&state.sam);
return rc;
}
/* =========================================================================
samwise - best effort store and forward message publishing
This Source Code Form is subject to the terms of the MIT
License. If a copy of the MIT License was not distributed with
this file, You can obtain one at http://opensource.org/licenses/MIT
=========================================================================
*/
/**
@brief samwise c client library
@file samwise.c
*/
#include "../include/samwise.h"
#include <czmq.h>
/// samwise state
struct samwise_t {
zsock_t *req; ///< request socket communicating with samd
};
// --------------------------------------------------------------------------
/// Read the response from samd and print any errors that may arise.
static int
handle_response (
samwise_t *self)
{
int code;
char *msg;
zsock_recv (self->req, "is", &code, &msg);
if (code) {
fprintf (stderr, "received error '%d': %s\n", code, msg);
}
free (msg);
return code;
}
// --------------------------------------------------------------------------
/// Creates an empty message and appends the protocol version.
static zmsg_t *
create_msg ()
{
zmsg_t *msg = zmsg_new ();
zmsg_addstr (msg, "100");
return msg;
}
// --------------------------------------------------------------------------
/// Publish a message to samd.
int
samwise_publish (
samwise_t *self,
samwise_pub_t *pub)
{
zmsg_t *msg = create_msg ();
zmsg_addstr (msg, "publish");
// distribution type
if (pub->disttype == SAMWISE_ROUNDROBIN) {
zmsg_addstr (msg, "round robin");
}
else if (pub->disttype == SAMWISE_REDUNDANT) {
zmsg_addstr (msg, "redundant");
char buf [128];
snprintf (buf, 128, "%d", pub->distcount);
zmsg_addstr (msg, buf);
}
else {
assert (false);
}
// amqp options
zmsg_addstr (msg, pub->exchange);
zmsg_addstr (msg, pub->routing_key);
zmsg_addstr (msg, "0"); // mandatory
zmsg_addstr (msg, "0"); // immediate
// rabbitmq options
zmsg_addstr (msg, "12");
zmsg_addmem (msg, NULL, 0);
zmsg_addmem (msg, NULL, 0);
zmsg_addmem (msg, NULL, 0);
zmsg_addmem (msg, NULL, 0);
zmsg_addmem (msg, NULL, 0);
zmsg_addmem (msg, NULL, 0);
zmsg_addmem (msg, NULL, 0);
zmsg_addmem (msg, NULL, 0);
zmsg_addmem (msg, NULL, 0);
zmsg_addmem (msg, NULL, 0);
zmsg_addmem (msg, NULL, 0);
zmsg_addmem (msg, NULL, 0);
// header
zmsg_addstr (msg, "0");
// payload
zmsg_addmem (msg, pub->msg, pub->size);
zmsg_send (&msg, self->req);
return handle_response (self);
}
// --------------------------------------------------------------------------
/// Ping samd.
int
samwise_ping (
samwise_t *self)
{
zmsg_t *msg = create_msg ();
zmsg_addstr (msg, "ping");
zmsg_send (&msg, self->req);
return handle_response (self);
}
// --------------------------------------------------------------------------
/// Create a new samwise instance and connect to samd's endpoint.
samwise_t *
samwise_new (
const char *endpoint)
{
assert (endpoint);
samwise_t *self = malloc (sizeof (samwise_t));
assert (self);
self->req = zsock_new_req (endpoint);
if (!self->req) {
fprintf (stderr, "could not connect to endpoint\n");
return NULL;
}
return self;
}
// --------------------------------------------------------------------------
/// destroy a samwise instance.
void
samwise_destroy (
samwise_t **self)
{
assert (*self);
zsock_destroy (&(*self)->req);
free (*self);
*self = NULL;
}
......@@ -91,20 +91,6 @@ if (condition) {
}