1
0

ServerDbPostgres.Notifications.cs 4.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132
  1. using System.Data;
  2. using System.Threading;
  3. using System.Threading.Tasks;
  4. using Content.Server.Administration.Managers;
  5. using Microsoft.EntityFrameworkCore;
  6. using Npgsql;
  7. namespace Content.Server.Database;
  8. /// Listens for ban_notification containing the player id and the banning server id using postgres listen/notify.
  9. /// Players a ban_notification got received for get banned, except when the current server id and the one in the notification payload match.
  10. public sealed partial class ServerDbPostgres
  11. {
  12. /// <summary>
  13. /// The list of notify channels to subscribe to.
  14. /// </summary>
  15. private static readonly string[] NotificationChannels =
  16. [
  17. BanManager.BanNotificationChannel,
  18. MultiServerKickManager.NotificationChannel,
  19. ];
  20. private static readonly TimeSpan ReconnectWaitIncrease = TimeSpan.FromSeconds(10);
  21. private readonly CancellationTokenSource _notificationTokenSource = new();
  22. private NpgsqlConnection? _notificationConnection;
  23. private TimeSpan _reconnectWaitTime = TimeSpan.Zero;
  24. /// <summary>
  25. /// Sets up the database connection and the notification handler
  26. /// </summary>
  27. private void InitNotificationListener(string connectionString)
  28. {
  29. _notificationConnection = new NpgsqlConnection(connectionString);
  30. _notificationConnection.Notification += OnNotification;
  31. var cancellationToken = _notificationTokenSource.Token;
  32. Task.Run(() => NotificationListener(cancellationToken), cancellationToken);
  33. }
  34. /// <summary>
  35. /// Listens to the notification channel with basic error handling and reopens the connection if it got closed
  36. /// </summary>
  37. private async Task NotificationListener(CancellationToken cancellationToken)
  38. {
  39. if (_notificationConnection == null)
  40. return;
  41. _notifyLog.Verbose("Starting notification listener");
  42. while (!cancellationToken.IsCancellationRequested)
  43. {
  44. try
  45. {
  46. if (_notificationConnection.State == ConnectionState.Broken)
  47. {
  48. _notifyLog.Debug("Notification listener entered broken state, closing...");
  49. await _notificationConnection.CloseAsync();
  50. }
  51. if (_notificationConnection.State == ConnectionState.Closed)
  52. {
  53. _notifyLog.Debug("Opening notification listener connection...");
  54. if (_reconnectWaitTime != TimeSpan.Zero)
  55. {
  56. _notifyLog.Verbose($"_reconnectWaitTime is {_reconnectWaitTime}");
  57. await Task.Delay(_reconnectWaitTime, cancellationToken);
  58. }
  59. await _notificationConnection.OpenAsync(cancellationToken);
  60. _reconnectWaitTime = TimeSpan.Zero;
  61. _notifyLog.Verbose($"Notification connection opened...");
  62. }
  63. foreach (var channel in NotificationChannels)
  64. {
  65. _notifyLog.Verbose($"Listening on channel {channel}");
  66. await using var cmd = new NpgsqlCommand($"LISTEN {channel}", _notificationConnection);
  67. await cmd.ExecuteNonQueryAsync(cancellationToken);
  68. }
  69. while (!cancellationToken.IsCancellationRequested)
  70. {
  71. _notifyLog.Verbose("Waiting on notifications...");
  72. await _notificationConnection.WaitAsync(cancellationToken);
  73. }
  74. }
  75. catch (OperationCanceledException)
  76. {
  77. // Abort loop on cancel.
  78. _notifyLog.Verbose($"Shutting down notification listener due to cancellation");
  79. return;
  80. }
  81. catch (Exception e)
  82. {
  83. _reconnectWaitTime += ReconnectWaitIncrease;
  84. _notifyLog.Error($"Error in notification listener: {e}");
  85. }
  86. }
  87. _notificationConnection.Dispose();
  88. }
  89. private void OnNotification(object _, NpgsqlNotificationEventArgs notification)
  90. {
  91. _notifyLog.Verbose($"Received notification on channel {notification.Channel}");
  92. NotificationReceived(new DatabaseNotification
  93. {
  94. Channel = notification.Channel,
  95. Payload = notification.Payload,
  96. });
  97. }
  98. public override async Task SendNotification(DatabaseNotification notification)
  99. {
  100. await using var db = await GetDbImpl();
  101. await db.PgDbContext.Database.ExecuteSqlAsync(
  102. $"SELECT pg_notify({notification.Channel}, {notification.Payload})");
  103. }
  104. public override void Shutdown()
  105. {
  106. _notificationTokenSource.Cancel();
  107. if (_notificationConnection == null)
  108. return;
  109. _notificationConnection.Notification -= OnNotification;
  110. }
  111. }