73
loading...
This website collects cookies to deliver better user experience
MQTT (short for Message Queuing Telemetry Transport) is an OASIS standard messaging protocol for the Internet of Things (IoT). It is designed as an extremely lightweight publish/subscribe messaging transport that is ideal for connecting remote devices with a small code footprint and minimal network bandwidth. MQTT today is used in a wide variety of industries, such as automotive, manufacturing, telecommunications, oil and gas, etc.
.sln
file and two c# console projects, as it follows:.
├── MQTTFirstLook.Broker
├── MQTTFirstLook.Client
└── MQTTFirstLook.sln
MQTTServer
that listen to the port 707 on localhost.using System;
using System.Text;
using MQTTnet;
using MQTTnet.Server;
using Serilog;
void Main(string[] args)
method.// Create the options for our MQTT Broker
MqttServerOptionsBuilder options = new MqttServerOptionsBuilder()
// set endpoint to localhost
.WithDefaultEndpoint()
// port used will be 707
.WithDefaultEndpointPort(707)
// handler for new connections
.WithConnectionValidator(OnNewConnection)
// handler for new messages
.WithApplicationMessageInterceptor(OnNewMessage);
// creates a new mqtt server
IMqttServer mqttServer = new MqttFactory().CreateMqttServer();
// start the server with options
mqttServer.StartAsync(options.Build()).GetAwaiter().GetResult();
// keep application running until user press a key
Console.ReadLine();
public static void OnNewConnection(MqttConnectionValidatorContext context)
{
Log.Logger.Information(
"New connection: ClientId = {clientId}, Endpoint = {endpoint}",
context.ClientId,
context.Endpoint);
}
public static void OnNewMessage(MqttApplicationMessageInterceptorContext context)
{
var payload = context.ApplicationMessage?.Payload == null ? null : Encoding.UTF8.GetString(context.ApplicationMessage?.Payload);
MessageCounter++;
Log.Logger.Information(
"MessageId: {MessageCounter} - TimeStamp: {TimeStamp} -- Message: ClientId = {clientId}, Topic = {topic}, Payload = {payload}, QoS = {qos}, Retain-Flag = {retainFlag}",
MessageCounter,
DateTime.Now,
context.ClientId,
context.ApplicationMessage?.Topic,
payload,
context.ApplicationMessage?.QualityOfServiceLevel,
context.ApplicationMessage?.Retain);
}
MQTTClient
instance. This instance will connect to our Broker on localhost:707 and send messages to the topic Dev.to/topic/json
using System;
using System.Threading.Tasks;
using MQTTnet;
using MQTTnet.Client.Connecting;
using MQTTnet.Client.Disconnecting;
using MQTTnet.Client.Options;
using MQTTnet.Extensions.ManagedClient;
using Newtonsoft.Json;
using Serilog;
void Main(string[] args)
method.// Creates a new client
MqttClientOptionsBuilder builder = new MqttClientOptionsBuilder()
.WithClientId("Dev.To")
.WithTcpServer("localhost", 707);
// Create client options objects
ManagedMqttClientOptions options = new ManagedMqttClientOptionsBuilder()
.WithAutoReconnectDelay(TimeSpan.FromSeconds(60))
.WithClientOptions(builder.Build())
.Build();
// Creates the client object
IManagedMqttClient _mqttClient = new MqttFactory().CreateManagedMqttClient();
// Set up handlers
_mqttClient.ConnectedHandler = new MqttClientConnectedHandlerDelegate(OnConnected);
_mqttClient.DisconnectedHandler = new MqttClientDisconnectedHandlerDelegate(OnDisconnected);
_mqttClient.ConnectingFailedHandler = new ConnectingFailedHandlerDelegate(OnConnectingFailed);
// Starts a connection with the Broker
_mqttClient.StartAsync(options).GetAwaiter().GetResult();
// Send a new message to the broker every second
while (true)
{
string json = JsonConvert.SerializeObject(new { message = "Heyo :)", sent= DateTimeOffset.UtcNow });
_mqttClient.PublishAsync("dev.to/topic/json", json);
Task.Delay(1000).GetAwaiter().GetResult();
}
public static void OnConnected(MqttClientConnectedEventArgs obj)
{
Log.Logger.Information("Successfully connected.");
}
public static void OnConnectingFailed(ManagedProcessFailedEventArgs obj)
{
Log.Logger.Warning("Couldn't connect to broker.");
}
public static void OnDisconnected(MqttClientDisconnectedEventArgs obj)
{
Log.Logger.Information("Successfully disconnected.");
}