| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132 |
- using System.Data;
- using System.Threading;
- using System.Threading.Tasks;
- using Content.Server.Administration.Managers;
- using Microsoft.EntityFrameworkCore;
- using Npgsql;
- namespace Content.Server.Database;
- /// Listens for ban_notification containing the player id and the banning server id using postgres listen/notify.
- /// Players a ban_notification got received for get banned, except when the current server id and the one in the notification payload match.
- public sealed partial class ServerDbPostgres
- {
- /// <summary>
- /// The list of notify channels to subscribe to.
- /// </summary>
- private static readonly string[] NotificationChannels =
- [
- BanManager.BanNotificationChannel,
- MultiServerKickManager.NotificationChannel,
- ];
- private static readonly TimeSpan ReconnectWaitIncrease = TimeSpan.FromSeconds(10);
- private readonly CancellationTokenSource _notificationTokenSource = new();
- private NpgsqlConnection? _notificationConnection;
- private TimeSpan _reconnectWaitTime = TimeSpan.Zero;
- /// <summary>
- /// Sets up the database connection and the notification handler
- /// </summary>
- private void InitNotificationListener(string connectionString)
- {
- _notificationConnection = new NpgsqlConnection(connectionString);
- _notificationConnection.Notification += OnNotification;
- var cancellationToken = _notificationTokenSource.Token;
- Task.Run(() => NotificationListener(cancellationToken), cancellationToken);
- }
- /// <summary>
- /// Listens to the notification channel with basic error handling and reopens the connection if it got closed
- /// </summary>
- private async Task NotificationListener(CancellationToken cancellationToken)
- {
- if (_notificationConnection == null)
- return;
- _notifyLog.Verbose("Starting notification listener");
- while (!cancellationToken.IsCancellationRequested)
- {
- try
- {
- if (_notificationConnection.State == ConnectionState.Broken)
- {
- _notifyLog.Debug("Notification listener entered broken state, closing...");
- await _notificationConnection.CloseAsync();
- }
- if (_notificationConnection.State == ConnectionState.Closed)
- {
- _notifyLog.Debug("Opening notification listener connection...");
- if (_reconnectWaitTime != TimeSpan.Zero)
- {
- _notifyLog.Verbose($"_reconnectWaitTime is {_reconnectWaitTime}");
- await Task.Delay(_reconnectWaitTime, cancellationToken);
- }
- await _notificationConnection.OpenAsync(cancellationToken);
- _reconnectWaitTime = TimeSpan.Zero;
- _notifyLog.Verbose($"Notification connection opened...");
- }
- foreach (var channel in NotificationChannels)
- {
- _notifyLog.Verbose($"Listening on channel {channel}");
- await using var cmd = new NpgsqlCommand($"LISTEN {channel}", _notificationConnection);
- await cmd.ExecuteNonQueryAsync(cancellationToken);
- }
- while (!cancellationToken.IsCancellationRequested)
- {
- _notifyLog.Verbose("Waiting on notifications...");
- await _notificationConnection.WaitAsync(cancellationToken);
- }
- }
- catch (OperationCanceledException)
- {
- // Abort loop on cancel.
- _notifyLog.Verbose($"Shutting down notification listener due to cancellation");
- return;
- }
- catch (Exception e)
- {
- _reconnectWaitTime += ReconnectWaitIncrease;
- _notifyLog.Error($"Error in notification listener: {e}");
- }
- }
- _notificationConnection.Dispose();
- }
- private void OnNotification(object _, NpgsqlNotificationEventArgs notification)
- {
- _notifyLog.Verbose($"Received notification on channel {notification.Channel}");
- NotificationReceived(new DatabaseNotification
- {
- Channel = notification.Channel,
- Payload = notification.Payload,
- });
- }
- public override async Task SendNotification(DatabaseNotification notification)
- {
- await using var db = await GetDbImpl();
- await db.PgDbContext.Database.ExecuteSqlAsync(
- $"SELECT pg_notify({notification.Channel}, {notification.Payload})");
- }
- public override void Shutdown()
- {
- _notificationTokenSource.Cancel();
- if (_notificationConnection == null)
- return;
- _notificationConnection.Notification -= OnNotification;
- }
- }
|