2626import org .bukkit .event .player .PlayerTeleportEvent ;
2727import org .bukkit .event .player .PlayerChangedWorldEvent ;
2828
29+ import java .util .concurrent .ExecutorService ;
30+ import java .util .concurrent .Executors ;
31+ import java .util .concurrent .TimeUnit ;
32+
2933import java .io .*;
3034import java .nio .charset .StandardCharsets ;
3135import java .util .*;
@@ -52,6 +56,8 @@ public class TuffX extends JavaPlugin implements Listener, PluginMessageListener
5256
5357 private boolean debug ;
5458
59+ private ExecutorService chunkProcessorPool ;
60+
5561 private void logDebug (String message ) {
5662 if (debug ) getLogger ().log (Level .INFO , "[TuffX-Debug] " + message );
5763 }
@@ -70,6 +76,19 @@ public void onEnable() {
7076 getServer ().getPluginManager ().registerEvents (this , this );
7177 if (this .viablockids == null ) this .viablockids = new ViaBlockIds (this );
7278 logFancyEnable ();
79+
80+ int configuredThreads = getConfig ().getInt ("chunk-processor-threads" , -1 );
81+ int threadCount ;
82+ if (configuredThreads <= 0 ) {
83+ threadCount = Math .max (1 , Math .min (4 , Runtime .getRuntime ().availableProcessors () / 2 ));
84+ logDebug ("Auto-detected and using " + threadCount + " threads for the chunk processor pool." );
85+ } else {
86+ threadCount = configuredThreads ;
87+ getLogger ().info ("Using user-configured thread count of " + threadCount + " for the chunk processor pool." );
88+ }
89+
90+ this .chunkProcessorPool = Executors .newFixedThreadPool (threadCount );
91+
7392 startProcessorTask ();
7493 }
7594
@@ -78,6 +97,20 @@ public record ChunkSectionCoord(int cx, int cy, int cz) {}
7897 @ Override
7998 public void onDisable () {
8099 if (processorTask != null ) processorTask .cancel ();
100+
101+ if (chunkProcessorPool != null ) {
102+ chunkProcessorPool .shutdown ();
103+ try {
104+ if (!chunkProcessorPool .awaitTermination (5 , TimeUnit .SECONDS )) {
105+ getLogger ().warning ("Chunk processor pool did not shut down cleanly, forcing shutdown." );
106+ chunkProcessorPool .shutdownNow ();
107+ }
108+ } catch (InterruptedException e ) {
109+ chunkProcessorPool .shutdownNow ();
110+ Thread .currentThread ().interrupt ();
111+ }
112+ }
113+
81114 requestQueue .clear ();
82115 awaitingInitialBatch .clear ();
83116 initialChunksToProcess .clear ();
@@ -216,55 +249,57 @@ public void onPlayerChangeWorld(PlayerChangedWorldEvent event) {
216249
217250 Queue <Vector > playerQueue = requestQueue .get (playerId );
218251 if (playerQueue != null && !playerQueue .isEmpty ()) {
219- logDebug ("Player " + player .getName () + " changed worlds. Clearing " + playerQueue .size () + " pending chunk requests." );
220- playerQueue .clear ();
221- }
222-
223- if (initialChunksToProcess .remove (playerId ) != null ) {
224- logDebug ("Player " + player .getName () + " was in the middle of an initial chunk load. The process has been cancelled." );
225- awaitingInitialBatch .remove (playerId );
226- player .sendPluginMessage (this , CHANNEL , createLoadFinishedPayload ());
227- }
228-
229- player .sendPluginMessage (this , CHANNEL , createDimensionPayload ());
252+ logDebug ("Player " + player .getName () + " changed worlds. Clearing " + playerQueue .size () + " pending chunk requests." );
253+ playerQueue .clear ();
254+ }
255+
256+ if (initialChunksToProcess .remove (playerId ) != null ) {
257+ logDebug ("Player " + player .getName () + " was in the middle of an initial chunk load. The process has been cancelled." );
258+ awaitingInitialBatch .remove (playerId );
259+ player .sendPluginMessage (this , CHANNEL , createLoadFinishedPayload ());
260+ }
230261
231- player .sendPluginMessage (this , CHANNEL , createBelowY0StatusPayload (enabledWorlds .contains (player .getWorld ().getName ())));
232- }
262+ player .sendPluginMessage (this , CHANNEL , createDimensionPayload ());
233263
234- private void processAndSendChunk ( final Player player , final Chunk chunk ) {
235- if ( chunk == null || ! player . isOnline ()) return ;
264+ player . sendPluginMessage ( this , CHANNEL , createBelowY0StatusPayload ( enabledWorlds . contains ( player . getWorld (). getName ())));
265+ }
236266
237- final Vector chunkVec = new Vector (chunk .getX (), 0 , chunk .getZ ());
238- logDebug ("Processing chunk " + chunk .getX () + "," + chunk .getZ () + " for " + player .getName ());
267+ private void processAndSendChunk (final Player player , final Chunk chunk ) {
268+ if (chunk == null || !player .isOnline () || chunkProcessorPool .isShutdown ()) {
269+ return ;
270+ }
239271
240- new BukkitRunnable () {
241- @ Override
242- public void run () {
243- final ChunkSnapshot snapshot = chunk .getChunkSnapshot (true , false , false );
244- final Map <BlockData , int []> conversionCache = new HashMap <>();
272+ chunkProcessorPool .submit (() -> {
273+ final List <byte []> processedPayloads = new ArrayList <>();
274+ final ChunkSnapshot snapshot = chunk .getChunkSnapshot (true , false , false );
275+ final Map <BlockData , int []> conversionCache = new HashMap <>();
245276
246- for (int sectionY = -4 ; sectionY < 0 ; sectionY ++) {
247- if (!player .isOnline ()) break ;
248- try {
249- byte [] payload = createSectionPayload (snapshot , chunk .getX (), chunk .getZ (), sectionY , conversionCache );
250- if (payload != null ) {
251- player .sendPluginMessage (TuffX .this , CHANNEL , payload );
252- }
253- } catch (IOException e ) {
254- getLogger ().severe ("Payload creation failed for " + chunk .getX () + "," + chunk .getZ () + ": " + e .getMessage ());
277+ for (int sectionY = -4 ; sectionY < 0 ; sectionY ++) {
278+ if (!player .isOnline ()) {
279+ return ;
280+ }
281+ try {
282+ byte [] payload = createSectionPayload (snapshot , chunk .getX (), chunk .getZ (), sectionY , conversionCache );
283+ if (payload != null ) {
284+ processedPayloads .add (payload );
255285 }
286+ } catch (IOException e ) {
287+ getLogger ().severe ("Payload creation failed for " + chunk .getX () + "," + chunk .getZ () + ": " + e .getMessage ());
256288 }
289+ }
257290
258- new BukkitRunnable () {
259- @ Override
260- public void run () {
261- if (player .isOnline ()) {
262- checkIfInitialLoadComplete (player );
291+ new BukkitRunnable () {
292+ @ Override
293+ public void run () {
294+ if (player .isOnline ()) {
295+ for (byte [] payload : processedPayloads ) {
296+ player .sendPluginMessage (TuffX .this , CHANNEL , payload );
263297 }
298+ checkIfInitialLoadComplete (player );
264299 }
265- }. runTask ( TuffX . this );
266- }
267- }. runTaskAsynchronously ( this );
300+ }
301+ }. runTask ( this );
302+ });
268303 }
269304
270305 private void checkIfInitialLoadComplete (Player player ) {
@@ -318,36 +353,53 @@ private byte[] createWelcomePayload(String message, int someNumber) {
318353 }
319354
320355 private byte [] createSectionPayload (ChunkSnapshot snapshot , int cx , int cz , int sectionY , Map <BlockData , int []> cache ) throws IOException {
321- try (ByteArrayOutputStream bout = new ByteArrayOutputStream (12300 ); DataOutputStream out = new DataOutputStream (bout )) {
356+ short [] blockDataArray = new short [4096 ];
357+ byte [] lightDataArray = new byte [4096 ];
358+
359+ boolean hasAnythingToSend = false ;
360+ int baseY = sectionY * 16 ;
361+ int index = 0 ;
362+
363+ for (int y = 0 ; y < 16 ; y ++) {
364+ for (int z = 0 ; z < 16 ; z ++) {
365+ for (int x = 0 ; x < 16 ; x ++) {
366+ int worldY = baseY + y ;
367+
368+ BlockData blockData = snapshot .getBlockData (x , worldY , z );
369+ int [] legacyData = cache .computeIfAbsent (blockData , viablockids ::toLegacy );
370+
371+ short legacyBlock = (short ) ((legacyData [1 ] << 12 ) | (legacyData [0 ] & 0xFFF ));
372+ byte packedLight = (byte ) ((snapshot .getBlockSkyLight (x , worldY , z ) << 4 ) | snapshot .getBlockEmittedLight (x , worldY , z ));
373+
374+ blockDataArray [index ] = legacyBlock ;
375+ lightDataArray [index ] = packedLight ;
376+
377+ if (legacyBlock != 0 || packedLight != 0 ) {
378+ hasAnythingToSend = true ;
379+ }
380+ index ++;
381+ }
382+ }
383+ }
384+
385+ if (!hasAnythingToSend ) {
386+ return null ;
387+ }
388+
389+ try (ByteArrayOutputStream bout = new ByteArrayOutputStream (8256 );
390+ DataOutputStream out = new DataOutputStream (bout )) {
391+
322392 out .writeUTF ("chunk_data" );
323393 out .writeInt (cx );
324394 out .writeInt (cz );
325395 out .writeInt (sectionY );
326396
327- boolean hasAnythingToSend = false ;
328- int baseY = sectionY * 16 ;
329-
330- for (int y = 0 ; y < 16 ; y ++) {
331- for (int z = 0 ; z < 16 ; z ++) {
332- for (int x = 0 ; x < 16 ; x ++) {
333- int worldY = baseY + y ;
334-
335- BlockData blockData = snapshot .getBlockData (x , worldY , z );
336- int [] legacyData = cache .computeIfAbsent (blockData , viablockids ::toLegacy );
337- out .writeShort ((short ) ((legacyData [1 ] << 12 ) | (legacyData [0 ] & 0xFFF )));
338-
339- int blockLight = snapshot .getBlockEmittedLight (x , worldY , z );
340- int skyLight = snapshot .getBlockSkyLight (x , worldY , z );
341- out .writeByte ((byte ) ((skyLight << 4 ) | blockLight ));
342-
343- if (legacyData [0 ] != 0 || blockLight != 0 || skyLight != 0 ) {
344- hasAnythingToSend = true ;
345- }
346- }
347- }
397+ for (int i = 0 ; i < 4096 ; i ++) {
398+ out .writeShort (blockDataArray [i ]);
399+ out .writeByte (lightDataArray [i ]);
348400 }
349401
350- return hasAnythingToSend ? bout .toByteArray () : null ;
402+ return bout .toByteArray ();
351403 }
352404 }
353405
0 commit comments