paint-brush
使用 TDEngine 和 GraphQL 创建时间序列数据库经过@patrickheneise
648 讀數
648 讀數

使用 TDEngine 和 GraphQL 创建时间序列数据库

经过 Patrick Heneise13m2023/10/22
Read on Terminal Reader

太長; 讀書

在本文中,我们将介绍 TDEngine 数据库和表的设置,以及如何创建 GraphQL 架构,使我们能够查询来自各种客户端和应用程序的数据。
featured image - 使用 TDEngine 和 GraphQL 创建时间序列数据库
Patrick Heneise HackerNoon profile picture
0-item
1-item

动机和介绍

作为 Nevados 软件团队的一部分,我们正在为 Nevados All Terrain Tracker® 构建一个操作和监控平台。太阳能跟踪器是一种将太阳能电池板朝向太阳的装置。每个太阳能跟踪器都会不断向我们的平台发送状态信息和读数,例如当前角度、温度、电压等,我们需要存储这些信息以进行分析和可视化。如果跟踪器配置为每 5 秒发送一次数据,则每个跟踪器每天有 17,280 个数据点,每个跟踪器每月有 518,400 个数据点。这总结了很多信息。这种数据被称为“时间序列数据”,对于软件中的所有复杂问题,都有多种解决方案(时间序列数据库)。最著名的是 InfluxDB 和 TimescaleDB。对于我们的平台,我们决定使用TDEngine ,这是一种相对较新的产品,针对 IoT 应用程序进行了优化,并使用 SQL 查询语言。


对于这个决定有几个论据: TDEngine

  • 是开源的
  • 针对物联网应用进行了优化
  • 使用SQL,这是我们熟悉的语言
  • 作为托管服务提供,我们可以专注于构建我们的应用程序
  • 很容易通过 Docker 在本地运行


在本文中,我们将介绍 TDEngine 数据库和表的设置,以及如何创建 GraphQL 架构,使我们能够查询来自各种客户端和应用程序的数据。

TDEngine 入门

开始使用 TDEngine 的最简单方法是使用他们的云服务。转到TDEngine并创建一个帐户。他们有一些我们可以使用的公共数据库,这对于制作演示或查询实验来说非常有用。


如果你想在本地运行 TDEngine,你可以使用 Docker 镜像和Telegraf从各种来源检索数据并将其发送到数据库,例如系统信息、ping 统计信息等。

 version: '3.9' services: tdengine: restart: always image: tdengine/tdengine:latest hostname: tdengine container_name: tdengine ports: - 6030:6030 - 6041:6041 - 6043-6049:6043-6049 - 6043-6049:6043-6049/udp volumes: - data:/var/lib/taos telegraf: image: telegraf:latest links: - tdengine env_file: .env volumes: - ./telegraf.conf:/etc/telegraf/telegraf.conf


查看Telegraf 配置的官方文档Telegraf 上的 TDEngine 文档。简而言之,连接到 MQTT 主题看起来像这样:

 [agent] interval = "5s" round_interval = true omit_hostname = true [[processors.printer]] [[outputs.http]] url = "http://127.0.0.1:6041/influxdb/v1/write?db=telegraf" method = "POST" timeout = "5s" username = "root" password = "taosdata" data_format = "influx" [[inputs.mqtt_consumer]] topics = [ "devices/+/trackers", ]

在本文中,我们将使用公共数据库,其中包含来自美国 5 个主要港口的船舶移动情况,而不是在本地设置所有内容并等待数据库填充信息。

将 TDEngine 与公共船舶运动数据结合使用

默认情况下,TDEngine 中的表具有隐式架构,这意味着该架构会适应写入数据库的数据。这对于引导非常有用,但最终,我们希望切换到显式模式以避免传入数据出现问题。需要一点时间来适应的一件事是他们的超级表(简称“STable”)的概念。 TDEngine 中有标签(键)和列(数据)。对于每个组合键,都会创建一个“表”。所有表都分组在 STable 中。

显示 tdengine 云表的屏幕截图


查看vessel数据库,他们有一个名为ais_data的稳定表,其中包含很多表。通常,我们不想针对每个表进行查询,而是始终使用 STable 从所有表中获取累积的数据。


TDEngine 有一个函数DESCRIBE ,它允许我们检查表或 STable 的模式。 ais_data具有以下架构:

显示 TDEngine 表架构的屏幕截图


STable 有两个键和六个数据列。键是mmsiname 。我们可以使用常规的SQL语句来查询数据:

 SELECT ts, name, latitude, longitude FROM vessel.ais_data LIMIT 100; ts name latitude longitude 2023-08-11T22:07:02.419Z GERONIMO 37.921673 -122.40928 2023-08-11T22:21:48.985Z GERONIMO 37.921688 -122.40926 2023-08-11T22:25:08.784Z GERONIMO 37.92169 -122.40926 ...


请记住,时间序列数据通常非常大,因此我们应该始终限制结果集。我们可以使用一些特定于时间序列的函数,例如PARTITION BY ,它按键对结果进行分组,对于获取最新更新的单个键很有用。例如:

 SELECT last_row(ts, name, latitude, longitude) FROM vessel.ais_data PARTITION BY name; ts name latitude longitude 2023-09-08T13:09:34.951Z SAN SABA 29.375961 -94.86894 2023-09-07T18:05:01.230Z SELENA 33.678585 -118.1954 2023-09-01T17:23:24.145Z SOME TUESDAY 33.676563 -118.230606 ... 


显示 tdengine 输出的屏幕截图


我建议阅读他们的SQL 文档以获取更多示例。在继续之前,请转到“编程”、“Node.js”并检索TDENGINE_CLOUD_URLTDENGINE_CLOUD_TOKEN变量。

GraphQL 与 Nexus.js、Fastify 和 Mercurius

GraphQL 如今非常出名,并且有很多关于它的好文章。我们选择该技术是因为我们收集和处理来自不同来源的信息,而 GraphQL 允许我们透明地将它们组合到单个 API 中。


我们将使用令人惊叹的Fastify框架(目前是 Node.js 应用程序的默认选择)和Mercurius适配器。 Mercurius 和 Fastify 团队共同努力提供无缝体验,这是注重性能的 GraphQL API 的绝佳选择。 GraphQL Nexus是一个构建/生成模式和解析器的工具,因此我们不必手动编写所有内容。


有一些设置代码等需要完成,我将在此处跳过。您可以在GitHub 上找到完整的示例 - tdengine-graphql-example


我想在这篇文章中详细阐述两件相当具体的事情:

  1. TDEngine 查询库
  2. 带有 Nexus 的 GraphQL 模式

TDEngine查询库

TDEngine 有一个Node.js 库,允许我们查询数据库。这使得连接和发送查询变得很容易,不幸的是,响应有点难以处理。所以我们写了一个小包装:


 'use strict' import tdengine from '@tdengine/rest' import { tdEngineToken, tdEngineUrl } from '../config.js' import parseFields from 'graphql-parse-fields' const { options: tdOptions, connect: tdConnect } = tdengine tdOptions.query = { token: tdEngineToken } tdOptions.url = tdEngineUrl export default function TdEngine(log) { this.log = log const conn = tdConnect(tdOptions) this.cursor = conn.cursor() } TdEngine.prototype.fetchData = async function fetchData(sql) { this.log.debug('fetchData()') this.log.debug(sql) const result = await this.cursor.query(sql) const data = result.getData() const errorCode = result.getErrCode() const columns = result.getMeta() if (errorCode !== 0) { this.log.error(`fetchData() error: ${result.getErrStr()}`) throw new Error(result.getErrStr()) } return data.map((r) => { const res = {} r.forEach((c, idx) => { const columnName = columns[idx].columnName .replace(/`/g, '') .replace('last_row(', '') .replace(')', '') if (c !== null) { res[columnName] = c } }) return res }) }


这将返回一个可以传递到 GraphQL 上下文中的 TDEngine 对象。我们将主要使用fetchData函数,在其中我们可以传入 SQL 查询并将结果作为对象数组返回。 TDEngine 分别返回元数据(列)、错误和数据。我们将使用元数据将列映射到常规对象列表中。这里的一个特殊情况是last_row函数。这些列以last_row(ts)last_row(name)等形式返回,我们希望删除last_row部分,以便属性将1:1 映射到GraphQL 模式。这是在columnName.replace部分完成的。

GraphQL 架构

不幸的是,TDEngine 没有像Postgraphile这样的模式生成器,我们不想编写和维护纯 GraphQL 模式,因此我们将使用 Nexus.js 来帮助我们实现这一点。我们将从两种基本类型开始: VesselMovementTimestamp (标量类型)。 TimestampTDDate是将日期显示为时间戳或日期字符串的两种不同类型。这对于客户端应用程序(以及开发过程中)很有用,因为它可以决定使用哪种格式。 asNexusMethod允许我们将该类型用作VesselMovement模式中的函数。我们可以在类型定义中解析TDDate以使用原始ts时间戳值。


 import { scalarType, objectType } from 'nexus' export const Timestamp = scalarType({ name: 'Timestamp', asNexusMethod: 'ts', description: 'TDEngine Timestamp', serialize(value) { return new Date(value).getTime() } }) export const TDDate = scalarType({ name: 'TDDate', asNexusMethod: 'tdDate', description: 'TDEngine Timestamp as Date', serialize(value) { return new Date(value).toJSON() } }) export const VesselMovement = objectType({ name: 'VesselMovement', definition(t) { t.ts('ts') t.tdDate('date', { resolve: (root) => root.ts }) t.string('mmsi') t.string('name') t.float('latitude') t.float('longitude') t.float('speed') t.float('heading') t.int('nav_status') } })


对于时间序列类型,我们使用MovementSeries后缀在界面中明确区分关系类型和时间序列类型。


现在我们可以定义查询。我们将从一个简单的查询开始,以获取 TDEngine 的最新动态:

 import { objectType } from 'nexus' export const GenericQueries = objectType({ name: 'Query', definition(t) { t.list.field('latestMovements', { type: 'VesselMovement', resolve: async (root, args, { tdEngine }, info) => { const fields = filterFields(info) return tdEngine.fetchData( `select last_row(${fields}) from vessel.ais_data partition by mmsi;` ) } }) } }) 


显示查询的 graphiql 输出的屏幕截图


GraphiQL是测试 API 和探索模式的绝佳工具,您可以通过在 Mercurius 中传递graphiql.enabled = true来启用它。通过查询,我们可以看到按mmsi分组的船舶的最新动态。不过,让我们更进一步。 GraphQL 的最大优点之一是它对客户端或应用程序来说是透明的层。我们可以从多个来源获取数据并将它们组合到同一个模式中。


不幸的是,我无法找到包含大量船舶信息的简单/免费 API。有Sinay ,但他们只在 Vessel 响应中提供namemmsiimo (我们已经在 TDEngine 中提供了)。为了举例,我们假设我们的数据库中没有该name ,我们需要从 Sinay 检索它。通过imo我们还可以查询船舶的二氧化碳排放量,或者可以使用另一个 API 来检索图像、旗帜或其他信息,所有这些都可以在Vessel类型中组合。


 export const Vessel = objectType({ name: 'Vessel', definition(t) { t.string('mmsi') t.string('name') t.nullable.string('imo') t.list.field('movements', { type: 'VesselMovement' }) } })


正如您在此处所看到的,我们可以将列表字段movements与来自 TDEngine 的时间序列数据一起包含在内。我们将添加另一个查询来获取船舶信息,解析器允许我们合并来自 TDEngine 和 Sinay 的数据:


 t.field('vessel', { type: 'Vessel', args: { mmsi: 'String' }, resolve: async (root, args, { tdEngine }, info) => { const waiting = [ getVesselInformation(args.mmsi), tdEngine.fetchData( `select * from vessel.ais_data where mmsi = '${args.mmsi}' order by ts desc limit 10;` ) ] const results = await Promise.all(waiting) return { ...results[0][0], movements: results[1] } } }) 


显示查询的 graphiql 输出的屏幕截图

🎉 这里我们有一个有效的 GraphQL API,它从 TDEngine 返回我们请求的特定容器的行。 getVesselInformation()是一个从 Sinay 获取数据的简单包装器。我们将 TDEngine 结果添加到movements属性中,GraphQL 将处理其余部分并将所有内容映射到模式。

注意:SQL注入

与任何 SQL 数据库一样,我们需要小心用户输入。在上面的示例中,我们直接使用mmsi输入,这使得该查询容易受到 SQL 注入攻击。为了举例,我们暂时忽略这一点,但在“现实世界”应用程序中,我们应该始终清理用户输入。有几个小型库可以清理字符串,在大多数情况下,我们只依赖数字(分页、限制等)和枚举(排序顺序),GraphQL 会为我们检查这些。


感谢 Dmitry Zaets 指出了这一点!

优化

有一些事情超出了本文的范围,但我想简单提一下:

Pothos 作为 Nexus.js 的精神继承者

当我们开始该项目时,Nexus.js 是生成 GraphQL 模式的最佳选择。虽然稳定并且功能有些完整,但它缺乏维护和更新。有一个名为Pothos的基于插件的 GraphQL 模式构建器,它更加现代并且得到积极维护。如果您要开始一个新项目,我可能建议使用 Pothos 而不是 Nexus.js。


感谢莫萨特勒指出这一点!

现场旋转变压器

正如您在上面的Vessel解析器中看到的,两个数据源都会立即获取并处理。这意味着如果查询仅针对name ,我们仍然会获取响应的movements 。如果查询仅针对movements ,我们仍然从 Sinay 获取名称并可能为请求付费。


这是 GraphQL 的反模式,我们可以通过使用字段信息仅获取请求的数据来提高性能。解析器将字段信息作为第四个参数,但它们很难使用。相反,我们可以使用graphql-parse-fields来获取请求字段的简单对象并调整解析器逻辑。

SQL查询优化

在我们的示例查询中,我们使用select *从数据库中获取所有列,即使不需要它们。这显然很糟糕,我们可以使用相同的字段解析器来优化 sql 查询:


 export function filterFields(info, context) { const invalidFields = ['__typename', 'date'] const parsedFields = parseFields(info) const fields = context ? parsedFields[context] : parsedFields const filteredFields = Object.keys(fields).filter( (f) => !invalidFields.includes(f) ) return filteredFields.join(',') }


该函数从 GraphQL 信息中返回一个以逗号分隔的字段列表。

 const fields = filterFields(info) return tdEngine.fetchData( `select last_row(${fields}) from vessel.ais_data partition by mmsi;` )


如果我们请求tslatitudelongitude ,查询将如下所示:

 select last_row(ts, latitude, longitude) from vessel.ais_data partition by mmsi;


如果该表中只有几列,这可能并不重要,但如果有更多的表和复杂的查询,这可能会对应用程序性能产生巨大的影响。

时间序列函数

TDEngine 有一些特定于时间序列的扩展,可用于提高性能。例如,要检索最新条目,可以使用传统的 SQL 查询:

 SELECT ts, name, latitude, longitude FROM vessel.ais_data order by ts desc limit 1;


执行需要 653 毫秒,而“TDEngine”查询只需要 145 毫秒:

 SELECT last_row(ts, name, latitude, longitude) FROM vessel.ais_data;


每个表都有配置选项来优化last_row/first_row函数和其他缓存设置。我建议阅读TDEngine 文档

结论

简单版本:在本文中,我们设置了 TDEngine 时间序列数据库并定义了 GraphQL 架构以允许客户端应用程序连接和查询数据。


还有很多事情要做。我们有一个样板项目,可以在透明的界面中将复杂的时间序列数据与关系数据结合起来。在 Nevados,我们使用 PostgreSQL 作为主数据库,并以与上面的movement示例相同的方式检索时间序列数据。这是将多个来源的数据组合到单个 API 中的好方法。另一个好处是仅在请求时才获取数据,这为客户端应用程序增加了很多灵活性。最后但并非最不重要的一点是,GraphQL 架构充当文档和合约,因此我们可以轻松勾选“API 文档”框。


如果您有任何问题或意见,请联系 BlueSky加入 GitHub 上的讨论


也发布在这里