Salta ai contenuti

Connettore Aggregation

Il connettore Aggregation mette in buffer i payload in ingresso in una finestra ed emette un singolo payload aggregato per ogni flush della finestra. Supporta finestre basate su tempo e su conteggio con più funzioni di aggregazione per campo.

Tipo Connettore: MeddleAggregation

Finestre temporali fisse e non sovrapposte. Il buffer viene completamente svuotato ad ogni flush.

{
"type": "MeddleAggregation",
"config": {
"windowType": "tumbling",
"windowSize": 60000,
"fields": [
{ "key": "temperature", "function": "avg", "outputKey": "temperature_avg" },
{ "key": "temperature", "function": "max", "outputKey": "temperature_max" }
]
}
}

Ogni 60 secondi, emette la media e il massimo di temperature raccolti durante la finestra, quindi resetta il buffer.

Finestre temporali sovrapposte. Dopo ogni flush, la seconda metà del buffer viene mantenuta in modo che la prossima emissione si sovrapponga alla precedente.

{
"type": "MeddleAggregation",
"config": {
"windowType": "sliding",
"windowSize": 60000,
"slideSize": 30000,
"fields": [
{ "key": "pressure", "function": "avg", "outputKey": "pressure_avg" }
]
}
}

Utile per moving average più morbide dove ogni emissione condivide metà dei propri campioni con la finestra precedente.

Effettua il flush non appena il buffer contiene countSize payload, indipendentemente dal tempo trascorso.

{
"type": "MeddleAggregation",
"config": {
"windowType": "count",
"countSize": 100,
"fields": [
{ "key": "vibration", "function": "max", "outputKey": "vibration_peak" },
{ "key": "vibration", "function": "count", "outputKey": "samples" }
]
}
}

Emette un payload ogni 100 campioni ricevuti.

  • windowType (richiesto): tumbling, sliding, o count
  • windowSize: Durata della finestra in millisecondi (richiesta per tumbling e sliding)
  • slideSize: Intervallo di slide in millisecondi (usato con sliding)
  • countSize: Numero di campioni per finestra (richiesto per count)
  • fields (richiesto, min 1): Array di definizioni di campi di aggregazione
  • groupByKeys: Lista opzionale di chiavi del payload per raggruppare le aggregazioni
  • key: Chiave del payload di input da cui leggere
  • function: Una tra sum, avg, min, max, count, first, last
  • outputKey: Chiave sotto cui viene scritto il valore aggregato
  • sum — Somma dei valori numerici nella finestra
  • avg — Media aritmetica dei valori numerici
  • min / max — Valore numerico minimo / massimo
  • count — Numero di payload nella finestra
  • first / last — Primo / ultimo valore osservato per la chiave (qualsiasi tipo)

I valori non numerici vengono saltati per le funzioni numeriche e segnalati come errori di mismatch di tipo sul canale di errore.

  1. Attenuazione di letture rumorose dei sensori con una media sliding
  2. KPI al minuto o all’ora per dashboard
  3. Batching di eventi prima di scrivere su database o broker di messaggi
  4. Rilevamento di picchi su un conteggio fisso di campioni
{
"type": "MeddleAggregation",
"config": {
"windowType": "tumbling",
"windowSize": 30000,
"fields": [
{ "key": "temperature", "function": "avg", "outputKey": "temperature_avg" },
{ "key": "temperature", "function": "min", "outputKey": "temperature_min" },
{ "key": "temperature", "function": "max", "outputKey": "temperature_max" },
{ "key": "status", "function": "last", "outputKey": "status_last" }
]
}
}
  • Scegli tumbling per report periodici puliti; sliding per trend più morbidi
  • Usa finestre count per stream ad alta frequenza dove il tempo non è l’unità naturale
  • Mantieni finestre abbastanza piccole da limitare la memoria ma abbastanza grandi da essere statisticamente significative
  • Usa first / last per trasportare contesto non numerico (es. stato della macchina) insieme ad aggregati numerici