import datetime
import datetime as dt
import json
import logging
import os
import jsonschema
from google.cloud import bigquery
from octue.cloud.storage import GoogleCloudStorageClient
from aerosense_tools.preprocess import RawSignal
from aerosense_tools.utils import remove_metadata_columns_and_set_datetime_index
logger = logging.getLogger(__name__)
DATASET_NAME = "aerosense-twined.greta"
ROW_LIMIT = 10000
SENSOR_COORDINATES_SCHEMA_URI = "https://jsonschema.registry.octue.com/aerosense/sensor-coordinates/0.1.4.json"
[docs]class BigQuery:
"""A collection of queries for working with the Aerosense BigQuery dataset.
:param str project_name: the name of the Google Cloud project the BigQuery dataset belongs to
:return None:
"""
def __init__(self, project_name="aerosense-twined"):
self.client = bigquery.Client(project=project_name)
[docs] def get_sensor_data(
self,
installation_reference,
node_id,
sensor_type_reference,
start=None,
finish=None,
row_limit=ROW_LIMIT,
):
"""Get sensor data for the given sensor type on the given node of the given installation over the given time
period. The time period defaults to the last day.
:param str installation_reference: the reference of the installation to get sensor data from
:param str node_id: the node on the installation to get sensor data from
:param str sensor_type_reference: the type of sensor from which to get the data
:param datetime.datetime|None start: defaults to 1 day before the given finish
:param datetime.datetime|None finish: defaults to the current datetime
:param int|None row_limit: if set to `None`, no row limit is applied; if set to an integer, the row limit is set to this; defaults to 10000
:return (pandas.Dataframe, bool): the sensor data and whether the data has been limited by a row limit
"""
table_name = f"{DATASET_NAME}.sensor_data_{sensor_type_reference}"
conditions = """
WHERE datetime BETWEEN @start AND @finish
AND installation_reference = @installation_reference
AND node_id = @node_id
"""
start, finish = self._get_time_period(start, finish)
query_config = bigquery.QueryJobConfig(
query_parameters=[
bigquery.ScalarQueryParameter("start", "DATETIME", start),
bigquery.ScalarQueryParameter("finish", "DATETIME", finish),
bigquery.ScalarQueryParameter("installation_reference", "STRING", installation_reference),
bigquery.ScalarQueryParameter("node_id", "STRING", node_id),
]
)
data_query = f"""
SELECT *
FROM `{table_name}`
{conditions}
ORDER BY datetime
"""
data_limit_applied = False
if row_limit:
count_query = f"""
SELECT COUNT(datetime)
FROM `{table_name}`
{conditions}
"""
number_of_rows = list(self.client.query(count_query, job_config=query_config).result())[0][0]
if number_of_rows > row_limit:
data_query += f"\nLIMIT {row_limit}"
data_limit_applied = True
return (self.client.query(data_query, job_config=query_config).to_dataframe(), data_limit_applied)
[docs] def get_sensor_data_at_datetime(
self,
installation_reference,
node_id,
sensor_type_reference,
datetime,
tolerance=1,
):
"""Get sensor data for the given sensor type on the given node of the given installation at the given datetime.
The first datetime within a tolerance of ±0.5 * `tolerance` is used.
:param str installation_reference: the reference of the installation to get sensor data from
:param str node_id: the node on the installation to get sensor data from
:param str sensor_type_reference: the type of sensor from which to get the data
:param datetime.datetime|None datetime: the datetime to get the data at
:param float tolerance: the tolerance on the given datetime in seconds
:return pandas.Dataframe: the sensor data at the given datetime
"""
table_name = f"{DATASET_NAME}.sensor_data_{sensor_type_reference}"
start_datetime = datetime - dt.timedelta(seconds=tolerance / 2)
finish_datetime = datetime + dt.timedelta(seconds=tolerance / 2)
query_config = bigquery.QueryJobConfig(
query_parameters=[
bigquery.ScalarQueryParameter("start_datetime", "DATETIME", start_datetime),
bigquery.ScalarQueryParameter("finish_datetime", "DATETIME", finish_datetime),
bigquery.ScalarQueryParameter("installation_reference", "STRING", installation_reference),
bigquery.ScalarQueryParameter("node_id", "STRING", node_id),
]
)
query = f"""
SELECT *
FROM `{table_name}`
WHERE datetime >= @start_datetime
AND datetime < @finish_datetime
AND installation_reference = @installation_reference
AND node_id = @node_id
ORDER BY datetime
"""
return self.client.query(query, job_config=query_config).to_dataframe()
[docs] def get_aggregated_connection_statistics(self, installation_reference, node_id, start=None, finish=None):
"""Get minute-wise aggregated connection statistics over the given time period. The time period defaults to the
last day.
:param str installation_reference: the reference of the installation to get sensor data from
:param str node_id: the node on the installation to get sensor data from
:param datetime.datetime|None start: defaults to 1 day before the given finish
:param datetime.datetime|None finish: defaults to the current datetime
:return pandas.Dataframe: the aggregated connection statistics
"""
query = f"""
SELECT datetime, filtered_rssi, raw_rssi, tx_power, allocated_heap_memory
FROM `{DATASET_NAME}.connection_statistics_agg`
WHERE datetime BETWEEN @start AND @finish
AND installation_reference = @installation_reference
AND node_id = @node_id
ORDER BY datetime
"""
start, finish = self._get_time_period(start, finish)
query_config = bigquery.QueryJobConfig(
query_parameters=[
bigquery.ScalarQueryParameter("start", "DATETIME", start),
bigquery.ScalarQueryParameter("finish", "DATETIME", finish),
bigquery.ScalarQueryParameter("installation_reference", "STRING", installation_reference),
bigquery.ScalarQueryParameter("node_id", "STRING", node_id),
]
)
return self.client.query(query, job_config=query_config).to_dataframe()
[docs] def download_microphone_data_at_datetime(self, installation_reference, node_id, datetime, tolerance=1):
"""Download the microphone datafile for the given node of the given installation at the given datetime (within
the given tolerance). If more than one datetime is found within the tolerance, the datafile with the earliest
timestamp is downloaded.
:param str installation_reference: the reference of the installation to get microphone data from
:param str node_id: the node on the installation to get microphone data from
:param datetime.datetime datetime: the datetime to get the data for
:param float tolerance: the tolerance on the given datetime in seconds
:return str: the local path the microphone datafile was downloaded to
"""
start_datetime = datetime - dt.timedelta(seconds=tolerance / 2)
finish_datetime = datetime + dt.timedelta(seconds=tolerance / 2)
microphone_metadata = self.get_microphone_metadata(
installation_reference=installation_reference,
node_id=node_id,
start=start_datetime,
finish=finish_datetime,
)
cloud_path = microphone_metadata.iloc[0]["path"]
extension = os.path.splitext(cloud_path)[-1]
local_path = f"microphone-data-{datetime.isoformat()}" + extension
GoogleCloudStorageClient().download_to_file(local_path=local_path, cloud_path=cloud_path)
return os.path.abspath(local_path)
[docs] def get_installations(self):
"""Get the available installations.
:return list(dict): the available installations
"""
query = f"""
SELECT reference, turbine_id, location
FROM `{DATASET_NAME}.installation`
ORDER BY reference
"""
installations = self.client.query(query).to_dataframe().to_dict(orient="records")
return [
{"label": f"{row['reference']} (Turbine {row['turbine_id']})", "value": row["reference"]}
for row in installations
]
[docs] def get_sensor_types(self):
"""Get the available sensor types and their metadata.
:return list(dict): the available sensor types and their metadata
"""
query = f"""
SELECT name, metadata
FROM `{DATASET_NAME}.sensor_type`
ORDER BY name
"""
return {
sensor_type["name"]: json.loads(sensor_type["metadata"])
for sensor_type in self.client.query(query).to_dataframe().to_dict("records")
}
[docs] def get_nodes(self, installation_reference):
"""Get the IDs of the nodes installed on the given installation.
:param str installation_reference: the reference of the installation to get the node IDs of
:return list(str): the node IDs for the installation
"""
query = f"""
SELECT node_id FROM `{DATASET_NAME}.sensor_data`
WHERE installation_reference = @installation_reference
GROUP BY node_id
ORDER BY node_id
"""
query_config = bigquery.QueryJobConfig(
query_parameters=[bigquery.ScalarQueryParameter("installation_reference", "STRING", installation_reference)]
)
return self.client.query(query, job_config=query_config).to_dataframe()["node_id"].to_list()
[docs] def add_sensor_coordinates(self, coordinates):
"""Add the given sensor coordinates to the sensor coordinates table.
:param dict coordinates: the sensor coordinates
:return None:
"""
jsonschema.validate(coordinates, {"$ref": SENSOR_COORDINATES_SCHEMA_URI})
if self.get_sensor_coordinates(coordinates["reference"]):
raise ValueError(f"Sensor coordinates with the reference {coordinates['reference']!r} already exist.")
errors = self.client.insert_rows(
table=self.client.get_table(DATASET_NAME + ".sensor_coordinates"),
rows=[
{
"reference": coordinates["reference"],
"kind": coordinates["kind"],
"geometry": json.dumps(coordinates["geometry"]),
}
],
)
if errors:
raise ValueError(errors)
[docs] def update_sensor_coordinates(self, reference, kind, geometry):
"""Update the given sensor coordinates in the sensor coordinates table.
:param str reference: the reference of the coordinates to update
:param str kind: the kind of the new coordinates
:param dict geometry: the new coordinates
:return None:
"""
coordinates = {"reference": reference, "kind": kind, "geometry": geometry}
jsonschema.validate(coordinates, {"$ref": SENSOR_COORDINATES_SCHEMA_URI})
query_config = bigquery.QueryJobConfig(
query_parameters=[
bigquery.ScalarQueryParameter("kind", "STRING", kind),
bigquery.ScalarQueryParameter("geometry", "JSON", json.dumps(geometry)),
bigquery.ScalarQueryParameter("reference", "STRING", reference),
]
)
self.client.query(
f"""UPDATE {DATASET_NAME}.sensor_coordinates
SET kind = @kind, geometry = @geometry
WHERE reference = @reference;
""",
job_config=query_config,
)
[docs] def get_sensor_coordinates(self, reference=None):
"""Get the sensor coordinates with the given reference from the sensor coordinates table if they exist. If no
reference is given, get all sensor coordinates.
:param str|None reference: the reference of the coordinates to get
:return dict|None: the sensor coordinates if they exist
"""
if not reference:
return self.client.query(f"SELECT * FROM {DATASET_NAME}.sensor_coordinates").result().to_dataframe()
else:
query_config = bigquery.QueryJobConfig(
query_parameters=[bigquery.ScalarQueryParameter("reference", "STRING", reference)]
)
result = self.client.query(
f"""SELECT * FROM {DATASET_NAME}.sensor_coordinates
WHERE reference = @reference;
""",
job_config=query_config,
).result()
if result.total_rows == 0:
return None
result = dict(result.to_dataframe().iloc[0])
result["geometry"] = json.loads(result["geometry"])
return result
[docs] def get_measurement_sessions(
self,
installation_reference,
node_id,
sensor_type_reference,
start=None,
finish=None,
):
"""Get the measurement sessions that exist for the given sensor type, node, and installation between the given
start and finish datetimes.
:param str installation_reference: the reference of the installation to get measurement sessions for
:param str node_id: the ID of the node to get measurement sessions for
:param str sensor_type_reference: the type of sensor to get measurement sessions for
:param datetime.datetime|None start: the time after which the sessions start
:param datetime.datetime|None finish: the time before which the sessions end
:return pandas.DataFrame: the measurement sessions
"""
start, finish = self._get_time_period(start, finish)
query_config = bigquery.QueryJobConfig(
query_parameters=[
bigquery.ScalarQueryParameter("installation_reference", "STRING", installation_reference),
bigquery.ScalarQueryParameter("node_id", "STRING", node_id),
bigquery.ScalarQueryParameter("sensor_type_reference", "STRING", sensor_type_reference),
bigquery.ScalarQueryParameter("start", "DATETIME", start),
bigquery.ScalarQueryParameter("finish", "DATETIME", finish),
]
)
result = self.client.query(
f"""
SELECT * FROM {DATASET_NAME}.sessions
WHERE installation_reference = @installation_reference
AND node_id = @node_id
AND sensor_type_reference = @sensor_type_reference
AND start_datetime > @start
AND finish_datetime <= @finish
ORDER BY start_datetime
""",
job_config=query_config,
).result()
return result.to_dataframe()
[docs] def extract_and_add_new_measurement_sessions(self, sensors=None):
"""Extract new measurement sessions from the database for the given sensors and add them to the sessions table.
If no sensors are given, sessions for the following sensors are searched for:
- connection_statistics
- magnetometer
- connection
- barometer
- barometer_thermometer
- accelerometer
- gyroscope
- battery_info
- differential_barometer
:param list(str)|None sensors: the sensors to search for new measurement sessions for
:return None:
"""
sessions_table_name = DATASET_NAME + ".sessions"
sensors = sensors or (
"connection_statistics",
"magnetometer",
"barometer",
"barometer_thermometer",
"accelerometer",
"gyroscope",
"battery_info",
"differential_barometer",
)
installations = [installation["value"] for installation in self.get_installations()]
for installation_reference in installations:
nodes = self.get_nodes(installation_reference)
for node_id in nodes:
for sensor_type_reference in sensors:
logger.info(
"Getting latest extracted session finish datetime for installation %r, node %r, sensor type %r.",
installation_reference,
node_id,
sensor_type_reference,
)
result = (
self.client.query(
f"""
SELECT finish_datetime FROM {sessions_table_name}
WHERE installation_reference = @installation_reference
AND node_id = @node_id
AND sensor_type_reference = sensor_type_reference
ORDER BY finish_datetime DESC
LIMIT 1
""",
job_config=bigquery.QueryJobConfig(
query_parameters=[
bigquery.ScalarQueryParameter(
"installation_reference", "STRING", installation_reference
),
bigquery.ScalarQueryParameter("node_id", "STRING", node_id),
bigquery.ScalarQueryParameter(
"sensor_type_reference", "STRING", sensor_type_reference
),
]
),
)
.result()
.to_dataframe()
)
try:
latest_session_finish_datetime = result.iloc[0]["finish_datetime"].to_pydatetime()
except IndexError:
self._log_no_sessions(installation_reference, node_id, sensor_type_reference, "existing")
continue
if sensor_type_reference == "connection_statistics":
sensor_data_df = self.get_aggregated_connection_statistics(
installation_reference=installation_reference,
node_id=node_id,
start=latest_session_finish_datetime,
finish=datetime.datetime.now(),
)
else:
sensor_data_df, _ = self.get_sensor_data(
installation_reference=installation_reference,
node_id=node_id,
sensor_type_reference=sensor_type_reference,
start=latest_session_finish_datetime,
finish=datetime.datetime.now(),
)
if sensor_data_df.empty:
self._log_no_sessions(installation_reference, node_id, sensor_type_reference)
continue
sensor_data_df = remove_metadata_columns_and_set_datetime_index(sensor_data_df)
_, measurement_sessions = RawSignal(
dataframe=sensor_data_df,
sensor_type=sensor_type_reference,
).extract_measurement_sessions()
if measurement_sessions.empty:
self._log_no_sessions(installation_reference, node_id, sensor_type_reference)
continue
# Add columns needed for sessions table.
measurement_sessions["installation_reference"] = installation_reference
measurement_sessions["node_id"] = node_id
measurement_sessions["sensor_type_reference"] = sensor_type_reference
# Reorder columns.
measurement_sessions = measurement_sessions[
[
"installation_reference",
"node_id",
"sensor_type_reference",
"start_datetime",
"finish_datetime",
]
]
# Add new sessions to sessions table.
self.client.load_table_from_dataframe(
dataframe=measurement_sessions,
destination=sessions_table_name,
).result()
logger.info(
"New sessions added for installation %r, node %r, sensor type %r.",
installation_reference,
node_id,
sensor_type_reference,
)
[docs] def query(self, query_string):
"""Query the dataset with an arbitrary query.
:param str query_string: the query to use
:return pd.DataFrame: the results of the query
"""
return self.client.query(query_string).to_dataframe()
def _get_time_period(self, start=None, finish=None):
"""Get a time period of:
- The past day if no arguments are given
- The given start until the current datetime if only the start is given
- The day previous to the finish if only the finish is given
- The given start and finish if both are given
:param datetime.datetime|None start: defaults to 1 day before the given finish
:param datetime.datetime|None finish: defaults to the current datetime
:return (datetime.datetime, datetime.datetime):
"""
finish = finish or dt.datetime.now()
start = start or finish - dt.timedelta(days=1)
return start, finish
def _log_no_sessions(self, installation_reference, node_id, sensor_type_reference, session_type="new"):
"""Log that none of the given session type are available.
:param str installation_reference:
:param str node_id:
:param str sensor_type_reference:
:param str session_type:
:return None:
"""
logger.info(
"No %s sessions available for installation %r, node %r, sensor type %r.",
session_type,
installation_reference,
node_id,
sensor_type_reference,
)