What is a Pipe?
A Pipe is a self-contained data processing unit. Pipes are developed using the Pipes language, a custom DSL developed by Panoptix.
Pipes are associated with Targets - they can be run on a server, on distributed servers often referred to as collectors, or even in the cloud, practically anywhere. Currently they are either to run as Linux systemd or Docker services. This flexibility allows a distributed processing strategy, where functionality can be moved to the edge, or moved back to the centre without re-writing definitions.
A Pipe is divided into three discrete steps, namely Inputs, Actions and Outputs. Input data comes from inputs, goes through processing actions and finally gets written to an output.
The essential parts of a Pipe definition are:
- actions (an array of actions)
Or as expressed in the
name: uptime input: exec: command: uptime interval: 10s actions: - extract: input-field: _raw remove: true pattern: 'load average: (\S+), (\S+), (\S+)' output-fields: [m1, m5, m15] output: write: console
In this example, input data comes from executing the Unix
uptime command every 10 seconds; the load averages (1min, 5min, 15min) are extracted using regular expressions from the input line using the
extract action. We then write the results out to the console.
So from a typical input '09:07:36 up 16 min, 1 user, load average: 0.08, 0.23, 0.31'
we get a JSON record that looks like
Time series events normally have an associated time, and must be in an appropriate form to be processed and queried by data analytics systems. If our
actions steps were:
- extract: input-field: _raw remove: true pattern: 'load average: (\S+), (\S+), (\S+)' output-fields: [m1, m5, m15] - convert: - m1: num - m5: num - m15: num - time: output-field: '@timestamp'
Then the result looks like this:
These records can then be indexed by Elasticsearch, and queried with a tools such as Kibana or Grafana.
So a pipe is "input -> action1 -> action2 -> ... -> output". Note that actions are optional... the only required top-level fields are name, input, and output.
Most action steps work with JSON data. Although real-world data usually arrives as raw
exec input will by default 'quote' it as JSON, like
The various extraction steps work on this raw data and generate sensible event fields.
Adding Pipes to the Hotrod System
Assuming you are logged in, then:
$ hotrod targets add Joburg target Joburg has id e94ccdca-f379-447b-8c90-6976e77652ec $ hotrod targets add Durban target Durban has id 8a1a0a29-d8f8-4098-a016-9d08f841f9a4 $ hotrod targets list name | id | tags | pipes | last seen -------+-------------------------------------- +------+-------+----------- Joburg | e94ccdca-f379-447b-8c90-6976e77652ec | | | Durban | 8a1a0a29-d8f8-4098-a016-9d08f841f9a4 | | |
Unless you explicitly specify
targets add, new targets will be assigned
uuid unique identifier. Target names and ids must be unique.
A Pipe definition is a file
NAME is the provided name of the Pipe.
Pipe names must be unique, and the
hotrod tool will also enforce that the name of the file
matches. To enable versioning or annotation of a pipe definition an addition fullstop with some
text is allowed in front of the extension, for example:
A pipe is loaded with the
pipes add subcommand:
$ hotrod pipes add --file uptime.yml $ hotrod pipes list name ------ uptime $ hotrod pipes show uptime name: uptime input: exec: command: uptime interval: 2s actions: - extract: input-field: _raw remove: true pattern: 'load average: (\S+), (\S+), (\S+)' output-fields: [m1, m5, m15] - convert: - m1: num - m5: num - m15: num - time: output-field: '@timestamp' output: write: console
This Pipe is not yet associated with any target, so we update a particular target:
$ hotrod targets update Joburg --add-pipe uptime $ hotrod targets list name | id | tags | pipes | last seen -------+-------------------------------------- +------+--------+----------- Joburg | e94ccdca-f379-447b-8c90-6976e77652ec | | uptime | Durban | 8a1a0a29-d8f8-4098-a016-9d08f841f9a4 | | |
The Pipe is now staged for the "Joburg" target, and will be deployed in a short while using the configured Hotrod Agent.
If you do
update Joburg --remove-pipe uptime then it will be removed from the staging
area, and stopped on the Target by the Hotrod Agent.