Commit 773afd74 authored by Felix Hamann's avatar Felix Hamann

wiring and drastic logger changes

parent 13fbd014
......@@ -140,7 +140,7 @@ rabbit_declare_exchange (
self->chan, // virtual connection
amqp_cstring_bytes (exchange), // exchange name
amqp_cstring_bytes (type), // type
0, // passive
0, // passive
0, // durable
amqp_empty_table); // arguments
......
......@@ -13,8 +13,6 @@ src_libsam_la_SOURCES = \
include/sam_gen.h \
src/sam_gen.c \
\
include/sam_logger.h \
src/sam_logger.c \
include/sam_log.h \
src/sam_log.c \
\
......
......@@ -53,14 +53,11 @@
#include "sam_prelude.h"
/// Type for sam instances
typedef struct sam_t {
sam_log_t *log;
sam_logger_t *logger;
sam_msg_t *backends;
} sam_t;
// --------------------------------------------------------------------------
/// @brief Create a new instance of samwise
/// @return A freshly allocated sam instance
......
......@@ -36,113 +36,39 @@ typedef enum {
#define SAM_LOG_LVL_ERROR_REPR "error"
/// The different handler options
typedef enum {
SAM_LOG_HANDLER_STD ///< stdout and stderr handler
} sam_log_handler_t;
/// State for sam_log instances
typedef struct sam_log_t {
char *endpoint; ///< pull socket endpoint
zactor_t *actor; ///< logging thread
} sam_log_t;
/// The state of a log thread
typedef struct sam_log_inner_t {
zsock_t *pll; ///< socket accepting log requests
const char *line_fmt; ///< the output format for messages
const char *date_fmt; ///< format for timestamps
struct handler_ { ///< references to handlers per level
zlist_t *trace;
zlist_t *info;
zlist_t *error;
} handler;
} sam_log_inner_t;
// Everything above gets cut. Prevents
// buffer overflows, since no heap is used.
#define SAM_LOG_LINE_MAXSIZE 256
#define SAM_LOG_DATE_MAXSIZE 16
// --------------------------------------------------------------------------
/// @brief Create a new logger
/// @param endpoint Optional endpoint name
/// @return Pointer to the logger state
CZMQ_EXPORT sam_log_t *
sam_log_new (char *endpoint);
// --------------------------------------------------------------------------
/// @brief Free's all memory of the log's state
CZMQ_EXPORT void
sam_log_destroy (sam_log_t **self);
// --------------------------------------------------------------------------
/// @brief Add a function posing as a callback for a severity
/// @param self Log facility
/// @param lvl Call handler for all severities up to <lvl>
/// @param handler The callback function
CZMQ_EXPORT void
sam_log_add_handler (
sam_log_t *self,
void
sam_log_ (
sam_log_lvl_t lvl,
sam_log_handler_t handler);
const char *fac,
const char *msg);
// --------------------------------------------------------------------------
/// @brief Remove a function registered for the lvl and lower
/// @param self Log facility
/// @param lvl Call handler for all severities down to <lvl>
/// @param handler The callback function
CZMQ_EXPORT void
sam_log_remove_handler (
sam_log_t *self,
void
sam_logf_ (
sam_log_lvl_t lvl,
sam_log_handler_t handler);
// --------------------------------------------------------------------------
/// @brief Logs to stdout and stderr
/// @param lvl Severity level
/// @param line The log line
CZMQ_EXPORT void
sam_log_handler_std (sam_log_lvl_t lvl, const char *line);
// --------------------------------------------------------------------------
/// @brief Retrieves the logging endpoint
/// @param log Log facility
/// @param line The log line
CZMQ_EXPORT char *
sam_log_endpoint (sam_log_t *self);
// --------------------------------------------------------------------------
/// @brief short description of the functions purpose
/// @param args some arguments
/// @param argc argc size of the argument vector
CZMQ_EXPORT void
sam_log_test ();
const char *fac,
const char *msg,
...);
//
// TRACE LOGGING
//
#if defined(LOG_THRESHOLD_TRACE) || defined(LOG_THRESHOLD_INFO) || defined(LOG_THRESHOLD_ERROR)
#define sam_log_trace(logger, msg)
#define sam_log_tracef(logger, msg, ...)
#define sam_log_trace(msg)
#define sam_log_tracef(msg, __FILE__, ...)
#else
#define sam_log_trace(logger, msg) \
sam_logger_send (logger, SAM_LOG_LVL_TRACE, msg);
#define sam_log_tracef(logger, msg, ...) \
sam_logger_sendf (logger, SAM_LOG_LVL_TRACE, msg, __VA_ARGS__);
#define sam_log_trace(msg) \
sam_log_ (SAM_LOG_LVL_TRACE, msg, __FILE__);
#define sam_log_tracef(msg, ...) \
sam_logf_ (SAM_LOG_LVL_TRACE, msg, __FILE__, __VA_ARGS__);
#endif
......@@ -151,14 +77,14 @@ sam_log_test ();
// INFO LOGGING
//
#if defined(LOG_THRESHOLD_INFO) || defined(LOG_THRESHOLD_TRACE)
#define sam_log_info(logger, msg)
#define sam_log_infof(logger, msg, ...)
#define sam_log_info(msg)
#define sam_log_infof(msg, __FILE__, ...)
#else
#define sam_log_info(logger, msg) \
sam_logger_send (logger, SAM_LOG_LVL_INFO, msg);
#define sam_log_infof(logger, msg, ...) \
sam_logger_sendf (logger, SAM_LOG_LVL_INFO, msg, __VA_ARGS__);
#define sam_log_info(msg) \
sam_log_ (SAM_LOG_LVL_INFO, msg, __FILE__);
#define sam_log_infof(msg, ...) \
sam_logf_ (SAM_LOG_LVL_INFO, msg, __FILE__, __VA_ARGS__);
#endif
......@@ -167,14 +93,14 @@ sam_log_test ();
// ERROR LOGGING
//
#if defined(LOG_THRESHOLD_ERROR)
#define sam_log_error(logger, msg)
#define sam_log_errorf(logger, msg, ...)
#define sam_log_error(msg)
#define sam_log_errorf(msg, __FILE__, ...)
#else
#define sam_log_error(logger, msg) \
sam_logger_send (logger, SAM_LOG_LVL_ERROR, msg);
#define sam_log_errorf(logger, msg, ...) \
sam_logger_sendf (logger, SAM_LOG_LVL_ERROR, msg, __VA_ARGS__);
#define sam_log_error(msg) \
sam_log_ (SAM_LOG_LVL_ERROR, msg, __FILE__);
#define sam_log_errorf(msg, ...) \
sam_logf_ (SAM_LOG_LVL_ERROR, msg, __FILE__, __VA_ARGS__);
#endif
......
/* =========================================================================
sam_logger - logger clients
This Source Code Form is subject to the terms of the MIT
License. If a copy of the MIT License was not distributed with
this file, You can obtain one at http://opensource.org/licenses/MIT
=========================================================================
*/
/**
\brief sam_logger - logger clients
Instances of this class may be used to send
the central log facility messages.
*/
#ifndef _SAM_LOGGER_H_
#define _SAM_LOGGER_H_
/// The state of a logger
typedef struct sam_logger_t {
zsock_t *psh; ///< socket pushing log requests
char *name; ///< identifier for the logger
} sam_logger_t;
// --------------------------------------------------------------------------
/// @brief Create a new logger instance
/// @param endpoint The log facilities endpoint
/// @return The new logger instance
CZMQ_EXPORT sam_logger_t *
sam_logger_new (char *name, char *endpoint);
// --------------------------------------------------------------------------
/// @brief Destroy the logger
/// @param logger Logger instance
CZMQ_EXPORT void
sam_logger_destroy (sam_logger_t **logger);
// --------------------------------------------------------------------------
/// @brief Send a log line
/// @param logger Logger instance
/// @param lvl Severity of the log message
/// @param fmt A zero-terminated string
CZMQ_EXPORT void
sam_logger_send (
sam_logger_t *logger,
sam_log_lvl_t lvl,
const char *line);
// --------------------------------------------------------------------------
/// @brief Send a formatted log line
/// @param logger Logger instance
/// @param lvl Severity of the log message
/// @param fmt A zero-terminated format string
/// @param ... Format parameters
CZMQ_EXPORT void
sam_logger_sendf (
sam_logger_t *logger,
sam_log_lvl_t lvl,
const char *fmt,
...);
#endif
......@@ -28,20 +28,9 @@ typedef enum {
} sam_msg_res_t;
typedef struct sam_msg_state_t {
unsigned int backend_c;
sam_logger_t *logger;
zsock_t *rep;
} sam_msg_state_t;
typedef struct sam_msg_t {
sam_logger_t *logger;
zsock_t *req;
zactor_t *actor;
sam_msg_state_t *state;
} sam_msg_t;
typedef enum {
SAM_MSG_BE_RABBITMQ
} sam_msg_be_t;
/// return type for "start" functions
......@@ -53,6 +42,22 @@ typedef struct sam_msg_backend_t {
} sam_msg_backend_t;
typedef struct sam_msg_state_t {
unsigned int backend_c;
zsock_t *actor_rep;
zsock_t *backend_pull;
sam_msg_backend_t *backend;
} sam_msg_state_t;
typedef struct sam_msg_t {
zsock_t *actor_req;
char *backend_pull_endpoint;
zactor_t *actor;
sam_msg_state_t *state;
} sam_msg_t;
// --------------------------------------------------------------------------
/// @brief Creates a new sam_msg instance
/// @param name Messaging name, max. ~50 chars
......@@ -77,7 +82,7 @@ sam_msg_destroy (
int
sam_msg_create_backend (
sam_msg_t *self,
char *backend,
sam_msg_be_t be_type,
void *opts);
......
......@@ -27,8 +27,6 @@
typedef struct sam_msg_rabbitmq_t {
unsigned int id;
sam_logger_t *logger; ///< logger instance
struct { ///< amqp connection
amqp_connection_state_t connection;
amqp_socket_t *socket;
......
......@@ -31,7 +31,6 @@
#include "sam_gen.h"
#include "sam_log.h"
#include "sam_logger.h"
#include "sam_msg.h"
#include "sam_msg_rabbitmq.h"
#include "sam.h"
......
......@@ -26,7 +26,6 @@
typedef struct samd_t {
sam_t *sam;
sam_logger_t *logger;
zsock_t *client_rep;
} samd_t;
......
......@@ -28,7 +28,6 @@
typedef struct state_t {
sam_logger_t *logger;
sam_msg_backend_t *backend;
zactor_t *burster;
} state_t;
......@@ -42,9 +41,8 @@ publishing_burst (zsock_t *pipe, void *args)
{
zsock_signal (pipe, 0);
zsock_t *req = args;
sam_logger_t *logger = sam_logger_new ("burst", SAM_LOG_ENDPOINT);
sam_log_info (logger, "starting publishing burst");
sam_log_info ("starting publishing burst");
int64_t start_time = zclock_mono ();
int p_c = 0;
for (; p_c < BURST; p_c++) {
......@@ -57,18 +55,16 @@ publishing_burst (zsock_t *pipe, void *args)
int seq;
zsock_recv (req, "i", &seq);
sam_log_tracef (logger, "received seq %d", seq);
sam_log_tracef ("received seq %d", seq);
}
sam_log_infof (
logger,
"publish",
"finished publishing %d messages in %dms",
BURST,
zclock_mono () - start_time);
printf ("publishing took %zdms\n", zclock_mono () - start_time);
sam_logger_destroy (&logger);
}
......@@ -95,10 +91,8 @@ handle_stdin (zloop_t *loop UU, zmq_pollitem_t *poll_stdin UU, void *args)
// --------------------------------------------------------------------------
/// Callback for msg_rabbitmq answers arriving on the pull socket.
static int
handle_pll (zloop_t *loop UU, zsock_t *pll, void *args)
handle_pll (zloop_t *loop UU, zsock_t *pll, void *args UU)
{
state_t *state = args;
sam_msg_res_t res;
zmsg_t *msg = zmsg_new ();
......@@ -110,18 +104,18 @@ handle_pll (zloop_t *loop UU, zsock_t *pll, void *args)
case SAM_MSG_RES_ACK:
seq = zmsg_popstr (msg);
sam_log_tracef (state->logger, "received ACK %s", seq);
sam_log_tracef ("received ACK %s", seq);
free (seq);
break;
case SAM_MSG_RES_CONNECTION_LOSS:
sam_log_error (state->logger, "received CONNECTION LOSS");
sam_log_error ("received CONNECTION LOSS");
break;
default:
sam_log_errorf (state->logger, "received unknown response id: %d", res);
sam_log_errorf ("received unknown response id: %d", res);
}
zmsg_destroy (&msg);
......@@ -136,14 +130,9 @@ void
playground_publish_loop ()
{
state_t state = {
.logger = sam_logger_new ("publish loop", SAM_LOG_ENDPOINT),
.backend = NULL
};
sam_log_trace (state.logger, "logging trace");
sam_log_info (state.logger, "logging info");
sam_log_error (state.logger, "logging error");
sam_msg_rabbitmq_t *rabbit = sam_msg_rabbitmq_new ();
sam_msg_rabbitmq_opts_t opts = {
.host = "localhost",
......@@ -174,5 +163,4 @@ playground_publish_loop ()
zsock_destroy (&pll);
sam_msg_rabbitmq_destroy (&rabbit);
sam_logger_destroy (&state.logger);
}
......@@ -20,32 +20,19 @@
#include "../include/sam_prelude.h"
// to be removed before shipping. for debugging
static void
sigabrt (int signo UU)
{
sam_logger_t *logger = sam_logger_new ("sigabrt", SAM_LOG_ENDPOINT);
sam_log_error (logger, "catched SIGABRT");
sam_logger_destroy (&logger);
zclock_sleep (10);
}
// --------------------------------------------------------------------------
sam_t *
sam_new ()
{
signal (SIGABRT, sigabrt); // for debug purposes
sam_t *self = malloc (sizeof (sam_t));
assert (self);
// logging
self->log = sam_log_new (SAM_LOG_ENDPOINT);
self->logger = sam_logger_new ("sam", SAM_LOG_ENDPOINT);
sam_log_add_handler (
self->log, SAM_LOG_LVL_TRACE, SAM_LOG_HANDLER_STD);
// request/response multiplexing
sam_log_infof (
"bound public endpoint: %s",
SAM_PUBLIC_ENDPOINT);
sam_log_info ("created sam");
return self;
}
......@@ -56,8 +43,9 @@ sam_destroy (sam_t **self)
{
assert (*self);
sam_logger_destroy (&(*self)->logger);
sam_log_destroy (&(*self)->log);
if ((*self)->backends) {
sam_msg_destroy (&(*self)->backends);
}
free (*self);
*self = NULL;
......@@ -66,27 +54,47 @@ sam_destroy (sam_t **self)
// --------------------------------------------------------------------------
int
sam_config (sam_t *self, const char *conf UU)
sam_init (sam_t *self, const char *conf UU)
{
sam_log_error (self->logger, "sam_config is not yet implemented");
// TODO create shared config (#32)
// for now, create one backend pool
// containing one rabbitmq backend instance
self->backends = sam_msg_new ("rabbitmq");
assert (self->backends);
sam_msg_rabbitmq_opts_t rabbitmq_opts = {
.host = "localhost",
.port = 5672,
.user = "guest",
.pass = "guest",
.heartbeat = 3
};
int rc = sam_msg_create_backend (
self->backends, SAM_MSG_BE_RABBITMQ, &rabbitmq_opts);
if (rc != 0) {
sam_log_error ("could not create rabbitmq backend");
}
return 0;
}
// --------------------------------------------------------------------------
int
sam_publish (sam_t *self, zmsg_t *msg UU)
sam_publish (sam_t *self, zmsg_t *msg)
{
sam_log_trace (self->logger, "got publishing request");
return 0;
sam_log_trace ("got publishing request");
return sam_msg_publish (self->backends, msg);
}
// --------------------------------------------------------------------------
int
sam_stats (sam_t *self)
sam_stats (sam_t *self UU)
{
sam_log_error (self->logger, "sam_stats is not yet implemented!");
sam_log_error ("sam_stats is not yet implemented!");
return 0;
}
......@@ -96,7 +104,6 @@ sam_stats (sam_t *self)
void
sam_test ()
{
sam_log_test ();
sam_msg_rabbitmq_test ();
sam_msg_test ();
......
This diff is collapsed.
/* =========================================================================
sam_logger - logger classes to communicate with sam_log instances
This Source Code Form is subject to the terms of the MIT
License. If a copy of the MIT License was not distributed with
this file, You can obtain one at http://opensource.org/licenses/MIT
=========================================================================
*/
/**
@brief distributed logger instances
@file sam_logger.c
Instances of this class communicate with a specific log
facility. They must not be shared between threads, but
arbitrary many instances of this class (one per thread)
may communicate with one log facility.
connections:
"[<>^v]" : connect
"o" : bind
PUSH PULL
sam_logger[0] >-----o sam_log
/
sam_logger[i] >/
PUSH
*/
#include "../include/sam_prelude.h"
// --------------------------------------------------------------------------
/// Create a new logger instance.
/// Most of the time you need to provide the return value of
/// sam_log_endpoint (...)
sam_logger_t *
sam_logger_new (char *name, char *endpoint)
{
sam_logger_t *logger = malloc (sizeof (sam_logger_t));
logger->psh = zsock_new_push (endpoint);
logger->name = strdup (name);
return logger;
}
// --------------------------------------------------------------------------
/// Destroy a logger instance.
void
sam_logger_destroy (sam_logger_t **logger)
{
assert (logger);
zsock_destroy (&(*logger)->psh);
free ((*logger)->name);
free (*logger);
logger = NULL;
}
// --------------------------------------------------------------------------
/// Send a log line to the log facility.
/// The timestamp appearing in the log is created in this function.
void
sam_logger_send (
sam_logger_t *logger,
sam_log_lvl_t lvl,
const char *line)
{
assert (logger);
time_t time_curr = time (NULL);
zsock_send (
logger->psh,
"sbssb",
"log",
&lvl, sizeof (sam_log_lvl_t),
logger->name,
line,
&time_curr, sizeof (time_t));
}
// --------------------------------------------------------------------------
/// Send a log line to the log facility.
/// The timestamp appearing in the log is created in this function.
void
sam_logger_sendf (
sam_logger_t *logger,
sam_log_lvl_t lvl,
const char *fmt,
...)
{
va_list argp;
char line[256];
va_start (argp, fmt);
vsnprintf (line, SAM_LOG_LINE_MAXSIZE - 1, fmt, argp);
line[SAM_LOG_LINE_MAXSIZE - 1] = 0;
va_end (argp);
sam_logger_send (logger, lvl, line);
}
......@@ -22,14 +22,28 @@
static int
handle_req (zloop_t *loop UU, zsock_t *rep, void *args)
handle_actor_req (zloop_t *loop UU, zsock_t *rep, void *args)
{
sam_msg_state_t *state = args;
zmsg_t *msg = zmsg_new ();
char *distribution;
zframe_t *payload;
char
*distribution,
*exchange,
*routing_key;
zsock_recv (rep, "sm", &distribution, &msg);
sam_log_tracef (state->logger, "handle req called (%s)", distribution);
zsock_recv (
rep, "sssf", &distribution, &exchange, &routing_key, &payload);
sam_log_tracef (
"handle req called, payload size: %d",
zframe_size (payload));
zsock_send (
state->backend->req, "issf",
SAM_MSG_REQ_PUBLISH,
exchange, routing_key,
payload);
zsock_send (rep, "i", 0);
......@@ -39,20 +53,32 @@ handle_req (zloop_t *loop UU, zsock_t *rep, void *args)
}
static int
handle_backend_req (zloop_t *loop UU, zsock_t *pull, void *args UU)
{
sam_log_trace ("handle backend request");
zmsg_t *msg = zmsg_recv (pull);
zmsg_destroy (&msg);
return 0;
}
static void
actor (zsock_t *pipe, void *args)
{
sam_msg_state_t *state = args;
zloop_t *loop = zloop_new ();
zloop_reader (loop, state->rep, handle_req, state);
zloop_reader (loop, state->actor_rep, handle_actor_req, state);
zloop_reader (loop, state->backend_pull, handle_backend_req, state);
zloop_reader (loop, pipe, sam_gen_handle_pipe, NULL);
sam_log_info (state->logger, "starting poll loop");
sam_log_info ("starting poll loop");
zsock_signal (pipe, 0);
zloop_start (loop);
printf ("EXIT LOOP SAM_MSG\n");
sam_log_trace (state->logger, "destroying loop");
sam_log_trace ("destroying loop");
zloop_destroy (&loop);
}
......@@ -67,23 +93,26 @@ sam_msg_new (const char *name)
sam_msg_state_t *state = malloc (sizeof (sam_msg_state_t));
assert (self);
snprintf (buf, buf_s, "msg_%s", name);
self->logger = sam_logger_new (buf, SAM_LOG_ENDPOINT);
snprintf (buf, buf_s, "msg_actor_%s", name);
state->logger = sam_logger_new (buf, SAM_LOG_ENDPOINT);
// actor req/rep
snprintf (