#define A_NEW_VALUE "value"
#define A_NEW_VALUE_LEN 5
static int cb_filter(const void *data, size_t bytes,
const char *tag, int tag_len,
void **out_buf, size_t *out_size,
struct flb_filter_instance *f_ins,
struct flb_config *config)
msgpack_sbuffer tmp_sbuf;
/* Create temporary msgpack buffer */
msgpack_sbuffer_init(&tmp_sbuf);
msgpack_packer_init(&tmp_pck, &tmp_sbuf, msgpack_sbuffer_write);
/* Iterate over each item */
msgpack_unpacked_init(&result);
while (msgpack_unpack_next(&result, data, bytes, &off) == MSGPACK_UNPACK_SUCCESS) {
* Each record is a msgpack array [timestamp, map] of the
* timestamp and record map. We 'unpack' each record, and then re-pack
* it with the new fields added.
if (result.data.type != MSGPACK_OBJECT_ARRAY) {
/* unpack the array of [timestamp, map] */
flb_time_pop_from_msgpack(&tm, &result, &obj);
/* obj should now be the record map */
if (obj->type != MSGPACK_OBJECT_MAP) {
/* re-pack the array into a new buffer */
msgpack_pack_array(&tmp_pck, 2);
flb_time_append_to_msgpack(&tm, &tmp_pck, 0);
/* new record map size is old size + the new keys we will add */
total_records = obj->via.map.size + new_keys;
msgpack_pack_map(&tmp_pck, total_records);
/* iterate through the old record map and add it to the new buffer */
for(i=0; i < obj->via.map.size; i++) {
msgpack_pack_object(&tmp_pck, (kv+i)->key);
msgpack_pack_object(&tmp_pck, (kv+i)->val);
msgpack_pack_str(&tmp_pck, A_NEW_KEY_LEN);
msgpack_pack_str_body(&tmp_pck, A_NEW_KEY, A_NEW_KEY_LEN);
msgpack_pack_str(&tmp_pck, A_NEW_VALUE_LEN);
msgpack_pack_str_body(&tmp_pck, A_NEW_VALUE, A_NEW_VALUE_LEN);
msgpack_unpacked_destroy(&result);
*out_buf = tmp_sbuf.data;
*out_size = tmp_sbuf.size;
return FLB_FILTER_MODIFIED;