diff options
Diffstat (limited to 'modules/multiplayer/scene_replication_interface.cpp')
-rw-r--r-- | modules/multiplayer/scene_replication_interface.cpp | 435 |
1 files changed, 296 insertions, 139 deletions
diff --git a/modules/multiplayer/scene_replication_interface.cpp b/modules/multiplayer/scene_replication_interface.cpp index 6e3dbfab47..df9985916b 100644 --- a/modules/multiplayer/scene_replication_interface.cpp +++ b/modules/multiplayer/scene_replication_interface.cpp @@ -30,21 +30,47 @@ #include "scene_replication_interface.h" +#include "scene_multiplayer.h" + #include "core/io/marshalls.h" #include "scene/main/node.h" - -#include "multiplayer_spawner.h" -#include "multiplayer_synchronizer.h" -#include "scene_multiplayer.h" +#include "scene/scene_string_names.h" #define MAKE_ROOM(m_amount) \ if (packet_cache.size() < m_amount) \ packet_cache.resize(m_amount); -void SceneReplicationInterface::_free_remotes(int p_id) { - const HashMap<uint32_t, ObjectID> remotes = rep_state->peer_get_remotes(p_id); - for (const KeyValue<uint32_t, ObjectID> &E : remotes) { - Node *node = rep_state->get_node(E.value); +SceneReplicationInterface::TrackedNode &SceneReplicationInterface::_track(const ObjectID &p_id) { + if (!tracked_nodes.has(p_id)) { + tracked_nodes[p_id] = TrackedNode(p_id); + Node *node = get_id_as<Node>(p_id); + node->connect(SceneStringNames::get_singleton()->tree_exited, callable_mp(this, &SceneReplicationInterface::_untrack).bind(p_id), Node::CONNECT_ONE_SHOT); + } + return tracked_nodes[p_id]; +} + +void SceneReplicationInterface::_untrack(const ObjectID &p_id) { + if (!tracked_nodes.has(p_id)) { + return; + } + uint32_t net_id = tracked_nodes[p_id].net_id; + uint32_t peer = tracked_nodes[p_id].remote_peer; + tracked_nodes.erase(p_id); + // If it was spawned by a remote, remove it from the received nodes. + if (peer && peers_info.has(peer)) { + peers_info[peer].recv_nodes.erase(net_id); + } + // If we spawned or synced it, we need to remove it from any peer it was sent to. + if (net_id || peer == 0) { + for (KeyValue<int, PeerInfo> &E : peers_info) { + E.value.spawn_nodes.erase(p_id); + } + } +} + +void SceneReplicationInterface::_free_remotes(const PeerInfo &p_info) { + for (const KeyValue<uint32_t, ObjectID> &E : p_info.recv_nodes) { + Node *node = tracked_nodes.has(E.value) ? get_id_as<Node>(E.value) : nullptr; ERR_CONTINUE(!node); node->queue_delete(); } @@ -52,34 +78,48 @@ void SceneReplicationInterface::_free_remotes(int p_id) { void SceneReplicationInterface::on_peer_change(int p_id, bool p_connected) { if (p_connected) { - rep_state->on_peer_change(p_id, p_connected); - for (const ObjectID &oid : rep_state->get_spawned_nodes()) { + peers_info[p_id] = PeerInfo(); + for (const ObjectID &oid : spawned_nodes) { _update_spawn_visibility(p_id, oid); } - for (const ObjectID &oid : rep_state->get_synced_nodes()) { - MultiplayerSynchronizer *sync = rep_state->get_synchronizer(oid); - ERR_CONTINUE(!sync); // ERR_BUG - if (sync->is_multiplayer_authority()) { - _update_sync_visibility(p_id, oid); - } + for (const ObjectID &oid : sync_nodes) { + _update_sync_visibility(p_id, get_id_as<MultiplayerSynchronizer>(oid)); } } else { - _free_remotes(p_id); - rep_state->on_peer_change(p_id, p_connected); + ERR_FAIL_COND(!peers_info.has(p_id)); + _free_remotes(peers_info[p_id]); + peers_info.erase(p_id); } } void SceneReplicationInterface::on_reset() { - for (int pid : rep_state->get_peers()) { - _free_remotes(pid); + for (const KeyValue<int, PeerInfo> &E : peers_info) { + _free_remotes(E.value); + } + peers_info.clear(); + // Tracked nodes are cleared on deletion, here we only reset the ids so they can be later re-assigned. + for (KeyValue<ObjectID, TrackedNode> &E : tracked_nodes) { + TrackedNode &tobj = E.value; + tobj.net_id = 0; + tobj.remote_peer = 0; } - rep_state->reset(); + for (const ObjectID &oid : sync_nodes) { + MultiplayerSynchronizer *sync = get_id_as<MultiplayerSynchronizer>(oid); + ERR_CONTINUE(!sync); + sync->reset(); + } + last_net_id = 0; } void SceneReplicationInterface::on_network_process() { uint64_t msec = OS::get_singleton()->get_ticks_msec(); - for (int peer : rep_state->get_peers()) { - _send_sync(peer, msec); + for (KeyValue<int, PeerInfo> &E : peers_info) { + const HashSet<ObjectID> to_sync = E.value.sync_nodes; + if (to_sync.is_empty()) { + continue; // Nothing to sync + } + uint16_t sync_net_time = ++E.value.last_sent_sync; + _send_sync(E.key, to_sync, sync_net_time, msec); } } @@ -88,14 +128,19 @@ Error SceneReplicationInterface::on_spawn(Object *p_obj, Variant p_config) { ERR_FAIL_COND_V(!node || p_config.get_type() != Variant::OBJECT, ERR_INVALID_PARAMETER); MultiplayerSpawner *spawner = Object::cast_to<MultiplayerSpawner>(p_config.get_validated_object()); ERR_FAIL_COND_V(!spawner, ERR_INVALID_PARAMETER); - Error err = rep_state->config_add_spawn(node, spawner); - ERR_FAIL_COND_V(err != OK, err); + // Track node. const ObjectID oid = node->get_instance_id(); + TrackedNode &tobj = _track(oid); + ERR_FAIL_COND_V(tobj.spawner != ObjectID(), ERR_ALREADY_IN_USE); + tobj.spawner = spawner->get_instance_id(); + spawned_nodes.insert(oid); + if (multiplayer->has_multiplayer_peer() && spawner->is_multiplayer_authority()) { - rep_state->ensure_net_id(oid); + if (tobj.net_id == 0) { + tobj.net_id = ++last_net_id; + } _update_spawn_visibility(0, oid); } - ERR_FAIL_COND_V(err != OK, err); return OK; } @@ -109,14 +154,22 @@ Error SceneReplicationInterface::on_despawn(Object *p_obj, Variant p_config) { Error err = _make_despawn_packet(node, len); ERR_FAIL_COND_V(err != OK, ERR_BUG); const ObjectID oid = p_obj->get_instance_id(); - for (int pid : rep_state->get_peers()) { - if (!rep_state->is_peer_spawn(pid, oid)) { + for (const KeyValue<int, PeerInfo> &E : peers_info) { + if (!E.value.spawn_nodes.has(oid)) { continue; } - _send_raw(packet_cache.ptr(), len, pid, true); + _send_raw(packet_cache.ptr(), len, E.key, true); } // Also remove spawner tracking from the replication state. - return rep_state->config_del_spawn(node, spawner); + ERR_FAIL_COND_V(!tracked_nodes.has(oid), ERR_INVALID_PARAMETER); + TrackedNode &tobj = _track(oid); + ERR_FAIL_COND_V(tobj.spawner != spawner->get_instance_id(), ERR_INVALID_PARAMETER); + tobj.spawner = ObjectID(); + spawned_nodes.erase(oid); + for (KeyValue<int, PeerInfo> &E : peers_info) { + E.value.spawn_nodes.erase(oid); + } + return OK; } Error SceneReplicationInterface::on_replication_start(Object *p_obj, Variant p_config) { @@ -125,25 +178,40 @@ Error SceneReplicationInterface::on_replication_start(Object *p_obj, Variant p_c MultiplayerSynchronizer *sync = Object::cast_to<MultiplayerSynchronizer>(p_config.get_validated_object()); ERR_FAIL_COND_V(!sync, ERR_INVALID_PARAMETER); - // Add to synchronizer list and setup visibility. - rep_state->config_add_sync(node, sync); - const ObjectID oid = node->get_instance_id(); - sync->connect("visibility_changed", callable_mp(this, &SceneReplicationInterface::_visibility_changed).bind(oid)); - if (multiplayer->has_multiplayer_peer() && sync->is_multiplayer_authority()) { - _update_sync_visibility(0, oid); - } - - // Try to apply initial state if spawning (hack to apply if before ready). - if (pending_spawn == p_obj->get_instance_id()) { - pending_spawn = ObjectID(); // Make sure this only happens once. - const List<NodePath> props = sync->get_replication_config()->get_spawn_properties(); - Vector<Variant> vars; - vars.resize(props.size()); - int consumed; - Error err = MultiplayerAPI::decode_and_decompress_variants(vars, pending_buffer, pending_buffer_size, consumed); - ERR_FAIL_COND_V(err, err); - err = MultiplayerSynchronizer::set_state(props, node, vars); - ERR_FAIL_COND_V(err, err); + // Add to synchronizer list. + TrackedNode &tobj = _track(p_obj->get_instance_id()); + const ObjectID sid = sync->get_instance_id(); + tobj.synchronizers.insert(sid); + sync_nodes.insert(sid); + + // Update visibility. + sync->connect("visibility_changed", callable_mp(this, &SceneReplicationInterface::_visibility_changed).bind(sync->get_instance_id())); + _update_sync_visibility(0, sync); + + if (pending_spawn == p_obj->get_instance_id() && sync->get_multiplayer_authority() == pending_spawn_remote) { + // Try to apply synchronizer Net ID + ERR_FAIL_COND_V_MSG(pending_sync_net_ids.is_empty(), ERR_INVALID_DATA, vformat("The MultiplayerSynchronizer at path \"%s\" is unable to process the pending spawn since it has no network ID. This might happen when changing the multiplayer authority during the \"_ready\" callback. Make sure to only change the authority of multiplayer synchronizers during \"_enter_tree\" or the \"_spawn_custom\" callback of their multiplayer spawner.", sync->get_path())); + ERR_FAIL_COND_V(!peers_info.has(pending_spawn_remote), ERR_INVALID_DATA); + uint32_t net_id = pending_sync_net_ids[0]; + pending_sync_net_ids.pop_front(); + peers_info[pending_spawn_remote].recv_sync_ids[net_id] = sync->get_instance_id(); + + // Try to apply spawn state (before ready). + if (pending_buffer_size > 0) { + ERR_FAIL_COND_V(!node || sync->get_replication_config().is_null(), ERR_UNCONFIGURED); + int consumed = 0; + const List<NodePath> props = sync->get_replication_config()->get_spawn_properties(); + Vector<Variant> vars; + vars.resize(props.size()); + Error err = MultiplayerAPI::decode_and_decompress_variants(vars, pending_buffer, pending_buffer_size, consumed); + ERR_FAIL_COND_V(err, err); + if (consumed > 0) { + pending_buffer += consumed; + pending_buffer_size -= consumed; + err = MultiplayerSynchronizer::set_state(props, node, vars); + ERR_FAIL_COND_V(err, err); + } + } } return OK; } @@ -154,59 +222,98 @@ Error SceneReplicationInterface::on_replication_stop(Object *p_obj, Variant p_co MultiplayerSynchronizer *sync = Object::cast_to<MultiplayerSynchronizer>(p_config.get_validated_object()); ERR_FAIL_COND_V(!sync, ERR_INVALID_PARAMETER); sync->disconnect("visibility_changed", callable_mp(this, &SceneReplicationInterface::_visibility_changed)); - return rep_state->config_del_sync(node, sync); + // Untrack synchronizer. + const ObjectID oid = node->get_instance_id(); + const ObjectID sid = sync->get_instance_id(); + ERR_FAIL_COND_V(!tracked_nodes.has(oid), ERR_INVALID_PARAMETER); + TrackedNode &tobj = _track(oid); + tobj.synchronizers.erase(sid); + sync_nodes.erase(sid); + for (KeyValue<int, PeerInfo> &E : peers_info) { + E.value.sync_nodes.erase(sid); + if (sync->get_net_id()) { + E.value.recv_sync_ids.erase(sync->get_net_id()); + } + } + return OK; } -void SceneReplicationInterface::_visibility_changed(int p_peer, ObjectID p_oid) { - if (rep_state->is_spawned_node(p_oid)) { - _update_spawn_visibility(p_peer, p_oid); - } - if (rep_state->is_synced_node(p_oid)) { - _update_sync_visibility(p_peer, p_oid); +void SceneReplicationInterface::_visibility_changed(int p_peer, ObjectID p_sid) { + MultiplayerSynchronizer *sync = get_id_as<MultiplayerSynchronizer>(p_sid); + ERR_FAIL_COND(!sync); // Bug. + Node *node = sync->get_root_node(); + ERR_FAIL_COND(!node); // Bug. + const ObjectID oid = node->get_instance_id(); + if (spawned_nodes.has(oid)) { + _update_spawn_visibility(p_peer, oid); } + _update_sync_visibility(p_peer, sync); } -Error SceneReplicationInterface::_update_sync_visibility(int p_peer, const ObjectID &p_oid) { - MultiplayerSynchronizer *sync = rep_state->get_synchronizer(p_oid); - ERR_FAIL_COND_V(!sync || !sync->is_multiplayer_authority(), ERR_BUG); - bool is_visible = sync->is_visible_to(p_peer); +Error SceneReplicationInterface::_update_sync_visibility(int p_peer, MultiplayerSynchronizer *p_sync) { + ERR_FAIL_COND_V(!p_sync, ERR_BUG); + if (!multiplayer->has_multiplayer_peer() || !p_sync->is_multiplayer_authority()) { + return OK; + } + + const ObjectID &sid = p_sync->get_instance_id(); + bool is_visible = p_sync->is_visible_to(p_peer); if (p_peer == 0) { - for (int pid : rep_state->get_peers()) { + for (KeyValue<int, PeerInfo> &E : peers_info) { // Might be visible to this specific peer. - is_visible = is_visible || sync->is_visible_to(pid); - if (rep_state->is_peer_sync(pid, p_oid) == is_visible) { + is_visible = is_visible || p_sync->is_visible_to(E.key); + if (is_visible == E.value.sync_nodes.has(sid)) { continue; } if (is_visible) { - rep_state->peer_add_sync(pid, p_oid); + E.value.sync_nodes.insert(sid); } else { - rep_state->peer_del_sync(pid, p_oid); + E.value.sync_nodes.erase(sid); } } return OK; } else { - if (is_visible == rep_state->is_peer_sync(p_peer, p_oid)) { + ERR_FAIL_COND_V(!peers_info.has(p_peer), ERR_INVALID_PARAMETER); + if (is_visible == peers_info[p_peer].sync_nodes.has(sid)) { return OK; } if (is_visible) { - return rep_state->peer_add_sync(p_peer, p_oid); + peers_info[p_peer].sync_nodes.insert(sid); } else { - return rep_state->peer_del_sync(p_peer, p_oid); + peers_info[p_peer].sync_nodes.erase(sid); } + return OK; } } Error SceneReplicationInterface::_update_spawn_visibility(int p_peer, const ObjectID &p_oid) { - MultiplayerSpawner *spawner = rep_state->get_spawner(p_oid); - MultiplayerSynchronizer *sync = rep_state->get_synchronizer(p_oid); - Node *node = Object::cast_to<Node>(ObjectDB::get_instance(p_oid)); + const TrackedNode *tnode = tracked_nodes.getptr(p_oid); + ERR_FAIL_COND_V(!tnode, ERR_BUG); + MultiplayerSpawner *spawner = get_id_as<MultiplayerSpawner>(tnode->spawner); + Node *node = get_id_as<Node>(p_oid); ERR_FAIL_COND_V(!node || !spawner || !spawner->is_multiplayer_authority(), ERR_BUG); - bool is_visible = !sync || sync->is_visible_to(p_peer); + ERR_FAIL_COND_V(!tracked_nodes.has(p_oid), ERR_BUG); + const HashSet<ObjectID> synchronizers = tracked_nodes[p_oid].synchronizers; + bool is_visible = true; + for (const ObjectID &sid : synchronizers) { + MultiplayerSynchronizer *sync = get_id_as<MultiplayerSynchronizer>(sid); + ERR_CONTINUE(!sync); + if (!sync->is_multiplayer_authority()) { + continue; + } + // Spawn visibility is composed using OR when multiple synchronizers are present. + if (sync->is_visible_to(p_peer)) { + is_visible = true; + break; + } + is_visible = false; + } // Spawn (and despawn) when needed. HashSet<int> to_spawn; HashSet<int> to_despawn; if (p_peer) { - if (is_visible == rep_state->is_peer_spawn(p_peer, p_oid)) { + ERR_FAIL_COND_V(!peers_info.has(p_peer), ERR_INVALID_PARAMETER); + if (is_visible == peers_info[p_peer].spawn_nodes.has(p_oid)) { return OK; } if (is_visible) { @@ -216,33 +323,37 @@ Error SceneReplicationInterface::_update_spawn_visibility(int p_peer, const Obje } } else { // Check visibility for each peers. - for (int pid : rep_state->get_peers()) { - bool peer_visible = is_visible || sync->is_visible_to(pid); - if (peer_visible == rep_state->is_peer_spawn(pid, p_oid)) { - continue; - } - if (peer_visible) { - to_spawn.insert(pid); + for (const KeyValue<int, PeerInfo> &E : peers_info) { + if (is_visible) { + // This is fast, since the the object is visibile to everyone, we don't need to check each peer. + if (E.value.spawn_nodes.has(p_oid)) { + // Already spawned. + continue; + } + to_spawn.insert(E.key); } else { - to_despawn.insert(pid); + // Need to check visibility for each peer. + _update_spawn_visibility(E.key, p_oid); } } } if (to_spawn.size()) { int len = 0; - _make_spawn_packet(node, len); + _make_spawn_packet(node, spawner, len); for (int pid : to_spawn) { + ERR_CONTINUE(!peers_info.has(pid)); int path_id; multiplayer->get_path_cache()->send_object_cache(spawner, pid, path_id); _send_raw(packet_cache.ptr(), len, pid, true); - rep_state->peer_add_spawn(pid, p_oid); + peers_info[pid].spawn_nodes.insert(p_oid); } } if (to_despawn.size()) { int len = 0; _make_despawn_packet(node, len); for (int pid : to_despawn) { - rep_state->peer_del_spawn(pid, p_oid); + ERR_CONTINUE(!peers_info.has(pid)); + peers_info[pid].spawn_nodes.erase(p_oid); _send_raw(packet_cache.ptr(), len, pid, true); } } @@ -265,20 +376,20 @@ Error SceneReplicationInterface::_send_raw(const uint8_t *p_buffer, int p_size, return peer->put_packet(p_buffer, p_size); } -Error SceneReplicationInterface::_make_spawn_packet(Node *p_node, int &r_len) { - ERR_FAIL_COND_V(!multiplayer, ERR_BUG); +Error SceneReplicationInterface::_make_spawn_packet(Node *p_node, MultiplayerSpawner *p_spawner, int &r_len) { + ERR_FAIL_COND_V(!multiplayer || !p_node || !p_spawner, ERR_BUG); const ObjectID oid = p_node->get_instance_id(); - MultiplayerSpawner *spawner = rep_state->get_spawner(oid); - ERR_FAIL_COND_V(!spawner || !p_node, ERR_BUG); + const TrackedNode *tnode = tracked_nodes.getptr(oid); + ERR_FAIL_COND_V(!tnode, ERR_INVALID_PARAMETER); - uint32_t nid = rep_state->get_net_id(oid); + uint32_t nid = tnode->net_id; ERR_FAIL_COND_V(!nid, ERR_UNCONFIGURED); // Prepare custom arg and scene_id - uint8_t scene_id = spawner->find_spawnable_scene_index_from_object(oid); + uint8_t scene_id = p_spawner->find_spawnable_scene_index_from_object(oid); bool is_custom = scene_id == MultiplayerSpawner::INVALID_ID; - Variant spawn_arg = spawner->get_spawn_argument(oid); + Variant spawn_arg = p_spawner->get_spawn_argument(oid); int spawn_arg_size = 0; if (is_custom) { Error err = MultiplayerAPI::encode_and_compress_variant(spawn_arg, nullptr, spawn_arg_size, false); @@ -286,31 +397,51 @@ Error SceneReplicationInterface::_make_spawn_packet(Node *p_node, int &r_len) { } // Prepare spawn state. + List<NodePath> state_props; + List<uint32_t> sync_ids; + const HashSet<ObjectID> synchronizers = tnode->synchronizers; + for (const ObjectID &sid : synchronizers) { + MultiplayerSynchronizer *sync = get_id_as<MultiplayerSynchronizer>(sid); + if (!sync->is_multiplayer_authority()) { + continue; + } + ERR_CONTINUE(!sync); + ERR_FAIL_COND_V(sync->get_replication_config().is_null(), ERR_BUG); + for (const NodePath &prop : sync->get_replication_config()->get_spawn_properties()) { + state_props.push_back(prop); + } + // Ensure the synchronizer has an ID. + if (sync->get_net_id() == 0) { + sync->set_net_id(++last_net_id); + } + sync_ids.push_back(sync->get_net_id()); + } int state_size = 0; Vector<Variant> state_vars; Vector<const Variant *> state_varp; - MultiplayerSynchronizer *synchronizer = rep_state->get_synchronizer(oid); - if (synchronizer) { - ERR_FAIL_COND_V(synchronizer->get_replication_config().is_null(), ERR_BUG); - const List<NodePath> props = synchronizer->get_replication_config()->get_spawn_properties(); - Error err = MultiplayerSynchronizer::get_state(props, p_node, state_vars, state_varp); + if (state_props.size()) { + Error err = MultiplayerSynchronizer::get_state(state_props, p_node, state_vars, state_varp); ERR_FAIL_COND_V_MSG(err != OK, err, "Unable to retrieve spawn state."); err = MultiplayerAPI::encode_and_compress_variants(state_varp.ptrw(), state_varp.size(), nullptr, state_size); ERR_FAIL_COND_V_MSG(err != OK, err, "Unable to encode spawn state."); } // Encode scene ID, path ID, net ID, node name. - int path_id = multiplayer->get_path_cache()->make_object_cache(spawner); + int path_id = multiplayer->get_path_cache()->make_object_cache(p_spawner); CharString cname = p_node->get_name().operator String().utf8(); int nlen = encode_cstring(cname.get_data(), nullptr); - MAKE_ROOM(1 + 1 + 4 + 4 + 4 + nlen + (is_custom ? 4 + spawn_arg_size : 0) + state_size); + MAKE_ROOM(1 + 1 + 4 + 4 + 4 + 4 * sync_ids.size() + 4 + nlen + (is_custom ? 4 + spawn_arg_size : 0) + state_size); uint8_t *ptr = packet_cache.ptrw(); ptr[0] = (uint8_t)SceneMultiplayer::NETWORK_COMMAND_SPAWN; ptr[1] = scene_id; int ofs = 2; ofs += encode_uint32(path_id, &ptr[ofs]); ofs += encode_uint32(nid, &ptr[ofs]); + ofs += encode_uint32(sync_ids.size(), &ptr[ofs]); ofs += encode_uint32(nlen, &ptr[ofs]); + for (uint32_t snid : sync_ids) { + ofs += encode_uint32(snid, &ptr[ofs]); + } ofs += encode_cstring(cname.get_data(), &ptr[ofs]); // Write args if (is_custom) { @@ -331,18 +462,20 @@ Error SceneReplicationInterface::_make_spawn_packet(Node *p_node, int &r_len) { Error SceneReplicationInterface::_make_despawn_packet(Node *p_node, int &r_len) { const ObjectID oid = p_node->get_instance_id(); + const TrackedNode *tnode = tracked_nodes.getptr(oid); + ERR_FAIL_COND_V(!tnode, ERR_INVALID_PARAMETER); MAKE_ROOM(5); uint8_t *ptr = packet_cache.ptrw(); ptr[0] = (uint8_t)SceneMultiplayer::NETWORK_COMMAND_DESPAWN; int ofs = 1; - uint32_t nid = rep_state->get_net_id(oid); + uint32_t nid = tnode->net_id; ofs += encode_uint32(nid, &ptr[ofs]); r_len = ofs; return OK; } Error SceneReplicationInterface::on_spawn_receive(int p_from, const uint8_t *p_buffer, int p_buffer_len) { - ERR_FAIL_COND_V_MSG(p_buffer_len < 14, ERR_INVALID_DATA, "Invalid spawn packet received"); + ERR_FAIL_COND_V_MSG(p_buffer_len < 18, ERR_INVALID_DATA, "Invalid spawn packet received"); int ofs = 1; // The spawn/despawn command. uint8_t scene_id = p_buffer[ofs]; ofs += 1; @@ -354,9 +487,16 @@ Error SceneReplicationInterface::on_spawn_receive(int p_from, const uint8_t *p_b uint32_t net_id = decode_uint32(&p_buffer[ofs]); ofs += 4; + uint32_t sync_len = decode_uint32(&p_buffer[ofs]); + ofs += 4; uint32_t name_len = decode_uint32(&p_buffer[ofs]); ofs += 4; - ERR_FAIL_COND_V_MSG(name_len > uint32_t(p_buffer_len - ofs), ERR_INVALID_DATA, vformat("Invalid spawn packet size: %d, wants: %d", p_buffer_len, ofs + name_len)); + ERR_FAIL_COND_V_MSG(name_len + (sync_len * 4) > uint32_t(p_buffer_len - ofs), ERR_INVALID_DATA, vformat("Invalid spawn packet size: %d, wants: %d", p_buffer_len, ofs + name_len + (sync_len * 4))); + List<uint32_t> sync_ids; + for (uint32_t i = 0; i < sync_len; i++) { + sync_ids.push_back(decode_uint32(&p_buffer[ofs])); + ofs += 4; + } ERR_FAIL_COND_V_MSG(name_len < 1, ERR_INVALID_DATA, "Zero spawn name size."); // We need to make sure no trickery happens here, but we want to allow autogenerated ("@") node names. @@ -387,20 +527,35 @@ Error SceneReplicationInterface::on_spawn_receive(int p_from, const uint8_t *p_b } ERR_FAIL_COND_V(!node, ERR_UNAUTHORIZED); node->set_name(name); - rep_state->peer_add_remote(p_from, net_id, node, spawner); + + // Add and track remote + ERR_FAIL_COND_V(!peers_info.has(p_from), ERR_UNAVAILABLE); + ERR_FAIL_COND_V(peers_info[p_from].recv_nodes.has(net_id), ERR_ALREADY_IN_USE); + ObjectID oid = node->get_instance_id(); + TrackedNode &tobj = _track(oid); + tobj.spawner = spawner->get_instance_id(); + tobj.net_id = net_id; + tobj.remote_peer = p_from; + peers_info[p_from].recv_nodes[net_id] = oid; + // The initial state will be applied during the sync config (i.e. before _ready). - int state_len = p_buffer_len - ofs; - if (state_len) { - pending_spawn = node->get_instance_id(); - pending_buffer = &p_buffer[ofs]; - pending_buffer_size = state_len; - } + pending_spawn = node->get_instance_id(); + pending_spawn_remote = p_from; + pending_buffer_size = p_buffer_len - ofs; + pending_buffer = pending_buffer_size > 0 ? &p_buffer[ofs] : nullptr; + pending_sync_net_ids = sync_ids; + parent->add_child(node); spawner->emit_signal(SNAME("spawned"), node); pending_spawn = ObjectID(); + pending_spawn_remote = 0; pending_buffer = nullptr; pending_buffer_size = 0; + if (pending_sync_net_ids.size()) { + pending_sync_net_ids.clear(); + ERR_FAIL_V(ERR_INVALID_DATA); // Should have been consumed. + } return OK; } @@ -409,12 +564,18 @@ Error SceneReplicationInterface::on_despawn_receive(int p_from, const uint8_t *p int ofs = 1; // The spawn/despawn command. uint32_t net_id = decode_uint32(&p_buffer[ofs]); ofs += 4; - Node *node = nullptr; - Error err = rep_state->peer_del_remote(p_from, net_id, &node); - ERR_FAIL_COND_V(err != OK, err); + + // Untrack remote + ERR_FAIL_COND_V(!peers_info.has(p_from), ERR_UNAUTHORIZED); + PeerInfo &pinfo = peers_info[p_from]; + ERR_FAIL_COND_V(!pinfo.recv_nodes.has(net_id), ERR_UNAUTHORIZED); + Node *node = get_id_as<Node>(pinfo.recv_nodes[net_id]); ERR_FAIL_COND_V(!node, ERR_BUG); + pinfo.recv_nodes.erase(net_id); - MultiplayerSpawner *spawner = rep_state->get_spawner(node->get_instance_id()); + const ObjectID oid = node->get_instance_id(); + ERR_FAIL_COND_V(!tracked_nodes.has(oid), ERR_BUG); + MultiplayerSpawner *spawner = get_id_as<MultiplayerSpawner>(tracked_nodes[oid].spawner); ERR_FAIL_COND_V(!spawner, ERR_DOES_NOT_EXIST); ERR_FAIL_COND_V(p_from != spawner->get_multiplayer_authority(), ERR_UNAUTHORIZED); @@ -427,27 +588,24 @@ Error SceneReplicationInterface::on_despawn_receive(int p_from, const uint8_t *p return OK; } -void SceneReplicationInterface::_send_sync(int p_peer, uint64_t p_msec) { - const HashSet<ObjectID> &to_sync = rep_state->get_peer_sync_nodes(p_peer); - if (to_sync.is_empty()) { - return; - } +void SceneReplicationInterface::_send_sync(int p_peer, const HashSet<ObjectID> p_synchronizers, uint16_t p_sync_net_time, uint64_t p_msec) { MAKE_ROOM(sync_mtu); uint8_t *ptr = packet_cache.ptrw(); ptr[0] = SceneMultiplayer::NETWORK_COMMAND_SYNC; int ofs = 1; - ofs += encode_uint16(rep_state->peer_sync_next(p_peer), &ptr[1]); + ofs += encode_uint16(p_sync_net_time, &ptr[1]); // Can only send updates for already notified nodes. // This is a lazy implementation, we could optimize much more here with by grouping by replication config. - for (const ObjectID &oid : to_sync) { - if (!rep_state->update_sync_time(oid, p_msec)) { + for (const ObjectID &oid : p_synchronizers) { + MultiplayerSynchronizer *sync = get_id_as<MultiplayerSynchronizer>(oid); + ERR_CONTINUE(!sync || !sync->get_replication_config().is_valid() || !sync->is_multiplayer_authority()); + if (!sync->update_outbound_sync_time(p_msec)) { continue; // nothing to sync. } - MultiplayerSynchronizer *sync = rep_state->get_synchronizer(oid); - ERR_CONTINUE(!sync || !sync->get_replication_config().is_valid()); - Node *node = rep_state->get_node(oid); + + Node *node = sync->get_root_node(); ERR_CONTINUE(!node); - uint32_t net_id = rep_state->get_net_id(oid); + uint32_t net_id = sync->get_net_id(); if (net_id == 0 || (net_id & 0x80000000)) { int path_id = 0; bool verified = multiplayer->get_path_cache()->send_object_cache(sync, p_peer, path_id); @@ -455,7 +613,7 @@ void SceneReplicationInterface::_send_sync(int p_peer, uint64_t p_msec) { if (net_id == 0) { // First time path based ID. net_id = path_id | 0x80000000; - rep_state->set_net_id(oid, net_id | 0x80000000); + sync->set_net_id(net_id | 0x80000000); } if (!verified) { // The path based sync is not yet confirmed, skipping. @@ -478,7 +636,7 @@ void SceneReplicationInterface::_send_sync(int p_peer, uint64_t p_msec) { ofs = 3; } if (size) { - ofs += encode_uint32(rep_state->get_net_id(oid), &ptr[ofs]); + ofs += encode_uint32(sync->get_net_id(), &ptr[ofs]); ofs += encode_uint32(size, &ptr[ofs]); MultiplayerAPI::encode_and_compress_variants(varp.ptrw(), varp.size(), &ptr[ofs], size); ofs += size; @@ -494,33 +652,32 @@ Error SceneReplicationInterface::on_sync_receive(int p_from, const uint8_t *p_bu ERR_FAIL_COND_V_MSG(p_buffer_len < 11, ERR_INVALID_DATA, "Invalid sync packet received"); uint16_t time = decode_uint16(&p_buffer[1]); int ofs = 3; - rep_state->peer_sync_recv(p_from, time); while (ofs + 8 < p_buffer_len) { uint32_t net_id = decode_uint32(&p_buffer[ofs]); ofs += 4; uint32_t size = decode_uint32(&p_buffer[ofs]); ofs += 4; - Node *node = nullptr; + MultiplayerSynchronizer *sync = nullptr; if (net_id & 0x80000000) { - MultiplayerSynchronizer *sync = Object::cast_to<MultiplayerSynchronizer>(multiplayer->get_path_cache()->get_cached_object(p_from, net_id & 0x7FFFFFFF)); - ERR_FAIL_COND_V(!sync || sync->get_multiplayer_authority() != p_from, ERR_UNAUTHORIZED); - node = sync->get_node(sync->get_root_path()); - } else { - node = rep_state->peer_get_remote(p_from, net_id); + sync = Object::cast_to<MultiplayerSynchronizer>(multiplayer->get_path_cache()->get_cached_object(p_from, net_id & 0x7FFFFFFF)); + } else if (peers_info[p_from].recv_sync_ids.has(net_id)) { + const ObjectID &sid = peers_info[p_from].recv_sync_ids[net_id]; + sync = get_id_as<MultiplayerSynchronizer>(sid); } - if (!node) { + if (!sync) { // Not received yet. ofs += size; continue; } - const ObjectID oid = node->get_instance_id(); - if (!rep_state->update_last_node_sync(oid, time)) { + Node *node = sync->get_root_node(); + if (sync->get_multiplayer_authority() != p_from || !node) { + ERR_CONTINUE(true); + } + if (!sync->update_inbound_sync_time(time)) { // State is too old. ofs += size; continue; } - MultiplayerSynchronizer *sync = rep_state->get_synchronizer(oid); - ERR_FAIL_COND_V(!sync, ERR_BUG); ERR_FAIL_COND_V(size > uint32_t(p_buffer_len - ofs), ERR_BUG); const List<NodePath> props = sync->get_replication_config()->get_sync_properties(); Vector<Variant> vars; |