Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions malsim/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,17 @@
run_simulation,
)

from malsim.scenario import load_scenario
from malsim.scenario import (
Scenario
)

__title__ = 'malsim'
__version__ = '1.2.0'
__authors__ = ['Andrei Buhaiu', 'Joakim Loxdal', 'Jakob Nyberg', 'Nikolaos Kakouros']
__license__ = 'Apache 2.0'
__docformat__ = 'restructuredtext en'

__all__ = ['MalSimulator', 'MalSimulatorSettings', 'run_simulation', 'load_scenario']
__all__ = ['MalSimulator', 'MalSimulatorSettings', 'run_simulation', 'Scenario']


# TODO: Make sure logging dir exists and make it configurable
Expand Down
4 changes: 2 additions & 2 deletions malsim/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
MalSimulator,
MalSimulatorSettings,
run_simulation,
load_scenario,
Scenario
)
from .mal_simulator import TTCMode

Expand Down Expand Up @@ -52,7 +52,7 @@ def main() -> None:
help='If set, simulator will send actions to malsim-gui',
)
args = parser.parse_args()
scenario = load_scenario(args.scenario_file)
scenario = Scenario.load_from_file(args.scenario_file)
sim = MalSimulator.from_scenario(
scenario,
MalSimulatorSettings(
Expand Down
38 changes: 20 additions & 18 deletions malsim/envs/gym_envs.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@
from gymnasium.core import RenderFrame
import numpy as np

from ..scenario import load_scenario
from ..mal_simulator import MalSimulator, AgentType
from ..scenario import Scenario, AgentConfig, AgentType
from ..mal_simulator import MalSimulator
from ..envs import MalSimVectorizedObsEnv
from ..agents import DecisionAgent

Expand All @@ -31,11 +31,12 @@ def __init__(self, scenario_file: str, **kwargs: Any) -> None:
self.render_mode = kwargs.pop('render_mode', None)

# Create a simulator from the scenario given
scenario = load_scenario(scenario_file, **kwargs)
scenario = Scenario.load_from_file(scenario_file, **kwargs)
self.sim = MalSimVectorizedObsEnv(MalSimulator(scenario.attack_graph))

attacker_agents = [
agent for agent in scenario.agents if agent['type'] == AgentType.ATTACKER
agent for agent in scenario.agents.values()
if agent.type == AgentType.ATTACKER
]

assert len(attacker_agents) == 1, (
Expand All @@ -44,10 +45,10 @@ def __init__(self, scenario_file: str, **kwargs: Any) -> None:
)

attacker_agent = attacker_agents[0]
self.attacker_agent_name = attacker_agent['name']
self.attacker_agent_name = attacker_agent.name

self.sim.register_attacker(
self.attacker_agent_name, attacker_agent['entry_points']
self.attacker_agent_name, attacker_agent.entry_points
)
self.sim.reset()

Expand Down Expand Up @@ -103,7 +104,7 @@ def __init__(self, scenario_file: str, **kwargs: Any) -> None:
self.randomize = kwargs.pop('randomize_attacker_behavior', False)
self.render_mode = kwargs.pop('render_mode', None)

scenario = load_scenario(scenario_file)
scenario = Scenario.load_from_file(scenario_file)

self.scenario_agents = scenario.agents
self.sim = MalSimVectorizedObsEnv(
Expand All @@ -124,27 +125,28 @@ def __init__(self, scenario_file: str, **kwargs: Any) -> None:
self.observation_space = self.sim.observation_space(self.defender_agent_name)
self.action_space = self.sim.action_space(self.defender_agent_name)

def _register_attacker_agents(self, agents: list[dict[str, Any]]) -> None:
def _register_attacker_agents(self, agents: dict[str, AgentConfig]) -> None:
"""Register attackers in simulator"""
for agent_config in agents:
if agent_config['type'] == AgentType.ATTACKER:
for agent_config in agents.values():
if agent_config.type == AgentType.ATTACKER:
self.sim.register_attacker(
agent_config['name'], agent_config['entry_points']
agent_config.name, agent_config.entry_points
)

def _create_attacker_decision_agents(
self, agents: list[dict[str, Any]], seed: Optional[int] = None
) -> dict[str, DecisionAgent]:
self, agents: dict[str, AgentConfig], seed: Optional[int] = None
) -> dict[str, DecisionAgent]:
"""Create decision agents for each attacker"""

attacker_agents = {}
for agent_config in agents:
if agent_config['type'] == AgentType.ATTACKER:
agent_name = agent_config['name']
if agent_config['agent_class']:
attacker_agents[agent_name] = agent_config['agent_class'](
for agent_config in agents.values():
if agent_config.type == AgentType.ATTACKER and agent_config.policy_class:
agent_name = agent_config.name
attacker_agents[agent_name] = (
agent_config.policy_class(
{'seed': seed, 'randomize': self.randomize}
)
)
return attacker_agents

def reset(
Expand Down
46 changes: 13 additions & 33 deletions malsim/mal_simulator.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,17 +20,15 @@
)

from malsim.scenario import (
AgentType,
AgentConfig,
AttackerAgentConfig,
DefenderAgentConfig,
load_scenario,
Scenario
)

from malsim.visualization.malsim_gui_client import MalSimGUIClient

if TYPE_CHECKING:
from malsim.scenario import Scenario
from malsim.agents import DecisionAgent


Expand Down Expand Up @@ -292,17 +290,6 @@ def from_scenario(
) -> MalSimulator:
"""Create a MalSimulator object from a Scenario"""

def register_agent_dict(agent_config: dict[str, Any]) -> None:
"""Register an agent specified in a dictionary"""
if agent_config['type'] == AgentType.ATTACKER:
sim.register_attacker(
agent_config['name'],
agent_config['entry_points'],
agent_config.get('goals'),
)
elif agent_config['type'] == AgentType.DEFENDER:
sim.register_defender(agent_config['name'])

def register_agent_config(agent_config: AgentConfig) -> None:
"""Register an agent config in simulator"""
if isinstance(agent_config, AttackerAgentConfig):
Expand All @@ -314,7 +301,7 @@ def register_agent_config(agent_config: AgentConfig) -> None:

if isinstance(scenario, str):
# Load scenario if file was given
scenario = load_scenario(scenario)
scenario = Scenario.load_from_file(scenario)

sim = cls(
scenario.attack_graph,
Expand All @@ -330,11 +317,8 @@ def register_agent_config(agent_config: AgentConfig) -> None:
)

if register_agents:
for agent_config in scenario.agents:
if isinstance(agent_config, dict):
register_agent_dict(agent_config)
elif isinstance(agent_config, AgentConfig):
register_agent_config(agent_config)
for agent_config in scenario.agents.values():
register_agent_config(agent_config)

return sim

Expand Down Expand Up @@ -1281,14 +1265,14 @@ def render(self) -> None:


def run_simulation(
sim: MalSimulator, agents: list[dict[str, Any]]
) -> dict[str, list[AttackGraphNode]]:
sim: MalSimulator, agents: dict[str, AgentConfig]
) -> dict[str, list[AttackGraphNode]]:
"""Run a simulation with agents

Return selected actions by each agent in each step
"""
agent_actions: dict[str, list[AttackGraphNode]] = {}
total_rewards = {agent_config['name']: 0.0 for agent_config in agents}
total_rewards = {agent_name: 0.0 for agent_name in agents}

logger.info('Starting CLI env simulator.')
states = sim.reset()
Expand All @@ -1298,9 +1282,8 @@ def run_simulation(
actions: dict[str, list[AttackGraphNode]] = {}

# Select actions for each agent
for agent_config in agents:
decision_agent: Optional[DecisionAgent] = agent_config['agent']
agent_name = agent_config['name']
for agent_name, agent_config in agents.items():
decision_agent: Optional[DecisionAgent] = agent_config.policy
if decision_agent is None:
print(
f'Agent "{agent_name}" has no decision agent class '
Expand All @@ -1320,18 +1303,15 @@ def run_simulation(

# Perform next step of simulation
states = sim.step(actions)
for agent_config in agents:
total_rewards[agent_config['name']] += sim.agent_reward(
agent_config['name']
)
for agent_name in agents:
total_rewards[agent_name] += sim.agent_reward(agent_name)

print('---')

print(f'Simulation over after {sim.cur_iter} steps.')

# Print total rewards
for agent_config in agents:
agent_name = agent_config['name']
print(f'Total reward "{agent_name}"', total_rewards[agent_config['name']])
for agent_name in agents:
print(f'Total reward "{agent_name}"', total_rewards[agent_name])

return agent_actions
Loading