Skip to content

SQL Connector

The SQL connector writes data to relational databases.

Connector Type: SqlWriter

Supported Databases:

  • MySQL
  • PostgreSQL
  • SQL Server (MSSQL)
{
"type": "SqlWriter",
"config": {
"connectionString": "user:password@tcp(localhost:3306)/database_name",
"driver": "mysql",
"table": "sensor_data",
"writeAction": "Insert",
"mode": "ColumnMapping",
"columns": {
"temperature": "temp_column",
"pressure": "press_column",
"timestamp": "ts_column"
}
}
}
{
"type": "SqlWriter",
"config": {
"connectionString": "host=localhost port=5432 user=postgres password=pass dbname=mydb sslmode=disable",
"driver": "postgres",
"table": "measurements",
"writeAction": "Insert",
"mode": "ColumnMapping",
"columns": {
"temperature": "temperature",
"humidity": "humidity"
}
}
}
{
"type": "SqlWriter",
"config": {
"connectionString": "sqlserver://user:pass@localhost:1433?database=mydb",
"driver": "sqlserver",
"table": "sensor_readings",
"writeAction": "Upsert",
"mode": "ColumnMapping",
"columns": {
"sensor_id": "id",
"value": "reading_value"
}
}
}
  • connectionString: Database-specific connection string
  • driver: Database driver (mysql, postgres, sqlserver)
  • table: Target table name
  • writeAction: Write operation (Insert, Update, Upsert)
  • mode: Mapping mode (ColumnMapping)
  • columns: Map payload keys to database columns

Add new rows:

{
"writeAction": "Insert"
}

Update existing rows (requires WHERE condition):

{
"writeAction": "Update"
}

Insert or update if exists:

{
"writeAction": "Upsert"
}

Map Meddle payload keys to database columns:

{
"columns": {
"payload_key": "database_column",
"temperature": "temp_celsius",
"pressure": "pressure_bar",
"timestamp": "recorded_at"
}
}
user:password@tcp(host:port)/database?param=value
host=localhost port=5432 user=user password=pass dbname=db sslmode=disable
sqlserver://user:pass@host:port?database=db&param=value
{
"type": "SqlWriter",
"config": {
"connectionString": "user:pass@tcp(mysql.local:3306)/production",
"driver": "mysql",
"table": "machine_logs",
"writeAction": "Insert",
"mode": "ColumnMapping",
"columns": {
"machine_id": "machine_id",
"temperature": "temperature",
"speed": "rpm",
"status": "operational_status",
"timestamp": "log_time"
}
}
}
  1. Create appropriate indexes on frequently queried columns
  2. Use Upsert for idempotent operations
  3. Ensure table schema matches column mappings
  4. Use connection pooling for high-throughput scenarios
  5. Monitor database performance and query execution times
  • Verify connection string format
  • Check database server is running
  • Verify credentials
  • Check firewall rules
  • Ensure column names in mapping match database schema
  • Check table exists
  • Verify case sensitivity