Add scalable session thresholds

This commit is contained in:
Simponic 2022-12-30 05:46:35 -07:00
parent 60eea1b4ed
commit 42425b0226
Signed by untrusted user who does not match committer: simponic
GPG Key ID: 52B3774857EB24B1
16 changed files with 262 additions and 44 deletions

View File

@ -1,11 +1,11 @@
import Config import Config
config :chessh, RateLimits, config :chessh, RateLimits,
jail_timeout_ms: 5000, jail_timeout_ms: 10_000,
jail_attempt_threshold: 3 jail_attempt_threshold: 3
config :chessh, Chessh.Repo, config :chessh, Chessh.Repo,
database: "chessh-test", database: "chesshtest",
username: "postgres", username: "postgres",
password: "postgres", password: "postgres",
hostname: "localhost", hostname: "localhost",

View File

@ -1,7 +1,16 @@
defmodule Chessh.Auth.KeyAuthenticator do defmodule Chessh.Auth.KeyAuthenticator do
alias Chessh.{Key, Repo} alias Chessh.{Key, Repo, Player}
import Ecto.Query import Ecto.Query
def authenticate(player = %Player{}, public_key) do
!!Repo.one(
from(k in Key,
where: k.key == ^Key.encode_key(public_key),
where: k.player_id == ^player.id
)
)
end
def authenticate(username, public_key) do def authenticate(username, public_key) do
!!Repo.one( !!Repo.one(
from(k in Key, from(k in Key,

View File

@ -1,9 +1,14 @@
defmodule Chessh.Auth.PasswordAuthenticator do defmodule Chessh.Auth.PasswordAuthenticator do
alias Chessh.{Player, Repo} alias Chessh.{Player, Repo}
def authenticate(player = %Player{}, password) do
Player.valid_password?(player, password)
end
def authenticate(username, password) do def authenticate(username, password) do
case Repo.get_by(Player, username: username) do case Repo.get_by(Player, username: username) do
x -> Player.valid_password?(x, password) player -> authenticate(player, password)
nil -> false
end end
end end
end end

View File

@ -5,7 +5,7 @@ defmodule Chessh.Node do
@primary_key {:id, :string, []} @primary_key {:id, :string, []}
schema "nodes" do schema "nodes" do
field(:last_start, :utc_datetime) field(:last_start, :utc_datetime_usec)
end end
def changeset(node, attrs) do def changeset(node, attrs) do
@ -18,7 +18,7 @@ defmodule Chessh.Node do
nil -> %Chessh.Node{id: node_id} nil -> %Chessh.Node{id: node_id}
node -> node node -> node
end end
|> Chessh.Node.changeset(%{last_start: DateTime.utc_now()}) |> changeset(%{last_start: DateTime.utc_now()})
|> Repo.insert_or_update() |> Repo.insert_or_update()
end end
end end

View File

@ -9,11 +9,18 @@ defmodule Chessh.Player do
field(:password, :string, virtual: true) field(:password, :string, virtual: true)
field(:hashed_password, :string) field(:hashed_password, :string)
field(:authentications, :integer, default: 0)
has_many(:keys, Chessh.Key) has_many(:keys, Chessh.Key)
timestamps() timestamps()
end end
def authentications_changeset(player, attrs) do
player
|> cast(attrs, [:authentications])
end
def registration_changeset(player, attrs, opts \\ []) do def registration_changeset(player, attrs, opts \\ []) do
player player
|> cast(attrs, [:username, :password]) |> cast(attrs, [:username, :password])

View File

@ -1,10 +1,12 @@
defmodule Chessh.PlayerSession do defmodule Chessh.PlayerSession do
alias Chessh.Repo alias Chessh.{Repo, Player, PlayerSession, Utils}
use Ecto.Schema use Ecto.Schema
import Ecto.{Query, Changeset} import Ecto.{Query, Changeset}
require Logger
schema "player_sessions" do schema "player_sessions" do
field(:login, :utc_datetime) field(:process, :string)
field(:login, :utc_datetime_usec)
belongs_to(:node, Chessh.Node, type: :string) belongs_to(:node, Chessh.Node, type: :string)
belongs_to(:player, Chessh.Player) belongs_to(:player, Chessh.Player)
@ -17,7 +19,7 @@ defmodule Chessh.PlayerSession do
def concurrent_sessions(player) do def concurrent_sessions(player) do
Repo.aggregate( Repo.aggregate(
from(p in Chessh.PlayerSession, from(p in PlayerSession,
where: p.player_id == ^player.id where: p.player_id == ^player.id
), ),
:count :count
@ -31,4 +33,48 @@ defmodule Chessh.PlayerSession do
) )
) )
end end
def player_within_concurrent_sessions_and_satisfies(username, auth_fn) do
max_sessions =
Application.get_env(:chessh, RateLimits)
|> Keyword.get(:max_concurrent_user_sessions)
Repo.transaction(fn ->
case Repo.one(
from(p in Player,
where: p.username == ^String.Chars.to_string(username),
lock: "FOR UPDATE"
)
) do
nil ->
Logger.error("Player with username #{username} does not exist")
send(self(), {:authed, false})
player ->
authed =
auth_fn.(player) &&
PlayerSession.concurrent_sessions(player) < max_sessions
Repo.insert(%PlayerSession{
login: DateTime.utc_now(),
node_id: System.fetch_env!("NODE_ID"),
player: player,
# TODO: This PID may be wrong - need to determine if this PID is shared with disconnectfun
process: Utils.pid_to_str(self())
})
player
|> Player.authentications_changeset(%{authentications: player.authentications + 1})
|> Repo.update()
send(self(), {:authed, authed})
end
end)
receive do
{:authed, authed} -> authed
after
3_000 -> false
end
end
end end

26
lib/chessh/ssh/cli.ex Normal file
View File

@ -0,0 +1,26 @@
defmodule Chessh.SSH.Cli do
@behaviour :ssh_server_channel
def init() do
{:ok, %{}}
end
def handle_msg(message, state) do
{:ok, state}
end
def handle_ssh_msg(message, state) do
{:ok, state}
end
def handle_ssh_msg(
{:ssh_cm, _connection_handler, {:exit_signal, channel_id, signal, err, lang}},
state
) do
{:stop, channel_id, state}
end
def terminate(reason, state) do
:ok
end
end

View File

@ -1,6 +1,10 @@
defmodule Chessh.SSH.Daemon do defmodule Chessh.SSH.Daemon do
alias Chessh.{Repo, PlayerSession, Player, Utils}
alias Chessh.Auth.PasswordAuthenticator alias Chessh.Auth.PasswordAuthenticator
use GenServer use GenServer
import Ecto.Query
require Logger
def start_link(_) do def start_link(_) do
GenServer.start_link(__MODULE__, %{ GenServer.start_link(__MODULE__, %{
@ -13,33 +17,33 @@ defmodule Chessh.SSH.Daemon do
{:ok, state} {:ok, state}
end end
def pwd_authenticate(username, password) do def pwd_authenticate(username, password, {ip, _port}) do
# TODO - check concurrent sessions
PasswordAuthenticator.authenticate(
String.Chars.to_string(username),
String.Chars.to_string(password)
)
end
def pwd_authenticate(username, password, inet) do
[jail_timeout_ms, jail_attempt_threshold] = [jail_timeout_ms, jail_attempt_threshold] =
Application.get_env(:chessh, RateLimits) Application.get_env(:chessh, RateLimits)
|> Keyword.take([:jail_timeout_ms, :jail_attempt_threshold]) |> Keyword.take([:jail_timeout_ms, :jail_attempt_threshold])
|> Keyword.values() |> Keyword.values()
{ip, _port} = inet
rateId = "failed_password_attempts:#{Enum.join(Tuple.to_list(ip), ".")}" rateId = "failed_password_attempts:#{Enum.join(Tuple.to_list(ip), ".")}"
if pwd_authenticate(username, password) do case PasswordAuthenticator.authenticate(
true String.Chars.to_string(username),
else String.Chars.to_string(password)
case Hammer.check_rate_inc(rateId, jail_timeout_ms, jail_attempt_threshold, 1) do ) do
{:allow, _count} -> false ->
false case Hammer.check_rate_inc(rateId, jail_timeout_ms, jail_attempt_threshold, 1) do
{:allow, _count} ->
false
{:deny, _limit} -> {:deny, _limit} ->
:disconnect :disconnect
end end
x ->
if PlayerSession.player_within_concurrent_sessions_and_satisfies(username, fn _player ->
x
end),
do: true,
else: :disconnect
end end
end end
@ -53,10 +57,13 @@ defmodule Chessh.SSH.Daemon do
case :ssh.daemon( case :ssh.daemon(
port, port,
# shell: fn _username, _peer -> Process.sleep(5000) end,
system_dir: key_dir, system_dir: key_dir,
pwdfun: &pwd_authenticate/4, pwdfun: &pwd_authenticate/4,
key_cb: Chessh.SSH.ServerKey, key_cb: Chessh.SSH.ServerKey,
# disconnectfun: ssh_cli: {Chessh.SSH.Cli, []},
# connectfun: &on_connect/3,
disconnectfun: &on_disconnect/1,
id_string: :random, id_string: :random,
subsystems: [], subsystems: [],
parallel_login: true, parallel_login: true,
@ -74,4 +81,32 @@ defmodule Chessh.SSH.Daemon do
end end
def handle_info(_, state), do: {:noreply, state} def handle_info(_, state), do: {:noreply, state}
# defp on_connect(username, _inet, _method) do
# Logger.debug("#{inspect(self())} connected and is authenticated as #{username}")
#
# case Repo.get_by(Player, username: String.Chars.to_string(username)) do
# nil ->
# nil
#
# player ->
# Repo.insert(%PlayerSession{
# login: DateTime.utc_now(),
# node_id: System.fetch_env!("NODE_ID"),
# player: player,
# process: pid_to_str(self())
# })
# end
# end
defp on_disconnect(_reason) do
Logger.debug("#{inspect(self())} disconnected")
Repo.delete_all(
from(p in PlayerSession,
where: p.node_id == ^System.fetch_env!("NODE_ID"),
where: p.process == ^Utils.pid_to_str(self())
)
)
end
end end

View File

@ -1,9 +1,14 @@
defmodule Chessh.SSH.ServerKey do defmodule Chessh.SSH.ServerKey do
alias Chessh.PlayerSession
alias Chessh.Auth.KeyAuthenticator alias Chessh.Auth.KeyAuthenticator
@behaviour :ssh_server_key_api @behaviour :ssh_server_key_api
def is_auth_key(key, username, _daemon_options) do def is_auth_key(key, username, _daemon_options) do
KeyAuthenticator.authenticate(username, key) PlayerSession.player_within_concurrent_sessions_and_satisfies(
username,
&KeyAuthenticator.authenticate(&1, key)
)
end end
def host_key(algorithm, daemon_options) do def host_key(algorithm, daemon_options) do

9
lib/chessh/utils.ex Normal file
View File

@ -0,0 +1,9 @@
defmodule Chessh.Utils do
def pid_to_str(pid) do
pid
|> :erlang.pid_to_list()
|> List.delete_at(0)
|> List.delete_at(-1)
|> to_string()
end
end

View File

@ -4,7 +4,7 @@ defmodule Chessh.Repo.Migrations.AddNode do
def change do def change do
create table(:nodes, primary_key: false) do create table(:nodes, primary_key: false) do
add(:id, :string, primary_key: true) add(:id, :string, primary_key: true)
add(:last_start, :utc_datetime) add(:last_start, :utc_datetime_usec)
end end
end end
end end

View File

@ -3,9 +3,13 @@ defmodule Chessh.Repo.Migrations.AddUserSession do
def change do def change do
create table(:player_sessions) do create table(:player_sessions) do
add(:login, :utc_datetime) add(:process, :string)
add(:login, :utc_datetime_usec)
add(:player_id, references(:players)) add(:player_id, references(:players))
add(:node_id, references(:nodes, type: :string)) add(:node_id, references(:nodes, type: :string))
end end
create(unique_index(:player_sessions, [:process, :node_id]))
end end
end end

View File

@ -0,0 +1,9 @@
defmodule Chessh.Repo.Migrations.AddAuthenticatingColumnToPlayer do
use Ecto.Migration
def change do
alter table(:players) do
add(:authentications, :integer, default: 0)
end
end
end

View File

@ -8,7 +8,7 @@ defmodule Chessh.Auth.PasswordAuthenticatorTest do
Ecto.Adapters.SQL.Sandbox.checkout(Repo) Ecto.Adapters.SQL.Sandbox.checkout(Repo)
Ecto.Adapters.SQL.Sandbox.mode(Repo, {:shared, self()}) Ecto.Adapters.SQL.Sandbox.mode(Repo, {:shared, self()})
{:ok, _user} = Repo.insert(Player.registration_changeset(%Player{}, @valid_user)) {:ok, _player} = Repo.insert(Player.registration_changeset(%Player{}, @valid_user))
:ok :ok
end end
@ -24,4 +24,13 @@ defmodule Chessh.Auth.PasswordAuthenticatorTest do
"a_bad_password" "a_bad_password"
) )
end end
test "Password can authenticate a user instance" do
player = Repo.get_by(Player, username: "logan")
assert Chessh.Auth.PasswordAuthenticator.authenticate(
player,
@valid_user.password
)
end
end end

View File

@ -1,6 +1,6 @@
defmodule Chessh.SSH.AuthTest do defmodule Chessh.SSH.AuthTest do
use ExUnit.Case use ExUnit.Case, async: false
alias Chessh.{Player, Repo, Key} alias(Chessh.{Player, Repo, Key, PlayerSession})
@localhost '127.0.0.1' @localhost '127.0.0.1'
@localhost_inet {{127, 0, 0, 1}, 1} @localhost_inet {{127, 0, 0, 1}, 1}
@ -26,6 +26,14 @@ defmodule Chessh.SSH.AuthTest do
:ok :ok
end end
def cleanup() do
Process.sleep(1_000)
PlayerSession.delete_all_on_node(System.fetch_env!("NODE_ID"))
# Wait for (what I believe to be the) DB Connection queue to clear?
Process.sleep(1_000)
end
test "Password attempts are rate limited" do test "Password attempts are rate limited" do
jail_attempt_threshold = jail_attempt_threshold =
Application.get_env(:chessh, RateLimits) Application.get_env(:chessh, RateLimits)
@ -49,7 +57,7 @@ defmodule Chessh.SSH.AuthTest do
test_pid = self() test_pid = self()
Task.Supervisor.start_child(sup, fn -> Task.Supervisor.start_child(sup, fn ->
{:ok, _pid} = {:ok, conn} =
:ssh.connect(@localhost, Application.fetch_env!(:chessh, :port), :ssh.connect(@localhost, Application.fetch_env!(:chessh, :port),
user: String.to_charlist(@valid_user.username), user: String.to_charlist(@valid_user.username),
password: String.to_charlist(@valid_user.password), password: String.to_charlist(@valid_user.password),
@ -57,11 +65,12 @@ defmodule Chessh.SSH.AuthTest do
silently_accept_hosts: true silently_accept_hosts: true
) )
:ssh.close(conn)
send(test_pid, :connected_via_password) send(test_pid, :connected_via_password)
end) end)
Task.Supervisor.start_child(sup, fn -> Task.Supervisor.start_child(sup, fn ->
{:ok, _pid} = {:ok, conn} =
:ssh.connect(@localhost, Application.fetch_env!(:chessh, :port), :ssh.connect(@localhost, Application.fetch_env!(:chessh, :port),
user: String.to_charlist(@valid_user.username), user: String.to_charlist(@valid_user.username),
auth_methods: 'publickey', auth_methods: 'publickey',
@ -69,15 +78,61 @@ defmodule Chessh.SSH.AuthTest do
user_dir: String.to_charlist(@client_test_keys_dir) user_dir: String.to_charlist(@client_test_keys_dir)
) )
:ssh.close(conn)
send(test_pid, :connected_via_public_key) send(test_pid, :connected_via_public_key)
end) end)
assert_receive(:connected_via_password, 1000) assert_receive(:connected_via_password, 2_000)
assert_receive(:connected_via_public_key, 1000) assert_receive(:connected_via_public_key, 2_000)
cleanup()
end end
# TODO test "INTEGRATION - Player cannot have more than specified concurrent sessions" do
# test "INTEGRATION - User cannot have more than specified concurrent sessions" do max_concurrent_user_sessions =
# :ok Application.get_env(:chessh, RateLimits)
# end |> Keyword.get(:max_concurrent_user_sessions)
player = Repo.get_by(Player, username: @valid_user.username)
{:ok, sup} = Task.Supervisor.start_link()
test_pid = self()
Enum.reduce(0..(max_concurrent_user_sessions + 1), fn i, _ ->
Task.Supervisor.start_child(sup, fn ->
case :ssh.connect(@localhost, Application.fetch_env!(:chessh, :port),
user: String.to_charlist(@valid_user.username),
password: String.to_charlist(@valid_user.password),
auth_methods: if(rem(i, 2) == 0, do: 'publickey', else: 'password'),
silently_accept_hosts: true,
user_dir: String.to_charlist(@client_test_keys_dir)
) do
{:ok, conn} ->
send(
test_pid,
{:attempted, {:ok, conn}}
)
x ->
send(test_pid, {:attempted, x})
end
end)
end)
Enum.reduce(0..max_concurrent_user_sessions, fn _, _ ->
assert_receive({:attempted, {:ok, _conn}}, 2000)
end)
assert_receive(
{:attempted, {:error, 'Unable to connect using the available authentication methods'}},
2000
)
# Give it time to send back the disconnection payload after session was opened
# but over threshold
:timer.sleep(100)
assert PlayerSession.concurrent_sessions(player) == max_concurrent_user_sessions
cleanup()
end
end end

View File

@ -1,3 +1,2 @@
ExUnit.start() ExUnit.start()
Ecto.Adapters.SQL.Sandbox.mode(Chessh.Repo, :manual) Ecto.Adapters.SQL.Sandbox.mode(Chessh.Repo, :manual)