iotswarm package

Submodules

iotswarm.db module

This module holds all implementation for databases. Currently only supporting Oracle.

class iotswarm.db.BaseDatabase(inherit_logger: logging.Logger | None = None)

Bases: ABC

Base class for implementing database objects

Parameters

inherit_logger – Assigns the passed logger to instance.

abstract query_latest_from_site() List
class iotswarm.db.MockDB(inherit_logger: logging.Logger | None = None)

Bases: BaseDatabase

static query_latest_from_site() List
class iotswarm.db.CosmosDB(inherit_logger: logging.Logger | None = None)

Bases: BaseDatabase

Base class for databases using COSMOS_UK data.

connection: object

Connection to database.

site_data_query: CosmosQuery

SQL query for retrieving a single record.

site_id_query: CosmosQuery

SQL query for retrieving list of site IDs

query_latest_from_site()
class iotswarm.db.Oracle(inherit_logger: logging.Logger | None = None)

Bases: CosmosDB

Class for handling oracledb logic and retrieving values from DB.

connection: Connection

Connection to oracle database.

site_data_query: CosmosQuery = 'SELECT * FROM COSMOS.{table}\nWHERE site_id = :site_id \nORDER BY date_time DESC \nFETCH NEXT 1 ROWS ONLY'

SQL query for retrieving a single record.

site_id_query: CosmosQuery = 'SELECT UNIQUE(site_id) FROM COSMOS.{table}'

SQL query for retrieving list of site IDs

async classmethod create(dsn: str, user: str, password: str = None, inherit_logger: logging.Logger | None = None, **kwargs)
Factory method for initialising the class.

Initialization is done through the create() method: Oracle.create(…).

Parameters
  • dsn – Oracle data source name.

  • user – Username used for query.

  • pw – User password for auth.

  • inherit_logger – Uses the given logger if provided

async query_latest_from_site(site_id: str, table: CosmosTable) dict

Requests the latest data from a table for a specific site.

Parameters
  • site_id – ID of the site to retrieve records from.

  • table – A valid table from the database

Returns

A dict containing the database columns as keys, and the values as values.

Returns None if no data retrieved.

Return type

dict | None

async query_site_ids(table: CosmosTable, max_sites: int | None = None) list

query_site_ids returns a list of site IDs from COSMOS database

Parameters
  • table – A valid table from the database

  • max_sites – Maximum number of sites to retreive

Returns

A list of site ID strings.

Return type

List[str]

class iotswarm.db.LoopingCsvDB(csv_file: str | pathlib.Path)

Bases: BaseDatabase

A database that reads from csv files and loops through items for a given table or site. The site and index is remembered via a dictionary key and incremented each time data is requested.

db_file: str | pathlib.Path

Path to the database file.

connection: DataFrame

Connection to the pd object holding data.

query_latest_from_site(site_id: str, index: int) dict

Queries the datbase for a SITE_ID incrementing by 1 each time called for a specific site. If the end is reached, it loops back to the start.

Parameters
  • site_id – ID of the site to query for.

  • index – An offset index to query.

Returns

A dict of the data row.

query_site_ids(max_sites: int | None = None) list

query_site_ids returns a list of site IDs from the database

Parameters

max_sites – Maximum number of sites to retreive

Returns

A list of site ID strings.

Return type

List[str]

class iotswarm.db.LoopingSQLite3(db_file: str | pathlib.Path)

Bases: CosmosDB, LoopingCsvDB

A database that reads from .db files using sqlite3 and loops through entries in sequential order. There is a script that generates the .db file in the __assets__/data directory relative to this file. .csv datasets should be downloaded from the accompanying S3 bucket before running.

connection: Connection

Connection to the database.

site_data_query: CosmosQuery = 'SELECT * FROM {table}\nWHERE site_id = :site_id\nLIMIT 1 OFFSET :offset'

SQL query for retrieving a single record.

site_id_query: CosmosQuery = 'SELECT DISTINCT(site_id) FROM {table}'

SQL query for retrieving list of site IDs

query_latest_from_site(site_id: str, table: CosmosTable, index: int) dict

Queries the datbase for a SITE_ID incrementing by 1 each time called for a specific site. If the end is reached, it loops back to the start.

Parameters
  • site_id – ID of the site to query for.

  • table – A valid table from the database

  • index – Offset of index.

Returns

A dict of the data row.

query_site_ids(table: CosmosTable, max_sites: int | None = None) list

query_site_ids returns a list of site IDs from COSMOS database

Parameters
  • table – A valid table from the database

  • max_sites – Maximum number of sites to retreive

Returns

A list of site ID strings.

Return type

List[str]

iotswarm.devices module

This module hold logic for device implementation. Currently only a single device time implemented.

class iotswarm.devices.BaseDevice(device_id: str, data_source: BaseDatabase, connection: MessagingBaseClass, *, sleep_time: int | None = None, max_cycles: int | None = None, delay_start: bool | None = None, inherit_logger: logging.Logger | None = None, table: iotswarm.queries.CosmosTable | None = None, mqtt_topic: str | None = None, mqtt_prefix: str | None = None, mqtt_suffix: str | None = None, no_send_probability: int = 0)

Bases: object

Base class for sensing devices.

device_type: str = 'base-device'

Name of the device type

cycle: int = 0

Current cycle.

mqtt_base_topic: str

Base topic for mqtt topic.

swarm: object | None = None

The session applied

device_id: str

ID of the site.

table: CosmosTable

SQL table used in queries if Oracle or LoopingSQLite3 selected as data_source.

data_source: BaseDatabase

Specifies the source of data to use.

connection: MessagingBaseClass

Connection to the data receiver.

max_cycles: int = 0

Maximum number of data transfer cycles before shutting down.

sleep_time: int = 60

Time to sleep for each time data is sent.

delay_start: bool = False

Adds a random delay to first invocation from 0 - sleep_time.

property mqtt_topic: str

Builds the mqtt topic.

mqtt_prefix: str

Prefix added to mqtt message.

mqtt_suffix: str

Suffix added to mqtt message.

property no_send_probability: int

Defines the chance of data not being sent, can be 0 - 100

async run()

The main invocation of the method. Expects a Oracle object to do work on and a table to retrieve. Runs asynchronously until max_cycles is reached.

Parameters

message_connection – The message object to send data through

class iotswarm.devices.CR1000XDevice(*args, serial_number: str | None = None, os_version: str | None = None, program_name: str | None = None, table_name: str | None = None, **kwargs)

Bases: BaseDevice

Represents a CR1000X datalogger.

device_type: str = 'CR1000X'

Name of the device type

serial_number: str = '00000'

Serial number of the device instance.

os_version: str = 'CR1000X.Std.07.02'

Operating system installed on the device.

program_name: str = 'CPU:iotswarm-1.6.3.CR1X'

Name of logger program being run.

table_name: str = 'default'

Name of table being submitted by logger.

class iotswarm.devices.XMLDataTypes(value, names=<not given>, *values, module=None, qualname=None, type=None, start=1, boundary=None)

Bases: Enum

Enum class representing XML datatypes with rankings used for selecting maximum type needed for a range of values.

null = {'rank': 0, 'schema': 'xsi:nil'}
string = {'rank': 1, 'schema': 'xsd:string'}
boolean = {'rank': 2, 'schema': 'xsd:boolean'}
dateTime = {'rank': 3, 'schema': 'xsd:dateTime'}
short = {'rank': 4, 'schema': 'xsd:short'}
int = {'rank': 5, 'schema': 'xsd:int'}
long = {'rank': 6, 'schema': 'xsd:long'}
integer = {'rank': 7, 'schema': 'xsd:integer'}
float = {'rank': 8, 'schema': 'xsd:float'}
double = {'rank': 9, 'schema': 'xsd:double'}
class iotswarm.devices.CR1000XField(name: str, data_type: str | None = None, units: str | None = None, process: str | None = None, settable: bool | None = None, data_values: list[any] | None = None)

Bases: object

Represents the field part of a CR1000X payload. Each sensor gets a field.

name: str = ''

Name of the field.

data_type: str

XML datatype of the field.

units: str = ''

Scientific units of the field.

process: str = 'Smp'

Process used for aggregating the data. Defaults to “Smp” meaning “Sample”.

settable: bool = False

iotswarm.example module

iotswarm.loggers module

Logging module for defining custom log handlers.

class iotswarm.loggers.TimedRotatingFileHandler(*args, **kwargs)

Bases: TimedRotatingFileHandler

TimedRotatingFileHandler Handler for rotating logs on a timed basis.

Extended this handler to ensure log file and directory are created according to platform.

iotswarm.queries module

This module contains query constants for retrieving data from the COSMOS database.

class iotswarm.queries.CosmosTable(value, names=<not given>, *values, module=None, qualname=None, type=None, start=1, boundary=None)

Bases: StrEnum

Enums of approved COSMOS database tables.

LEVEL_1_SOILMET_30MIN = 'LEVEL1_SOILMET_30MIN'
LEVEL_1_NMDB_1HOUR = 'LEVEL1_NMDB_1HOUR'
LEVEL_1_PRECIP_1MIN = 'LEVEL1_PRECIP_1MIN'
LEVEL_1_PRECIP_RAINE_1MIN = 'LEVEL1_PRECIP_RAINE_1MIN'
COSMOS_STATUS_1HOUR = 'COSMOS_STATUS_1HOUR'
class iotswarm.queries.CosmosQuery(value, names=<not given>, *values, module=None, qualname=None, type=None, start=1, boundary=None)

Bases: StrEnum

Enums of common queries in each databasing language.

SQLITE_LOOPED_DATA = 'SELECT * FROM {table}\nWHERE site_id = :site_id\nLIMIT 1 OFFSET :offset'

Query for retreiving data from a given table in sqlite format.

SELECT * FROM <table>
WHERE site_id = :site_id
LIMIT 1 OFFSET :offset
ORACLE_LATEST_DATA = 'SELECT * FROM COSMOS.{table}\nWHERE site_id = :site_id \nORDER BY date_time DESC \nFETCH NEXT 1 ROWS ONLY'

Query for retreiving data from a given table in oracle format.

SELECT * FROM <table>
ORDER BY date_time DESC
FETCH NEXT 1 ROWS ONLY
SQLITE_SITE_IDS = 'SELECT DISTINCT(site_id) FROM {table}'

Queries unique `site_id `s from a given table.

SELECT DISTINCT(site_id) FROM <table>
ORACLE_SITE_IDS = 'SELECT UNIQUE(site_id) FROM COSMOS.{table}'

Queries unique `site_id `s from a given table.

SELECT UNQIUE(site_id) FROM <table>

iotswarm.swarm module

This is the core module for orchestrating swarms of IoT devices. One swarm defined currently for using COSMOS data.

class iotswarm.swarm.Swarm(devices: List[BaseDevice], name: str | None = None, base_directory: str | pathlib.Path | None = None)

Bases: object

Manages a swarm of IoT devices and runs the main loop of all devices. Can receive any number or combination of devices.

devices: List[BaseDevice]

List of site objects.

name: str

Name of swarm applied in logs.

base_directory: Path = PosixPath('/home/runner/.local/share/iot_swarm/swarms')

The base directory where swarms are stored.

async run() None

Main function for running the swarm. Sends the query and message connection object. Runs until all sites reach their maximum cycle. If any site has no maximum, it runs forever.

write_self(replace: bool = False) None

Writes the swarm state to file.

Parameters
  • replace – When True it replaces the swarm. Execption is

  • raised if the file exists and replace is False.

classmethod destroy_swarm(swarm: object) None

Destroys a swarm file.

list_swarms() List[str]

Returns list of swarms from instance directory

classmethod load_swarm(swarm_id: str) Self

Loads a swarm from dill file.