Exchange_1C_Counterparty/HRM_MQ_Consumer_Service/HRMConsumerService.cs
2025-07-10 16:35:13 +05:00

78 lines
2.6 KiB
C#

using HRM_MQ_Consumer_Service.Parsers;
using Microsoft.Extensions.Configuration;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using Simple.OData.Client;
using System;
using System.Collections.Generic;
using System.ComponentModel;
using System.Configuration;
using System.Data;
using System.Diagnostics;
using System.Linq;
using System.ServiceProcess;
using System.Text;
using System.Threading.Tasks;
namespace HRM_MQ_Consumer_Service
{
public partial class HRMConsumerService : ServiceBase
{
private ConnectionFactory connectionFactory = new ConnectionFactory
{
HostName = ConfigurationManager.AppSettings["rabbitmq_host_name"],
UserName = ConfigurationManager.AppSettings["rabbitmq_user_name"],
Password = ConfigurationManager.AppSettings["rabbitmq_password"],
VirtualHost = ConfigurationManager.AppSettings["rabbitmq_virtualhost"]
};
private IConnection _connection;
private IChannel _channel;
private AsyncEventingBasicConsumer _consumer;
private CounterpartyJSONParser _counterpartyParser;
public HRMConsumerService()
{
InitializeComponent();
}
internal void TestStatupAndStop(string[] args)
{
this.OnStart(args);
Console.ReadLine();
this.OnStop();
}
protected override async void OnStart(string[] args)
{
string queue = ConfigurationManager.AppSettings["rabbitmq_queue_name"];
_connection = await connectionFactory.CreateConnectionAsync();
_channel = await _connection.CreateChannelAsync();
await _channel.QueueDeclareAsync(
queue: queue,
durable: false,
exclusive: false,
autoDelete: false,
arguments: null);
_consumer = new AsyncEventingBasicConsumer(_channel);
_counterpartyParser = new CounterpartyJSONParser();
_consumer.ReceivedAsync += async (model, ea) =>
{
var body = ea.Body.ToArray();
var message = Encoding.UTF8.GetString(body);
var result = await _counterpartyParser.Parse(message);
//var result = await _counterpartyParser.Parse(message);
//Console.WriteLine($"Recieved {message}");
};
await _channel.BasicConsumeAsync(
queue: queue,
autoAck: true,
consumer: _consumer);
}
protected override void OnStop()
{
_channel?.Dispose();
_connection?.Dispose();
}
}
}