57
loading...
This website collects cookies to deliver better user experience
dotnet new console -n eventhubpublishertest
cd eventhubpublishertest
. code
using System;
using System.Globalization;
using System.Security.Cryptography;
using System.Text;
using System.Web;
namespace eventhubpublishertest
{
class Program
{
static void Main(string[] args)
{
var eventHubHostName = "kenakamupublishertest.servicebus.windows.net";
var eventHubName = "publisher-test";
var keyName = "Send";
var keyValue = "<your send policy key>";
var publisher = "publisher1";
var sasToken = CreateToken($"https://{eventHubHostName}/{eventHubName}/publishers/{publisher}", keyName, keyValue);
}
private static string CreateToken(string resourceUri, string keyName, string key)
{
TimeSpan sinceEpoch = DateTime.UtcNow - new DateTime(1970, 1, 1);
var week = 60 * 60 * 24 * 7;
var expiry = Convert.ToString((int)sinceEpoch.TotalSeconds + week);
string stringToSign = HttpUtility.UrlEncode(resourceUri) + "\n" + expiry;
HMACSHA256 hmac = new HMACSHA256(Encoding.UTF8.GetBytes(key));
var signature = Convert.ToBase64String(hmac.ComputeHash(Encoding.UTF8.GetBytes(stringToSign)));
var sasToken = String.Format(CultureInfo.InvariantCulture, "SharedAccessSignature sr={0}&sig={1}&se={2}&skn={3}", HttpUtility.UrlEncode(resourceUri), HttpUtility.UrlEncode(signature), expiry, keyName);
return sasToken;
}
}
}
dotnet add package Azure.Messaging.EventHubs
using System;
using System.Collections.Generic;
using System.Globalization;
using System.Security.Cryptography;
using System.Text;
using System.Threading.Tasks;
using System.Web;
using Azure;
using Azure.Messaging.EventHubs;
using Azure.Messaging.EventHubs.Producer;
namespace eventhubpublishertest
{
class Program
{
static async Task Main(string[] args)
{
var eventHubHostName = "kenakamupublishertest.servicebus.windows.net";
var eventHubName = "publisher-test";
var keyName = "Send";
var keyValue = "<your send policy key>";
var publisher = "publisher1";
var sasToken = CreateToken($"https://{eventHubHostName}/{eventHubName}/publishers/{publisher}", keyName, keyValue);
var producerClient = new EventHubProducerClient(
eventHubHostName,
$"{eventHubName}/publishers/{publisher}",
new AzureSasCredential(sasToken));
var sendEvents = new List<EventData>()
{
new EventData(Encoding.ASCII.GetBytes("test")),
};
await producerClient.SendAsync(sendEvents).ConfigureAwait(false);
}
private static string CreateToken(string resourceUri, string keyName, string key)
{
TimeSpan sinceEpoch = DateTime.UtcNow - new DateTime(1970, 1, 1);
var week = 60 * 60 * 24 * 7;
var expiry = Convert.ToString((int)sinceEpoch.TotalSeconds + week);
string stringToSign = HttpUtility.UrlEncode(resourceUri) + "\n" + expiry;
HMACSHA256 hmac = new HMACSHA256(Encoding.UTF8.GetBytes(key));
var signature = Convert.ToBase64String(hmac.ComputeHash(Encoding.UTF8.GetBytes(stringToSign)));
var sasToken = String.Format(CultureInfo.InvariantCulture, "SharedAccessSignature sr={0}&sig={1}&se={2}&skn={3}", HttpUtility.UrlEncode(resourceUri), HttpUtility.UrlEncode(signature), expiry, keyName);
return sasToken;
}
}
}
dotnet new console -n eventhubconsumertest
cd eventhubconsumertest
dotnet add package Azure.Messaging.EventHubs
dotnet add package Azure.Messaging.EventHubs.Processor
using System;
using System.Text;
using System.Threading.Tasks;
using Azure.Storage.Blobs;
using Azure.Messaging.EventHubs;
using Azure.Messaging.EventHubs.Consumer;
using Azure.Messaging.EventHubs.Processor;
namespace eventhubconsumertest
{
class Program
{
private const string ehubNamespaceConnectionString = "<event hub namespace connection string>";
private const string eventHubName = "publisher-test";
private const string blobStorageConnectionString = "<storage account connection string>";
private const string blobContainerName = "checkpoint";
static BlobContainerClient storageClient;
static EventProcessorClient processor;
static async Task Main(string[] args)
{
string consumerGroup = EventHubConsumerClient.DefaultConsumerGroupName;
storageClient = new BlobContainerClient(blobStorageConnectionString, blobContainerName);
processor = new EventProcessorClient(storageClient, consumerGroup, ehubNamespaceConnectionString, eventHubName);
processor.ProcessEventAsync += ProcessEventHandler;
processor.ProcessErrorAsync += ProcessErrorHandler;
await processor.StartProcessingAsync();
await Task.Delay(TimeSpan.FromSeconds(30));
await processor.StopProcessingAsync();
}
static async Task ProcessEventHandler(ProcessEventArgs eventArgs)
{
// Write the body of the event to the console window
Console.WriteLine("\tReceived event: {0}", Encoding.UTF8.GetString(eventArgs.Data.Body.ToArray()));
Console.WriteLine("\tPublisher: {0}", eventArgs.Data.PartitionKey);
// Update checkpoint in the blob storage so that the app receives only new events the next time it's run
await eventArgs.UpdateCheckpointAsync(eventArgs.CancellationToken);
}
static Task ProcessErrorHandler(ProcessErrorEventArgs eventArgs)
{
// Write details about the error to the console window
Console.WriteLine($"\tPartition '{ eventArgs.PartitionId}': an unhandled exception was encountered. This was not expected to happen.");
Console.WriteLine(eventArgs.Exception.Message);
return Task.CompletedTask;
}
}
}
dotnet run
dotnet run