Revisiting InfluxDB

When I put together a Zato service to take temperature values from SmartThings and record them in InfluxDB I considered whether a new outgoing connection type in Zato would be the best way to do it.

On reflection, I think not. That’s only partly because Zato has a reasonably large codebase and – frankly – I wouldn’t know where to start. My Python’s not good enough yet to get up to speed on it quickly.

That’s not the only reason though.

There are any number of systems you might want to interact with, and InfluxDB is only one. Zato can’t support them all out of the box without becoming bloated. There’s no need. In my opinion the ESB should remain agnostic as to the endpoints. If data has to be massaged to fit a specific endpoint then that should be the responsibility of the services written within Zato.

There is scope for improving on what I did before though, and this is the method I’ve chosen.

When a service wants to write to InfluxDB, it calls another service with the data to be written and the database name. The database name in this case is the name of a Zato outgoing connection. The intermediate service formats the data for InfluxDB and passes it to the outgoing connection.

A service writing to InfluxDB will call it like this:

response = self.invoke('influxdb.write',
           { "database": "InfluxHome",
             "measurement": "measurement name",
             "value": value,
             "tags": { "tagname1": "tagvalue1",
                       "tagname2": "tagvalue2"}
             }, data_format=DATA_FORMAT.JSON, as_bunch=True)

The service being invoked – influxdb.write – takes the measurement name, value and an array of tags. I’m letting Influx set an automatic timestamp, but it would be easy to extend this to send an optional timestamp as well.

The database parameter is the name of the outgoing channel that defines the database. The outgoing connection defines both the hostname and, by use of the db query string parameter, the specific database on that host:


This means that if I want to write to a different InfluxDB database I only have to set up a new outgoing connection – I don’t have to modify the influx.write service. Services wanting to write to InfluxDB just have to know the outgoing connection name of “InfluxHome” which encapsulates the two things needed to define a database – the hostname and specific database name on that host.

The service influxdb.write service is defined in influxdb.py:

from zato.server.service import Service, Dict

class Write(Service):

    class SimpleIO:
        input_required = ('database', 'measurement', Dict('tags'), 'value')
        input_optional = (Dict('params'),)

         output_optional = ('error', )

    def handle(self):

        influxdata = [ self.request.input.measurement ]

        for (key,value) in self.request.input.tags.items():
             cleanvalue = value.replace(' ', '\ '
                              ).replace(',', '\,'
                              ).replace('=', '\=')

             influxdata.append(key + "=" + cleanvalue)

        payload = ",".join(influxdata) + " value=" + self.request.input.value

        self.logger.info(self.request.input.params)
        self.logger.info(payload)

        conn = self.outgoing.plain_http[self.request.input.database].conn
        response = conn.post(self.cid, payload, self.request.input.params)

        self.response.status_code = response.status_code
        self.response.payload = response.text

This builds up an InfluxDB payload and passes it to the plain HTTP outgoing connection named in the “database” input from the calling service.

By allowing the influxdb.write service to take “params” as an additional parameter it’s also possible to pass other options to InfluxDB, such as a specific retention policy. These will be merged with the db name in the query string.

The InfluxDB http API supports basic authentication, so that can also be added in the security definition of the outgoing connection.

The service can also be exposed as a plain HTTP connection so that it can be called from outside of Zato:

I only envisage using that for testing though, as the intent is for it to be called internally by other services that take incoming data from external services and need to write it to an InfluxDB database.

For my purposes this is sufficient as I don’t have a need to read from the Influx database through Zato. Influx is an end point that’s read by my Grafana dashboards, but it wouldn’t be that difficult to add additional classes to influxdb.py to read and manage the database if needed.

in Home Automation

Related Posts