The Pipe Is More Important Than the Dashboard
Here is a pattern we see in every failed Monitor deployment: the team spends three weeks building beautiful dashboards and thirty minutes thinking about how data gets into them.
Then the dashboards show stale data. Or wrong data. Or no data at all.
The unsexy truth is that data ingestion is the foundation. Get it wrong and nothing downstream works -- not your dashboards, not your analytics, not your alerts. Get it right and everything else falls into place.
"We had 200 sensors online for six months before we realized 30 of them were sending data in Fahrenheit while our device type expected Celsius. Every alert threshold was wrong."
This post is about not making that mistake. We cover the protocols, the patterns, the pitfalls, and the practices that separate a clean data pipeline from a data graveyard.
Who this is for: IoT engineers implementing device connectivity, data architects designing ingestion pipelines, Maximo administrators managing device fleets, and reliability engineers who need to trust the data their analytics run on.
MQTT: The Workhorse Protocol
MQTT is the primary protocol for device communication in Maximo Monitor, and for good reason. It was designed for exactly this scenario: resource-constrained devices sending telemetry over unreliable networks.
Connection Parameters
Broker: {org-id}.messaging.internetofthings.ibmcloud.com
Port: 8883 (TLS -- always use this)
Client ID: d:{org-id}:{device-type}:{device-id}
Username: use-token-auth
Password: {authentication-token}Topic Structure
Device to Cloud (publishing events):
iot-2/evt/{event-id}/fmt/{format}event-id: Your event identifier. Usestatusfor regular telemetry,alertfor edge-detected anomalies.format: Almost alwaysjson. Binary (bin) and XML (xml) are supported but rarely used.
Cloud to Device (receiving commands):
iot-2/cmd/{command-id}/fmt/{format}QoS: The Decision That Matters
QoS Level — Delivery — Overhead — When to Use
0 — At most once — Lowest — High-frequency telemetry where missing one reading is acceptable (vibration sampling at 1kHz)
1 — At least once — Medium — Standard monitoring. Use this. Guaranteed delivery, possible duplicates (your analytics can handle duplicates)
2 — Exactly once — Highest — Critical events like safety shutdowns or billing-grade metering. Rarely needed.
Key insight: QoS 1 is the right choice for 90% of monitoring use cases. It guarantees your data arrives without the handshake overhead of QoS 2. If you get occasional duplicates, your analytics engine handles deduplication.
A Production-Ready MQTT Client
Here is a client that handles reconnection, TLS, and bidirectional communication. This is closer to what you need in production than the "hello world" examples:
import paho.mqtt.client as mqtt
import json
import ssl
from datetime import datetime, timezone
class MonitorDeviceClient:
def __init__(self, org_id, device_type, device_id, auth_token):
self.org_id = org_id
self.device_type = device_type
self.device_id = device_id
client_id = f"d:{org_id}:{device_type}:{device_id}"
self.client = mqtt.Client(client_id)
self.client.username_pw_set("use-token-auth", auth_token)
# TLS -- non-negotiable in production
self.client.tls_set(
cert_reqs=ssl.CERT_REQUIRED,
tls_version=ssl.PROTOCOL_TLS
)
# Callbacks
self.client.on_connect = self._on_connect
self.client.on_disconnect = self._on_disconnect
self.client.on_message = self._on_command
# Auto-reconnect
self.client.reconnect_delay_set(min_delay=1, max_delay=120)
def _on_connect(self, client, userdata, flags, rc):
if rc == 0:
# Subscribe to commands on every connect (handles reconnects)
client.subscribe("iot-2/cmd/+/fmt/json")
else:
print(f"Connection failed with code {rc}")
def _on_disconnect(self, client, userdata, rc):
if rc != 0:
print(f"Unexpected disconnect. Will auto-reconnect.")
def _on_command(self, client, userdata, msg):
command = json.loads(msg.payload)
# Handle commands from Monitor (e.g., reconfigure, restart)
self.handle_command(command)
def connect(self):
broker = f"{self.org_id}.messaging.internetofthings.ibmcloud.com"
self.client.connect(broker, 8883, keepalive=60)
self.client.loop_start()
def send_telemetry(self, data):
payload = {
"d": data,
"ts": datetime.now(timezone.utc).isoformat()
}
self.client.publish(
"iot-2/evt/status/fmt/json",
json.dumps(payload),
qos=1
)
def handle_command(self, command):
# Override in subclass for your device logic
pass
def disconnect(self):
self.client.loop_stop()
self.client.disconnect()HTTP: The Fallback Protocol
Not every device can maintain a persistent MQTT connection. Some sit behind firewalls that only allow outbound HTTPS. Some are batch systems that collect readings and upload periodically. HTTP is your answer.
When to Use HTTP Instead of MQTT
- Devices behind corporate firewalls that block port 8883
- Batch upload of historical data (CSV imports, system migrations)
- Integration with existing systems that already speak REST
- Serverless functions triggered on a schedule
Sending Events via HTTP
POST https://{org-id}.messaging.internetofthings.ibmcloud.com/api/v0002/device/types/{type}/devices/{id}/events/{event-id}
Headers:
Content-Type: application/json
Authorization: Basic {base64(use-token-auth:token)}
Body:
{
"d": {
"temperature": 24.5,
"humidity": 55.2,
"pressure": 1013.25
}
}The trade-off is clear: HTTP is simpler to implement but lacks MQTT's persistent connection, bidirectional capability, and bandwidth efficiency. For standard monitoring, MQTT wins. For edge cases, HTTP fills the gap.
Gateway Architecture: Connecting Legacy Assets
Here is the reality most plants face: your newest equipment speaks MQTT natively. Your most critical equipment was installed in 1997 and speaks Modbus over serial.
Gateways bridge that gap.
LEGACY EQUIPMENT GATEWAY MAXIMO MONITOR
───────────────── ──────── ──────────────
[PLC - Modbus] ──serial──► ┌────────┐
[BACnet HVAC] ──BACnet──► │ Edge │ ──MQTT──► Watson IoT
[OPC-UA Server] ──OPC-UA──► │ Gateway│ Platform
[4-20mA Sensor] ──analog──► └────────┘Gateway Client ID Format
Gateways register differently from regular devices:
Client ID: g:{org-id}:{gateway-type}:{gateway-id}The gateway publishes events on behalf of downstream devices:
Topic: iot-2/type/{device-type}/id/{device-id}/evt/{event-id}/fmt/jsonThis means every legacy device appears as a first-class citizen in Monitor -- with its own identity, data stream, and dashboard presence -- even though the gateway handles the actual communication.
Gateway Use Cases
- Modbus/RTU serial devices -- Industrial PLCs, VFDs, motor controllers
- BACnet building automation -- HVAC controllers, lighting systems
- OPC-UA industrial servers -- SCADA systems, DCS platforms
- Analog sensors -- 4-20mA and 0-10V signals via ADC conversion
- Bluetooth/Zigbee mesh -- Battery-powered wireless sensor networks
- Bandwidth optimization -- Aggregate 100 sensor readings into batched transmissions
Key insight: A single gateway can represent hundreds of downstream devices. One Raspberry Pi running a Modbus-to-MQTT bridge can bring an entire legacy production line into Monitor.
Device Type Design: The Schema That Defines Everything
This is where most teams underinvest and pay for it later. Your device type schema defines:
- What data each device can send
- What types those values must be
- What ranges are considered valid
- How data is stored and queried
Supported Data Types
Type — Use Case — Example
NUMBER — Measurements, counters — 24.5, 1542
STRING — Status codes, identifiers — "running", "zone-a"
BOOLEAN — Binary states — true, false
GEOLOCATION — GPS coordinates — {"latitude": 40.7, "longitude": -74.0}
JSON — Complex nested objects — {"config": {"threshold": 50}}
Schema Design Best Practices
Include range definitions. They catch bad data at ingestion.
{
"metrics": [
{
"name": "temperature",
"description": "Bearing surface temperature",
"type": "NUMBER",
"unit": "celsius",
"range": { "min": -40, "max": 200 }
},
{
"name": "vibration_rms",
"description": "RMS vibration velocity",
"type": "NUMBER",
"unit": "mm/s",
"range": { "min": 0, "max": 50 }
},
{
"name": "status",
"description": "Operational state",
"type": "STRING",
"enum": ["running", "idle", "fault", "maintenance"]
}
]
}Name metrics for humans, not machines. bearing_temp_c is better than t1. Six months from now, when you are building a dashboard, you will thank yourself.
Plan for metrics you do not have yet. If you think you might add current sensors later, include the metric in the schema now. Adding metrics is easy. Renaming existing ones is painful.
Bulk Device Registration
You are not going to register 500 devices through the UI. Use the bulk registration API.
import requests
def register_devices_bulk(org_id, api_key, api_token, devices):
url = (f"https://{org_id}.internetofthings.ibmcloud.com"
f"/api/v0002/bulk/devices/add")
response = requests.post(
url,
auth=(api_key, api_token),
headers={"Content-Type": "application/json"},
json=devices
)
if response.status_code == 201:
return response.json()
else:
raise Exception(f"Bulk registration failed: {response.text}")
# Generate device records
devices = [
{
"typeId": "VibrationSensor",
"deviceId": f"VIB-{str(i).zfill(4)}",
"deviceInfo": {
"serialNumber": f"SN{20000+i}",
"manufacturer": "SensorCorp",
"model": "VS-500"
},
"metadata": {
"location": f"Line-{(i % 4) + 1}",
"asset": f"MOTOR-{str(i).zfill(3)}",
"installDate": "2026-02-19",
"tags": ["production", "critical"]
}
}
for i in range(1, 501)
]
results = register_devices_bulk("abc123", "a-abc123-key", "token", devices)
print(f"Registered {len(results)} devices")Device Lifecycle Management
Devices have states. Manage them:
┌──────────┐ ┌──────────┐ ┌──────────┐
│ Inactive │────►│ Active │────►│ Blocked │
└──────────┘ └──────────┘ └──────────┘
▲ │ │
└───────────────┴─────────────────┘
(Reactivate)- Active: Sending and receiving data normally
- Blocked: Credentials compromised or device decommissioned -- rejects all connections
- Inactive: Registered but not yet deployed -- awaiting field installation
Real-Time vs. Batch: Choosing Your Ingestion Pattern
Real-Time Streaming
Best for: safety monitoring, operational dashboards, threshold alerts.
Every reading transmits immediately. Higher bandwidth, lowest latency. This is the default for most monitoring scenarios.
Batch Ingestion
Best for: historical data imports, periodic sensors (daily weather readings), bandwidth-constrained environments (satellite-connected remote sites).
[
{"d": {"temperature": 24.1}, "ts": "2026-02-19T14:00:00.000Z"},
{"d": {"temperature": 24.3}, "ts": "2026-02-19T14:05:00.000Z"},
{"d": {"temperature": 24.5}, "ts": "2026-02-19T14:10:00.000Z"}
]Adaptive Hybrid Pattern
The smart approach: stream critical readings immediately, batch everything else.
class AdaptiveDataSender:
def __init__(self, client, batch_size=100, max_wait_seconds=60):
self.client = client
self.batch_size = batch_size
self.max_wait = max_wait_seconds
self.buffer = []
self.last_send = time.time()
def add_reading(self, data):
# Critical values bypass the buffer
if self._is_critical(data):
self.client.send_telemetry(data)
return
self.buffer.append(data)
# Flush on size or time
if (len(self.buffer) >= self.batch_size or
time.time() - self.last_send > self.max_wait):
self._flush()
def _is_critical(self, data):
return (data.get("temperature", 0) > 95 or
data.get("vibration_rms", 0) > 10 or
data.get("status") == "fault")
def _flush(self):
if self.buffer:
self.client.send_telemetry({"batch": self.buffer})
self.buffer = []
self.last_send = time.time()Data Quality: The Foundation Your Analytics Stand On
Bad data in Monitor does not just produce wrong charts. It produces wrong alerts. Which produce wrong work orders. Which send technicians to the wrong equipment. Which costs money and erodes trust.
Validate at the Edge
Do not send data to the cloud and hope the analytics sort it out. Validate before transmission.
def validate_reading(data, schema):
errors = []
for metric, config in schema.items():
if metric not in data:
if config.get("required", False):
errors.append(f"Missing required: {metric}")
continue
value = data[metric]
# Type check
if config["type"] == "NUMBER" and not isinstance(value, (int, float)):
errors.append(f"{metric}: expected number, got {type(value).__name__}")
# Range check
if "range" in config and isinstance(value, (int, float)):
if value < config["range"]["min"] or value > config["range"]["max"]:
errors.append(f"{metric}: {value} outside range "
f"[{config['range']['min']}, {config['range']['max']}]")
return errorsHandle Missing Data Deliberately
Sensors drop readings. Gateways lose connectivity. It happens. The question is what you do about it.
Strategy — When to Use — Risk
Last known value — Slow-changing metrics (ambient temp) — Masks failures
Interpolation — Smooth metrics with predictable trends — Hides anomalies
Null/None — Safety-critical or billing data — Gaps in charts
Default value — Non-critical operational flags — Misleading if overused
Timestamp Discipline
This one rule prevents 80% of time-series headaches:
Always use UTC. Always use ISO 8601.
from datetime import datetime, timezone
# Correct
timestamp = datetime.now(timezone.utc).isoformat()
# "2026-02-19T14:30:00.000000+00:00"Never send local timestamps. Never send epoch seconds without specifying UTC. Never let two devices in different time zones send timestamps in their local time. The resulting chart will make no sense.
Scaling Device Management
When you go from 10 devices to 10,000, you need structure.
Logical Organization
Organization: CORP
├── Site: PLANT-NORTH
│ ├── Area: Production-A
│ │ ├── Line-1 (50 sensors)
│ │ └── Line-2 (50 sensors)
│ └── Area: Utilities
│ ├── HVAC (20 sensors)
│ └── Power (30 sensors)
└── Site: PLANT-SOUTH
└── ...Tag Everything
Tags are your filter mechanism at scale:
{
"deviceId": "VIB-0042",
"metadata": {
"tags": ["production", "critical", "line-1"],
"costCenter": "CC-1234",
"maintenanceTeam": "team-alpha",
"calibrationDue": "2026-08-15"
}
}When you have 5,000 devices, you do not browse a list. You filter by tags: "Show me all critical devices on Line-1 that are overdue for calibration."
Monitor Device Health
Devices fail silently. A sensor that stops sending data does not raise an alarm -- it just disappears from your charts. Build watchdog queries:
from datetime import datetime, timedelta
def find_stale_devices(org_id, api_key, api_token, device_type,
stale_hours=1):
"""Find devices that haven't reported recently"""
url = (f"https://{org_id}.internetofthings.ibmcloud.com"
f"/api/v0002/device/types/{device_type}/devices")
response = requests.get(url, auth=(api_key, api_token))
devices = response.json()["results"]
threshold = datetime.utcnow() - timedelta(hours=stale_hours)
stale = []
for device in devices:
last_event = device.get("status", {}).get("lastEvent", {})
if last_event:
last_time = datetime.fromisoformat(
last_event["timestamp"].replace("Z", "+00:00")
)
if last_time.replace(tzinfo=None) < threshold:
stale.append({
"deviceId": device["deviceId"],
"lastSeen": last_event["timestamp"],
"hoursAgo": round(
(datetime.utcnow() - last_time.replace(tzinfo=None))
.total_seconds() / 3600, 1
)
})
return staleKey insight: Run this query daily. Build a "Device Health" dashboard that shows which sensors have gone quiet. A 2% device failure rate across 1,000 sensors means 20 blind spots you do not know about.
The 7 Commandments of Data Ingestion
- Schema first, sensors second. Design your device types before you install hardware.
- MQTT for real-time, HTTP for exceptions. Do not overcomplicate the protocol choice.
- QoS 1 unless you have a specific reason otherwise. It is the right balance.
- Validate at the edge. Bad data caught at the source costs nothing. Bad data caught in analytics costs credibility.
- UTC timestamps, no exceptions. Future you will thank present you.
- Tag aggressively. Every device should have location, criticality, maintenance team, and calibration date.
- Monitor the monitors. If a sensor goes silent, you need to know within hours, not weeks.
What Comes Next
Your data is flowing cleanly into Monitor. Your devices are organized and tagged. Your quality validation is catching problems at the edge.
Now let's make it visible.
In Part 4: Dashboards and Visualization, we cover:
- Dashboard types for different audiences (executive, operational, analytical)
- Widget selection and configuration
- Custom KPI creation
- Drill-down navigation patterns
- Performance optimization for large device fleets
Series Navigation
Part — Title
1 — Introduction to IBM Maximo Monitor
2 — Getting Started with Maximo Monitor
3 — Data Ingestion and Device Management (You are here)
4 — Dashboards and Visualization
5 — Analytics and AI Integration
8 — Best Practices and Case Studies
Built by practitioners. For practitioners. No fluff.
TheMaximoGuys -- Maximo expertise, delivered different.



