Thursday, 26 June 2014

How to transform an ordinary source into a threaded source?

Why should we use a different thread for a source module?

We can imagine such situations, when creating a message in a source module can be a costly operation, and if this module runs in the same thread as the main event handler then the whole syslog-ng became unresponsive therefore losing performance and may or may not misses occured events.

In these situations it is adviced to create an other thread for this source module.
However it can be tought that it introduces some typical problems of a multi-threaded problem, this is not as much pain as we anticipated.

Example (original file):

static void
trigger_source_triggered (gpointer s)
{
  TriggerSource *self = (TriggerSource *) s;
  LogMessage *msg;
  LogPathOptions path_options = LOG_PATH_OPTIONS_INIT;

  // An operation that needs a lot of time
  sleep(2); 
  main_loop_assert_main_thread ();

  if (!log_source_free_to_send (&self->super))
    goto end;

  msg = log_msg_new_internal (LOG_INFO, self->options->message);
  path_options.ack_needed = FALSE;

  log_pipe_queue (&self->super.super, msg, &path_options);

 end:
  trigger_source_update_watches (self);
}
With this modification you can check whether syslog-ng become blocked.

Transformation example:

First of all, we have to get rid of the iv_timer part (in this example we do not need it):

/*
* Copyright (c) 2013 BalaBit IT Ltd, Budapest, Hungary
* Copyright (c) 2013 Gergely Nagy 
*
* This program is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License version 2 as published
* by the Free Software Foundation, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
*
* As an additional exemption you are allowed to compile & link against the
* OpenSSL libraries as published by the OpenSSL project. See the file
* COPYING for details.
*
*/

#include "trigger-source.h"

#include "driver.h"
#include "logsource.h"
#include "mainloop.h"

typedef struct
{
  gint trigger_freq;
  gchar *message;
} TriggerOptions;

typedef struct _TriggerSourceDriver
{
  LogSrcDriver super;
  LogSource *source;
  LogSourceOptions source_options;

  TriggerOptions options;
} TriggerSourceDriver;

typedef struct
{
  LogSource super;

  TriggerOptions *options;
} TriggerSource;

#ifndef SCS_TRIGGER
#define SCS_TRIGGER 0
#endif

/*
* TriggerSource
*/

static void
trigger_source_triggered (gpointer s)
{
  TriggerSource *self = (TriggerSource *) s;
  LogMessage *msg;
  LogPathOptions path_options = LOG_PATH_OPTIONS_INIT;

  main_loop_assert_main_thread ();

  if (!log_source_free_to_send (&self->super))
    goto end;

  msg = log_msg_new_internal (LOG_INFO, self->options->message);
  path_options.ack_needed = FALSE;

  log_pipe_queue (&self->super.super, msg, &path_options);

 end:
}

static gboolean
trigger_source_init (LogPipe *s)
{
  TriggerSource *self = (TriggerSource *)s;

  if (!log_source_init (s))
    return FALSE;

  iv_validate_now ();

  return TRUE;
}

static gboolean
trigger_source_deinit (LogPipe *s)
{
  TriggerSource *self = (TriggerSource *)s;
  return log_source_deinit (s);
}


static LogSource *
trigger_source_new (TriggerSourceDriver *owner, LogSourceOptions *options)
{
  TriggerSource *self = g_new0 (TriggerSource, 1);

  log_source_init_instance (&self->super);
  log_source_set_options (&self->super, options, 0, SCS_TRIGGER,
                          owner->super.super.id, NULL, FALSE);

  self->options = &owner->options;

  self->super.super.init = trigger_source_init;
  self->super.super.deinit = trigger_source_deinit;

  return &self->super;
}

/*
* TriggerSourceDriver
*/

static gboolean
trigger_sd_init (LogPipe *s)
{
  TriggerSourceDriver *self = (TriggerSourceDriver *)s;
  GlobalConfig *cfg = log_pipe_get_config (s);

  if (!log_src_driver_init_method (s))
    return FALSE;

  if (self->options.trigger_freq <= 0)
    self->options.trigger_freq = 10;

  if (!self->options.message)
    self->options.message = g_strdup ("Trigger source is trigger happy.");

  log_source_options_init (&self->source_options, cfg, self->super.super.group);
  self->source = trigger_source_new (self, &self->source_options);

  log_pipe_append (&self->source->super, s);
  log_pipe_init (&self->source->super, cfg);

  return TRUE;
}

static gboolean
trigger_sd_deinit (LogPipe *s)
{
  TriggerSourceDriver *self = (TriggerSourceDriver *)s;

  if (self->source)
    {
      log_pipe_deinit (&self->source->super);
      log_pipe_unref (&self->source->super);
      self->source = NULL;
    }

  g_free (self->options.message);

  return log_src_driver_deinit_method (s);
}

LogSourceOptions *
trigger_sd_get_source_options (LogDriver *s)
{
  TriggerSourceDriver *self = (TriggerSourceDriver *)s;

  return &self->source_options;
}

void
trigger_sd_set_trigger_freq (LogDriver *s, gint freq)
{
  TriggerSourceDriver *self = (TriggerSourceDriver *)s;

  self->options.trigger_freq = freq;
}

void
trigger_sd_set_trigger_message (LogDriver *s, const gchar *message)
{
  TriggerSourceDriver *self = (TriggerSourceDriver *)s;

  g_free (self->options.message);
  self->options.message = g_strdup (message);
}

LogDriver *
trigger_sd_new (void)
{
  TriggerSourceDriver *self = g_new0 (TriggerSourceDriver, 1);

  log_src_driver_init_instance ((LogSrcDriver *)&self->super);

  self->super.super.super.init = trigger_sd_init;
  self->super.super.super.deinit = trigger_sd_deinit;

  log_source_options_defaults (&self->source_options);

  return (LogDriver *)self;
}

After that we add a thread pointer to the TriggerSource struct:
typedef struct
{
  LogSource super;

  TriggerOptions *options;
  GThread * workerThread;
} TriggerSource;

Then we rewrite the method which put the messages in the queue:

static void
trigger_source_triggered (gpointer s)
{
  TriggerSource *self = (TriggerSource *) s;
  LogMessage *msg;
  LogPathOptions path_options = LOG_PATH_OPTIONS_INIT;

  //There were weird segmentation violations without this line.
  iv_init();

  while(1) {
    if (log_source_free_to_send (&self->super)) {

      msg = log_msg_new_internal (LOG_INFO, self->options->message);
      path_options.ack_needed = FALSE;

      log_msg_refcache_start_producer(msg);
      log_pipe_queue (&self->super.super, msg, &path_options);
      log_msg_refcache_stop( );

      sleep(self->options->trigger_freq);

  }
}

The log_msg_refcache_start_producer and log_msg_refcache_stop are currently magical things. They are needed if we create a message.

Now we only need to create and start the thread:
static LogSource *
trigger_source_new (TriggerSourceDriver *owner, LogSourceOptions *options)
{
  TriggerSource *self = g_new0 (TriggerSource, 1);

  log_source_init_instance (&self->super);
  log_source_set_options (&self->super, options, 0, SCS_TRIGGER,
                          owner->super.super.id, NULL, FALSE);

  self->options = &owner->options;

  self->super.super.init = trigger_source_init;
  self->super.super.deinit = trigger_source_deinit;

  self->workerThread = create_worker_thread(&trigger_source_triggered, (void*)self, FALSE, NULL);

  return &self->super;
}

The trigger-source now runs in a different thread.
Please note, that there is a problem with the timestamp of the message.
The bug is under investigation.