2323import org .bukkit .scheduler .BukkitTask ;
2424import org .bukkit .util .Vector ;
2525import org .bukkit .event .block .BlockPhysicsEvent ;
26+ import org .bukkit .event .player .PlayerTeleportEvent ;
2627
2728import java .io .*;
2829import java .nio .charset .StandardCharsets ;
2930import java .util .*;
3031import java .util .concurrent .ConcurrentHashMap ;
32+ import java .util .concurrent .atomic .AtomicInteger ;
3133import java .util .concurrent .ConcurrentLinkedQueue ;
3234
3335import java .util .logging .Level ;
@@ -40,10 +42,9 @@ public class TuffX extends JavaPlugin implements Listener, PluginMessageListener
4042 private static final int CHUNKS_PER_TICK = 2 ;
4143
4244 private final Map <UUID , Queue <Vector >> requestQueue = new ConcurrentHashMap <>();
43- private final Map <UUID , Set <Vector >> currentlyQueued = new ConcurrentHashMap <>();
4445
45- private final Map <UUID , Boolean > initialLoadingPlayers = new ConcurrentHashMap <> ();
46- private final Map <UUID , Integer > initialChunksToProcess = new ConcurrentHashMap <>();
46+ private final Set <UUID > awaitingInitialBatch = ConcurrentHashMap . newKeySet ();
47+ private final Map <UUID , AtomicInteger > initialChunksToProcess = new ConcurrentHashMap <>();
4748
4849 private BukkitTask processorTask ;
4950
@@ -57,14 +58,16 @@ public void onEnable() {
5758 getServer ().getMessenger ().registerIncomingPluginChannel (this , CHANNEL , this );
5859 getServer ().getPluginManager ().registerEvents (this , this );
5960 if (this .viablockids == null ) this .viablockids = new ViaBlockIds (this );
60- startProcessorTask ();
6161 logFancyEnable ();
62+ startProcessorTask ();
6263 }
6364
6465 @ Override
6566 public void onDisable () {
6667 if (processorTask != null ) processorTask .cancel ();
6768 requestQueue .clear ();
69+ awaitingInitialBatch .clear ();
70+ initialChunksToProcess .clear ();
6871 getLogger ().info ("TuffX has been disabled." );
6972 }
7073
@@ -79,48 +82,48 @@ public void onPluginMessageReceived(String channel, Player player, byte[] messag
7982 byte [] actionBytes = new byte [actionLength ];
8083 in .readFully (actionBytes );
8184 String action = new String (actionBytes , StandardCharsets .UTF_8 );
82- handleIncomingPacket (player , new Location (player .getWorld (), x , y , z ), action , x , z );
85+ handleIncomingPacket (player , new Location (player .getWorld (), x , y , z ), action , x , z , in );
8386 } catch (IOException e ) {
8487 getLogger ().warning ("Failed to parse plugin message from " + player .getName () + ": " + e .getMessage ());
8588 }
8689 }
90+
91+ private void handleSingleChunkRequest (Player player , int chunkX , int chunkZ , UUID playerId ) {
92+ requestQueue .computeIfAbsent (player .getUniqueId (), k -> new ConcurrentLinkedQueue <>()).add (new Vector (chunkX , 0 , chunkZ ));
93+ }
8794
88- private void handleIncomingPacket (Player player , Location loc , String action , int chunkX , int chunkZ ) {
95+ private void handleIncomingPacket (Player player , Location loc , String action , int chunkX , int chunkZ , DataInputStream in ) throws IOException {
8996 UUID playerId = player .getUniqueId ();
9097 switch (action .toLowerCase ()) {
9198 case "request_chunk" :
92- Vector chunkVec = new Vector (chunkX , 0 , chunkZ );
93- Set <Vector > queuedSet = currentlyQueued .computeIfAbsent (playerId , k -> ConcurrentHashMap .newKeySet ());
99+ handleSingleChunkRequest (player ,chunkX ,chunkZ ,playerId );
100+ break ;
101+ case "request_chunk_batch" :
102+ if (awaitingInitialBatch .remove (playerId )) {
103+ int batchSize = in .readInt ();
104+ logDebug ("Received definitive initial batch of " + batchSize + " chunks. Queueing for processing." );
94105
95- if (queuedSet .add (chunkVec )) {
96- requestQueue .computeIfAbsent (playerId , k -> new ConcurrentLinkedQueue <>()).add (chunkVec );
106+ initialChunksToProcess .put (playerId , new AtomicInteger (batchSize ));
97107
98- if (initialLoadingPlayers .getOrDefault (playerId , false )) {
99- initialChunksToProcess .merge (playerId , 1 , Integer ::sum );
100- logDebug ("Player " + player .getName () + " needs chunk " + chunkX + "," + chunkZ + ". Total initial chunks to process: " + initialChunksToProcess .get (playerId ));
108+ Queue <Vector > playerQueue = requestQueue .computeIfAbsent (playerId , k -> new ConcurrentLinkedQueue <>());
109+ for (int i = 0 ; i < batchSize ; i ++) {
110+ playerQueue .add (new Vector (in .readInt (), 0 , in .readInt ()));
111+ }
112+
113+ if (batchSize == 0 ) {
114+ checkIfInitialLoadComplete (player );
115+ }
116+ } else {
117+ int batchSize = in .readInt ();
118+ Queue <Vector > playerQueue = requestQueue .computeIfAbsent (playerId , k -> new ConcurrentLinkedQueue <>());
119+ for (int i = 0 ; i < batchSize ; i ++) {
120+ playerQueue .add (new Vector (in .readInt (), 0 , in .readInt ()));
101121 }
102122 }
103123 break ;
104124 case "ready" :
105- logDebug ("Player " + player .getName () + " sent READY packet. Starting initial load sequence." );
106-
107- initialLoadingPlayers .put (playerId , true );
108- initialChunksToProcess .put (playerId , 0 );
109-
110- new BukkitRunnable () {
111- @ Override
112- public void run () {
113- if (initialLoadingPlayers .containsKey (playerId )) {
114- initialLoadingPlayers .put (playerId , false ); // Lock the state.
115- logDebug ("Player " + player .getName () + " initial chunk requests locked in at " + initialChunksToProcess .getOrDefault (playerId , 0 ) + " chunks." );
116-
117- if (initialChunksToProcess .getOrDefault (playerId , 0 ) == 0 ) {
118- checkIfInitialLoadComplete (player );
119- }
120- }
121- }
122- }.runTaskLater (this , 5L );
123-
125+ logDebug ("Player " + player .getName () + " is READY. Awaiting first chunk batch..." );
126+ awaitingInitialBatch .add (player .getUniqueId ());
124127 player .sendPluginMessage (this , CHANNEL , createBelowY0StatusPayload (true ));
125128 break ;
126129 case "use_on_block" :
@@ -150,30 +153,22 @@ private void startProcessorTask() {
150153 public void run () {
151154 for (UUID playerUUID : new HashSet <>(requestQueue .keySet ())) {
152155 Player player = getServer ().getPlayer (playerUUID );
153- Queue <Vector > queue = requestQueue .get (playerUUID );
154-
155156 if (player == null || !player .isOnline ()) {
156- requestQueue .remove (playerUUID );
157- initialLoadingPlayers .remove (playerUUID );
158- initialChunksToProcess .remove (playerUUID );
159- currentlyQueued .remove (playerUUID );
157+ cleanupPlayer (playerUUID );
160158 continue ;
161159 }
162160
163- if (queue == null || queue .isEmpty ()) {
164- continue ;
165- }
166-
167- for (int i = 0 ; i < CHUNKS_PER_TICK && !queue .isEmpty (); i ++) {
168- Vector vec = queue .poll ();
169- if (vec != null ) {
170- World world = player .getWorld ();
171- int cx = vec .getBlockX ();
172- int cz = vec .getBlockZ ();
173- if (world .isChunkLoaded (cx , cz )) {
174- processAndSendChunk (player , world .getChunkAt (cx , cz ));
175- } else {
176- queue .add (vec );
161+ Queue <Vector > queue = requestQueue .get (playerUUID );
162+ if (queue != null && !queue .isEmpty ()) {
163+ for (int i = 0 ; i < CHUNKS_PER_TICK && !queue .isEmpty (); i ++) {
164+ Vector vec = queue .poll ();
165+ if (vec != null ) {
166+ World world = player .getWorld ();
167+ if (world .isChunkLoaded (vec .getBlockX (), vec .getBlockZ ())) {
168+ processAndSendChunk (player , world .getChunkAt (vec .getBlockX (), vec .getBlockZ ()));
169+ } else {
170+ queue .add (vec );
171+ }
177172 }
178173 }
179174 }
@@ -209,13 +204,9 @@ public void run() {
209204 new BukkitRunnable () {
210205 @ Override
211206 public void run () {
212- if (!player .isOnline ()) return ;
213-
214- Set <Vector > queuedSet = currentlyQueued .get (player .getUniqueId ());
215- if (queuedSet != null ) {
216- queuedSet .remove (chunkVec );
207+ if (player .isOnline ()) {
208+ checkIfInitialLoadComplete (player );
217209 }
218- checkIfInitialLoadComplete (player );
219210 }
220211 }.runTask (TuffX .this );
221212 }
@@ -224,18 +215,17 @@ public void run() {
224215
225216 private void checkIfInitialLoadComplete (Player player ) {
226217 UUID playerId = player .getUniqueId ();
227-
228- if ( initialChunksToProcess . containsKey ( playerId )) {
229- int remaining = initialChunksToProcess . compute ( playerId , ( k , v ) -> ( v == null ) ? - 1 : v - 1 );
230-
218+ AtomicInteger counter = initialChunksToProcess . get ( playerId );
219+
220+ if ( counter != null ) {
221+ int remaining = counter . decrementAndGet ();
231222 logDebug ("Player " + player .getName () + " finished a chunk. Remaining initial chunks: " + remaining );
232223
233224 if (remaining <= 0 ) {
234- initialLoadingPlayers .remove (playerId );
235- initialChunksToProcess .remove (playerId );
236-
237- player .sendPluginMessage (this , CHANNEL , createLoadFinishedPayload ());
238225 logDebug ("INITIAL LOAD COMPLETE for " + player .getName () + ". Sent finished packet." );
226+ player .sendPluginMessage (this , CHANNEL , createLoadFinishedPayload ());
227+
228+ initialChunksToProcess .remove (playerId );
239229 }
240230 }
241231 }
@@ -253,14 +243,16 @@ private byte[] createLoadFinishedPayload() {
253243 }
254244 }
255245
256- @ EventHandler
257- public void onPlayerQuit (PlayerQuitEvent event ) {
258- UUID playerId = event .getPlayer ().getUniqueId ();
246+ private void cleanupPlayer (UUID playerId ) {
259247 requestQueue .remove (playerId );
260- initialLoadingPlayers .remove (playerId );
261- currentlyQueued .remove (playerId );
248+ awaitingInitialBatch .remove (playerId );
262249 initialChunksToProcess .remove (playerId );
263250 }
251+
252+ @ EventHandler
253+ public void onPlayerQuit (PlayerQuitEvent event ) {
254+ cleanupPlayer (event .getPlayer ().getUniqueId ());
255+ }
264256
265257 private byte [] createWelcomePayload (String message , int someNumber ) {
266258 try (ByteArrayOutputStream bout = new ByteArrayOutputStream (); DataOutputStream out = new DataOutputStream (bout )) {
0 commit comments