Sign Up
Log In
Log In
or
Sign Up
Places
All Projects
Status Monitor
Collapse sidebar
openSUSE:Step:FrontRunner
rsyslog.12076
0006-Kubernetes-Metadata-plugin-mmkubernetes.patch
Overview
Repositories
Revisions
Requests
Users
Attributes
Meta
File 0006-Kubernetes-Metadata-plugin-mmkubernetes.patch of Package rsyslog.12076
From a6264bf8f91975c9bc0fc602dcdc6881486f1579 Mon Sep 17 00:00:00 2001 From: Rich Megginson <rmeggins@redhat.com> Date: Tue, 13 Feb 2018 21:11:39 -0700 Subject: [PATCH] Kubernetes Metadata plugin - mmkubernetes This plugin is used to annotate records logged by Kubernetes containers. It will add the namespace uuid, pod uuid, pod and namespace labels and annotations, and other metadata associated with the pod and namespace. It will work with either log files in `/var/log/containers/*.log` or with journald entries with `CONTAINER_NAME` and `CONTAINER_ID_FULL`. For file logs, the filename must match this regex: /var/log/containers/([a-z0-9]([-a-z0-9]*[a-z0-9])?(\\.[a-z0-9]([-a-z0-9]*[a-z0-9])?)*)_([^_]+)_(.+)-([a-z0-9]{64})\\.log$ The first match is the pod name, the second is the container hash (not currently used), the third is the namespace name, the fourth is the container name, and the fifth is the container id. You can specify a different regex by using the module or action configuration parameter `filenameregex` but keep in mind that the field positions are hardcoded so the regex must have the same fields in the same order. For journald logs, there must be a field `CONTAINER_NAME` which matches this: ^[^_]+_([^\\._]+)(\\.([^_]+))?_([^_]+)_([^_]+)_[^_]+_[^_]+$ The first match is the container name, the second is the container hash (not currently used), the third is the pod name, and the fourth is the namespace name. The record must also have the field `CONTAINER_ID_FULL`. You can specify a different regex by using the module or action configuration parameter `containerregex` but keep in mind that the field positions are hardcoded so the regex must have the same fields in the same order. The Kubernetes metadata is added to the record in the top-level fields `kubernetes` and `docker`. See https://github.com/ViaQ/elasticsearch-templates/blob/master/namespaces/kubernetes.yml and https://github.com/ViaQ/elasticsearch-templates/blob/master/namespaces/docker.yml for more details. *Configuration* `kubernetesurl` - Required - URL of the Kubernetes API server e.g. `https://localhost:8443` `tls.cacert` - Required - full path and file name of file containing the CA cert of the Kubernetes API server cert issuer `tokenfile` - Required (or `token`) - the file containing the token to use to authenticate to the Kubernetes API server `token` - Required (or `tokenfile`) - the token to use to authenticate to the Kubernetes API server `annotation_match` - Optional - by default no pod or namespace annotations will be added to the records - this parameter is an array of patterns to match the keys of the `annotations` field to include in the `annotations` field or the `namespace_annotations` field. *Example* module(load="imfile" mode="inotify") module(load="mmkubernetes" kubernetesurl="https://localhost:8443" tls.cacert="/etc/rsyslog.d/mmk8s.ca.crt" tokenfile="/etc/rsyslog.d/mmk8s.token" annotation_match=["."]) template(name="tpl" type="list") { property(name="jsonmesg") constant(value="\n") } ruleset(name="k8s") { action(type="mmkubernetes") action(type="omfile" file="/var/log/k8s.log" template="tpl") } input(type="imfile" file="/var/log/containers/*.log" tag="kubernetes" addmetadata="on" ruleset="k8s") if ($!_SYSTEMD_UNIT == "docker.service") and (strlen($!CONTAINER_NAME) > 0) then { call k8s } *Notes* We use lognorm instead of regex to parse filenames and CONTAINER_NAME since it is faster than regex and this parsing is in the critical path. We cannot use ln_loadSamplesFromString with liblognorm 2.0.2, so disallow the filenamerules and containerrules parameters for older rsyslog versions with the older liblognorm. Due to a limitation in mmnormalize, we cannot directly parse a filename like this: something_something_this-is-a-container-name-092039840293.log That is, we cannot handle a container name with `-` in it. Instead, parse the entire name + id into container_name_and_id, then parse into separate container_name and container_id in the code. *Credits* This work is based on https://github.com/fabric8io/fluent-plugin-kubernetes_metadata_filter and has many of the same features. --- contrib/mmkubernetes/Makefile.am | 4 +- contrib/mmkubernetes/k8s_container_name.rulebase | 2 + contrib/mmkubernetes/k8s_filename.rulebase | 2 + contrib/mmkubernetes/mmkubernetes.c | 1058 +++++++++++++++++++--- contrib/mmkubernetes/sample.conf | 21 +- 5 files changed, 927 insertions(+), 160 deletions(-) create mode 100644 contrib/mmkubernetes/k8s_container_name.rulebase create mode 100644 contrib/mmkubernetes/k8s_filename.rulebase diff --git a/contrib/mmkubernetes/Makefile.am b/contrib/mmkubernetes/Makefile.am index 02af334d4..3dcc235a6 100644 --- a/contrib/mmkubernetes/Makefile.am +++ b/contrib/mmkubernetes/Makefile.am @@ -1,6 +1,6 @@ pkglib_LTLIBRARIES = mmkubernetes.la mmkubernetes_la_SOURCES = mmkubernetes.c -mmkubernetes_la_CPPFLAGS = $(RSRT_CFLAGS) $(PTHREADS_CFLAGS) $(CURL_CFLAGS) +mmkubernetes_la_CPPFLAGS = $(RSRT_CFLAGS) $(PTHREADS_CFLAGS) $(CURL_CFLAGS) $(LIBLOGNORM_CFLAGS) mmkubernetes_la_LDFLAGS = -module -avoid-version -mmkubernetes_la_LIBADD = $(CURL_LIBS) +mmkubernetes_la_LIBADD = $(CURL_LIBS) $(LIBLOGNORM_LIBS) diff --git a/contrib/mmkubernetes/k8s_container_name.rulebase b/contrib/mmkubernetes/k8s_container_name.rulebase new file mode 100644 index 000000000..35fbb317c --- /dev/null +++ b/contrib/mmkubernetes/k8s_container_name.rulebase @@ -0,0 +1,2 @@ +rule=:%k8s_prefix:char-to:_%_%container_name:char-to:.%.%container_hash:char-to:_%_%pod_name:char-to:_%_%namespace_name:char-to:_%_%not_used_1:char-to:_%_%not_used_2:rest% +rule=:%k8s_prefix:char-to:_%_%container_name:char-to:_%_%pod_name:char-to:_%_%namespace_name:char-to:_%_%not_used_1:char-to:_%_%not_used_2:rest% diff --git a/contrib/mmkubernetes/k8s_filename.rulebase b/contrib/mmkubernetes/k8s_filename.rulebase new file mode 100644 index 000000000..24c0d9138 --- /dev/null +++ b/contrib/mmkubernetes/k8s_filename.rulebase @@ -0,0 +1,2 @@ +rule=:/var/log/containers/%pod_name:char-to:.%.%container_hash:char-to:_%_%namespace_name:char-to:_%_%container_name_and_id:char-to:.%.log +rule=:/var/log/containers/%pod_name:char-to:_%_%namespace_name:char-to:_%_%container_name_and_id:char-to:.%.log diff --git a/contrib/mmkubernetes/mmkubernetes.c b/contrib/mmkubernetes/mmkubernetes.c index 0d5502db6..5012c54f6 100644 --- a/contrib/mmkubernetes/mmkubernetes.c +++ b/contrib/mmkubernetes/mmkubernetes.c @@ -27,17 +27,6 @@ * limitations under the License. */ -/* todo: - * - cache cleanup - * - SIGHUP handling - * - authentication - * - statistics generation - * - other missing configuration options - * - documentation - * - more robust error-handling + debugging information - * - batching, parallel queries, failover, ... - */ - /* needed for asprintf */ #ifndef _GNU_SOURCE # define _GNU_SOURCE @@ -52,7 +41,9 @@ #include <assert.h> #include <errno.h> #include <unistd.h> +#include <sys/stat.h> #include <libestr.h> +#include <liblognorm.h> #include <json.h> #include <curl/curl.h> #include <curl/easy.h> @@ -63,6 +54,7 @@ #include "errmsg.h" #include "regexp.h" #include "hashtable.h" +#include "srUtils.h" /* static data */ MODULE_TYPE_OUTPUT /* this is technically an output plugin */ @@ -72,9 +64,40 @@ DEF_OMOD_STATIC_DATA DEFobjCurrIf(errmsg) DEFobjCurrIf(regexp) -#define DFLT_FILENAME_REGEX "var\\.log\\.containers\\.([a-z0-9]([-a-z0-9]*[a-z0-9])?(\\.[a-z0-9]([-a-z0-9]*[a-z0-9])?)*)_([^_]+)_(.+)-([a-z0-9]{64})\\.log$" +#define HAVE_LOADSAMPLESFROMSTRING 1 +#if defined(NO_LOADSAMPLESFROMSTRING) +#undef HAVE_LOADSAMPLESFROMSTRING +#endif +/* original from fluentd plugin: + * 'var\.log\.containers\.(?<pod_name>[a-z0-9]([-a-z0-9]*[a-z0-9])?\ + * (\.[a-z0-9]([-a-z0-9]*[a-z0-9])?)*)_(?<namespace>[^_]+)_\ + * (?<container_name>.+)-(?<docker_id>[a-z0-9]{64})\.log$' + * this is for _tag_ match, not actual filename match - in_tail turns filename + * into a fluentd tag + */ +#define DFLT_FILENAME_LNRULES ":/var/log/containers/%pod_name:char-to:.%."\ + "%container_hash:char-to:_%_"\ + "%namespace_name:char-to:_%_%container_name:char-to:-%-%container_id:char-to:.%.log\n"\ + ":/var/log/containers/%pod_name:char-to:_%_"\ + "%namespace_name:char-to:_%_%container_name:char-to:-%-%container_id:char-to:.%.log" +#define DFLT_FILENAME_RULEBASE "/etc/rsyslog.d/k8s_filename.rulebase" +/* original from fluentd plugin: + * '^(?<name_prefix>[^_]+)_(?<container_name>[^\._]+)\ + * (\.(?<container_hash>[^_]+))?_(?<pod_name>[^_]+)_\ + * (?<namespace>[^_]+)_[^_]+_[^_]+$' + */ +#define DFLT_CONTAINER_LNRULES ":%k8s_prefix:char-to:_%_%container_name:char-to:.%."\ + "%container_hash:char-to:_%_%"\ + "%pod_name:char-to:_%_%namespace_name:char-to:_%_%not_used_1:char-to:_%_%not_used_2:rest%\n"\ + ":%k8s_prefix:char-to:_%_%container_name:char-to:_%_"\ + "%pod_name:char-to:_%_%namespace_name:char-to:_%_%not_used_1:char-to:_%_%not_used_2:rest%" +#define DFLT_CONTAINER_RULEBASE "/etc/rsyslog.d/k8s_container_name.rulebase" #define DFLT_SRCMD_PATH "$!metadata!filename" -#define DFLT_DSTMD_PATH "$!metadata" +#define DFLT_DSTMD_PATH "$!" +#define DFLT_DE_DOT 1 /* true */ +#define DFLT_DE_DOT_SEPARATOR "_" +#define DFLT_CONTAINER_NAME "$!CONTAINER_NAME" /* name of variable holding CONTAINER_NAME value */ +#define DFLT_CONTAINER_ID_FULL "$!CONTAINER_ID_FULL" /* name of variable holding CONTAINER_ID_FULL value */ static struct cache_s { const uchar *kbUrl; @@ -83,20 +106,53 @@ static struct cache_s { pthread_mutex_t *cacheMtx; } **caches; +typedef struct { + int nmemb; + uchar **patterns; + regex_t *regexps; +} annotation_match_t; + /* module configuration data */ struct modConfData_s { rsconf_t *pConf; /* our overall config object */ - uchar *kubernetesUrl; /**< where to place queries */ - uchar *srcMetadataPath; /**< where to get data for kubernetes queries */ - uchar *dstMetadataPath; /**< where to put metadata obtained from kubernetes */ + uchar *kubernetesUrl; /* scheme, host, port, and optional path prefix for Kubernetes API lookups */ + uchar *srcMetadataPath; /* where to get data for kubernetes queries */ + uchar *dstMetadataPath; /* where to put metadata obtained from kubernetes */ + uchar *caCertFile; /* File holding the CA cert (+optional chain) of CA that issued the Kubernetes server cert */ + sbool allowUnsignedCerts; /* For testing/debugging - do not check for CA certs (CURLOPT_SSL_VERIFYPEER FALSE) */ + uchar *token; /* The token value to use to authenticate to Kubernetes - takes precedence over tokenFile */ + uchar *tokenFile; /* The file whose contents is the token value to use to authenticate to Kubernetes */ + sbool de_dot; /* If true (default), convert '.' characters in labels & annotations to de_dot_separator */ + uchar *de_dot_separator; /* separator character (default '_') to use for de_dotting */ + size_t de_dot_separator_len; /* length of separator character */ + annotation_match_t annotation_match; /* annotation keys must match these to be included in record */ + char *fnRules; /* lognorm rules for container log filename match */ + uchar *fnRulebase; /* lognorm rulebase filename for container log filename match */ + char *contRules; /* lognorm rules for CONTAINER_NAME value match */ + uchar *contRulebase; /* lognorm rulebase filename for CONTAINER_NAME value match */ }; /* action (instance) configuration data */ typedef struct _instanceData { - uchar *kubernetesUrl; /**< where to place queries */ - uchar *srcMetadataPath; /**< where to get data for kubernetes queries */ - uchar *dstMetadataPath; /**< where to put metadata obtained from kubernetes */ - regex_t fnRegex; + uchar *kubernetesUrl; /* scheme, host, port, and optional path prefix for Kubernetes API lookups */ + msgPropDescr_t *srcMetadataDescr; /* where to get data for kubernetes queries */ + uchar *dstMetadataPath; /* where to put metadata obtained from kubernetes */ + uchar *caCertFile; /* File holding the CA cert (+optional chain) of CA that issued the Kubernetes server cert */ + sbool allowUnsignedCerts; /* For testing/debugging - do not check for CA certs (CURLOPT_SSL_VERIFYPEER FALSE) */ + uchar *token; /* The token value to use to authenticate to Kubernetes - takes precedence over tokenFile */ + uchar *tokenFile; /* The file whose contents is the token value to use to authenticate to Kubernetes */ + sbool de_dot; /* If true (default), convert '.' characters in labels & annotations to de_dot_separator */ + uchar *de_dot_separator; /* separator character (default '_') to use for de_dotting */ + size_t de_dot_separator_len; /* length of separator character */ + annotation_match_t annotation_match; /* annotation keys must match these to be included in record */ + char *fnRules; /* lognorm rules for container log filename match */ + uchar *fnRulebase; /* lognorm rulebase filename for container log filename match */ + ln_ctx fnCtxln; /**< context to be used for liblognorm */ + char *contRules; /* lognorm rules for CONTAINER_NAME value match */ + uchar *contRulebase; /* lognorm rulebase filename for CONTAINER_NAME value match */ + ln_ctx contCtxln; /**< context to be used for liblognorm */ + msgPropDescr_t *contNameDescr; /* CONTAINER_NAME field */ + msgPropDescr_t *contIdFullDescr; /* CONTAINER_ID_FULL field */ struct cache_s *cache; } instanceData; @@ -112,7 +168,21 @@ typedef struct wrkrInstanceData { static struct cnfparamdescr modpdescr[] = { { "kubernetesurl", eCmdHdlrString, 0 }, { "srcmetadatapath", eCmdHdlrString, 0 }, - { "dstmetadatapath", eCmdHdlrString, 0 } + { "dstmetadatapath", eCmdHdlrString, 0 }, + { "tls.cacert", eCmdHdlrString, 0 }, + { "allowunsignedcerts", eCmdHdlrBinary, 0 }, + { "token", eCmdHdlrString, 0 }, + { "tokenfile", eCmdHdlrString, 0 }, + { "annotation_match", eCmdHdlrArray, 0 }, + { "de_dot", eCmdHdlrBinary, 0 }, + { "de_dot_separator", eCmdHdlrString, 0 }, + { "filenamerulebase", eCmdHdlrString, 0 }, + { "containerrulebase", eCmdHdlrString, 0 } +#if HAVE_LOADSAMPLESFROMSTRING == 1 + , + { "filenamerules", eCmdHdlrArray, 0 }, + { "containerrules", eCmdHdlrArray, 0 } +#endif }; static struct cnfparamblk modpblk = { CNFPARAMBLK_VERSION, @@ -124,7 +194,21 @@ static struct cnfparamblk modpblk = { static struct cnfparamdescr actpdescr[] = { { "kubernetesurl", eCmdHdlrString, 0 }, { "srcmetadatapath", eCmdHdlrString, 0 }, - { "dstmetadatapath", eCmdHdlrString, 0 } + { "dstmetadatapath", eCmdHdlrString, 0 }, + { "tls.cacert", eCmdHdlrString, 0 }, + { "allowunsignedcerts", eCmdHdlrBinary, 0 }, + { "token", eCmdHdlrString, 0 }, + { "tokenfile", eCmdHdlrString, 0 }, + { "annotation_match", eCmdHdlrArray, 0 }, + { "de_dot", eCmdHdlrBinary, 0 }, + { "de_dot_separator", eCmdHdlrString, 0 }, + { "filenamerulebase", eCmdHdlrString, 0 }, + { "containerrulebase", eCmdHdlrString, 0 } +#if HAVE_LOADSAMPLESFROMSTRING == 1 + , + { "filenamerules", eCmdHdlrArray, 0 }, + { "containerrules", eCmdHdlrArray, 0 } +#endif }; static struct cnfparamblk actpblk = { CNFPARAMBLK_VERSION, @@ -135,6 +219,272 @@ static struct cnfparamblk actpblk = static modConfData_t *loadModConf = NULL; /* modConf ptr to use for the current load process */ static modConfData_t *runModConf = NULL; /* modConf ptr to use for the current exec process */ +static void free_annotationmatch(annotation_match_t *match) { + if (match) { + for(int ii = 0 ; ii < match->nmemb; ++ii) { + if (match->patterns) + free(match->patterns[ii]); + if (match->regexps) + regexp.regfree(&match->regexps[ii]); + } + free(match->patterns); + match->patterns = NULL; + free(match->regexps); + match->regexps = NULL; + match->nmemb = 0; + } +} + +static int init_annotationmatch(annotation_match_t *match, struct cnfarray *ar) { + DEFiRet; + + match->nmemb = ar->nmemb; + CHKmalloc(match->patterns = calloc(sizeof(uchar*), match->nmemb)); + CHKmalloc(match->regexps = calloc(sizeof(regex_t), match->nmemb)); + for(int jj = 0; jj < ar->nmemb; ++jj) { + int rexret = 0; + match->patterns[jj] = (uchar*)es_str2cstr(ar->arr[jj], NULL); + rexret = regexp.regcomp(&match->regexps[jj], + (char *)match->patterns[jj], REG_EXTENDED|REG_NOSUB); + if (0 != rexret) { + char errMsg[512]; + regexp.regerror(rexret, &match->regexps[jj], errMsg, sizeof(errMsg)); + iRet = RS_RET_CONFIG_ERROR; + errmsg.LogError(0, iRet, + "error: could not compile annotation_match string [%s]" + " into an extended regexp - %d: %s\n", + match->patterns[jj], rexret, errMsg); + break; + } + } +finalize_it: + if (iRet) + free_annotationmatch(match); + RETiRet; +} + +static int copy_annotationmatch(annotation_match_t *src, annotation_match_t *dest) { + DEFiRet; + + dest->nmemb = src->nmemb; + CHKmalloc(dest->patterns = malloc(sizeof(uchar*) * dest->nmemb)); + CHKmalloc(dest->regexps = calloc(sizeof(regex_t), dest->nmemb)); + for(int jj = 0 ; jj < src->nmemb ; ++jj) { + CHKmalloc(dest->patterns[jj] = (uchar*)strdup((char *)src->patterns[jj])); + /* assumes was already successfully compiled */ + regexp.regcomp(&dest->regexps[jj], (char *)dest->patterns[jj], REG_EXTENDED|REG_NOSUB); + } +finalize_it: + if (iRet) + free_annotationmatch(dest); + RETiRet; +} + +/* takes a hash of annotations and returns another json object hash containing only the + * keys that match - this logic is taken directly from fluent-plugin-kubernetes_metadata_filter + * except that we do not add the key multiple times to the object to be returned + */ +static struct json_object *match_annotations(annotation_match_t *match, + struct json_object *annotations) { + struct json_object *ret = NULL; + + for (int jj = 0; jj < match->nmemb; ++jj) { + struct json_object_iterator it = json_object_iter_begin(annotations); + struct json_object_iterator itEnd = json_object_iter_end(annotations); + for (;!json_object_iter_equal(&it, &itEnd); json_object_iter_next(&it)) { + const char *const key = json_object_iter_peek_name(&it); + if (!ret || !fjson_object_object_get_ex(ret, key, NULL)) { + if (!regexp.regexec(&match->regexps[jj], key, 0, NULL, 0)) { + if (!ret) { + ret = json_object_new_object(); + } + json_object_object_add(ret, key, + json_object_get(json_object_iter_peek_value(&it))); + } + } + } + } + return ret; +} + +/* This will take a hash of labels or annotations and will de_dot the keys. + * It will return a brand new hash. AFAICT, there is no safe way to + * iterate over the hash while modifying it in place. + */ +static struct json_object *de_dot_json_object(struct json_object *jobj, + const char *delim, size_t delim_len) { + struct json_object *ret = NULL; + struct json_object_iterator it = json_object_iter_begin(jobj); + struct json_object_iterator itEnd = json_object_iter_end(jobj); + es_str_t *new_es_key = NULL; + DEFiRet; + + ret = json_object_new_object(); + while (!json_object_iter_equal(&it, &itEnd)) { + const char *const key = json_object_iter_peek_name(&it); + const char *cc = strstr(key, "."); + if (NULL == cc) { + json_object_object_add(ret, key, + json_object_get(json_object_iter_peek_value(&it))); + } else { + char *new_key = NULL; + const char *prevcc = key; + new_es_key = es_newStrFromCStr(key, (es_size_t)(cc-prevcc)); + while (cc) { + if (es_addBuf(&new_es_key, (char *)delim, (es_size_t)delim_len)) + ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY); + cc += 1; /* one past . */ + prevcc = cc; /* beginning of next substring */ + if ((cc = strstr(prevcc, ".")) || (cc = strchr(prevcc, '\0'))) { + if (es_addBuf(&new_es_key, (char *)prevcc, (es_size_t)(cc-prevcc))) + ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY); + if (!*cc) + cc = NULL; /* EOS - done */ + } + } + new_key = es_str2cstr(new_es_key, NULL); + es_deleteStr(new_es_key); + new_es_key = NULL; + json_object_object_add(ret, new_key, + json_object_get(json_object_iter_peek_value(&it))); + free(new_key); + } + json_object_iter_next(&it); + } +finalize_it: + if (iRet != RS_RET_OK) { + json_object_put(ret); + ret = NULL; + } + if (new_es_key) + es_deleteStr(new_es_key); + return ret; +} + +/* given a "metadata" object field, do + * - make sure "annotations" field has only the matching keys + * - de_dot the "labels" and "annotations" fields keys + * This modifies the jMetadata object in place + */ +static void parse_labels_annotations(struct json_object *jMetadata, + annotation_match_t *match, sbool de_dot, + const char *delim, size_t delim_len) { + struct json_object *jo = NULL; + + if (fjson_object_object_get_ex(jMetadata, "annotations", &jo)) { + if ((jo = match_annotations(match, jo))) + json_object_object_add(jMetadata, "annotations", jo); + else + json_object_object_del(jMetadata, "annotations"); + } + /* dedot labels and annotations */ + if (de_dot) { + struct json_object *jo2 = NULL; + if (fjson_object_object_get_ex(jMetadata, "annotations", &jo)) { + if ((jo2 = de_dot_json_object(jo, delim, delim_len))) { + json_object_object_add(jMetadata, "annotations", jo2); + } + } + if (fjson_object_object_get_ex(jMetadata, "labels", &jo)) { + if ((jo2 = de_dot_json_object(jo, delim, delim_len))) { + json_object_object_add(jMetadata, "labels", jo2); + } + } + } +} + +#if HAVE_LOADSAMPLESFROMSTRING == 1 +static int array_to_rules(struct cnfarray *ar, char **rules) { + DEFiRet; + es_str_t *tmpstr = NULL; + es_size_t size = 0; + + if (rules == NULL) + FINALIZE; + *rules = NULL; + if (!ar->nmemb) + FINALIZE; + for (int jj = 0; jj < ar->nmemb; jj++) + size += es_strlen(ar->arr[jj]); + if (!size) + FINALIZE; + CHKmalloc(tmpstr = es_newStr(size)); + CHKiRet((es_addStr(&tmpstr, ar->arr[0]))); + CHKiRet((es_addBufConstcstr(&tmpstr, "\n"))); + for(int jj=1; jj < ar->nmemb; ++jj) { + CHKiRet((es_addStr(&tmpstr, ar->arr[jj]))); + CHKiRet((es_addBufConstcstr(&tmpstr, "\n"))); + } + CHKiRet((es_addBufConstcstr(&tmpstr, "\0"))); + CHKmalloc(*rules = es_str2cstr(tmpstr, NULL)); +finalize_it: + if (tmpstr) { + es_deleteStr(tmpstr); + } + if (iRet != RS_RET_OK) { + free(*rules); + *rules = NULL; + } + RETiRet; +} +#endif + +/* callback for liblognorm error messages */ +static void +errCallBack(void __attribute__((unused)) *cookie, const char *msg, + size_t __attribute__((unused)) lenMsg) +{ + errmsg.LogError(0, RS_RET_ERR_LIBLOGNORM, "liblognorm error: %s", msg); +} + +static rsRetVal +set_lnctx(ln_ctx *ctxln, char *instRules, uchar *instRulebase, char *modRules, uchar *modRulebase) +{ + DEFiRet; + if (ctxln == NULL) + FINALIZE; + CHKmalloc(*ctxln = ln_initCtx()); + ln_setErrMsgCB(*ctxln, errCallBack, NULL); + if(instRules) { +#if HAVE_LOADSAMPLESFROMSTRING == 1 + if(ln_loadSamplesFromString(*ctxln, instRules) !=0) { + errmsg.LogError(0, RS_RET_NO_RULEBASE, "error: normalization rules '%s' " + "could not be loaded", instRules); + ABORT_FINALIZE(RS_RET_ERR_LIBLOGNORM_SAMPDB_LOAD); + } +#else + (void)instRules; +#endif + } else if(instRulebase) { + if(ln_loadSamples(*ctxln, (char*) instRulebase) != 0) { + errmsg.LogError(0, RS_RET_NO_RULEBASE, "error: normalization rulebase '%s' " + "could not be loaded", instRulebase); + ABORT_FINALIZE(RS_RET_ERR_LIBLOGNORM_SAMPDB_LOAD); + } + } else if(modRules) { +#if HAVE_LOADSAMPLESFROMSTRING == 1 + if(ln_loadSamplesFromString(*ctxln, modRules) !=0) { + errmsg.LogError(0, RS_RET_NO_RULEBASE, "error: normalization rules '%s' " + "could not be loaded", modRules); + ABORT_FINALIZE(RS_RET_ERR_LIBLOGNORM_SAMPDB_LOAD); + } +#else + (void)modRules; +#endif + } else if(modRulebase) { + if(ln_loadSamples(*ctxln, (char*) modRulebase) != 0) { + errmsg.LogError(0, RS_RET_NO_RULEBASE, "error: normalization rulebase '%s' " + "could not be loaded", modRulebase); + ABORT_FINALIZE(RS_RET_ERR_LIBLOGNORM_SAMPDB_LOAD); + } + } +finalize_it: + if (iRet != RS_RET_OK){ + ln_exitCtx(*ctxln); + *ctxln = NULL; + } + RETiRet; +} BEGINbeginCnfLoad CODESTARTbeginCnfLoad @@ -146,6 +496,8 @@ ENDbeginCnfLoad BEGINsetModCnf struct cnfparamvals *pvals = NULL; int i; + FILE *fp; + int ret; CODESTARTsetModCnf pvals = nvlstGetParams(lst, &modpblk, NULL); if(pvals == NULL) { @@ -159,6 +511,7 @@ CODESTARTsetModCnf cnfparamsPrint(&modpblk, pvals); } + loadModConf->de_dot = DFLT_DE_DOT; for(i = 0 ; i < modpblk.nParams ; ++i) { if(!pvals[i].bUsed) { continue; @@ -173,6 +526,90 @@ CODESTARTsetModCnf free(loadModConf->dstMetadataPath); loadModConf->dstMetadataPath = (uchar *) es_str2cstr(pvals[i].val.d.estr, NULL); /* todo: sanitize the path */ + } else if(!strcmp(modpblk.descr[i].name, "tls.cacert")) { + free(loadModConf->caCertFile); + loadModConf->caCertFile = (uchar *) es_str2cstr(pvals[i].val.d.estr, NULL); + fp = fopen((const char*)loadModConf->caCertFile, "r"); + if(fp == NULL) { + char errStr[1024]; + rs_strerror_r(errno, errStr, sizeof(errStr)); + iRet = RS_RET_NO_FILE_ACCESS; + errmsg.LogError(0, iRet, + "error: certificate file %s couldn't be accessed: %s\n", + loadModConf->caCertFile, errStr); + ABORT_FINALIZE(iRet); + } else { + fclose(fp); + } + } else if(!strcmp(modpblk.descr[i].name, "allowunsignedcerts")) { + loadModConf->allowUnsignedCerts = pvals[i].val.d.n; + } else if(!strcmp(modpblk.descr[i].name, "token")) { + free(loadModConf->token); + loadModConf->token = (uchar *) es_str2cstr(pvals[i].val.d.estr, NULL); + } else if(!strcmp(modpblk.descr[i].name, "tokenfile")) { + free(loadModConf->tokenFile); + loadModConf->tokenFile = (uchar *) es_str2cstr(pvals[i].val.d.estr, NULL); + fp = fopen((const char*)loadModConf->tokenFile, "r"); + if(fp == NULL) { + char errStr[1024]; + rs_strerror_r(errno, errStr, sizeof(errStr)); + iRet = RS_RET_NO_FILE_ACCESS; + errmsg.LogError(0, iRet, + "error: token file %s couldn't be accessed: %s\n", + loadModConf->tokenFile, errStr); + ABORT_FINALIZE(iRet); + } else { + fclose(fp); + } + } else if(!strcmp(modpblk.descr[i].name, "annotation_match")) { + free_annotationmatch(&loadModConf->annotation_match); + if ((ret = init_annotationmatch(&loadModConf->annotation_match, pvals[i].val.d.ar))) + ABORT_FINALIZE(ret); + } else if(!strcmp(modpblk.descr[i].name, "de_dot")) { + loadModConf->de_dot = pvals[i].val.d.n; + } else if(!strcmp(modpblk.descr[i].name, "de_dot_separator")) { + free(loadModConf->de_dot_separator); + loadModConf->de_dot_separator = (uchar *) es_str2cstr(pvals[i].val.d.estr, NULL); +#if HAVE_LOADSAMPLESFROMSTRING == 1 + } else if(!strcmp(modpblk.descr[i].name, "filenamerules")) { + free(loadModConf->fnRules); + CHKiRet((array_to_rules(pvals[i].val.d.ar, &loadModConf->fnRules))); +#endif + } else if(!strcmp(modpblk.descr[i].name, "filenamerulebase")) { + free(loadModConf->fnRulebase); + loadModConf->fnRulebase = (uchar *) es_str2cstr(pvals[i].val.d.estr, NULL); + fp = fopen((const char*)loadModConf->fnRulebase, "r"); + if(fp == NULL) { + char errStr[1024]; + rs_strerror_r(errno, errStr, sizeof(errStr)); + iRet = RS_RET_NO_FILE_ACCESS; + errmsg.LogError(0, iRet, + "error: filenamerulebase file %s couldn't be accessed: %s\n", + loadModConf->fnRulebase, errStr); + ABORT_FINALIZE(iRet); + } else { + fclose(fp); + } +#if HAVE_LOADSAMPLESFROMSTRING == 1 + } else if(!strcmp(modpblk.descr[i].name, "containerrules")) { + free(loadModConf->contRules); + CHKiRet((array_to_rules(pvals[i].val.d.ar, &loadModConf->contRules))); +#endif + } else if(!strcmp(modpblk.descr[i].name, "containerrulebase")) { + free(loadModConf->contRulebase); + loadModConf->contRulebase = (uchar *) es_str2cstr(pvals[i].val.d.estr, NULL); + fp = fopen((const char*)loadModConf->contRulebase, "r"); + if(fp == NULL) { + char errStr[1024]; + rs_strerror_r(errno, errStr, sizeof(errStr)); + iRet = RS_RET_NO_FILE_ACCESS; + errmsg.LogError(0, iRet, + "error: containerrulebase file %s couldn't be accessed: %s\n", + loadModConf->contRulebase, errStr); + ABORT_FINALIZE(iRet); + } else { + fclose(fp); + } } else { dbgprintf("mmkubernetes: program error, non-handled " "param '%s' in module() block\n", modpblk.descr[i].name); @@ -180,12 +617,39 @@ CODESTARTsetModCnf } } +#if HAVE_LOADSAMPLESFROMSTRING == 1 + if (loadModConf->fnRules && loadModConf->fnRulebase) { + errmsg.LogError(0, RS_RET_CONFIG_ERROR, + "mmkubernetes: only 1 of filenamerules or filenamerulebase may be used"); + ABORT_FINALIZE(RS_RET_CONFIG_ERROR); + } + if (loadModConf->contRules && loadModConf->contRulebase) { + errmsg.LogError(0, RS_RET_CONFIG_ERROR, + "mmkubernetes: only 1 of containerrules or containerrulebase may be used"); + ABORT_FINALIZE(RS_RET_CONFIG_ERROR); + } +#endif + /* set defaults */ if(loadModConf->srcMetadataPath == NULL) loadModConf->srcMetadataPath = (uchar *) strdup(DFLT_SRCMD_PATH); if(loadModConf->dstMetadataPath == NULL) loadModConf->dstMetadataPath = (uchar *) strdup(DFLT_DSTMD_PATH); - + if(loadModConf->de_dot_separator == NULL) + loadModConf->de_dot_separator = (uchar *) strdup(DFLT_DE_DOT_SEPARATOR); + if(loadModConf->de_dot_separator) + loadModConf->de_dot_separator_len = strlen((const char *)loadModConf->de_dot_separator); +#if HAVE_LOADSAMPLESFROMSTRING == 1 + if (loadModConf->fnRules == NULL && loadModConf->fnRulebase == NULL) + loadModConf->fnRules = strdup(DFLT_FILENAME_LNRULES); + if (loadModConf->contRules == NULL && loadModConf->contRulebase == NULL) + loadModConf->contRules = strdup(DFLT_CONTAINER_LNRULES); +#else + if (loadModConf->fnRulebase == NULL) + loadModConf->fnRulebase = (uchar *)strdup(DFLT_FILENAME_RULEBASE); + if (loadModConf->contRulebase == NULL) + loadModConf->contRulebase = (uchar *)strdup(DFLT_CONTAINER_RULEBASE); +#endif caches = calloc(1, sizeof(struct cache_s *)); finalize_it: @@ -202,38 +666,99 @@ ENDcreateInstance BEGINfreeInstance CODESTARTfreeInstance free(pData->kubernetesUrl); - free(pData->srcMetadataPath); + msgPropDescrDestruct(pData->srcMetadataDescr); + free(pData->srcMetadataDescr); free(pData->dstMetadataPath); - regexp.regfree(&pData->fnRegex); + free(pData->caCertFile); + free(pData->token); + free(pData->tokenFile); + free(pData->fnRules); + free(pData->fnRulebase); + ln_exitCtx(pData->fnCtxln); + free(pData->contRules); + free(pData->contRulebase); + ln_exitCtx(pData->contCtxln); + free_annotationmatch(&pData->annotation_match); + free(pData->de_dot_separator); + msgPropDescrDestruct(pData->contNameDescr); + free(pData->contNameDescr); + msgPropDescrDestruct(pData->contIdFullDescr); + free(pData->contIdFullDescr); ENDfreeInstance -size_t curlCB(char *data, size_t size, size_t nmemb, void *usrptr) +static size_t curlCB(char *data, size_t size, size_t nmemb, void *usrptr) { + DEFiRet; wrkrInstanceData_t *pWrkrData = (wrkrInstanceData_t *) usrptr; char * buf; size_t newlen; newlen = pWrkrData->curlRplyLen + size * nmemb; - buf = realloc(pWrkrData->curlRply, newlen); + CHKmalloc(buf = realloc(pWrkrData->curlRply, newlen)); memcpy(buf + pWrkrData->curlRplyLen, data, size * nmemb); pWrkrData->curlRply = buf; pWrkrData->curlRplyLen = newlen; +finalize_it: + if (iRet != RS_RET_OK) { + return 0; + } return size * nmemb; } BEGINcreateWrkrInstance CODESTARTcreateWrkrInstance CURL *ctx; - struct curl_slist *hdr; - - hdr = curl_slist_append(NULL, "Content-Type: text/json; charset=utf-8"); + struct curl_slist *hdr = NULL; + char *tokenHdr = NULL; + FILE *fp = NULL; + char *token = NULL; + + hdr = curl_slist_append(hdr, "Content-Type: text/json; charset=utf-8"); + if (pWrkrData->pData->token) { + if ((-1 == asprintf(&tokenHdr, "Authorization: Bearer %s", pWrkrData->pData->token)) || + (!tokenHdr)) { + ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY); + } + } else if (pWrkrData->pData->tokenFile) { + struct stat statbuf; + fp = fopen((const char*)pWrkrData->pData->tokenFile, "r"); + if (fp && !fstat(fileno(fp), &statbuf)) { + size_t bytesread; + CHKmalloc(token = malloc((statbuf.st_size+1)*sizeof(char))); + if (0 < (bytesread = fread(token, sizeof(char), statbuf.st_size, fp))) { + token[bytesread] = '\0'; + if ((-1 == asprintf(&tokenHdr, "Authorization: Bearer %s", token)) || + (!tokenHdr)) { + ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY); + } + } + free(token); + token = NULL; + } + fclose(fp); + fp = NULL; + } + if (tokenHdr) { + hdr = curl_slist_append(hdr, tokenHdr); + free(tokenHdr); + } pWrkrData->curlHdr = hdr; ctx = curl_easy_init(); curl_easy_setopt(ctx, CURLOPT_HTTPHEADER, hdr); curl_easy_setopt(ctx, CURLOPT_WRITEFUNCTION, curlCB); curl_easy_setopt(ctx, CURLOPT_WRITEDATA, pWrkrData); + if(pWrkrData->pData->caCertFile) + curl_easy_setopt(ctx, CURLOPT_CAINFO, pWrkrData->pData->caCertFile); + if(pWrkrData->pData->allowUnsignedCerts) + curl_easy_setopt(ctx, CURLOPT_SSL_VERIFYPEER, 0); + pWrkrData->curlCtx = ctx; +finalize_it: + free(token); + if (fp) { + fclose(fp); + } ENDcreateWrkrInstance @@ -248,7 +773,9 @@ static struct cache_s *cacheNew(const uchar *url) { struct cache_s *cache; - cache = calloc(1, sizeof(struct cache_s)); + if (NULL == (cache = calloc(1, sizeof(struct cache_s)))) { + goto finalize_it; + } cache->kbUrl = url; cache->mdHt = create_hashtable(100, hash_from_string, key_equals_string, (void (*)(void *)) json_object_put); @@ -257,6 +784,7 @@ static struct cache_s *cacheNew(const uchar *url) cache->cacheMtx = malloc(sizeof(pthread_mutex_t)); pthread_mutex_init(cache->cacheMtx, NULL); +finalize_it: return cache; } @@ -273,7 +801,10 @@ static void cacheFree(struct cache_s *cache) BEGINnewActInst struct cnfparamvals *pvals = NULL; - int i, ret; + int i; + FILE *fp; + char *rxstr = NULL; + char *srcMetadataPath = NULL; CODESTARTnewActInst DBGPRINTF("newActInst (mmkubernetes)\n"); @@ -293,6 +824,8 @@ CODESTARTnewActInst CHKiRet(OMSRsetEntry(*ppOMSR, 0, NULL, OMSR_TPL_AS_MSG)); CHKiRet(createInstance(&pData)); + pData->de_dot = loadModConf->de_dot; + pData->allowUnsignedCerts = loadModConf->allowUnsignedCerts; for(i = 0 ; i < actpblk.nParams ; ++i) { if(!pvals[i].bUsed) { continue; @@ -300,13 +833,101 @@ CODESTARTnewActInst free(pData->kubernetesUrl); pData->kubernetesUrl = (uchar *) es_str2cstr(pvals[i].val.d.estr, NULL); } else if(!strcmp(actpblk.descr[i].name, "srcmetadatapath")) { - free(pData->srcMetadataPath); - pData->srcMetadataPath = (uchar *) es_str2cstr(pvals[i].val.d.estr, NULL); + msgPropDescrDestruct(pData->srcMetadataDescr); + free(pData->srcMetadataDescr); + CHKmalloc(pData->srcMetadataDescr = MALLOC(sizeof(msgPropDescr_t))); + srcMetadataPath = es_str2cstr(pvals[i].val.d.estr, NULL); + CHKiRet(msgPropDescrFill(pData->srcMetadataDescr, (uchar *)srcMetadataPath, + strlen(srcMetadataPath))); /* todo: sanitize the path */ } else if(!strcmp(actpblk.descr[i].name, "dstmetadatapath")) { free(pData->dstMetadataPath); pData->dstMetadataPath = (uchar *) es_str2cstr(pvals[i].val.d.estr, NULL); /* todo: sanitize the path */ + } else if(!strcmp(actpblk.descr[i].name, "tls.cacert")) { + free(pData->caCertFile); + pData->caCertFile = (uchar *) es_str2cstr(pvals[i].val.d.estr, NULL); + fp = fopen((const char*)pData->caCertFile, "r"); + if(fp == NULL) { + char errStr[1024]; + rs_strerror_r(errno, errStr, sizeof(errStr)); + iRet = RS_RET_NO_FILE_ACCESS; + errmsg.LogError(0, iRet, + "error: certificate file %s couldn't be accessed: %s\n", + pData->caCertFile, errStr); + ABORT_FINALIZE(iRet); + } else { + fclose(fp); + } + } else if(!strcmp(actpblk.descr[i].name, "allowunsignedcerts")) { + pData->allowUnsignedCerts = pvals[i].val.d.n; + } else if(!strcmp(actpblk.descr[i].name, "token")) { + free(pData->token); + pData->token = (uchar *) es_str2cstr(pvals[i].val.d.estr, NULL); + } else if(!strcmp(actpblk.descr[i].name, "tokenfile")) { + free(pData->tokenFile); + pData->tokenFile = (uchar *) es_str2cstr(pvals[i].val.d.estr, NULL); + fp = fopen((const char*)pData->tokenFile, "r"); + if(fp == NULL) { + char errStr[1024]; + rs_strerror_r(errno, errStr, sizeof(errStr)); + iRet = RS_RET_NO_FILE_ACCESS; + errmsg.LogError(0, iRet, + "error: token file %s couldn't be accessed: %s\n", + pData->tokenFile, errStr); + ABORT_FINALIZE(iRet); + } else { + fclose(fp); + } + } else if(!strcmp(actpblk.descr[i].name, "annotation_match")) { + free_annotationmatch(&pData->annotation_match); + if (RS_RET_OK != (iRet = init_annotationmatch(&pData->annotation_match, pvals[i].val.d.ar))) + ABORT_FINALIZE(iRet); + } else if(!strcmp(actpblk.descr[i].name, "de_dot")) { + pData->de_dot = pvals[i].val.d.n; + } else if(!strcmp(actpblk.descr[i].name, "de_dot_separator")) { + free(pData->de_dot_separator); + pData->de_dot_separator = (uchar *) es_str2cstr(pvals[i].val.d.estr, NULL); +#if HAVE_LOADSAMPLESFROMSTRING == 1 + } else if(!strcmp(modpblk.descr[i].name, "filenamerules")) { + free(pData->fnRules); + CHKiRet((array_to_rules(pvals[i].val.d.ar, &pData->fnRules))); +#endif + } else if(!strcmp(modpblk.descr[i].name, "filenamerulebase")) { + free(pData->fnRulebase); + pData->fnRulebase = (uchar *) es_str2cstr(pvals[i].val.d.estr, NULL); + fp = fopen((const char*)pData->fnRulebase, "r"); + if(fp == NULL) { + char errStr[1024]; + rs_strerror_r(errno, errStr, sizeof(errStr)); + iRet = RS_RET_NO_FILE_ACCESS; + errmsg.LogError(0, iRet, + "error: filenamerulebase file %s couldn't be accessed: %s\n", + pData->fnRulebase, errStr); + ABORT_FINALIZE(iRet); + } else { + fclose(fp); + } +#if HAVE_LOADSAMPLESFROMSTRING == 1 + } else if(!strcmp(modpblk.descr[i].name, "containerrules")) { + free(pData->contRules); + CHKiRet((array_to_rules(pvals[i].val.d.ar, &pData->contRules))); +#endif + } else if(!strcmp(modpblk.descr[i].name, "containerrulebase")) { + free(pData->contRulebase); + pData->contRulebase = (uchar *) es_str2cstr(pvals[i].val.d.estr, NULL); + fp = fopen((const char*)pData->contRulebase, "r"); + if(fp == NULL) { + char errStr[1024]; + rs_strerror_r(errno, errStr, sizeof(errStr)); + iRet = RS_RET_NO_FILE_ACCESS; + errmsg.LogError(0, iRet, + "error: containerrulebase file %s couldn't be accessed: %s\n", + pData->contRulebase, errStr); + ABORT_FINALIZE(iRet); + } else { + fclose(fp); + } } else { dbgprintf("mmkubernetes: program error, non-handled " "param '%s' in action() block\n", actpblk.descr[i].name); @@ -314,22 +935,55 @@ CODESTARTnewActInst } } +#if HAVE_LOADSAMPLESFROMSTRING == 1 + if (pData->fnRules && pData->fnRulebase) { + errmsg.LogError(0, RS_RET_CONFIG_ERROR, + "mmkubernetes: only 1 of filenamerules or filenamerulebase may be used"); + ABORT_FINALIZE(RS_RET_CONFIG_ERROR); + } + if (pData->contRules && pData->contRulebase) { + errmsg.LogError(0, RS_RET_CONFIG_ERROR, + "mmkubernetes: only 1 of containerrules or containerrulebase may be used"); + ABORT_FINALIZE(RS_RET_CONFIG_ERROR); + } +#endif + CHKiRet(set_lnctx(&pData->fnCtxln, pData->fnRules, pData->fnRulebase, + loadModConf->fnRules, loadModConf->fnRulebase)); + CHKiRet(set_lnctx(&pData->contCtxln, pData->contRules, pData->contRulebase, + loadModConf->contRules, loadModConf->contRulebase)); + if(pData->kubernetesUrl == NULL) { if(loadModConf->kubernetesUrl == NULL) ABORT_FINALIZE(RS_RET_CONFIG_ERROR); pData->kubernetesUrl = (uchar *) strdup((char *) loadModConf->kubernetesUrl); } - if(pData->srcMetadataPath == NULL) - pData->srcMetadataPath = (uchar *) strdup((char *) loadModConf->srcMetadataPath); + if(pData->srcMetadataDescr == NULL) { + CHKmalloc(pData->srcMetadataDescr = MALLOC(sizeof(msgPropDescr_t))); + CHKiRet(msgPropDescrFill(pData->srcMetadataDescr, loadModConf->srcMetadataPath, + strlen((char *)loadModConf->srcMetadataPath))); + } if(pData->dstMetadataPath == NULL) pData->dstMetadataPath = (uchar *) strdup((char *) loadModConf->dstMetadataPath); - - /* todo: make file regexp configurable */ - ret = regexp.regcomp(&pData->fnRegex, DFLT_FILENAME_REGEX, REG_EXTENDED); - if(ret) { - dbgprintf("mmkubernetes: regexp compilation failed with code %d.\n", ret); - ABORT_FINALIZE(RS_RET_ERR); - } + if(pData->caCertFile == NULL && loadModConf->caCertFile) + pData->caCertFile = (uchar *) strdup((char *) loadModConf->caCertFile); + if(pData->token == NULL && loadModConf->token) + pData->token = (uchar *) strdup((char *) loadModConf->token); + if(pData->tokenFile == NULL && loadModConf->tokenFile) + pData->tokenFile = (uchar *) strdup((char *) loadModConf->tokenFile); + if(pData->de_dot_separator == NULL && loadModConf->de_dot_separator) + pData->de_dot_separator = (uchar *) strdup((char *) loadModConf->de_dot_separator); + if((pData->annotation_match.nmemb == 0) && (loadModConf->annotation_match.nmemb > 0)) + copy_annotationmatch(&loadModConf->annotation_match, &pData->annotation_match); + + if(pData->de_dot_separator) + pData->de_dot_separator_len = strlen((const char *)pData->de_dot_separator); + + CHKmalloc(pData->contNameDescr = MALLOC(sizeof(msgPropDescr_t))); + CHKiRet(msgPropDescrFill(pData->contNameDescr, (uchar*) DFLT_CONTAINER_NAME, + strlen(DFLT_CONTAINER_NAME))); + CHKmalloc(pData->contIdFullDescr = MALLOC(sizeof(msgPropDescr_t))); + CHKiRet(msgPropDescrFill(pData->contIdFullDescr, (uchar*) DFLT_CONTAINER_ID_FULL, + strlen(DFLT_CONTAINER_NAME))); /* get the cache for this url */ for(i = 0; caches[i] != NULL; i++) { @@ -341,13 +995,15 @@ CODESTARTnewActInst } else { pData->cache = cacheNew(pData->kubernetesUrl); - caches = realloc(caches, (i + 2) * sizeof(struct cache_s *)); + CHKmalloc(caches = realloc(caches, (i + 2) * sizeof(struct cache_s *))); caches[i] = pData->cache; caches[i + 1] = NULL; } CODE_STD_FINALIZERnewActInst if(pvals != NULL) cnfparamvalsDestruct(pvals, &actpblk); + free(rxstr); + free(srcMetadataPath); ENDnewActInst @@ -388,6 +1044,15 @@ CODESTARTfreeCnf free(pModConf->kubernetesUrl); free(pModConf->srcMetadataPath); free(pModConf->dstMetadataPath); + free(pModConf->caCertFile); + free(pModConf->token); + free(pModConf->tokenFile); + free(pModConf->de_dot_separator); + free(pModConf->fnRules); + free(pModConf->fnRulebase); + free(pModConf->contRules); + free(pModConf->contRulebase); + free_annotationmatch(&pModConf->annotation_match); for(i = 0; caches[i] != NULL; i++) cacheFree(caches[i]); free(caches); @@ -398,8 +1063,20 @@ BEGINdbgPrintInstInfo CODESTARTdbgPrintInstInfo dbgprintf("mmkubernetes\n"); dbgprintf("\tkubernetesUrl='%s'\n", pData->kubernetesUrl); - dbgprintf("\tsrcMetadataPath='%s'\n", pData->srcMetadataPath); + dbgprintf("\tsrcMetadataPath='%s'\n", pData->srcMetadataDescr->name); dbgprintf("\tdstMetadataPath='%s'\n", pData->dstMetadataPath); + dbgprintf("\ttls.cacert='%s'\n", pData->caCertFile); + dbgprintf("\tallowUnsignedCerts='%d'\n", pData->allowUnsignedCerts); + dbgprintf("\ttoken='%s'\n", pData->token); + dbgprintf("\ttokenFile='%s'\n", pData->tokenFile); + dbgprintf("\tde_dot='%d'\n", pData->de_dot); + dbgprintf("\tde_dot_separator='%s'\n", pData->de_dot_separator); + dbgprintf("\tfilenamerulebase='%s'\n", pData->fnRulebase); + dbgprintf("\tcontainerrulebase='%s'\n", pData->contRulebase); +#if HAVE_LOADSAMPLESFROMSTRING == 1 + dbgprintf("\tfilenamerules='%s'\n", pData->fnRules); + dbgprintf("\tcontainerrules='%s'\n", pData->contRules); +#endif ENDdbgPrintInstInfo @@ -407,62 +1084,84 @@ BEGINtryResume CODESTARTtryResume ENDtryResume - static rsRetVal -extractMsgMetadata(smsg_t *pMsg, uchar *propName, regex_t *fnRegex, - char **podName, char **ns, char **contName, char **dockerID) +extractMsgMetadata(smsg_t *pMsg, instanceData *pData, struct json_object **json) { DEFiRet; - msgPropDescr_t prop; - char *filename, *p; - rs_size_t fnLen; - unsigned short freeFn = 0; - size_t nmatch = 8, len; - regmatch_t pmatch[nmatch]; + uchar *filename = NULL, *container_name = NULL, *container_id_full = NULL; + rs_size_t fnLen, container_name_len, container_id_full_len; + unsigned short freeFn = 0, free_container_name = 0, free_container_id_full = 0; + int lnret; + struct json_object *cnid = NULL; + + if (!json) + FINALIZE; + *json = NULL; + /* extract metadata from the CONTAINER_NAME field and see if CONTAINER_ID_FULL is present */ + container_name = MsgGetProp(pMsg, NULL, pData->contNameDescr, + &container_name_len, &free_container_name, NULL); + container_id_full = MsgGetProp( + pMsg, NULL, pData->contIdFullDescr, &container_id_full_len, &free_container_id_full, NULL); + + if (container_name && container_id_full && container_name_len && container_id_full_len) { + dbgprintf("mmkubernetes: CONTAINER_NAME: '%s' CONTAINER_ID_FULL: '%s'.\n", + container_name, container_id_full); + if ((lnret = ln_normalize(pData->contCtxln, (char*)container_name, + container_name_len, json))) { + ABORT_FINALIZE(RS_RET_ERR); + } + /* if we have fields for pod name, namespace name, container name, + * and container id, we are good to go */ + if (fjson_object_object_get_ex(*json, "pod_name", NULL) && + fjson_object_object_get_ex(*json, "namespace_name", NULL) && + fjson_object_object_get_ex(*json, "container_name", NULL)) { + /* add field for container id */ + json_object_object_add(*json, "container_id", + json_object_new_string_len((const char *)container_id_full, + container_id_full_len)); + ABORT_FINALIZE(RS_RET_OK); + } + } /* extract metadata from the file name */ - msgPropDescrFill(&prop, propName, strlen((char *) propName)); - filename = (char *) MsgGetProp(pMsg, NULL, &prop, &fnLen, &freeFn, NULL); + filename = MsgGetProp(pMsg, NULL, pData->srcMetadataDescr, &fnLen, &freeFn, NULL); dbgprintf("mmkubernetes: filename: '%s'.\n", filename); - msgPropDescrDestruct(&prop); if(filename == NULL) ABORT_FINALIZE(RS_RET_NOT_FOUND); - if(REG_NOMATCH == regexp.regexec(fnRegex, filename, nmatch, pmatch, 0)) - ABORT_FINALIZE(RS_RET_NOT_FOUND); - - if(pmatch[1].rm_so != -1) { - len = pmatch[1].rm_eo - pmatch[1].rm_so; - p = malloc(len + 1); - memcpy(p, filename + pmatch[1].rm_so, len); - p[len] = '\0'; - *podName = p; - } - if(pmatch[5].rm_so != -1) { - len = pmatch[5].rm_eo - pmatch[5].rm_so; - p = malloc(len + 1); - memcpy(p, filename + pmatch[5].rm_so, len); - p[len] = '\0'; - *ns = p; - } - if(pmatch[6].rm_so != -1) { - len = pmatch[6].rm_eo - pmatch[6].rm_so; - p = malloc(len + 1); - memcpy(p, filename + pmatch[6].rm_so, len); - p[len] = '\0'; - *contName = p; - } - if(pmatch[7].rm_so != -1) { - len = pmatch[7].rm_eo - pmatch[7].rm_so; - p = malloc(len + 1); - memcpy(p, filename + pmatch[7].rm_so, len); - p[len] = '\0'; - *dockerID = p; + if ((lnret = ln_normalize(pData->fnCtxln, (char*)filename, fnLen, json))) { + ABORT_FINALIZE(RS_RET_ERR); } - + /* if we have fields for pod name, namespace name, container name, + * and container id, we are good to go */ + if (fjson_object_object_get_ex(*json, "pod_name", NULL) && + fjson_object_object_get_ex(*json, "namespace_name", NULL) && + fjson_object_object_get_ex(*json, "container_name_and_id", &cnid)) { + /* parse container_name_and_id into container_name and container_id */ + const char *container_name_and_id = json_object_get_string(cnid); + const char *last_dash = NULL; + if (container_name_and_id && (last_dash = strrchr(container_name_and_id, '-')) && + *(last_dash + 1) && (last_dash != container_name_and_id)) { + json_object_object_add(*json, "container_name", + json_object_new_string_len(container_name_and_id, + (int)(last_dash-container_name_and_id))); + json_object_object_add(*json, "container_id", + json_object_new_string(last_dash + 1)); + ABORT_FINALIZE(RS_RET_OK); + } + } + ABORT_FINALIZE(RS_RET_NOT_FOUND); finalize_it: if(freeFn) free(filename); + if (free_container_name) + free(container_name); + if (free_container_id_full) + free(container_id_full); + if (iRet != RS_RET_OK) { + json_object_put(*json); + *json = NULL; + } RETiRet; } @@ -479,18 +1178,11 @@ queryKB(wrkrInstanceData_t *pWrkrData, char *url, struct json_object **rply) ccode = curl_easy_setopt(pWrkrData->curlCtx, CURLOPT_URL, url); if(ccode != CURLE_OK) ABORT_FINALIZE(RS_RET_ERR); - ccode = curl_easy_perform(pWrkrData->curlCtx); - switch(ccode) { - case CURLE_COULDNT_CONNECT: - case CURLE_COULDNT_RESOLVE_HOST: - case CURLE_COULDNT_RESOLVE_PROXY: - case CURLE_HTTP_RETURNED_ERROR: - case CURLE_WRITE_ERROR: - dbgprintf("mmkubernetes: curl connection failed " - "with code %d.\n", ccode); - ABORT_FINALIZE(RS_RET_ERR); - default: - break; + if (CURLE_OK != (ccode = curl_easy_perform(pWrkrData->curlCtx))) { + errmsg.LogMsg(0, RS_RET_ERR, LOG_ERR, + "mmkubernetes: failed to connect to [%s] - %d:%s\n", + url, ccode, curl_easy_strerror(ccode)); + ABORT_FINALIZE(RS_RET_ERR); } /* parse retrieved data */ @@ -500,6 +1192,10 @@ queryKB(wrkrInstanceData_t *pWrkrData, char *url, struct json_object **rply) json_tokener_free(jt); if(!json_object_is_type(jo, json_type_object)) { json_object_put(jo); + jo = NULL; + errmsg.LogMsg(0, RS_RET_JSON_PARSE_ERR, LOG_INFO, + "mmkubernetes: unable to parse string as JSON:[%.*s]\n", + (int)pWrkrData->curlRplyLen, pWrkrData->curlRply); ABORT_FINALIZE(RS_RET_JSON_PARSE_ERR); } @@ -527,108 +1223,168 @@ BEGINdoAction_NoStrings BEGINdoAction smsg_t *pMsg = (smsg_t*) ppString[0]; #endif - char *podName = NULL, *ns = NULL, *containerName = NULL, - *dockerID = NULL, *mdKey = NULL; - struct json_object *jMetadata = NULL; + const char *podName = NULL, *ns = NULL, *containerName = NULL, + *containerID = NULL; + char *mdKey = NULL; + struct json_object *jMetadata = NULL, *jMetadataCopy = NULL, *jMsgMeta = NULL, + *jo = NULL; + int add_ns_metadata = 0; CODESTARTdoAction - CHKiRet_Hdlr(extractMsgMetadata(pMsg, pWrkrData->pData->srcMetadataPath, - &pWrkrData->pData->fnRegex, &podName, &ns, &containerName, &dockerID)) { + CHKiRet_Hdlr(extractMsgMetadata(pMsg, pWrkrData->pData, &jMsgMeta)) { ABORT_FINALIZE((iRet == RS_RET_NOT_FOUND) ? RS_RET_OK : iRet); } + if (fjson_object_object_get_ex(jMsgMeta, "pod_name", &jo)) + podName = json_object_get_string(jo); + if (fjson_object_object_get_ex(jMsgMeta, "namespace_name", &jo)) + ns = json_object_get_string(jo); + if (fjson_object_object_get_ex(jMsgMeta, "container_name", &jo)) + containerName = json_object_get_string(jo); + if (fjson_object_object_get_ex(jMsgMeta, "container_id", &jo)) + containerID = json_object_get_string(jo); assert(podName != NULL); assert(ns != NULL); assert(containerName != NULL); - assert(dockerID != NULL); + assert(containerID != NULL); dbgprintf("mmkubernetes:\n podName: '%s'\n namespace: '%s'\n containerName: '%s'\n" - " dockerID: '%s'\n", podName, ns, containerName, dockerID); + " containerID: '%s'\n", podName, ns, containerName, containerID); /* check cache for metadata */ - asprintf(&mdKey, "%s_%s_%s", ns, podName, containerName); + if ((-1 == asprintf(&mdKey, "%s_%s_%s", ns, podName, containerName)) || + (!mdKey)) { + ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY); + } pthread_mutex_lock(pWrkrData->pData->cache->cacheMtx); jMetadata = hashtable_search(pWrkrData->pData->cache->mdHt, mdKey); if(jMetadata == NULL) { char *url = NULL; - struct json_object *jReply = NULL, *jo = NULL, *jo2 = NULL, *jNewNS = NULL; + struct json_object *jReply = NULL, *jo2 = NULL, *jNsMeta = NULL, *jPodData = NULL; - /* check cache for namespace id */ - jo2 = hashtable_search(pWrkrData->pData->cache->nsHt, ns); - pthread_mutex_unlock(pWrkrData->pData->cache->cacheMtx); + /* check cache for namespace metadata */ + jNsMeta = hashtable_search(pWrkrData->pData->cache->nsHt, (char *)ns); - if(jo2 == NULL) { + if(jNsMeta == NULL) { /* query kubernetes for namespace info */ /* todo: move url definitions elsewhere */ - asprintf(&url, "%s/api/v1/namespaces/%s", - (char *) pWrkrData->pData->kubernetesUrl, ns); + if ((-1 == asprintf(&url, "%s/api/v1/namespaces/%s", + (char *) pWrkrData->pData->kubernetesUrl, ns)) || + (!url)) { + pthread_mutex_unlock(pWrkrData->pData->cache->cacheMtx); + ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY); + } iRet = queryKB(pWrkrData, url, &jReply); free(url); - /* todo: opt to ignore the missing uid? */ - CHKiRet(iRet); + /* todo: implement support for the .orphaned namespace */ + if (iRet != RS_RET_OK) { + json_object_put(jReply); + jReply = NULL; + pthread_mutex_unlock(pWrkrData->pData->cache->cacheMtx); + FINALIZE; + } - if(fjson_object_object_get_ex(jReply, "metadata", &jo2) - && fjson_object_object_get_ex(jo2, "uid", &jo2)) - jo2 = jNewNS = json_object_get(jo2); - else - jo2 = NULL; + if(fjson_object_object_get_ex(jReply, "metadata", &jNsMeta)) { + jNsMeta = json_object_get(jNsMeta); + parse_labels_annotations(jNsMeta, &pWrkrData->pData->annotation_match, + pWrkrData->pData->de_dot, + (const char *)pWrkrData->pData->de_dot_separator, + pWrkrData->pData->de_dot_separator_len); + add_ns_metadata = 1; + } else { + /* namespace with no metadata??? */ + errmsg.LogMsg(0, RS_RET_ERR, LOG_INFO, + "mmkubernetes: namespace [%s] has no metadata!\n", ns); + jNsMeta = NULL; + } json_object_put(jReply); + jReply = NULL; } - asprintf(&url, "%s/api/v1/namespaces/%s/pods/%s", - (char *) pWrkrData->pData->kubernetesUrl, ns, podName); + if ((-1 == asprintf(&url, "%s/api/v1/namespaces/%s/pods/%s", + (char *) pWrkrData->pData->kubernetesUrl, ns, podName)) || + (!url)) { + pthread_mutex_unlock(pWrkrData->pData->cache->cacheMtx); + ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY); + } iRet = queryKB(pWrkrData, url, &jReply); free(url); if(iRet != RS_RET_OK) { - if(jNewNS) { - pthread_mutex_lock(pWrkrData->pData->cache->cacheMtx); - hashtable_insert(pWrkrData->pData->cache->nsHt, ns, jNewNS); - ns = NULL; - pthread_mutex_unlock(pWrkrData->pData->cache->cacheMtx); + if(jNsMeta && add_ns_metadata) { + hashtable_insert(pWrkrData->pData->cache->nsHt, strdup(ns), jNsMeta); } + json_object_put(jReply); + jReply = NULL; + pthread_mutex_unlock(pWrkrData->pData->cache->cacheMtx); FINALIZE; } jo = json_object_new_object(); - if(jo2) + if(jNsMeta && fjson_object_object_get_ex(jNsMeta, "uid", &jo2)) json_object_object_add(jo, "namespace_id", json_object_get(jo2)); - if(fjson_object_object_get_ex(jReply, "nodeName", &jo2)) - json_object_object_add(jo, "host", json_object_get(jo2)); - if(fjson_object_object_get_ex(jReply, "uid", &jo2)) - json_object_object_add(jo, "pod_id", json_object_get(jo2)); - /* todo: labels */ + if(jNsMeta && fjson_object_object_get_ex(jNsMeta, "labels", &jo2)) + json_object_object_add(jo, "namespace_labels", json_object_get(jo2)); + if(jNsMeta && fjson_object_object_get_ex(jNsMeta, "annotations", &jo2)) + json_object_object_add(jo, "namespace_annotations", json_object_get(jo2)); + if(jNsMeta && fjson_object_object_get_ex(jNsMeta, "creationTimestamp", &jo2)) + json_object_object_add(jo, "creation_timestamp", json_object_get(jo2)); + if(fjson_object_object_get_ex(jReply, "metadata", &jPodData)) { + if(fjson_object_object_get_ex(jPodData, "uid", &jo2)) + json_object_object_add(jo, "pod_id", json_object_get(jo2)); + parse_labels_annotations(jPodData, &pWrkrData->pData->annotation_match, + pWrkrData->pData->de_dot, + (const char *)pWrkrData->pData->de_dot_separator, + pWrkrData->pData->de_dot_separator_len); + if(fjson_object_object_get_ex(jPodData, "annotations", &jo2)) + json_object_object_add(jo, "annotations", json_object_get(jo2)); + if(fjson_object_object_get_ex(jPodData, "labels", &jo2)) + json_object_object_add(jo, "labels", json_object_get(jo2)); + } + if(fjson_object_object_get_ex(jReply, "spec", &jPodData)) { + if(fjson_object_object_get_ex(jPodData, "nodeName", &jo2)) { + json_object_object_add(jo, "host", json_object_get(jo2)); + } + } json_object_put(jReply); - - json_object_object_add(jo, "namespace", json_object_new_string(ns)); - json_object_object_add(jo, "pod_name", json_object_new_string(podName)); - json_object_object_add(jo, "container_name", json_object_new_string(containerName)); + jReply = NULL; + + if (fjson_object_object_get_ex(jMsgMeta, "pod_name", &jo2)) + json_object_object_add(jo, "pod_name", json_object_get(jo2)); + if (fjson_object_object_get_ex(jMsgMeta, "namespace_name", &jo2)) + json_object_object_add(jo, "namespace_name", json_object_get(jo2)); + if (fjson_object_object_get_ex(jMsgMeta, "container_name", &jo2)) + json_object_object_add(jo, "container_name", json_object_get(jo2)); + json_object_object_add(jo, "master_url", + json_object_new_string((const char *)pWrkrData->pData->kubernetesUrl)); jMetadata = json_object_new_object(); json_object_object_add(jMetadata, "kubernetes", jo); jo = json_object_new_object(); - json_object_object_add(jo, "container_id", json_object_new_string(dockerID)); + if (fjson_object_object_get_ex(jMsgMeta, "container_id", &jo2)) + json_object_object_add(jo, "container_id", json_object_get(jo2)); json_object_object_add(jMetadata, "docker", jo); - pthread_mutex_lock(pWrkrData->pData->cache->cacheMtx); hashtable_insert(pWrkrData->pData->cache->mdHt, mdKey, jMetadata); mdKey = NULL; - if(jNewNS) { - hashtable_insert(pWrkrData->pData->cache->nsHt, ns, jNewNS); + if(jNsMeta && add_ns_metadata) { + hashtable_insert(pWrkrData->pData->cache->nsHt, strdup(ns), jNsMeta); ns = NULL; } - pthread_mutex_unlock(pWrkrData->pData->cache->cacheMtx); - } else { - pthread_mutex_unlock(pWrkrData->pData->cache->cacheMtx); } + /* make a copy of the metadata for the msg to own */ + /* todo: use json_object_deep_copy when implementation available in libfastjson */ + /* yes, this is expensive - but there is no other way to make this thread safe - we + * can't allow the msg to have a shared pointer to an element inside the cache, + * outside of the cache lock + */ + jMetadataCopy = json_tokener_parse(json_object_get_string(jMetadata)); + pthread_mutex_unlock(pWrkrData->pData->cache->cacheMtx); /* the +1 is there to skip the leading '$' */ - msgAddJSON(pMsg, (uchar *) pWrkrData->pData->dstMetadataPath + 1, json_object_get(jMetadata), 0, 0); + msgAddJSON(pMsg, (uchar *) pWrkrData->pData->dstMetadataPath + 1, jMetadataCopy, 0, 0); finalize_it: - free(podName); - free(ns); - free(containerName); - free(dockerID); + json_object_put(jMsgMeta); free(mdKey); ENDdoAction diff --git a/contrib/mmkubernetes/sample.conf b/contrib/mmkubernetes/sample.conf index face6066f..4c400ed51 100644 --- a/contrib/mmkubernetes/sample.conf +++ b/contrib/mmkubernetes/sample.conf @@ -1,12 +1,19 @@ -module(load="mmkubernetes" kubernetesurl="http://localhost:5000") -module(load="imfile") - -input(type="imfile" file="/var/log/ctrs/*" tag="ctr" addmetadata="on") - -action(type="mmkubernetes") +module(load="imfile" mode="inotify") +module(load="mmkubernetes" kubernetesurl="https://localhost:8443" + tls.cacert="/etc/rsyslog.d/mmk8s.ca.crt" + tokenfile="/etc/rsyslog.d/mmk8s.token" annotation_match=["."]) template(name="tpl" type="list") { property(name="jsonmesg") constant(value="\n") } -action(type="omfile" file="/var/log/ctrs.log" template="tpl") + +ruleset(name="k8s") { + action(type="mmkubernetes") + action(type="omfile" file="/var/log/k8s.log" template="tpl") +} + +input(type="imfile" file="/var/log/containers/*.log" tag="kubernetes" addmetadata="on" ruleset="k8s") +if ($!_SYSTEMD_UNIT == "docker.service") and (strlen($!CONTAINER_NAME) > 0) then { + call k8s +} -- 2.16.4
Locations
Projects
Search
Status Monitor
Help
OpenBuildService.org
Documentation
API Documentation
Code of Conduct
Contact
Support
@OBShq
Terms
openSUSE Build Service is sponsored by
The Open Build Service is an
openSUSE project
.
Sign Up
Log In
Places
Places
All Projects
Status Monitor