Overview
This comprehensive collection of code examples demonstrates real-world implementations in FrameworX 10.1. Each example includes complete, production-ready code with error handling, best practices, and performance considerations. Examples cover common industrial automation scenarios across all supported languages.
Industrial Control Examples
Motor Control with Interlocks
csharp
// C# - Complete motor control logic with safety interlocks
public class MotorControl : ScriptBase
{
private readonly string motorTag;
private DateTime lastStartTime;
private int startAttempts;
public MotorControl(string tagBase)
{
motorTag = tagBase;
}
public void ExecuteControl()
{
// Read motor status
var motor = new
{
StartCmd = @Tag[$"{motorTag}.StartCmd"],
StopCmd = @Tag[$"{motorTag}.StopCmd"],
Running = @Tag[$"{motorTag}.Running"],
Faulted = @Tag[$"{motorTag}.Faulted"],
Current = @Tag[$"{motorTag}.Current"],
LocalMode = @Tag[$"{motorTag}.LocalMode"]
};
// Check interlocks
if (!CheckInterlocks())
{
@Tag[$"{motorTag}.InterlockActive"] = true;
if (motor.StartCmd)
{
LogWarning($"Motor {motorTag} start blocked by interlock");
@Tag[$"{motorTag}.StartCmd"] = false;
}
return;
}
@Tag[$"{motorTag}.InterlockActive"] = false;
// Handle start command
if (motor.StartCmd && !motor.Running)
{
if (StartMotor())
{
@Tag[$"{motorTag}.Hours.StartCount"]++;
lastStartTime = DateTime.Now;
}
}
// Handle stop command
if (motor.StopCmd && motor.Running)
{
StopMotor();
}
// Monitor running motor
if (motor.Running)
{
MonitorMotor();
}
// Handle faults
if (motor.Faulted)
{
HandleFault();
}
}
private bool CheckInterlocks()
{
// Safety interlocks
if (@Tag.Safety.EmergencyStop)
return false;
if (@Tag[$"{motorTag}.LocalMode"])
return false;
// Process interlocks
if (@Tag[$"{motorTag}.LubeOilPressure"] < 20)
{
@Tag[$"{motorTag}.InterlockReason"] = "Low lube oil pressure";
return false;
}
if (@Tag[$"{motorTag}.Temperature"] > 80)
{
@Tag[$"{motorTag}.InterlockReason"] = "High temperature";
return false;
}
// Upstream/downstream interlocks
if (@Tag[$"{motorTag}.RequiresUpstream"] && !@Tag.Upstream.Running)
{
@Tag[$"{motorTag}.InterlockReason"] = "Upstream not running";
return false;
}
return true;
}
private bool StartMotor()
{
// Check start inhibit timer
var timeSinceLastStart = DateTime.Now - lastStartTime;
if (timeSinceLastStart.TotalSeconds < 30)
{
LogInfo($"Start inhibit active. Wait {30 - timeSinceLastStart.TotalSeconds:F0} seconds");
return false;
}
// Send start command to PLC
@Tag[$"{motorTag}.StartOutput"] = true;
Thread.Sleep(100);
@Tag[$"{motorTag}.StopOutput"] = false;
// Wait for feedback
var timeout = DateTime.Now.AddSeconds(5);
while (DateTime.Now < timeout)
{
if (@Tag[$"{motorTag}.Running"])
{
LogInfo($"Motor {motorTag} started successfully");
startAttempts = 0;
return true;
}
Thread.Sleep(100);
}
// Start failed
startAttempts++;
LogError($"Motor {motorTag} failed to start (attempt {startAttempts})");
if (startAttempts >= 3)
{
@Tag[$"{motorTag}.Faulted"] = true;
@Tag[$"{motorTag}.FaultCode"] = 101; // Start failure
RaiseAlarm($"{motorTag}_StartFail", AlarmPriority.High,
$"Motor {motorTag} failed to start after 3 attempts");
}
return false;
}
private void MonitorMotor()
{
var current = @Tag[$"{motorTag}.Current"];
var nominalCurrent = @Tag[$"{motorTag}.NominalCurrent"];
// Update runtime hours
@Tag[$"{motorTag}.Hours.Runtime"] += 1.0 / 3600; // Add 1 second
// Check overcurrent
if (current > nominalCurrent * 1.15)
{
@Tag[$"{motorTag}.OverloadTimer"]++;
if (@Tag[$"{motorTag}.OverloadTimer"] > 60) // 60 seconds
{
LogError($"Motor {motorTag} overcurrent shutdown");
StopMotor();
@Tag[$"{motorTag}.Faulted"] = true;
@Tag[$"{motorTag}.FaultCode"] = 102; // Overcurrent
}
}
else
{
@Tag[$"{motorTag}.OverloadTimer"] = 0;
}
}
}
PID Control Loop
python
# Python - PID controller with anti-windup and bumpless transfer
class PIDController:
def __init__(self, tag_base, kp=1.0, ki=0.5, kd=0.1):
self.tag_base = tag_base
self.kp = kp
self.ki = ki
self.kd = kd
self.integral = 0
self.last_error = 0
self.last_output = 0
self.manual_mode = False
self.dt = 0.1 # 100ms scan time
def execute(self):
"""Main PID execution"""
# Read tags
setpoint = @Tag[f"{self.tag_base}.SP"]
process_value = @Tag[f"{self.tag_base}.PV"]
manual_mode = @Tag[f"{self.tag_base}.ManualMode"]
manual_output = @Tag[f"{self.tag_base}.ManualOutput"]
output_min = @Tag[f"{self.tag_base}.OutputMin"]
output_max = @Tag[f"{self.tag_base}.OutputMax"]
# Handle mode transitions (bumpless transfer)
if manual_mode != self.manual_mode:
if manual_mode:
# Switching to manual
self.last_output = @Tag[f"{self.tag_base}.Output"]
else:
# Switching to auto - reset integral
self.integral = manual_output / self.ki if self.ki != 0 else 0
self.last_error = setpoint - process_value
self.manual_mode = manual_mode
# Calculate output
if manual_mode:
output = manual_output
else:
output = self.calculate_pid(setpoint, process_value,
output_min, output_max)
# Write output
@Tag[f"{self.tag_base}.Output"] = output
self.last_output = output
# Update diagnostics
self.update_diagnostics(setpoint, process_value, output)
def calculate_pid(self, sp, pv, out_min, out_max):
"""PID calculation with anti-windup"""
error = sp - pv
# Proportional term
p_term = self.kp * error
# Integral term with anti-windup
self.integral += error * self.dt
# Limit integral (anti-windup)
integral_limit = (out_max - out_min) / self.ki if self.ki != 0 else 0
self.integral = max(-integral_limit, min(integral_limit, self.integral))
i_term = self.ki * self.integral
# Derivative term with filtering
if self.dt > 0:
derivative = (error - self.last_error) / self.dt
# Low-pass filter on derivative
alpha = 0.1 # Filter constant
filtered_derivative = alpha * derivative + (1 - alpha) * (self.last_error / self.dt)
d_term = self.kd * filtered_derivative
else:
d_term = 0
# Calculate total output
output = p_term + i_term + d_term
# Output limiting
output = max(out_min, min(out_max, output))
# Back-calculation for anti-windup
if output != (p_term + i_term + d_term):
# Output was limited, adjust integral
excess = output - (p_term + i_term + d_term)
self.integral += excess / self.ki if self.ki != 0 else 0
self.last_error = error
return output
def update_diagnostics(self, sp, pv, output):
"""Update diagnostic tags"""
error = sp - pv
@Tag[f"{self.tag_base}.Error"] = error
@Tag[f"{self.tag_base}.ErrorPercent"] = (error / sp * 100) if sp != 0 else 0
@Tag[f"{self.tag_base}.PTerm"] = self.kp * error
@Tag[f"{self.tag_base}.ITerm"] = self.ki * self.integral
@Tag[f"{self.tag_base}.DTerm"] = self.kd * (error - self.last_error) / self.dt
@Tag[f"{self.tag_base}.Saturated"] = (
output == @Tag[f"{self.tag_base}.OutputMin"] or
output == @Tag[f"{self.tag_base}.OutputMax"]
)
# Usage
controller = PIDController("TempControl", kp=2.0, ki=0.5, kd=0.2)
controller.execute()
Data Processing Examples
Batch Processing and Reporting
csharp
// C# - Complete batch processing with report generation
public class BatchProcessor
{
private BatchData currentBatch;
private List<BatchPhase> phases;
public async Task ProcessBatch(string batchId)
{
try
{
// Initialize batch
currentBatch = new BatchData
{
BatchId = batchId,
Recipe = @Tag.Batch.Recipe,
StartTime = DateTime.Now,
Operator = @Tag.System.CurrentUser
};
@Tag.Batch.Status = "Starting";
LogEvent("Batch", $"Starting batch {batchId}");
// Load recipe
var recipe = await LoadRecipe(currentBatch.Recipe);
phases = recipe.Phases;
// Execute phases
foreach (var phase in phases)
{
@Tag.Batch.CurrentPhase = phase.Name;
if (!await ExecutePhase(phase))
{
throw new Exception($"Phase {phase.Name} failed");
}
// Record phase data
currentBatch.PhaseData.Add(new PhaseData
{
PhaseName = phase.Name,
StartTime = phase.StartTime,
EndTime = phase.EndTime,
Parameters = phase.ActualParameters,
Result = phase.Result
});
}
// Batch complete
currentBatch.EndTime = DateTime.Now;
currentBatch.Status = "Complete";
currentBatch.Result = CalculateBatchResult();
// Generate report
await GenerateBatchReport(currentBatch);
// Archive batch data
await ArchiveBatchData(currentBatch);
@Tag.Batch.Status = "Complete";
LogEvent("Batch", $"Batch {batchId} completed successfully");
}
catch (Exception ex)
{
currentBatch.Status = "Failed";
currentBatch.ErrorMessage = ex.Message;
@Tag.Batch.Status = "Failed";
@Tag.Batch.Error = ex.Message;
LogError($"Batch {batchId} failed: {ex.Message}");
// Generate exception report
await GenerateExceptionReport(currentBatch, ex);
}
}
private async Task<bool> ExecutePhase(BatchPhase phase)
{
phase.StartTime = DateTime.Now;
try
{
// Set phase parameters
foreach (var param in phase.Parameters)
{
@Tag[$"Phase.{param.Key}"] = param.Value;
}
// Start phase logic
@Tag.Phase.Start = true;
// Wait for phase completion
var timeout = DateTime.Now.Add(phase.MaxDuration);
while (DateTime.Now < timeout)
{
// Check phase complete
if (@Tag.Phase.Complete)
{
phase.EndTime = DateTime.Now;
phase.Result = "Success";
// Collect actual values
phase.ActualParameters = CollectPhaseData();
return true;
}
// Check for phase fault
if (@Tag.Phase.Fault)
{
phase.EndTime = DateTime.Now;
phase.Result = "Fault";
phase.ErrorMessage = @Tag.Phase.FaultMessage;
return false;
}
await Task.Delay(100);
}
// Timeout
phase.Result = "Timeout";
return false;
}
catch (Exception ex)
{
phase.Result = "Exception";
phase.ErrorMessage = ex.Message;
return false;
}
finally
{
@Tag.Phase.Start = false;
}
}
private async Task GenerateBatchReport(BatchData batch)
{
var report = new StringBuilder();
report.AppendLine("BATCH PRODUCTION REPORT");
report.AppendLine("=" .PadRight(50, '='));
report.AppendLine($"Batch ID: {batch.BatchId}");
report.AppendLine($"Recipe: {batch.Recipe}");
report.AppendLine($"Operator: {batch.Operator}");
report.AppendLine($"Start Time: {batch.StartTime:yyyy-MM-dd HH:mm:ss}");
report.AppendLine($"End Time: {batch.EndTime:yyyy-MM-dd HH:mm:ss}");
report.AppendLine($"Duration: {(batch.EndTime - batch.StartTime).TotalMinutes:F1} minutes");
report.AppendLine($"Status: {batch.Status}");
report.AppendLine();
report.AppendLine("PHASE SUMMARY");
report.AppendLine("-".PadRight(50, '-'));
foreach (var phase in batch.PhaseData)
{
report.AppendLine($"Phase: {phase.PhaseName}");
report.AppendLine($" Duration: {(phase.EndTime - phase.StartTime).TotalMinutes:F1} min");
report.AppendLine($" Result: {phase.Result}");
if (phase.Parameters.Any())
{
report.AppendLine(" Parameters:");
foreach (var param in phase.Parameters)
{
report.AppendLine($" {param.Key}: {param.Value}");
}
}
}
report.AppendLine();
report.AppendLine("QUALITY DATA");
report.AppendLine("-".PadRight(50, '-'));
report.AppendLine($"Sample Results: {batch.QualityResults}");
report.AppendLine($"Quality Score: {batch.QualityScore:F1}%");
// Save report
var fileName = $"Batch_{batch.BatchId}_{batch.EndTime:yyyyMMdd_HHmmss}.txt";
await File.WriteAllTextAsync($@"C:\Reports\{fileName}", report.ToString());
// Also save as PDF if configured
if (@Tag.Reports.GeneratePDF)
{
await GeneratePDFReport(batch, report.ToString());
}
}
}
Real-Time Data Analytics
python
# Python - Real-time statistical process control
import numpy as np
import pandas as pd
from scipy import stats
from collections import deque
class RealTimeAnalytics:
def __init__(self, window_size=100):
self.window_size = window_size
self.data_buffer = deque(maxlen=window_size)
self.control_limits_calculated = False
self.ucl = 0
self.lcl = 0
self.center_line = 0
def process_new_value(self, value):
"""Process new data point for SPC"""
# Add to buffer
self.data_buffer.append({
'timestamp': datetime.datetime.now(),
'value': value,
'quality': @Tag.Process.Quality
})
# Need minimum samples for statistics
if len(self.data_buffer) < 20:
return
# Convert to DataFrame for analysis
df = pd.DataFrame(self.data_buffer)
# Calculate statistics
mean = df['value'].mean()
std = df['value'].std()
# Calculate control limits (if stable)
if not self.control_limits_calculated:
if self.is_process_stable(df['value'].values):
self.calculate_control_limits(mean, std)
self.control_limits_calculated = True
# Check for out of control conditions
violations = self.check_control_rules(df['value'].values)
# Update tags
@Tag.SPC.Mean = mean
@Tag.SPC.StdDev = std
@Tag.SPC.UCL = self.ucl
@Tag.SPC.LCL = self.lcl
@Tag.SPC.CenterLine = self.center_line
# Handle violations
if violations:
self.handle_violations(violations, value)
# Calculate capability indices
if self.control_limits_calculated:
self.calculate_capability(df['value'].values)
def is_process_stable(self, data):
"""Check if process is stable using runs test"""
median = np.median(data)
runs, n1, n2 = self.count_runs(data, median)
# Expected runs
expected_runs = (2 * n1 * n2) / (n1 + n2) + 1
variance = (2 * n1 * n2 * (2 * n1 * n2 - n1 - n2)) / \
((n1 + n2) ** 2 * (n1 + n2 - 1))
if variance > 0:
z_score = (runs - expected_runs) / np.sqrt(variance)
p_value = 2 * (1 - stats.norm.cdf(abs(z_score)))
# Process is stable if p-value > 0.05
return p_value > 0.05
return False
def calculate_control_limits(self, mean, std):
"""Calculate control limits"""
self.center_line = mean
self.ucl = mean + 3 * std
self.lcl = mean - 3 * std
# Also calculate warning limits
@Tag.SPC.UWL = mean + 2 * std
@Tag.SPC.LWL = mean - 2 * std
def check_control_rules(self, data):
"""Western Electric rules for control charts"""
violations = []
if len(data) < 8:
return violations
recent = data[-8:]
mean = self.center_line
std = (self.ucl - self.center_line) / 3
# Rule 1: One point beyond 3-sigma
if data[-1] > self.ucl or data[-1] < self.lcl:
violations.append("Rule1: Point beyond control limits")
# Rule 2: Two of three beyond 2-sigma
two_sigma_upper = mean + 2 * std
two_sigma_lower = mean - 2 * std
last_three = data[-3:]
above_2sigma = sum(1 for x in last_three if x > two_sigma_upper)
below_2sigma = sum(1 for x in last_three if x < two_sigma_lower)
if above_2sigma >= 2 or below_2sigma >= 2:
violations.append("Rule2: Two of three beyond 2-sigma")
# Rule 3: Four of five beyond 1-sigma
one_sigma_upper = mean + std
one_sigma_lower = mean - std
last_five = data[-5:]
above_1sigma = sum(1 for x in last_five if x > one_sigma_upper)
below_1sigma = sum(1 for x in last_five if x < one_sigma_lower)
if above_1sigma >= 4 or below_1sigma >= 4:
violations.append("Rule3: Four of five beyond 1-sigma")
# Rule 4: Eight consecutive on one side of center
above_center = all(x > mean for x in recent)
below_center = all(x < mean for x in recent)
if above_center or below_center:
violations.append("Rule4: Eight consecutive on one side")
return violations
def calculate_capability(self, data):
"""Calculate process capability indices"""
usl = @Tag.Spec.USL
lsl = @Tag.Spec.LSL
if usl == 0 and lsl == 0:
return
mean = np.mean(data)
std = np.std(data)
# Cp - Process capability
if usl != 0 and lsl != 0:
cp = (usl - lsl) / (6 * std)
@Tag.SPC.Cp = cp
# Cpk - Process capability index
if usl != 0:
cpu = (usl - mean) / (3 * std)
else:
cpu = float('inf')
if lsl != 0:
cpl = (mean - lsl) / (3 * std)
else:
cpl = float('inf')
cpk = min(cpu, cpl)
@Tag.SPC.Cpk = cpk
# Determine capability status
if cpk >= 1.33:
@Tag.SPC.Status = "Capable"
elif cpk >= 1.0:
@Tag.SPC.Status = "Marginal"
else:
@Tag.SPC.Status = "Not Capable"
# Usage
analytics = RealTimeAnalytics(window_size=100)
analytics.process_new_value(@Tag.Process.Value)
Integration Examples
REST API Integration
csharp
// C# - Complete REST API client with retry logic
public class APIIntegration
{
private readonly HttpClient httpClient;
private readonly int maxRetries = 3;
private readonly int baseDelay = 1000; // milliseconds
public APIIntegration()
{
httpClient = new HttpClient();
httpClient.DefaultRequestHeaders.Add("Authorization",
$"Bearer {@Tag.API.Token}");
httpClient.Timeout = TimeSpan.FromSeconds(30);
}
public async Task<bool> SendProductionData()
{
try
{
// Collect production data
var productionData = new
{
timestamp = DateTime.UtcNow,
line = @Tag.Production.Line,
product = @Tag.Production.Product,
quantity = @Tag.Production.Quantity,
quality = @Tag.Production.Quality,
efficiency = @Tag.KPI.OEE,
operator = @Tag.System.CurrentUser,
shift = @Tag.Production.Shift,
metrics = new
{
availability = @Tag.KPI.Availability,
performance = @Tag.KPI.Performance,
quality = @Tag.KPI.Quality
},
alarms = GetActiveAlarms(),
downtime = GetDowntimeEvents()
};
// Send with retry logic
var response = await SendWithRetry(
"https://api.company.com/production",
productionData
);
if (response.IsSuccessStatusCode)
{
var result = await response.Content.ReadAsStringAsync();
dynamic json = JsonConvert.DeserializeObject(result);
@Tag.API.LastSyncTime = DateTime.Now;
@Tag.API.LastSyncStatus = "Success";
@Tag.API.RecordId = json.recordId;
LogInfo($"Production data sent successfully. Record ID: {json.recordId}");
return true;
}
else
{
@Tag.API.LastSyncStatus = $"Failed: {response.StatusCode}";
LogError($"API returned error: {response.StatusCode}");
return false;
}
}
catch (Exception ex)
{
@Tag.API.LastSyncStatus = $"Exception: {ex.Message}";
LogError($"API integration failed: {ex.Message}");
return false;
}
}
private async Task<HttpResponseMessage> SendWithRetry(string url, object data)
{
var json = JsonConvert.SerializeObject(data);
var content = new StringContent(json, Encoding.UTF8, "application/json");
for (int attempt = 0; attempt < maxRetries; attempt++)
{
try
{
var response = await httpClient.PostAsync(url, content);
// Return on success or client error (no retry needed)
if (response.IsSuccessStatusCode ||
(int)response.StatusCode >= 400 && (int)response.StatusCode < 500)
{
return response;
}
// Server error - retry with exponential backoff
if (attempt < maxRetries - 1)
{
var delay = baseDelay * Math.Pow(2, attempt);
LogWarning($"API call failed, retrying in {delay}ms...");
await Task.Delay((int)delay);
}
}
catch (TaskCanceledException)
{
LogError("API call timeout");
if (attempt == maxRetries - 1)
throw;
}
catch (HttpRequestException ex)
{
LogError($"Network error: {ex.Message}");
if (attempt == maxRetries - 1)
throw;
}
}
throw new Exception($"API call failed after {maxRetries} attempts");
}
}
MQTT Integration with Sparkplug B
python
# Python - MQTT Sparkplug B implementation
import paho.mqtt.client as mqtt
import json
import struct
import time
from google.protobuf import descriptor_pb2
from sparkplug_b import *
class SparkplugBClient:
def __init__(self, broker, port, group_id, node_id):
self.broker = broker
self.port = port
self.group_id = group_id
self.node_id = node_id
self.client = mqtt.Client()
self.seq = 0
self.bdSeq = 0
self.connected = False
# Set callbacks
self.client.on_connect = self.on_connect
self.client.on_disconnect = self.on_disconnect
self.client.on_message = self.on_message
# Device registry
self.devices = {}
def connect(self):
"""Connect to MQTT broker"""
try:
self.client.connect(self.broker, self.port, 60)
self.client.loop_start()
# Wait for connection
timeout = time.time() + 10
while not self.connected and time.time() < timeout:
time.sleep(0.1)
if self.connected:
self.publish_node_birth()
return True
else:
raise Exception("Connection timeout")
except Exception as e:
fx.system.log_error(f"MQTT connection failed: {e}")
return False
def on_connect(self, client, userdata, flags, rc):
"""Handle connection"""
if rc == 0:
self.connected = True
fx.system.log_info("Connected to MQTT broker")
# Subscribe to commands
command_topic = f"spBv1.0/{self.group_id}/NCMD/{self.node_id}"
client.subscribe(command_topic)
else:
fx.system.log_error(f"Connection failed with code {rc}")
def publish_node_birth(self):
"""Publish node birth certificate"""
payload = self.create_payload("NBIRTH")
# Add node metrics
self.add_metric(payload, "Properties/Version", "1.0", MetricDataType.String)
self.add_metric(payload, "Node Control/Rebirth", False, MetricDataType.Boolean)
self.add_metric(payload, "Node Control/Reboot", False, MetricDataType.Boolean)
self.add_metric(payload, "Node Control/Scan Rate", 1000, MetricDataType.Int32)
# Add process tags as metrics
self.add_process_metrics(payload)
# Publish
topic = f"spBv1.0/{self.group_id}/NBIRTH/{self.node_id}"
self.client.publish(topic, payload.SerializeToString(), qos=1, retain=False)
fx.system.log_info("Node birth published")
def add_process_metrics(self, payload):
"""Add process tags as Sparkplug metrics"""
# Map FrameworX tags to Sparkplug metrics
tag_mappings = [
("Process/Temperature", @Tag.Process.Temperature, MetricDataType.Float),
("Process/Pressure", @Tag.Process.Pressure, MetricDataType.Float),
("Process/Flow", @Tag.Process.Flow, MetricDataType.Float),
("Process/Level", @Tag.Process.Level, MetricDataType.Float),
("Motor/Running", @Tag.Motor.Running, MetricDataType.Boolean),
("Motor/Speed", @Tag.Motor.Speed, MetricDataType.Float),
("Motor/Current", @Tag.Motor.Current, MetricDataType.Float),
("Production/Count", @Tag.Production.Count, MetricDataType.Int64),
("Production/Rate", @Tag.Production.Rate, MetricDataType.Float),
("Alarms/Active", @Tag.Alarms.ActiveCount, MetricDataType.Int32)
]
for name, value, datatype in tag_mappings:
self.add_metric(payload, name, value, datatype)
def publish_node_data(self):
"""Publish NDATA message with current values"""
payload = self.create_payload("NDATA")
# Add updated metrics
self.add_process_metrics(payload)
# Publish
topic = f"spBv1.0/{self.group_id}/NDATA/{self.node_id}"
self.client.publish(topic, payload.SerializeToString(), qos=0, retain=False)
self.seq = (self.seq + 1) % 256
def publish_device_data(self, device_id):
"""Publish device data"""
if device_id not in self.devices:
# First time - publish DBIRTH
self.publish_device_birth(device_id)
# Publish DDATA
payload = self.create_payload("DDATA")
# Add device-specific metrics
device_tag_base = f"Device.{device_id}"
self.add_metric(payload, "Status", @Tag[f"{device_tag_base}.Status"],
MetricDataType.String)
self.add_metric(payload, "Value", @Tag[f"{device_tag_base}.Value"],
MetricDataType.Float)
self.add_metric(payload, "Quality", @Tag[f"{device_tag_base}.Quality"],
MetricDataType.Int32)
topic = f"spBv1.0/{self.group_id}/DDATA/{self.node_id}/{device_id}"
self.client.publish(topic, payload.SerializeToString(), qos=0, retain=False)
def run(self):
"""Main run loop"""
if not self.connect():
return
try:
while self.connected:
# Publish node data every second
self.publish_node_data()
# Check for device updates
for device_id in self.get_active_devices():
if self.device_has_changes(device_id):
self.publish_device_data(device_id)
time.sleep(1)
except KeyboardInterrupt:
fx.system.log_info("Shutting down MQTT client")
finally:
self.disconnect()
def disconnect(self):
"""Disconnect and publish death certificate"""
if self.connected:
# Publish NDEATH
payload = self.create_payload("NDEATH")
topic = f"spBv1.0/{self.group_id}/NDEATH/{self.node_id}"
self.client.publish(topic, payload.SerializeToString(), qos=1, retain=False)
self.client.disconnect()
self.client.loop_stop()
self.connected = False
# Usage
client = SparkplugBClient("mqtt.broker.com", 1883, "FrameworX", "Node01")
client.run()
User Interface Examples
Dynamic Dashboard Creation
javascript
// JavaScript - Create dynamic dashboard with real-time updates
class DynamicDashboard {
constructor(containerId) {
this.container = document.getElementById(containerId);
this.widgets = [];
this.updateInterval = 1000; // 1 second
this.charts = {};
}
initialize() {
// Create dashboard layout
this.createLayout();
// Add widgets based on configuration
this.addKPIWidget('oee', 'OEE', 'Tag.KPI.OEE', '%');
this.addKPIWidget('production', 'Production', 'Tag.Production.Count', 'units');
this.addKPIWidget('quality', 'Quality', 'Tag.Quality.Rate', '%');
// Add trend chart
this.addTrendChart('trend1', 'Process Trends', [
{ tag: 'Tag.Process.Temperature', label: 'Temperature', color: '#FF6B6B' },
{ tag: 'Tag.Process.Pressure', label: 'Pressure', color: '#4ECDC4' },
{ tag: 'Tag.Process.Flow', label: 'Flow', color: '#45B7D1' }
]);
// Add alarm widget
this.addAlarmWidget('alarms', 'Active Alarms');
// Start real-time updates
this.startUpdates();
}
createLayout() {
this.container.innerHTML = `
<div class="dashboard-grid">
<div class="kpi-row" id="kpi-container"></div>
<div class="chart-row">
<div class="chart-container" id="trend1"></div>
</div>
<div class="alarm-row">
<div class="alarm-container" id="alarms"></div>
</div>
</div>
`;
}
addKPIWidget(id, title, tagPath, unit) {
const kpiContainer = document.getElementById('kpi-container');
const widget = document.createElement('div');
widget.className = 'kpi-widget';
widget.id = `kpi-${id}`;
widget.innerHTML = `
<div class="kpi-title">${title}</div>
<div class="kpi-value" data-tag="${tagPath}">--</div>
<div class="kpi-unit">${unit}</div>
<div class="kpi-trend"></div>
`;
kpiContainer.appendChild(widget);
this.widgets.push({
id: id,
type: 'kpi',
element: widget,
tagPath: tagPath,
history: []
});
}
addTrendChart(id, title, series) {
const container = document.getElementById(id);
// Create chart using Chart.js
const canvas = document.createElement('canvas');
container.appendChild(canvas);
const ctx = canvas.getContext('2d');
const chart = new Chart(ctx, {
type: 'line',
data: {
labels: [],
datasets: series.map(s => ({
label: s.label,
data: [],
borderColor: s.color,
backgroundColor: s.color + '20',
tension: 0.4,
fill: false
}))
},
options: {
responsive: true,
maintainAspectRatio: false,
plugins: {
title: {
display: true,
text: title
},
legend: {
display: true,
position: 'top'
}
},
scales: {
x: {
type: 'time',
time: {
unit: 'minute',
displayFormats: {
minute: 'HH:mm'
}
},
title: {
display: true,
text: 'Time'
}
},
y: {
title: {
display: true,
text: 'Value'
}
}
},
animation: {
duration: 0
}
}
});
this.charts[id] = chart;
this.widgets.push({
id: id,
type: 'chart',
chart: chart,
series: series,
maxPoints: 100
});
}
addAlarmWidget(id, title) {
const container = document.getElementById(id);
container.innerHTML = `
<div class="alarm-widget">
<div class="alarm-title">${title}</div>
<div class="alarm-list" id="alarm-list"></div>
</div>
`;
this.widgets.push({
id: id,
type: 'alarm',
element: container
});
}
async startUpdates() {
setInterval(async () => {
await this.updateWidgets();
}, this.updateInterval);
}
async updateWidgets() {
for (const widget of this.widgets) {
switch (widget.type) {
case 'kpi':
await this.updateKPI(widget);
break;
case 'chart':
await this.updateChart(widget);
break;
case 'alarm':
await this.updateAlarms(widget);
break;
}
}
}
async updateKPI(widget) {
try {
// Get tag value from server
const value = await this.getTagValue(widget.tagPath);
// Update display
const valueElement = widget.element.querySelector('.kpi-value');
valueElement.textContent = value.toFixed(1);
// Update trend indicator
widget.history.push(value);
if (widget.history.length > 10) {
widget.history.shift();
}
if (widget.history.length >= 2) {
const trend = widget.history[widget.history.length - 1] -
widget.history[widget.history.length - 2];
const trendElement = widget.element.querySelector('.kpi-trend');
if (trend > 0) {
trendElement.innerHTML = '?';
trendElement.className = 'kpi-trend up';
} else if (trend < 0) {
trendElement.innerHTML = '?';
trendElement.className = 'kpi-trend down';
} else {
trendElement.innerHTML = '?';
trendElement.className = 'kpi-trend stable';
}
}
// Apply color based on thresholds
this.applyKPIColor(widget, value);
} catch (error) {
console.error(`Error updating KPI ${widget.id}:`, error);
}
}
async updateChart(widget) {
try {
const timestamp = new Date();
for (let i = 0; i < widget.series.length; i++) {
const value = await this.getTagValue(widget.series[i].tag);
// Add new data point
widget.chart.data.datasets[i].data.push({
x: timestamp,
y: value
});
// Remove old points
if (widget.chart.data.datasets[i].data.length > widget.maxPoints) {
widget.chart.data.datasets[i].data.shift();
}
}
// Update chart
widget.chart.update('none');
} catch (error) {
console.error(`Error updating chart ${widget.id}:`, error);
}
}
async updateAlarms(widget) {
try {
const alarms = await this.getActiveAlarms();
const listElement = document.getElementById('alarm-list');
listElement.innerHTML = alarms.map(alarm => `
<div class="alarm-item priority-${alarm.priority}">
<span class="alarm-time">${this.formatTime(alarm.timestamp)}</span>
<span class="alarm-name">${alarm.name}</span>
<span class="alarm-message">${alarm.message}</span>
<button onclick="acknowledgeAlarm('${alarm.id}')">ACK</button>
</div>
`).join('');
} catch (error) {
console.error('Error updating alarms:', error);
}
}
async getTagValue(tagPath) {
// Simulate API call to get tag value
const response = await fetch(`/api/tags/${tagPath}`);
const data = await response.json();
return data.value;
}
async getActiveAlarms() {
const response = await fetch('/api/alarms/active');
return await response.json();
}
applyKPIColor(widget, value) {
const element = widget.element.querySelector('.kpi-value');
// Define thresholds based on widget ID
const thresholds = {
'oee': { good: 85, warning: 70 },
'quality': { good: 95, warning: 90 },
'production': { good: 100, warning: 80 }
};
const threshold = thresholds[widget.id];
if (threshold) {
if (value >= threshold.good) {
element.className = 'kpi-value good';
} else if (value >= threshold.warning) {
element.className = 'kpi-value warning';
} else {
element.className = 'kpi-value critical';
}
}
}
formatTime(timestamp) {
const date = new Date(timestamp);
return date.toLocaleTimeString('en-US', {
hour: '2-digit',
minute: '2-digit'
});
}
}
// Initialize dashboard
const dashboard = new DynamicDashboard('dashboard-root');
dashboard.initialize();
Complete Solution Example
Water Treatment Plant Automation
csharp
// C# - Complete water treatment plant control system
public class WaterTreatmentPlant
{
private readonly string plantId;
private PlantState currentState;
private Dictionary<string, PumpController> pumps;
private Dictionary<string, ValveController> valves;
private ChemicalDosing chemicalSystem;
private FiltrationSystem filtration;
public async Task RunPlantControl()
{
while (true)
{
try
{
// Read plant state
UpdatePlantState();
// Execute control logic based on state
switch (currentState)
{
case PlantState.Startup:
await ExecuteStartupSequence();
break;
case PlantState.Normal:
await ExecuteNormalOperation();
break;
case PlantState.Backwash:
await ExecuteBackwashSequence();
break;
case PlantState.Emergency:
await ExecuteEmergencyShutdown();
break;
case PlantState.Maintenance:
await ExecuteMaintenanceMode();
break;
}
// Update SCADA displays
UpdateSCADA();
// Check for state transitions
CheckStateTransitions();
await Task.Delay(100); // 100ms scan rate
}
catch (Exception ex)
{
LogError($"Plant control error: {ex.Message}");
currentState = PlantState.Emergency;
}
}
}
private async Task ExecuteNormalOperation()
{
// Influent flow control
var influentFlow = @Tag.Influent.Flow;
var targetFlow = @Tag.Influent.Setpoint;
// Control influent pumps
var pumpControl = CalculatePumpControl(influentFlow, targetFlow);
await ControlInfluentPumps(pumpControl);
// Chemical dosing control
await chemicalSystem.ControlDosing(influentFlow);
// Filtration control
await filtration.ControlFiltration();
// Clear well level control
var clearWellLevel = @Tag.ClearWell.Level;
await ControlDistributionPumps(clearWellLevel);
// Monitor water quality
MonitorWaterQuality();
// Check filter backwash requirements
if (filtration.RequiresBackwash())
{
@Tag.Plant.RequestBackwash = true;
}
}
private async Task<bool> ControlInfluentPumps(PumpControlData control)
{
// Determine number of pumps needed
int pumpsRequired = CalculatePumpsRequired(control.FlowDemand);
int pumpsRunning = pumps.Count(p => p.Value.IsRunning);
// Start/stop pumps as needed
if (pumpsRequired > pumpsRunning)
{
// Start next available pump
var nextPump = pumps.Values
.Where(p => !p.IsRunning && p.IsAvailable)
.OrderBy(p => p.RunHours)
.FirstOrDefault();
if (nextPump != null)
{
await nextPump.Start();
LogInfo($"Started pump {nextPump.Id}");
}
}
else if (pumpsRequired < pumpsRunning)
{
// Stop pump with most run hours
var pumpToStop = pumps.Values
.Where(p => p.IsRunning)
.OrderByDescending(p => p.RunHours)
.FirstOrDefault();
if (pumpToStop != null)
{
await pumpToStop.Stop();
LogInfo($"Stopped pump {pumpToStop.Id}");
}
}
// Adjust VFD speeds for running pumps
foreach (var pump in pumps.Values.Where(p => p.IsRunning))
{
pump.SetSpeed(control.PumpSpeed);
}
return true;
}
private void MonitorWaterQuality()
{
// Read quality parameters
var quality = new WaterQuality
{
Turbidity = @Tag.Quality.Turbidity,
pH = @Tag.Quality.pH,
Chlorine = @Tag.Quality.Chlorine,
Temperature = @Tag.Quality.Temperature
};
// Check against limits
if (quality.Turbidity > @Tag.Limits.TurbidityMax)
{
RaiseAlarm("HighTurbidity", AlarmPriority.High,
$"Turbidity {quality.Turbidity} NTU exceeds limit");
}
if (quality.pH < @Tag.Limits.pHMin || quality.pH > @Tag.Limits.pHMax)
{
RaiseAlarm("pHOutOfRange", AlarmPriority.Medium,
$"pH {quality.pH} out of range");
}
if (quality.Chlorine < @Tag.Limits.ChlorineMin)
{
RaiseAlarm("LowChlorine", AlarmPriority.High,
$"Chlorine {quality.Chlorine} ppm below minimum");
}
// Log to historian
LogQualityData(quality);
// Update compliance calculations
UpdateComplianceMetrics(quality);
}
}
// Chemical dosing subsystem
public class ChemicalDosing
{
public async Task ControlDosing(double flowRate)
{
// Calculate required dosing rates
var chlorineDose = CalculateChlorineDose(flowRate);
var coagulantDose = CalculateCoagulantDose(flowRate);
var polymperDose = CalculatePolymerDose(flowRate);
// Control metering pumps
await ControlMeteringPump("Chlorine", chlorineDose);
await ControlMeteringPump("Coagulant", coagulantDose);
await ControlMeteringPump("Polymer", polymerDose);
// Monitor tank levels
CheckChemicalTankLevels();
}
private double CalculateChlorineDose(double flowRate)
{
// Get target residual
var targetResidual = @Tag.Chemical.ChlorineSetpoint;
var currentResidual = @Tag.Quality.ChlorineResidual;
// PID control for dosing rate
var error = targetResidual - currentResidual;
var dose = @Tag.Chemical.ChlorineBaseDose + (error * 0.5);
// Flow pacing
dose = dose * flowRate / 1000; // Convert to GPH
// Limit dosing rate
return Math.Max(0, Math.Min(dose, @Tag.Chemical.ChlorineMaxDose));
}
}
AI Assistant Data
<details> <summary>Structured Information for AI Tools</summary>
json
{
"page": "Code Examples",
"type": "Complete Code Examples",
"purpose": "Production-ready code examples for FrameworX",
"categories": {
"industrial": ["Motor control", "PID loops", "Batch processing"],
"dataProcessing": ["Analytics", "SPC", "Reporting"],
"integration": ["REST API", "MQTT", "Database"],
"userInterface": ["Dashboards", "Dynamic displays", "Mobile"],
"complete": ["Water treatment", "Manufacturing", "Building automation"]
},
"languages": ["C#", "Python", "JavaScript", "VB.NET"],
"features": {
"errorHandling": "Comprehensive try-catch blocks",
"logging": "Detailed logging throughout",
"performance": "Optimized algorithms",
"security": "Input validation and authentication",
"documentation": "Inline comments"
},
"patterns": [
"State machines",
"Retry logic",
"Async operations",
"Event-driven",
"Object-oriented"
]
}