Sign Up
Log In
Log In
or
Sign Up
Places
All Projects
Status Monitor
Collapse sidebar
SUSE:SLE-15-SP7:GA
rsyslog.12076
0005-WIP-Add-the-mmkubernetes-plugin.patch
Overview
Repositories
Revisions
Requests
Users
Attributes
Meta
File 0005-WIP-Add-the-mmkubernetes-plugin.patch of Package rsyslog.12076
From 77886e21292d8220f93b3404236da0e8f7159255 Mon Sep 17 00:00:00 2001 From: Tomas Heinrich <theinric@redhat.com> Date: Tue, 8 Mar 2016 17:07:55 +0100 Subject: [PATCH] WIP - Add the mmkubernetes plugin DO NOT MERGE - This is a work-in-progress. --- Makefile.am | 5 + configure.ac | 21 ++ contrib/mmkubernetes/Makefile.am | 6 + contrib/mmkubernetes/mmkubernetes.c | 673 ++++++++++++++++++++++++++++++++++++ contrib/mmkubernetes/sample.conf | 12 + 5 files changed, 717 insertions(+) create mode 100644 contrib/mmkubernetes/Makefile.am create mode 100644 contrib/mmkubernetes/mmkubernetes.c create mode 100644 contrib/mmkubernetes/sample.conf diff --git a/Makefile.am b/Makefile.am index 1f73ae5a7..c874c1cdc 100644 --- a/Makefile.am +++ b/Makefile.am @@ -283,6 +283,11 @@ if ENABLE_OMTCL SUBDIRS += contrib/omtcl endif +# mmkubernetes +if ENABLE_MMKUBERNETES +SUBDIRS += contrib/mmkubernetes +endif + # tests are added as last element, because tests may need different # modules that need to be generated first SUBDIRS += tests diff --git a/configure.ac b/configure.ac index d36809c74..bb737bbc6 100644 --- a/configure.ac +++ b/configure.ac @@ -2116,6 +2116,25 @@ AM_CONDITIONAL(ENABLE_OMTCL, test x$enable_omtcl = xyes) # END TCL SUPPORT +# mmkubernetes - Kubernetes metadata support + +AC_ARG_ENABLE(mmkubernetes, + [AS_HELP_STRING([--enable-mmkubernetes], + [Enable compilation of the mmkubernetes module @<:@default=no@:>@])], + [case "${enableval}" in + yes) enable_mmkubernetes="yes" ;; + no) enable_mmkubernetes="no" ;; + *) AC_MSG_ERROR(bad value ${enableval} for --enable-mmkubernetes) ;; + esac], + [enable_mmkubernetes=no] +) +if test "x$enable_mmkubernetes" = "xyes"; then + PKG_CHECK_MODULES([CURL], [libcurl]) +fi +AM_CONDITIONAL(ENABLE_MMKUBERNETES, test x$enable_mmkubernetes = xyes) + +# END Kubernetes metadata support + # man pages have_to_generate_man_pages="no" git_src_have_to_generate_man_pages="yes" # default to use when building from git source @@ -2247,6 +2266,7 @@ AC_CONFIG_FILES([Makefile \ contrib/omhttpfs/Makefile \ contrib/omamqp1/Makefile \ contrib/omtcl/Makefile \ + contrib/mmkubernetes/Makefile \ tests/Makefile]) AC_OUTPUT @@ -2324,6 +2344,7 @@ echo " mmsequence enabled: $enable_mmsequence" echo " mmdblookup enabled: $enable_mmdblookup" echo " mmfields enabled: $enable_mmfields" echo " mmrm1stspace module enabled: $enable_mmrm1stspace" +echo " mmkubernetes enabled: $enable_mmkubernetes" echo echo "---{ database support }---" echo " MySql support enabled: $enable_mysql" diff --git a/contrib/mmkubernetes/Makefile.am b/contrib/mmkubernetes/Makefile.am new file mode 100644 index 000000000..02af334d4 --- /dev/null +++ b/contrib/mmkubernetes/Makefile.am @@ -0,0 +1,6 @@ +pkglib_LTLIBRARIES = mmkubernetes.la + +mmkubernetes_la_SOURCES = mmkubernetes.c +mmkubernetes_la_CPPFLAGS = $(RSRT_CFLAGS) $(PTHREADS_CFLAGS) $(CURL_CFLAGS) +mmkubernetes_la_LDFLAGS = -module -avoid-version +mmkubernetes_la_LIBADD = $(CURL_LIBS) diff --git a/contrib/mmkubernetes/mmkubernetes.c b/contrib/mmkubernetes/mmkubernetes.c new file mode 100644 index 000000000..0d5502db6 --- /dev/null +++ b/contrib/mmkubernetes/mmkubernetes.c @@ -0,0 +1,673 @@ +/* mmkubernetes.c + * This is a message modification module. It uses metadata obtained + * from the message to query Kubernetes and obtain additional metadata + * relating to the container instance. + * + * Inspired by: + * https://github.com/fabric8io/fluent-plugin-kubernetes_metadata_filter + * + * NOTE: read comments in module-template.h for details on the calling interface! + * + * Copyright 2016 Red Hat Inc. + * + * This file is part of rsyslog. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * -or- + * see COPYING.ASL20 in the source distribution + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/* todo: + * - cache cleanup + * - SIGHUP handling + * - authentication + * - statistics generation + * - other missing configuration options + * - documentation + * - more robust error-handling + debugging information + * - batching, parallel queries, failover, ... + */ + +/* needed for asprintf */ +#ifndef _GNU_SOURCE +# define _GNU_SOURCE +#endif + +#include "config.h" +#include "rsyslog.h" +#include <stdio.h> +#include <stdarg.h> +#include <stdlib.h> +#include <string.h> +#include <assert.h> +#include <errno.h> +#include <unistd.h> +#include <libestr.h> +#include <json.h> +#include <curl/curl.h> +#include <curl/easy.h> +#include <pthread.h> +#include "conf.h" +#include "syslogd-types.h" +#include "module-template.h" +#include "errmsg.h" +#include "regexp.h" +#include "hashtable.h" + +/* static data */ +MODULE_TYPE_OUTPUT /* this is technically an output plugin */ +MODULE_TYPE_KEEP /* releasing the module would cause a leak through libcurl */ +MODULE_CNFNAME("mmkubernetes") +DEF_OMOD_STATIC_DATA +DEFobjCurrIf(errmsg) +DEFobjCurrIf(regexp) + +#define DFLT_FILENAME_REGEX "var\\.log\\.containers\\.([a-z0-9]([-a-z0-9]*[a-z0-9])?(\\.[a-z0-9]([-a-z0-9]*[a-z0-9])?)*)_([^_]+)_(.+)-([a-z0-9]{64})\\.log$" +#define DFLT_SRCMD_PATH "$!metadata!filename" +#define DFLT_DSTMD_PATH "$!metadata" + +static struct cache_s { + const uchar *kbUrl; + struct hashtable *mdHt; + struct hashtable *nsHt; + pthread_mutex_t *cacheMtx; +} **caches; + +/* module configuration data */ +struct modConfData_s { + rsconf_t *pConf; /* our overall config object */ + uchar *kubernetesUrl; /**< where to place queries */ + uchar *srcMetadataPath; /**< where to get data for kubernetes queries */ + uchar *dstMetadataPath; /**< where to put metadata obtained from kubernetes */ +}; + +/* action (instance) configuration data */ +typedef struct _instanceData { + uchar *kubernetesUrl; /**< where to place queries */ + uchar *srcMetadataPath; /**< where to get data for kubernetes queries */ + uchar *dstMetadataPath; /**< where to put metadata obtained from kubernetes */ + regex_t fnRegex; + struct cache_s *cache; +} instanceData; + +typedef struct wrkrInstanceData { + instanceData *pData; + CURL *curlCtx; + struct curl_slist *curlHdr; + char *curlRply; + size_t curlRplyLen; +} wrkrInstanceData_t; + +/* module parameters (v6 config format) */ +static struct cnfparamdescr modpdescr[] = { + { "kubernetesurl", eCmdHdlrString, 0 }, + { "srcmetadatapath", eCmdHdlrString, 0 }, + { "dstmetadatapath", eCmdHdlrString, 0 } +}; +static struct cnfparamblk modpblk = { + CNFPARAMBLK_VERSION, + sizeof(modpdescr)/sizeof(struct cnfparamdescr), + modpdescr +}; + +/* action (instance) parameters (v6 config format) */ +static struct cnfparamdescr actpdescr[] = { + { "kubernetesurl", eCmdHdlrString, 0 }, + { "srcmetadatapath", eCmdHdlrString, 0 }, + { "dstmetadatapath", eCmdHdlrString, 0 } +}; +static struct cnfparamblk actpblk = + { CNFPARAMBLK_VERSION, + sizeof(actpdescr)/sizeof(struct cnfparamdescr), + actpdescr + }; + +static modConfData_t *loadModConf = NULL; /* modConf ptr to use for the current load process */ +static modConfData_t *runModConf = NULL; /* modConf ptr to use for the current exec process */ + + +BEGINbeginCnfLoad +CODESTARTbeginCnfLoad + loadModConf = pModConf; + pModConf->pConf = pConf; +ENDbeginCnfLoad + + +BEGINsetModCnf + struct cnfparamvals *pvals = NULL; + int i; +CODESTARTsetModCnf + pvals = nvlstGetParams(lst, &modpblk, NULL); + if(pvals == NULL) { + errmsg.LogError(0, RS_RET_MISSING_CNFPARAMS, "mmkubernetes: " + "error processing module config parameters [module(...)]"); + ABORT_FINALIZE(RS_RET_MISSING_CNFPARAMS); + } + + if(Debug) { + dbgprintf("module (global) param blk for mmkubernetes:\n"); + cnfparamsPrint(&modpblk, pvals); + } + + for(i = 0 ; i < modpblk.nParams ; ++i) { + if(!pvals[i].bUsed) { + continue; + } else if(!strcmp(modpblk.descr[i].name, "kubernetesurl")) { + free(loadModConf->kubernetesUrl); + loadModConf->kubernetesUrl = (uchar *) es_str2cstr(pvals[i].val.d.estr, NULL); + } else if(!strcmp(modpblk.descr[i].name, "srcmetadatapath")) { + free(loadModConf->srcMetadataPath); + loadModConf->srcMetadataPath = (uchar *) es_str2cstr(pvals[i].val.d.estr, NULL); + /* todo: sanitize the path */ + } else if(!strcmp(modpblk.descr[i].name, "dstmetadatapath")) { + free(loadModConf->dstMetadataPath); + loadModConf->dstMetadataPath = (uchar *) es_str2cstr(pvals[i].val.d.estr, NULL); + /* todo: sanitize the path */ + } else { + dbgprintf("mmkubernetes: program error, non-handled " + "param '%s' in module() block\n", modpblk.descr[i].name); + /* todo: error message? */ + } + } + + /* set defaults */ + if(loadModConf->srcMetadataPath == NULL) + loadModConf->srcMetadataPath = (uchar *) strdup(DFLT_SRCMD_PATH); + if(loadModConf->dstMetadataPath == NULL) + loadModConf->dstMetadataPath = (uchar *) strdup(DFLT_DSTMD_PATH); + + caches = calloc(1, sizeof(struct cache_s *)); + +finalize_it: + if(pvals != NULL) + cnfparamvalsDestruct(pvals, &modpblk); +ENDsetModCnf + + +BEGINcreateInstance +CODESTARTcreateInstance +ENDcreateInstance + + +BEGINfreeInstance +CODESTARTfreeInstance + free(pData->kubernetesUrl); + free(pData->srcMetadataPath); + free(pData->dstMetadataPath); + regexp.regfree(&pData->fnRegex); +ENDfreeInstance + +size_t curlCB(char *data, size_t size, size_t nmemb, void *usrptr) +{ + wrkrInstanceData_t *pWrkrData = (wrkrInstanceData_t *) usrptr; + char * buf; + size_t newlen; + + newlen = pWrkrData->curlRplyLen + size * nmemb; + buf = realloc(pWrkrData->curlRply, newlen); + memcpy(buf + pWrkrData->curlRplyLen, data, size * nmemb); + pWrkrData->curlRply = buf; + pWrkrData->curlRplyLen = newlen; + + return size * nmemb; +} + +BEGINcreateWrkrInstance +CODESTARTcreateWrkrInstance + CURL *ctx; + struct curl_slist *hdr; + + hdr = curl_slist_append(NULL, "Content-Type: text/json; charset=utf-8"); + pWrkrData->curlHdr = hdr; + ctx = curl_easy_init(); + curl_easy_setopt(ctx, CURLOPT_HTTPHEADER, hdr); + curl_easy_setopt(ctx, CURLOPT_WRITEFUNCTION, curlCB); + curl_easy_setopt(ctx, CURLOPT_WRITEDATA, pWrkrData); + pWrkrData->curlCtx = ctx; +ENDcreateWrkrInstance + + +BEGINfreeWrkrInstance +CODESTARTfreeWrkrInstance + curl_easy_cleanup(pWrkrData->curlCtx); + curl_slist_free_all(pWrkrData->curlHdr); +ENDfreeWrkrInstance + + +static struct cache_s *cacheNew(const uchar *url) +{ + struct cache_s *cache; + + cache = calloc(1, sizeof(struct cache_s)); + cache->kbUrl = url; + cache->mdHt = create_hashtable(100, hash_from_string, + key_equals_string, (void (*)(void *)) json_object_put); + cache->nsHt = create_hashtable(100, hash_from_string, + key_equals_string, (void (*)(void *)) json_object_put); + cache->cacheMtx = malloc(sizeof(pthread_mutex_t)); + pthread_mutex_init(cache->cacheMtx, NULL); + + return cache; +} + + +static void cacheFree(struct cache_s *cache) +{ + hashtable_destroy(cache->mdHt, 1); + hashtable_destroy(cache->nsHt, 1); + pthread_mutex_destroy(cache->cacheMtx); + free(cache->cacheMtx); + free(cache); +} + + +BEGINnewActInst + struct cnfparamvals *pvals = NULL; + int i, ret; +CODESTARTnewActInst + DBGPRINTF("newActInst (mmkubernetes)\n"); + + pvals = nvlstGetParams(lst, &actpblk, NULL); + if(pvals == NULL) { + errmsg.LogError(0, RS_RET_MISSING_CNFPARAMS, "mmkubernetes: " + "error processing config parameters [action(...)]"); + ABORT_FINALIZE(RS_RET_MISSING_CNFPARAMS); + } + + if(Debug) { + dbgprintf("action param blk in mmkubernetes:\n"); + cnfparamsPrint(&actpblk, pvals); + } + + CODE_STD_STRING_REQUESTnewActInst(1) + CHKiRet(OMSRsetEntry(*ppOMSR, 0, NULL, OMSR_TPL_AS_MSG)); + CHKiRet(createInstance(&pData)); + + for(i = 0 ; i < actpblk.nParams ; ++i) { + if(!pvals[i].bUsed) { + continue; + } else if(!strcmp(actpblk.descr[i].name, "kubernetesurl")) { + free(pData->kubernetesUrl); + pData->kubernetesUrl = (uchar *) es_str2cstr(pvals[i].val.d.estr, NULL); + } else if(!strcmp(actpblk.descr[i].name, "srcmetadatapath")) { + free(pData->srcMetadataPath); + pData->srcMetadataPath = (uchar *) es_str2cstr(pvals[i].val.d.estr, NULL); + /* todo: sanitize the path */ + } else if(!strcmp(actpblk.descr[i].name, "dstmetadatapath")) { + free(pData->dstMetadataPath); + pData->dstMetadataPath = (uchar *) es_str2cstr(pvals[i].val.d.estr, NULL); + /* todo: sanitize the path */ + } else { + dbgprintf("mmkubernetes: program error, non-handled " + "param '%s' in action() block\n", actpblk.descr[i].name); + /* todo: error message? */ + } + } + + if(pData->kubernetesUrl == NULL) { + if(loadModConf->kubernetesUrl == NULL) + ABORT_FINALIZE(RS_RET_CONFIG_ERROR); + pData->kubernetesUrl = (uchar *) strdup((char *) loadModConf->kubernetesUrl); + } + if(pData->srcMetadataPath == NULL) + pData->srcMetadataPath = (uchar *) strdup((char *) loadModConf->srcMetadataPath); + if(pData->dstMetadataPath == NULL) + pData->dstMetadataPath = (uchar *) strdup((char *) loadModConf->dstMetadataPath); + + /* todo: make file regexp configurable */ + ret = regexp.regcomp(&pData->fnRegex, DFLT_FILENAME_REGEX, REG_EXTENDED); + if(ret) { + dbgprintf("mmkubernetes: regexp compilation failed with code %d.\n", ret); + ABORT_FINALIZE(RS_RET_ERR); + } + + /* get the cache for this url */ + for(i = 0; caches[i] != NULL; i++) { + if(!strcmp((char *) pData->kubernetesUrl, (char *) caches[i]->kbUrl)) + break; + } + if(caches[i] != NULL) { + pData->cache = caches[i]; + } else { + pData->cache = cacheNew(pData->kubernetesUrl); + + caches = realloc(caches, (i + 2) * sizeof(struct cache_s *)); + caches[i] = pData->cache; + caches[i + 1] = NULL; + } +CODE_STD_FINALIZERnewActInst + if(pvals != NULL) + cnfparamvalsDestruct(pvals, &actpblk); +ENDnewActInst + + +/* legacy config format is not supported */ +BEGINparseSelectorAct +CODESTARTparseSelectorAct +CODE_STD_STRING_REQUESTparseSelectorAct(1) + if(strncmp((char *) p, ":mmkubernetes:", sizeof(":mmkubernetes:") - 1)) { + errmsg.LogError(0, RS_RET_LEGA_ACT_NOT_SUPPORTED, + "mmkubernetes supports only v6+ config format, use: " + "action(type=\"mmkubernetes\" ...)"); + } + ABORT_FINALIZE(RS_RET_CONFLINE_UNPROCESSED); +CODE_STD_FINALIZERparseSelectorAct +ENDparseSelectorAct + + +BEGINendCnfLoad +CODESTARTendCnfLoad +ENDendCnfLoad + + +BEGINcheckCnf +CODESTARTcheckCnf +ENDcheckCnf + + +BEGINactivateCnf +CODESTARTactivateCnf + runModConf = pModConf; +ENDactivateCnf + + +BEGINfreeCnf +CODESTARTfreeCnf + int i; + + free(pModConf->kubernetesUrl); + free(pModConf->srcMetadataPath); + free(pModConf->dstMetadataPath); + for(i = 0; caches[i] != NULL; i++) + cacheFree(caches[i]); + free(caches); +ENDfreeCnf + + +BEGINdbgPrintInstInfo +CODESTARTdbgPrintInstInfo + dbgprintf("mmkubernetes\n"); + dbgprintf("\tkubernetesUrl='%s'\n", pData->kubernetesUrl); + dbgprintf("\tsrcMetadataPath='%s'\n", pData->srcMetadataPath); + dbgprintf("\tdstMetadataPath='%s'\n", pData->dstMetadataPath); +ENDdbgPrintInstInfo + + +BEGINtryResume +CODESTARTtryResume +ENDtryResume + + +static rsRetVal +extractMsgMetadata(smsg_t *pMsg, uchar *propName, regex_t *fnRegex, + char **podName, char **ns, char **contName, char **dockerID) +{ + DEFiRet; + msgPropDescr_t prop; + char *filename, *p; + rs_size_t fnLen; + unsigned short freeFn = 0; + size_t nmatch = 8, len; + regmatch_t pmatch[nmatch]; + + /* extract metadata from the file name */ + msgPropDescrFill(&prop, propName, strlen((char *) propName)); + filename = (char *) MsgGetProp(pMsg, NULL, &prop, &fnLen, &freeFn, NULL); + dbgprintf("mmkubernetes: filename: '%s'.\n", filename); + msgPropDescrDestruct(&prop); + if(filename == NULL) + ABORT_FINALIZE(RS_RET_NOT_FOUND); + + if(REG_NOMATCH == regexp.regexec(fnRegex, filename, nmatch, pmatch, 0)) + ABORT_FINALIZE(RS_RET_NOT_FOUND); + + if(pmatch[1].rm_so != -1) { + len = pmatch[1].rm_eo - pmatch[1].rm_so; + p = malloc(len + 1); + memcpy(p, filename + pmatch[1].rm_so, len); + p[len] = '\0'; + *podName = p; + } + if(pmatch[5].rm_so != -1) { + len = pmatch[5].rm_eo - pmatch[5].rm_so; + p = malloc(len + 1); + memcpy(p, filename + pmatch[5].rm_so, len); + p[len] = '\0'; + *ns = p; + } + if(pmatch[6].rm_so != -1) { + len = pmatch[6].rm_eo - pmatch[6].rm_so; + p = malloc(len + 1); + memcpy(p, filename + pmatch[6].rm_so, len); + p[len] = '\0'; + *contName = p; + } + if(pmatch[7].rm_so != -1) { + len = pmatch[7].rm_eo - pmatch[7].rm_so; + p = malloc(len + 1); + memcpy(p, filename + pmatch[7].rm_so, len); + p[len] = '\0'; + *dockerID = p; + } + +finalize_it: + if(freeFn) + free(filename); + RETiRet; +} + + +static rsRetVal +queryKB(wrkrInstanceData_t *pWrkrData, char *url, struct json_object **rply) +{ + DEFiRet; + CURLcode ccode; + struct json_tokener *jt = NULL; + struct json_object *jo; + + /* query kubernetes for pod info */ + ccode = curl_easy_setopt(pWrkrData->curlCtx, CURLOPT_URL, url); + if(ccode != CURLE_OK) + ABORT_FINALIZE(RS_RET_ERR); + ccode = curl_easy_perform(pWrkrData->curlCtx); + switch(ccode) { + case CURLE_COULDNT_CONNECT: + case CURLE_COULDNT_RESOLVE_HOST: + case CURLE_COULDNT_RESOLVE_PROXY: + case CURLE_HTTP_RETURNED_ERROR: + case CURLE_WRITE_ERROR: + dbgprintf("mmkubernetes: curl connection failed " + "with code %d.\n", ccode); + ABORT_FINALIZE(RS_RET_ERR); + default: + break; + } + + /* parse retrieved data */ + jt = json_tokener_new(); + json_tokener_reset(jt); + jo = json_tokener_parse_ex(jt, pWrkrData->curlRply, pWrkrData->curlRplyLen); + json_tokener_free(jt); + if(!json_object_is_type(jo, json_type_object)) { + json_object_put(jo); + ABORT_FINALIZE(RS_RET_JSON_PARSE_ERR); + } + + dbgprintf("mmkubernetes: queryKB reply:\n%s\n", + json_object_to_json_string_ext(jo, JSON_C_TO_STRING_PRETTY)); + + *rply = jo; + +finalize_it: + if(pWrkrData->curlRply != NULL) { + free(pWrkrData->curlRply); + pWrkrData->curlRply = NULL; + pWrkrData->curlRplyLen = 0; + } + RETiRet; +} + + +/* versions < 8.16.0 don't support BEGINdoAction_NoStrings */ +#if defined(BEGINdoAction_NoStrings) +BEGINdoAction_NoStrings + smsg_t **ppMsg = (smsg_t **) pMsgData; + smsg_t *pMsg = ppMsg[0]; +#else +BEGINdoAction + smsg_t *pMsg = (smsg_t*) ppString[0]; +#endif + char *podName = NULL, *ns = NULL, *containerName = NULL, + *dockerID = NULL, *mdKey = NULL; + struct json_object *jMetadata = NULL; +CODESTARTdoAction + CHKiRet_Hdlr(extractMsgMetadata(pMsg, pWrkrData->pData->srcMetadataPath, + &pWrkrData->pData->fnRegex, &podName, &ns, &containerName, &dockerID)) { + ABORT_FINALIZE((iRet == RS_RET_NOT_FOUND) ? RS_RET_OK : iRet); + } + + assert(podName != NULL); + assert(ns != NULL); + assert(containerName != NULL); + assert(dockerID != NULL); + + dbgprintf("mmkubernetes:\n podName: '%s'\n namespace: '%s'\n containerName: '%s'\n" + " dockerID: '%s'\n", podName, ns, containerName, dockerID); + + /* check cache for metadata */ + asprintf(&mdKey, "%s_%s_%s", ns, podName, containerName); + pthread_mutex_lock(pWrkrData->pData->cache->cacheMtx); + jMetadata = hashtable_search(pWrkrData->pData->cache->mdHt, mdKey); + + if(jMetadata == NULL) { + char *url = NULL; + struct json_object *jReply = NULL, *jo = NULL, *jo2 = NULL, *jNewNS = NULL; + + /* check cache for namespace id */ + jo2 = hashtable_search(pWrkrData->pData->cache->nsHt, ns); + pthread_mutex_unlock(pWrkrData->pData->cache->cacheMtx); + + if(jo2 == NULL) { + /* query kubernetes for namespace info */ + /* todo: move url definitions elsewhere */ + asprintf(&url, "%s/api/v1/namespaces/%s", + (char *) pWrkrData->pData->kubernetesUrl, ns); + iRet = queryKB(pWrkrData, url, &jReply); + free(url); + /* todo: opt to ignore the missing uid? */ + CHKiRet(iRet); + + if(fjson_object_object_get_ex(jReply, "metadata", &jo2) + && fjson_object_object_get_ex(jo2, "uid", &jo2)) + jo2 = jNewNS = json_object_get(jo2); + else + jo2 = NULL; + + json_object_put(jReply); + } + + asprintf(&url, "%s/api/v1/namespaces/%s/pods/%s", + (char *) pWrkrData->pData->kubernetesUrl, ns, podName); + iRet = queryKB(pWrkrData, url, &jReply); + free(url); + if(iRet != RS_RET_OK) { + if(jNewNS) { + pthread_mutex_lock(pWrkrData->pData->cache->cacheMtx); + hashtable_insert(pWrkrData->pData->cache->nsHt, ns, jNewNS); + ns = NULL; + pthread_mutex_unlock(pWrkrData->pData->cache->cacheMtx); + } + FINALIZE; + } + + jo = json_object_new_object(); + if(jo2) + json_object_object_add(jo, "namespace_id", json_object_get(jo2)); + if(fjson_object_object_get_ex(jReply, "nodeName", &jo2)) + json_object_object_add(jo, "host", json_object_get(jo2)); + if(fjson_object_object_get_ex(jReply, "uid", &jo2)) + json_object_object_add(jo, "pod_id", json_object_get(jo2)); + /* todo: labels */ + json_object_put(jReply); + + json_object_object_add(jo, "namespace", json_object_new_string(ns)); + json_object_object_add(jo, "pod_name", json_object_new_string(podName)); + json_object_object_add(jo, "container_name", json_object_new_string(containerName)); + jMetadata = json_object_new_object(); + json_object_object_add(jMetadata, "kubernetes", jo); + jo = json_object_new_object(); + json_object_object_add(jo, "container_id", json_object_new_string(dockerID)); + json_object_object_add(jMetadata, "docker", jo); + + pthread_mutex_lock(pWrkrData->pData->cache->cacheMtx); + hashtable_insert(pWrkrData->pData->cache->mdHt, mdKey, jMetadata); + mdKey = NULL; + if(jNewNS) { + hashtable_insert(pWrkrData->pData->cache->nsHt, ns, jNewNS); + ns = NULL; + } + pthread_mutex_unlock(pWrkrData->pData->cache->cacheMtx); + } else { + pthread_mutex_unlock(pWrkrData->pData->cache->cacheMtx); + } + + /* the +1 is there to skip the leading '$' */ + msgAddJSON(pMsg, (uchar *) pWrkrData->pData->dstMetadataPath + 1, json_object_get(jMetadata), 0, 0); + +finalize_it: + free(podName); + free(ns); + free(containerName); + free(dockerID); + free(mdKey); +ENDdoAction + + +BEGINisCompatibleWithFeature +CODESTARTisCompatibleWithFeature +ENDisCompatibleWithFeature + + +/* all the macros bellow have to be in a specific order */ +BEGINmodExit +CODESTARTmodExit + curl_global_cleanup(); + + objRelease(regexp, LM_REGEXP_FILENAME); + objRelease(errmsg, CORE_COMPONENT); +ENDmodExit + + +BEGINqueryEtryPt +CODESTARTqueryEtryPt +CODEqueryEtryPt_STD_OMOD_QUERIES +CODEqueryEtryPt_STD_OMOD8_QUERIES +CODEqueryEtryPt_STD_CONF2_QUERIES +CODEqueryEtryPt_STD_CONF2_setModCnf_QUERIES +CODEqueryEtryPt_STD_CONF2_OMOD_QUERIES +ENDqueryEtryPt + + +BEGINmodInit() +CODESTARTmodInit + *ipIFVersProvided = CURR_MOD_IF_VERSION; /* we only support the current interface specification */ +CODEmodInit_QueryRegCFSLineHdlr + DBGPRINTF("mmkubernetes: module compiled with rsyslog version %s.\n", VERSION); + CHKiRet(objUse(errmsg, CORE_COMPONENT)); + CHKiRet(objUse(regexp, LM_REGEXP_FILENAME)); + + /* CURL_GLOBAL_ALL initializes more than is needed but the + * libcurl documentation discourages use of other values + */ + curl_global_init(CURL_GLOBAL_ALL); +ENDmodInit diff --git a/contrib/mmkubernetes/sample.conf b/contrib/mmkubernetes/sample.conf new file mode 100644 index 000000000..face6066f --- /dev/null +++ b/contrib/mmkubernetes/sample.conf @@ -0,0 +1,12 @@ +module(load="mmkubernetes" kubernetesurl="http://localhost:5000") +module(load="imfile") + +input(type="imfile" file="/var/log/ctrs/*" tag="ctr" addmetadata="on") + +action(type="mmkubernetes") + +template(name="tpl" type="list") { + property(name="jsonmesg") + constant(value="\n") +} +action(type="omfile" file="/var/log/ctrs.log" template="tpl") -- 2.16.4
Locations
Projects
Search
Status Monitor
Help
OpenBuildService.org
Documentation
API Documentation
Code of Conduct
Contact
Support
@OBShq
Terms
openSUSE Build Service is sponsored by
The Open Build Service is an
openSUSE project
.
Sign Up
Log In
Places
Places
All Projects
Status Monitor