# For the input, we're going to be pasting into the terminal input { stdin {} } filter { grok { # Here we are parsing out our timestamp, file and the rest of the message match => ["message", "\[(?.+)\] @(?[^:]+): (?.+)"] } date { match => ["timestamp", "ISO8601"] remove_field => ["timestamp"] } # Our starting point if [message] =~ "Found file!" { aggregate { task_id => "%{file}" # We set the key to our file name code => "" # This part is required, even if we leave it empty map_action => "create" # Our map variable is created } } # For all derivatives, we want to ensure that we record that their size if [message] =~ "Cutting" { grok { match => ["rest", "Cutting derivative (?.+)"] } aggregate { task_id => "%{file}" # We have to manually add our derivative to the map. # I'm making a new array if map['derivatives'] doesn't # exist and adding our derivatives to it code => "map['derivatives'] ||= []; map['derivatives'] << event['derivative']" map_action => "update" } } # On this final message, we want to capture our time # and any data from previous logs if [message] =~ "It took" { grok { match => ["rest", "It took (?[\d\.]+)"] } aggregate { task_id => "%{file}" # Now we take all that map data and put it on the event # so it becomes part of this final message code => "event['derivatives'] = map['derivatives']" map_action => "update" end_of_task => true # Ends the updates of the map timeout => 120 # It has no timeout by default } } } # The output will be to stdout with a debug codec output { stdout { codec => rubydebug } elasticsearch { # This plugin chooses localhost:9200 by default # I like to override the index for each resource index => "derivatives" } }