From 8edbda11f25801f30b7691b8d85e7924d1c3e2c1 Mon Sep 17 00:00:00 2001 From: Luis Casillas Date: Thu, 17 Apr 2014 18:55:54 -0700 Subject: [PATCH 01/10] Started refactoring the library to give the EventListener more control over whether to stop the BinaryLogClient. --- .../mysql/binlog/AbstractBinaryLogClient.java | 426 ++++++++++++ .../shyiko/mysql/binlog/BinaryLogClient.java | 610 +++++------------- .../binlog/jmx/BinaryLogClientStatistics.java | 9 +- 3 files changed, 588 insertions(+), 457 deletions(-) create mode 100644 src/main/java/com/github/shyiko/mysql/binlog/AbstractBinaryLogClient.java diff --git a/src/main/java/com/github/shyiko/mysql/binlog/AbstractBinaryLogClient.java b/src/main/java/com/github/shyiko/mysql/binlog/AbstractBinaryLogClient.java new file mode 100644 index 00000000..050ac753 --- /dev/null +++ b/src/main/java/com/github/shyiko/mysql/binlog/AbstractBinaryLogClient.java @@ -0,0 +1,426 @@ +package com.github.shyiko.mysql.binlog; + +import com.github.shyiko.mysql.binlog.event.*; +import com.github.shyiko.mysql.binlog.event.deserialization.ChecksumType; +import com.github.shyiko.mysql.binlog.event.deserialization.EventDeserializer; +import com.github.shyiko.mysql.binlog.io.ByteArrayInputStream; +import com.github.shyiko.mysql.binlog.jmx.BinaryLogClientMXBean; +import com.github.shyiko.mysql.binlog.network.AuthenticationException; +import com.github.shyiko.mysql.binlog.network.SocketFactory; +import com.github.shyiko.mysql.binlog.network.protocol.ErrorPacket; +import com.github.shyiko.mysql.binlog.network.protocol.GreetingPacket; +import com.github.shyiko.mysql.binlog.network.protocol.PacketChannel; +import com.github.shyiko.mysql.binlog.network.protocol.ResultSetRowPacket; +import com.github.shyiko.mysql.binlog.network.protocol.command.AuthenticateCommand; +import com.github.shyiko.mysql.binlog.network.protocol.command.DumpBinaryLogCommand; +import com.github.shyiko.mysql.binlog.network.protocol.command.PingCommand; +import com.github.shyiko.mysql.binlog.network.protocol.command.QueryCommand; + +import java.io.EOFException; +import java.io.IOException; +import java.net.InetSocketAddress; +import java.net.Socket; +import java.util.Arrays; +import java.util.LinkedList; +import java.util.List; +import java.util.concurrent.*; +import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; +import java.util.logging.Level; +import java.util.logging.Logger; + +/** + * @author Luis Casillas + */ +public abstract class AbstractBinaryLogClient implements BinaryLogClientMXBean { + protected final String hostname; + protected final int port; + protected final String schema; + protected final String username; + protected final String password; + private final Logger logger = Logger.getLogger(getClass().getName()); + private final Lock shutdownLock = new ReentrantLock(); + private long serverId = 65535; + private volatile String binlogFilename; + private volatile long binlogPosition; + private EventDeserializer eventDeserializer = new EventDeserializer(); + private SocketFactory socketFactory; + private PacketChannel channel; + private volatile boolean connected; + private boolean keepAlive = true; + private long keepAliveInterval = TimeUnit.MINUTES.toMillis(1); + private long keepAliveConnectTimeout = TimeUnit.SECONDS.toMillis(3); + private volatile ThreadPoolExecutor keepAliveThreadExecutor; + private long keepAliveThreadShutdownTimeout = TimeUnit.SECONDS.toMillis(6); + + + public AbstractBinaryLogClient(String username, int port, String hostname, String password, String schema) { + this.username = username; + this.port = port; + this.hostname = hostname; + this.password = password; + this.schema = schema; + } + + /** + * @return server id (65535 by default) + */ + public long getServerId() { + return serverId; + } + + /** + * @param serverId server id (in the range from 1 to 2^32 – 1). This value MUST be unique across whole replication + * group (that is, different from any other server id being used by any master or slave). Keep in mind that each + * binary log client (mysql-binlog-connector-java/BinaryLogClient, mysqlbinlog, etc) should be treated as a + * simplified slave and thus MUST also use a different server id. + */ + public void setServerId(long serverId) { + this.serverId = serverId; + } + + /** + * @return binary log filename, nullable. Note that this value is automatically tracked by the client and thus + * is subject to change (in response to {@link com.github.shyiko.mysql.binlog.event.EventType#ROTATE}, for example). + */ + public String getBinlogFilename() { + return binlogFilename; + } + + /** + * @param binlogFilename binary log filename (null indicates automatic resolution). + */ + public void setBinlogFilename(String binlogFilename) { + this.binlogFilename = binlogFilename; + } + + /** + * @return binary log position of the next event. Note that this value changes with each incoming event. + */ + public long getBinlogPosition() { + return binlogPosition; + } + + /** + * @param binlogPosition binary log position + */ + public void setBinlogPosition(long binlogPosition) { + this.binlogPosition = binlogPosition; + } + + /** + * @param keepAlive true if "keep alive" thread should be automatically started (recommended and true by default), + * false otherwise. + */ + public void setKeepAlive(boolean keepAlive) { + this.keepAlive = keepAlive; + } + + /** + * @param keepAliveInterval "keep alive" interval in milliseconds. + */ + public void setKeepAliveInterval(long keepAliveInterval) { + this.keepAliveInterval = keepAliveInterval; + } + + /** + * @param keepAliveConnectTimeout "keep alive" connect interval in milliseconds. + */ + public void setKeepAliveConnectTimeout(long keepAliveConnectTimeout) { + this.keepAliveConnectTimeout = keepAliveConnectTimeout; + } + + /** + * @param eventDeserializer custom event deserializer + */ + public void setEventDeserializer(EventDeserializer eventDeserializer) { + if (eventDeserializer == null) { + throw new IllegalArgumentException("Event deserializer cannot be NULL"); + } + this.eventDeserializer = eventDeserializer; + } + + /** + * @param socketFactory custom socket factory. If not provided, socket will be created with "new Socket()". + */ + public void setSocketFactory(SocketFactory socketFactory) { + this.socketFactory = socketFactory; + } + + /** + * Connect to the replication stream. Note that this method blocks until disconnected. + * @throws com.github.shyiko.mysql.binlog.network.AuthenticationException in case of failed authentication + * @throws java.io.IOException if anything goes wrong while trying to connect + */ + public void connect() throws IOException { + if (connected) { + throw new IllegalStateException("BinaryLogClient is already connected"); + } + try { + try { + Socket socket = socketFactory != null ? socketFactory.createSocket() : new Socket(); + socket.connect(new InetSocketAddress(hostname, port)); + channel = new PacketChannel(socket); + if (channel.getInputStream().peek() == -1) { + throw new EOFException(); + } + } catch (IOException e) { + throw new IOException("Failed to connect to MySQL on " + hostname + ":" + port + + ". Please make sure it's running.", e); + } + GreetingPacket greetingPacket = new GreetingPacket(channel.read()); + AuthenticateCommand authenticateCommand = new AuthenticateCommand(schema, username, password, + greetingPacket.getScramble()); + authenticateCommand.setCollation(greetingPacket.getServerCollation()); + channel.write(authenticateCommand); + byte[] authenticationResult = channel.read(); + if (authenticationResult[0] != (byte) 0x00 /* ok */) { + if (authenticationResult[0] == (byte) 0xFF /* error */) { + byte[] bytes = Arrays.copyOfRange(authenticationResult, 1, authenticationResult.length); + throw new AuthenticationException(new ErrorPacket(bytes).getErrorMessage()); + } + throw new AuthenticationException("Unexpected authentication result (" + authenticationResult[0] + ")"); + } + if (binlogFilename == null) { + fetchBinlogFilenameAndPosition(); + } + ChecksumType checksumType = fetchBinlogChecksum(); + if (checksumType != ChecksumType.NONE) { + confirmSupportOfChecksum(checksumType); + } + channel.write(new DumpBinaryLogCommand(serverId, binlogFilename, binlogPosition)); + } catch (IOException e) { + if (channel != null && channel.isOpen()) { + channel.close(); + } + throw e; + } + connected = true; + if (logger.isLoggable(Level.INFO)) { + logger.info("Connected to " + hostname + ":" + port + " at " + binlogFilename + "/" + binlogPosition); + } + getLifecycleListener().onConnect(this); + if (keepAlive && !isKeepAliveThreadRunning()) { + spawnKeepAliveThread(); + } + listenForEventPackets(); + } + + private void spawnKeepAliveThread() { + keepAliveThreadExecutor = newSingleDaemonThreadExecutor("blc-keepalive-" + hostname + ":" + port); + keepAliveThreadExecutor.submit(new Runnable() { + @Override + public void run() { + while (true) { + try { + Thread.sleep(keepAliveInterval); + } catch (InterruptedException e) { + // expected in case of disconnect + } + shutdownLock.lock(); + try { + if (keepAliveThreadExecutor.isShutdown()) { + return; + } + try { + channel.write(new PingCommand()); + } catch (IOException e) { + if (logger.isLoggable(Level.INFO)) { + logger.info("Trying to restore lost connection to " + hostname + ":" + port); + } + try { + if (isConnected()) { + disconnectChannel(); + } + connect(keepAliveConnectTimeout); + } catch (Exception ce) { + if (logger.isLoggable(Level.WARNING)) { + logger.warning("Failed to restore connection to " + hostname + ":" + port + + ". Next attempt in " + keepAliveInterval + "ms"); + } + } + } + } finally { + shutdownLock.unlock(); + } + } + } + }); + } + + private ThreadPoolExecutor newSingleDaemonThreadExecutor(final String threadName) { + return new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, + new LinkedBlockingQueue(), new ThreadFactory() { + + @Override + public Thread newThread(Runnable r) { + Thread thread = new Thread(r, threadName); + thread.setDaemon(true); + return thread; + } + }); + } + + protected boolean isKeepAliveThreadRunning() { + return keepAliveThreadExecutor != null && !keepAliveThreadExecutor.isShutdown(); + } + + + /** + * @return true if client is connected, false otherwise + */ + public boolean isConnected() { + return connected; + } + + private void fetchBinlogFilenameAndPosition() throws IOException { + ResultSetRowPacket[] resultSet; + channel.write(new QueryCommand("show master status")); + resultSet = readResultSet(); + if (resultSet.length == 0) { + throw new IOException("Failed to determine binlog filename/position"); + } + ResultSetRowPacket resultSetRow = resultSet[0]; + binlogFilename = resultSetRow.getValue(0); + binlogPosition = Long.parseLong(resultSetRow.getValue(1)); + } + + private ChecksumType fetchBinlogChecksum() throws IOException { + channel.write(new QueryCommand("show global variables like 'binlog_checksum'")); + ResultSetRowPacket[] resultSet = readResultSet(); + if (resultSet.length == 0) { + return ChecksumType.NONE; + } + return ChecksumType.valueOf(resultSet[0].getValue(1).toUpperCase()); + } + + private void confirmSupportOfChecksum(ChecksumType checksumType) throws IOException { + channel.write(new QueryCommand("set @master_binlog_checksum= @@global.binlog_checksum")); + byte[] statementResult = channel.read(); + if (statementResult[0] == (byte) 0xFF /* error */) { + byte[] bytes = Arrays.copyOfRange(statementResult, 1, statementResult.length); + throw new IOException(new ErrorPacket(bytes).getErrorMessage()); + } + eventDeserializer.setChecksumType(checksumType); + } + + private void listenForEventPackets() throws IOException { + ByteArrayInputStream inputStream = channel.getInputStream(); + try { + while (inputStream.peek() != -1) { + int packetLength = inputStream.readInteger(3); + inputStream.skip(1); // 1 byte for sequence + int marker = inputStream.read(); + if (marker == 0xFF) { + ErrorPacket errorPacket = new ErrorPacket(inputStream.read(packetLength - 1)); + throw new IOException(errorPacket.getErrorCode() + " - " + errorPacket.getErrorMessage()); + } + Event event; + try { + event = eventDeserializer.nextEvent(inputStream); + } catch (Exception e) { + if (isConnected()) { + getLifecycleListener().onEventDeserializationFailure(this, e); + } + continue; + } + if (isConnected()) { + notifyEventListener(event); + updateClientBinlogFilenameAndPosition(event); + } + } + } catch (Exception e) { + if (isConnected()) { + getLifecycleListener().onCommunicationFailure(this, e); + } + } finally { + if (isConnected()) { + disconnectChannel(); + } + } + } + + private void updateClientBinlogFilenameAndPosition(Event event) { + EventHeader eventHeader = event.getHeader(); + if (eventHeader.getEventType() == EventType.ROTATE) { + RotateEventData eventData = event.getData(); + if (eventData != null) { + binlogFilename = eventData.getBinlogFilename(); + binlogPosition = eventData.getBinlogPosition(); + } + } else + if (eventHeader instanceof EventHeaderV4) { + EventHeaderV4 trackableEventHeader = (EventHeaderV4) eventHeader; + long nextBinlogPosition = trackableEventHeader.getNextPosition(); + if (nextBinlogPosition > 0) { + binlogPosition = nextBinlogPosition; + } + } + } + + private ResultSetRowPacket[] readResultSet() throws IOException { + List resultSet = new LinkedList(); + while ((channel.read())[0] != (byte) 0xFE /* eof */) { /* skip */ } + for (byte[] bytes; (bytes = channel.read())[0] != (byte) 0xFE /* eof */; ) { + resultSet.add(new ResultSetRowPacket(bytes)); + } + return resultSet.toArray(new ResultSetRowPacket[resultSet.size()]); + } + + private void notifyEventListener(Event event) { + getEventListener().onEvent(event); + } + + /** + * Disconnect from the replication stream. + * Note that this does not cause binlogFilename/binlogPosition to be cleared out. + * As the result following {@link #connect()} resumes client from where it left off. + */ + public void disconnect() throws IOException { + shutdownLock.lock(); + try { + if (isKeepAliveThreadRunning()) { + keepAliveThreadExecutor.shutdownNow(); + } + disconnectChannel(); + } finally { + shutdownLock.unlock(); + } + if (isKeepAliveThreadRunning()) { + waitForKeepAliveThreadToBeTerminated(); + } + } + + private void waitForKeepAliveThreadToBeTerminated() { + boolean terminated = false; + try { + terminated = keepAliveThreadExecutor.awaitTermination(keepAliveThreadShutdownTimeout, + TimeUnit.MILLISECONDS); + } catch (InterruptedException e) { + if (logger.isLoggable(Level.WARNING)) { + logger.log(Level.WARNING, e.getMessage()); + } + } + if (!terminated) { + throw new IllegalStateException("BinaryLogClient was unable to shut keep alive thread down in " + + keepAliveThreadShutdownTimeout + "ms"); + } + } + + private void disconnectChannel() throws IOException { + try { + connected = false; + if (channel != null && channel.isOpen()) { + channel.close(); + } + } finally { + getLifecycleListener().onDisconnect(this); + } + } + + public abstract BinaryLogClient.LifecycleListener getLifecycleListener(); + + public abstract BinaryLogClient.EventListener getEventListener(); + +} diff --git a/src/main/java/com/github/shyiko/mysql/binlog/BinaryLogClient.java b/src/main/java/com/github/shyiko/mysql/binlog/BinaryLogClient.java index 997b8d2a..a315fae7 100644 --- a/src/main/java/com/github/shyiko/mysql/binlog/BinaryLogClient.java +++ b/src/main/java/com/github/shyiko/mysql/binlog/BinaryLogClient.java @@ -61,40 +61,13 @@ * * @author Stanley Shyiko */ -public class BinaryLogClient implements BinaryLogClientMXBean { - - private final Logger logger = Logger.getLogger(getClass().getName()); - - private final String hostname; - private final int port; - private final String schema; - private final String username; - private final String password; - - private long serverId = 65535; - private volatile String binlogFilename; - private volatile long binlogPosition; - - private EventDeserializer eventDeserializer = new EventDeserializer(); - - private final List eventListeners = new LinkedList(); - private final List lifecycleListeners = new LinkedList(); - - private SocketFactory socketFactory; - - private PacketChannel channel; - private volatile boolean connected; +public class BinaryLogClient extends AbstractBinaryLogClient { + private final BroadcastEventListener eventListener = new BroadcastEventListener(); + private final BroadcastLifecycleListener lifecycleListener = new BroadcastLifecycleListener(); private ThreadFactory threadFactory; + private final Logger logger = Logger.getLogger(getClass().getName()); - private boolean keepAlive = true; - private long keepAliveInterval = TimeUnit.MINUTES.toMillis(1); - private long keepAliveConnectTimeout = TimeUnit.SECONDS.toMillis(3); - - private volatile ThreadPoolExecutor keepAliveThreadExecutor; - private long keepAliveThreadShutdownTimeout = TimeUnit.SECONDS.toMillis(6); - - private final Lock shutdownLock = new ReentrantLock(); /** * Alias for BinaryLogClient("localhost", 3306, <no schema> = null, username, password). @@ -129,96 +102,7 @@ public BinaryLogClient(String hostname, int port, String username, String passwo * @param password password */ public BinaryLogClient(String hostname, int port, String schema, String username, String password) { - this.hostname = hostname; - this.port = port; - this.schema = schema; - this.username = username; - this.password = password; - } - - /** - * @return server id (65535 by default) - */ - public long getServerId() { - return serverId; - } - - /** - * @param serverId server id (in the range from 1 to 2^32 – 1). This value MUST be unique across whole replication - * group (that is, different from any other server id being used by any master or slave). Keep in mind that each - * binary log client (mysql-binlog-connector-java/BinaryLogClient, mysqlbinlog, etc) should be treated as a - * simplified slave and thus MUST also use a different server id. - */ - public void setServerId(long serverId) { - this.serverId = serverId; - } - - /** - * @return binary log filename, nullable. Note that this value is automatically tracked by the client and thus - * is subject to change (in response to {@link EventType#ROTATE}, for example). - */ - public String getBinlogFilename() { - return binlogFilename; - } - - /** - * @param binlogFilename binary log filename (null indicates automatic resolution). - */ - public void setBinlogFilename(String binlogFilename) { - this.binlogFilename = binlogFilename; - } - - /** - * @return binary log position of the next event. Note that this value changes with each incoming event. - */ - public long getBinlogPosition() { - return binlogPosition; - } - - /** - * @param binlogPosition binary log position - */ - public void setBinlogPosition(long binlogPosition) { - this.binlogPosition = binlogPosition; - } - - /** - * @param keepAlive true if "keep alive" thread should be automatically started (recommended and true by default), - * false otherwise. - */ - public void setKeepAlive(boolean keepAlive) { - this.keepAlive = keepAlive; - } - - /** - * @param keepAliveInterval "keep alive" interval in milliseconds. - */ - public void setKeepAliveInterval(long keepAliveInterval) { - this.keepAliveInterval = keepAliveInterval; - } - - /** - * @param keepAliveConnectTimeout "keep alive" connect interval in milliseconds. - */ - public void setKeepAliveConnectTimeout(long keepAliveConnectTimeout) { - this.keepAliveConnectTimeout = keepAliveConnectTimeout; - } - - /** - * @param eventDeserializer custom event deserializer - */ - public void setEventDeserializer(EventDeserializer eventDeserializer) { - if (eventDeserializer == null) { - throw new IllegalArgumentException("Event deserializer cannot be NULL"); - } - this.eventDeserializer = eventDeserializer; - } - - /** - * @param socketFactory custom socket factory. If not provided, socket will be created with "new Socket()". - */ - public void setSocketFactory(SocketFactory socketFactory) { - this.socketFactory = socketFactory; + super(username, port, hostname, password, schema); } /** @@ -229,140 +113,20 @@ public void setThreadFactory(ThreadFactory threadFactory) { this.threadFactory = threadFactory; } - /** - * Connect to the replication stream. Note that this method blocks until disconnected. - * @throws AuthenticationException in case of failed authentication - * @throws IOException if anything goes wrong while trying to connect - */ - public void connect() throws IOException { - if (connected) { - throw new IllegalStateException("BinaryLogClient is already connected"); - } - try { - try { - Socket socket = socketFactory != null ? socketFactory.createSocket() : new Socket(); - socket.connect(new InetSocketAddress(hostname, port)); - channel = new PacketChannel(socket); - if (channel.getInputStream().peek() == -1) { - throw new EOFException(); - } - } catch (IOException e) { - throw new IOException("Failed to connect to MySQL on " + hostname + ":" + port + - ". Please make sure it's running.", e); - } - GreetingPacket greetingPacket = new GreetingPacket(channel.read()); - AuthenticateCommand authenticateCommand = new AuthenticateCommand(schema, username, password, - greetingPacket.getScramble()); - authenticateCommand.setCollation(greetingPacket.getServerCollation()); - channel.write(authenticateCommand); - byte[] authenticationResult = channel.read(); - if (authenticationResult[0] != (byte) 0x00 /* ok */) { - if (authenticationResult[0] == (byte) 0xFF /* error */) { - byte[] bytes = Arrays.copyOfRange(authenticationResult, 1, authenticationResult.length); - throw new AuthenticationException(new ErrorPacket(bytes).getErrorMessage()); - } - throw new AuthenticationException("Unexpected authentication result (" + authenticationResult[0] + ")"); - } - if (binlogFilename == null) { - fetchBinlogFilenameAndPosition(); - } - ChecksumType checksumType = fetchBinlogChecksum(); - if (checksumType != ChecksumType.NONE) { - confirmSupportOfChecksum(checksumType); - } - channel.write(new DumpBinaryLogCommand(serverId, binlogFilename, binlogPosition)); - } catch (IOException e) { - if (channel != null && channel.isOpen()) { - channel.close(); - } - throw e; - } - connected = true; - if (logger.isLoggable(Level.INFO)) { - logger.info("Connected to " + hostname + ":" + port + " at " + binlogFilename + "/" + binlogPosition); - } - synchronized (lifecycleListeners) { - for (LifecycleListener lifecycleListener : lifecycleListeners) { - lifecycleListener.onConnect(this); - } - } - if (keepAlive && !isKeepAliveThreadRunning()) { - spawnKeepAliveThread(); - } - listenForEventPackets(); - } - - private void spawnKeepAliveThread() { - keepAliveThreadExecutor = newSingleDaemonThreadExecutor("blc-keepalive-" + hostname + ":" + port); - keepAliveThreadExecutor.submit(new Runnable() { - @Override - public void run() { - while (true) { - try { - Thread.sleep(keepAliveInterval); - } catch (InterruptedException e) { - // expected in case of disconnect - } - shutdownLock.lock(); - try { - if (keepAliveThreadExecutor.isShutdown()) { - return; - } - try { - channel.write(new PingCommand()); - } catch (IOException e) { - if (logger.isLoggable(Level.INFO)) { - logger.info("Trying to restore lost connection to " + hostname + ":" + port); - } - try { - if (isConnected()) { - disconnectChannel(); - } - connect(keepAliveConnectTimeout); - } catch (Exception ce) { - if (logger.isLoggable(Level.WARNING)) { - logger.warning("Failed to restore connection to " + hostname + ":" + port + - ". Next attempt in " + keepAliveInterval + "ms"); - } - } - } - } finally { - shutdownLock.unlock(); - } - } - } - }); - } - - private ThreadPoolExecutor newSingleDaemonThreadExecutor(final String threadName) { - return new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, - new LinkedBlockingQueue(), new ThreadFactory() { - @Override - public Thread newThread(Runnable r) { - Thread thread = new Thread(r, threadName); - thread.setDaemon(true); - return thread; - } - }); - } - - protected boolean isKeepAliveThreadRunning() { - return keepAliveThreadExecutor != null && !keepAliveThreadExecutor.isShutdown(); - } /** * Connect to the replication stream in a separate thread. * @param timeoutInMilliseconds timeout in milliseconds - * @throws AuthenticationException in case of failed authentication - * @throws IOException if anything goes wrong while trying to connect - * @throws TimeoutException if client wasn't able to connect in the requested period of time + * @throws com.github.shyiko.mysql.binlog.network.AuthenticationException in case of failed authentication + * @throws java.io.IOException if anything goes wrong while trying to connect + * @throws java.util.concurrent.TimeoutException if client wasn't able to connect in the requested period of time */ public void connect(long timeoutInMilliseconds) throws IOException, TimeoutException { final CountDownLatch countDownLatch = new CountDownLatch(1); - AbstractLifecycleListener connectListener = new AbstractLifecycleListener() { + BinaryLogClient.AbstractLifecycleListener connectListener = new BinaryLogClient.AbstractLifecycleListener() { @Override - public void onConnect(BinaryLogClient client) { + public void onConnect(AbstractBinaryLogClient client) { countDownLatch.countDown(); } }; @@ -399,268 +163,139 @@ public void run() { } } - /** - * @return true if client is connected, false otherwise - */ - public boolean isConnected() { - return connected; - } - - private void fetchBinlogFilenameAndPosition() throws IOException { - ResultSetRowPacket[] resultSet; - channel.write(new QueryCommand("show master status")); - resultSet = readResultSet(); - if (resultSet.length == 0) { - throw new IOException("Failed to determine binlog filename/position"); - } - ResultSetRowPacket resultSetRow = resultSet[0]; - binlogFilename = resultSetRow.getValue(0); - binlogPosition = Long.parseLong(resultSetRow.getValue(1)); - } - - private ChecksumType fetchBinlogChecksum() throws IOException { - channel.write(new QueryCommand("show global variables like 'binlog_checksum'")); - ResultSetRowPacket[] resultSet = readResultSet(); - if (resultSet.length == 0) { - return ChecksumType.NONE; - } - return ChecksumType.valueOf(resultSet[0].getValue(1).toUpperCase()); - } - - private void confirmSupportOfChecksum(ChecksumType checksumType) throws IOException { - channel.write(new QueryCommand("set @master_binlog_checksum= @@global.binlog_checksum")); - byte[] statementResult = channel.read(); - if (statementResult[0] == (byte) 0xFF /* error */) { - byte[] bytes = Arrays.copyOfRange(statementResult, 1, statementResult.length); - throw new IOException(new ErrorPacket(bytes).getErrorMessage()); - } - eventDeserializer.setChecksumType(checksumType); - } - - private void listenForEventPackets() throws IOException { - ByteArrayInputStream inputStream = channel.getInputStream(); - try { - while (inputStream.peek() != -1) { - int packetLength = inputStream.readInteger(3); - inputStream.skip(1); // 1 byte for sequence - int marker = inputStream.read(); - if (marker == 0xFF) { - ErrorPacket errorPacket = new ErrorPacket(inputStream.read(packetLength - 1)); - throw new IOException(errorPacket.getErrorCode() + " - " + errorPacket.getErrorMessage()); - } - Event event; - try { - event = eventDeserializer.nextEvent(inputStream); - } catch (Exception e) { - if (isConnected()) { - synchronized (lifecycleListeners) { - for (LifecycleListener lifecycleListener : lifecycleListeners) { - lifecycleListener.onEventDeserializationFailure(this, e); - } - } - } - continue; - } - if (isConnected()) { - notifyEventListeners(event); - updateClientBinlogFilenameAndPosition(event); - } - } - } catch (Exception e) { - if (isConnected()) { - synchronized (lifecycleListeners) { - for (LifecycleListener lifecycleListener : lifecycleListeners) { - lifecycleListener.onCommunicationFailure(this, e); - } - } - } - } finally { - if (isConnected()) { - disconnectChannel(); - } - } - } - - private void updateClientBinlogFilenameAndPosition(Event event) { - EventHeader eventHeader = event.getHeader(); - if (eventHeader.getEventType() == EventType.ROTATE) { - RotateEventData eventData = event.getData(); - if (eventData != null) { - binlogFilename = eventData.getBinlogFilename(); - binlogPosition = eventData.getBinlogPosition(); - } - } else - if (eventHeader instanceof EventHeaderV4) { - EventHeaderV4 trackableEventHeader = (EventHeaderV4) eventHeader; - long nextBinlogPosition = trackableEventHeader.getNextPosition(); - if (nextBinlogPosition > 0) { - binlogPosition = nextBinlogPosition; - } - } - } - - private ResultSetRowPacket[] readResultSet() throws IOException { - List resultSet = new LinkedList(); - while ((channel.read())[0] != (byte) 0xFE /* eof */) { /* skip */ } - for (byte[] bytes; (bytes = channel.read())[0] != (byte) 0xFE /* eof */; ) { - resultSet.add(new ResultSetRowPacket(bytes)); - } - return resultSet.toArray(new ResultSetRowPacket[resultSet.size()]); + @Override + public EventListener getEventListener() { + return eventListener; } /** * @return registered event listeners */ public List getEventListeners() { - return Collections.unmodifiableList(eventListeners); + return eventListener.getEventListeners(); } /** * Register event listener. Note that multiple event listeners will be called in order they * where registered. */ - public void registerEventListener(EventListener eventListener) { - synchronized (eventListeners) { - eventListeners.add(eventListener); - } + public void registerEventListener(EventListener listener) { + eventListener.registerEventListener(listener); } /** * Unregister all event listener of specific type. */ public void unregisterEventListener(Class listenerClass) { - synchronized (eventListeners) { - Iterator iterator = eventListeners.iterator(); - while (iterator.hasNext()) { - EventListener eventListener = iterator.next(); - if (listenerClass.isInstance(eventListener)) { - iterator.remove(); - } - } - } + eventListener.unregisterEventListener(listenerClass); } /** * Unregister single event listener. */ - public void unregisterEventListener(EventListener eventListener) { - synchronized (eventListeners) { - eventListeners.remove(eventListener); - } + public void unregisterEventListener(EventListener listener) { + eventListener.unregisterEventListener(listener); } - private void notifyEventListeners(Event event) { - synchronized (eventListeners) { - for (EventListener eventListener : eventListeners) { - try { - eventListener.onEvent(event); - } catch (Exception e) { - if (logger.isLoggable(Level.WARNING)) { - logger.log(Level.WARNING, eventListener + " choked on " + event, e); - } - } - } - } + @Override + public LifecycleListener getLifecycleListener() { + return lifecycleListener; } /** * @return registered lifecycle listeners */ public List getLifecycleListeners() { - return Collections.unmodifiableList(lifecycleListeners); + return lifecycleListener.getLifecycleListeners(); } /** * Register lifecycle listener. Note that multiple lifecycle listeners will be called in order they * where registered. */ - public void registerLifecycleListener(LifecycleListener lifecycleListener) { - synchronized (lifecycleListeners) { - lifecycleListeners.add(lifecycleListener); - } + public void registerLifecycleListener(LifecycleListener listener) { + lifecycleListener.registerLifecycleListener(listener); } /** * Unregister all lifecycle listener of specific type. */ public synchronized void unregisterLifecycleListener(Class listenerClass) { - synchronized (lifecycleListeners) { - Iterator iterator = lifecycleListeners.iterator(); - while (iterator.hasNext()) { - LifecycleListener lifecycleListener = iterator.next(); - if (listenerClass.isInstance(lifecycleListener)) { - iterator.remove(); - } - } - } + lifecycleListener.unregisterLifecycleListener(listenerClass); } /** * Unregister single lifecycle listener. */ - public synchronized void unregisterLifecycleListener(LifecycleListener eventListener) { - synchronized (lifecycleListeners) { - lifecycleListeners.remove(eventListener); - } + public synchronized void unregisterLifecycleListener(LifecycleListener listener) { + lifecycleListener.unregisterLifecycleListener(listener); } /** - * Disconnect from the replication stream. - * Note that this does not cause binlogFilename/binlogPosition to be cleared out. - * As the result following {@link #connect()} resumes client from where it left off. + * {@link BinaryLogClient}'s event listener. */ - public void disconnect() throws IOException { - shutdownLock.lock(); - try { - if (isKeepAliveThreadRunning()) { - keepAliveThreadExecutor.shutdownNow(); - } - disconnectChannel(); - } finally { - shutdownLock.unlock(); - } - if (isKeepAliveThreadRunning()) { - waitForKeepAliveThreadToBeTerminated(); - } + public interface EventListener { + + void onEvent(Event event); } - private void waitForKeepAliveThreadToBeTerminated() { - boolean terminated = false; - try { - terminated = keepAliveThreadExecutor.awaitTermination(keepAliveThreadShutdownTimeout, - TimeUnit.MILLISECONDS); - } catch (InterruptedException e) { - if (logger.isLoggable(Level.WARNING)) { - logger.log(Level.WARNING, e.getMessage()); + public class BroadcastEventListener implements EventListener { + private final List eventListeners = new LinkedList(); + + @Override + public void onEvent(Event event) { + for (BinaryLogClient.EventListener eventListener : eventListeners) { + try { + eventListener.onEvent(event); + } catch (Exception e) { + if (logger.isLoggable(Level.WARNING)) { + logger.log(Level.WARNING, eventListener + " choked on " + event, e); + } + } } } - if (!terminated) { - throw new IllegalStateException("BinaryLogClient was unable to shut keep alive thread down in " + - keepAliveThreadShutdownTimeout + "ms"); + + /** + * @return registered event listeners + */ + public List getEventListeners() { + return Collections.unmodifiableList(eventListeners); } - } - private void disconnectChannel() throws IOException { - try { - connected = false; - if (channel != null && channel.isOpen()) { - channel.close(); + /** + * Register event listener. Note that multiple event listeners will be called in order they + * where registered. + */ + public void registerEventListener(EventListener eventListener) { + synchronized (eventListeners) { + eventListeners.add(eventListener); } - } finally { - synchronized (lifecycleListeners) { - for (LifecycleListener lifecycleListener : lifecycleListeners) { - lifecycleListener.onDisconnect(this); + } + + /** + * Unregister all event listener of specific type. + */ + public void unregisterEventListener(Class listenerClass) { + synchronized (eventListeners) { + Iterator iterator = eventListeners.iterator(); + while (iterator.hasNext()) { + EventListener eventListener = iterator.next(); + if (listenerClass.isInstance(eventListener)) { + iterator.remove(); + } } } } - } - /** - * {@link BinaryLogClient}'s event listener. - */ - public interface EventListener { + /** + * Unregister single event listener. + */ + public void unregisterEventListener(EventListener eventListener) { + synchronized (eventListeners) { + eventListeners.remove(eventListener); + } + } - void onEvent(Event event); } /** @@ -671,24 +306,24 @@ public interface LifecycleListener { /** * Called once client has successfully logged in but before started to receive binlog events. */ - void onConnect(BinaryLogClient client); + void onConnect(AbstractBinaryLogClient client); /** - * It's guarantied to be called before {@link #onDisconnect(BinaryLogClient)}) in case of + * It's guarantied to be called before {@link #onDisconnect(AbstractBinaryLogClient)}) in case of * communication failure. */ - void onCommunicationFailure(BinaryLogClient client, Exception ex); + void onCommunicationFailure(AbstractBinaryLogClient client, Exception ex); /** * Called in case of failed event deserialization. Note this type of error does NOT cause client to * disconnect. If you wish to stop receiving events you'll need to fire client.disconnect() manually. */ - void onEventDeserializationFailure(BinaryLogClient client, Exception ex); + void onEventDeserializationFailure(AbstractBinaryLogClient client, Exception ex); /** * Called upon disconnect (regardless of the reason). */ - void onDisconnect(BinaryLogClient client); + void onDisconnect(AbstractBinaryLogClient client); } /** @@ -696,18 +331,87 @@ public interface LifecycleListener { */ public static abstract class AbstractLifecycleListener implements LifecycleListener { - public void onConnect(BinaryLogClient client) { + @Override + public void onConnect(AbstractBinaryLogClient client) { + } + + @Override + public void onCommunicationFailure(AbstractBinaryLogClient client, Exception ex) { + } + + @Override + public void onEventDeserializationFailure(AbstractBinaryLogClient client, Exception ex) { + } + + @Override + public void onDisconnect(AbstractBinaryLogClient client) { + } + + } + + public static class BroadcastLifecycleListener implements LifecycleListener { + final List lifecycleListeners = new LinkedList(); + + @Override + public void onConnect(AbstractBinaryLogClient client) { + throw new UnsupportedOperationException("UNIMPLEMENTED"); // TODO + } + + @Override + public void onCommunicationFailure(AbstractBinaryLogClient client, Exception ex) { + throw new UnsupportedOperationException("UNIMPLEMENTED"); // TODO + } + + @Override + public void onEventDeserializationFailure(AbstractBinaryLogClient client, Exception ex) { + throw new UnsupportedOperationException("UNIMPLEMENTED"); // TODO } - public void onCommunicationFailure(BinaryLogClient client, Exception ex) { + @Override + public void onDisconnect(AbstractBinaryLogClient client) { + throw new UnsupportedOperationException("UNIMPLEMENTED"); // TODO } - public void onEventDeserializationFailure(BinaryLogClient client, Exception ex) { + /** + * @return registered lifecycle listeners + */ + public List getLifecycleListeners() { + return Collections.unmodifiableList(lifecycleListeners); } - public void onDisconnect(BinaryLogClient client) { + /** + * Register lifecycle listener. Note that multiple lifecycle listeners will be called in order they + * where registered. + */ + public void registerLifecycleListener(LifecycleListener listener) { + synchronized (lifecycleListeners) { + lifecycleListeners.add(listener); + } } + /** + * Unregister all lifecycle listener of specific type. + */ + public synchronized void unregisterLifecycleListener(Class listenerClass) { + synchronized (lifecycleListeners) { + Iterator iterator = lifecycleListeners.iterator(); + while (iterator.hasNext()) { + LifecycleListener lifecycleListener = iterator.next(); + if (listenerClass.isInstance(lifecycleListener)) { + iterator.remove(); + } + } + } + } + + /** + * Unregister single lifecycle listener. + */ + public synchronized void unregisterLifecycleListener(LifecycleListener listener) { + synchronized (lifecycleListeners) { + lifecycleListeners.remove(listener); + } + } } } diff --git a/src/main/java/com/github/shyiko/mysql/binlog/jmx/BinaryLogClientStatistics.java b/src/main/java/com/github/shyiko/mysql/binlog/jmx/BinaryLogClientStatistics.java index 211385e5..e02353dc 100644 --- a/src/main/java/com/github/shyiko/mysql/binlog/jmx/BinaryLogClientStatistics.java +++ b/src/main/java/com/github/shyiko/mysql/binlog/jmx/BinaryLogClientStatistics.java @@ -15,6 +15,7 @@ */ package com.github.shyiko.mysql.binlog.jmx; +import com.github.shyiko.mysql.binlog.AbstractBinaryLogClient; import com.github.shyiko.mysql.binlog.BinaryLogClient; import com.github.shyiko.mysql.binlog.event.Event; import com.github.shyiko.mysql.binlog.event.EventHeader; @@ -99,7 +100,7 @@ public void onEvent(Event event) { } @Override - public void onEventDeserializationFailure(BinaryLogClient client, Exception ex) { + public void onEventDeserializationFailure(AbstractBinaryLogClient client, Exception ex) { numberOfSkippedEvents.getAndIncrement(); lastEventHeader.set(null); timestampOfLastEvent.set(getCurrentTimeMillis()); @@ -107,16 +108,16 @@ public void onEventDeserializationFailure(BinaryLogClient client, Exception ex) } @Override - public void onDisconnect(BinaryLogClient client) { + public void onDisconnect(AbstractBinaryLogClient client) { numberOfDisconnects.getAndIncrement(); } @Override - public void onConnect(BinaryLogClient client) { + public void onConnect(AbstractBinaryLogClient client) { } @Override - public void onCommunicationFailure(BinaryLogClient client, Exception ex) { + public void onCommunicationFailure(AbstractBinaryLogClient client, Exception ex) { } protected long getCurrentTimeMillis() { From f39aceb248a64bc7bae53d688a22cffd580a9c17 Mon Sep 17 00:00:00 2001 From: Luis Casillas Date: Thu, 17 Apr 2014 20:43:45 -0700 Subject: [PATCH 02/10] Improved factoring: 1. AbstractBinaryLogClient now no longer depends on EventListener or LifecycleListener. 2. LifecycleListener goes back to accepting BinaryLogClient arguments. Still haven't run tests. --- .../mysql/binlog/AbstractBinaryLogClient.java | 36 ++++-- .../shyiko/mysql/binlog/BinaryLogClient.java | 116 ++++++++++-------- .../binlog/jmx/BinaryLogClientStatistics.java | 9 +- 3 files changed, 100 insertions(+), 61 deletions(-) diff --git a/src/main/java/com/github/shyiko/mysql/binlog/AbstractBinaryLogClient.java b/src/main/java/com/github/shyiko/mysql/binlog/AbstractBinaryLogClient.java index 050ac753..6b2bda32 100644 --- a/src/main/java/com/github/shyiko/mysql/binlog/AbstractBinaryLogClient.java +++ b/src/main/java/com/github/shyiko/mysql/binlog/AbstractBinaryLogClient.java @@ -200,7 +200,7 @@ public void connect() throws IOException { if (logger.isLoggable(Level.INFO)) { logger.info("Connected to " + hostname + ":" + port + " at " + binlogFilename + "/" + binlogPosition); } - getLifecycleListener().onConnect(this); + onConnect(); if (keepAlive && !isKeepAliveThreadRunning()) { spawnKeepAliveThread(); } @@ -321,7 +321,7 @@ private void listenForEventPackets() throws IOException { event = eventDeserializer.nextEvent(inputStream); } catch (Exception e) { if (isConnected()) { - getLifecycleListener().onEventDeserializationFailure(this, e); + onEventDeserializationFailure(e); } continue; } @@ -332,7 +332,7 @@ private void listenForEventPackets() throws IOException { } } catch (Exception e) { if (isConnected()) { - getLifecycleListener().onCommunicationFailure(this, e); + onCommunicationFailure(e); } } finally { if (isConnected()) { @@ -369,7 +369,7 @@ private ResultSetRowPacket[] readResultSet() throws IOException { } private void notifyEventListener(Event event) { - getEventListener().onEvent(event); + onEvent(event); } /** @@ -415,12 +415,34 @@ private void disconnectChannel() throws IOException { channel.close(); } } finally { - getLifecycleListener().onDisconnect(this); + onDisconnect(); } } - public abstract BinaryLogClient.LifecycleListener getLifecycleListener(); + /** + * Invoked once for each {@link Event}, in the order they are processed. + */ + protected abstract void onEvent(Event event); - public abstract BinaryLogClient.EventListener getEventListener(); + /** + * Invoked when a connection is established. + */ + protected abstract void onConnect(); + + /** + * It's guarantied to be called before {@link #onDisconnect()}) in case of communication failure. + */ + protected abstract void onCommunicationFailure(Exception ex); + + /** + * Called in case of failed event deserialization. Note this type of error does NOT cause client to + * disconnect. If you wish to stop receiving events you'll need to fire client.disconnect() manually. + */ + protected abstract void onEventDeserializationFailure(Exception ex); + + /** + * Called upon disconnect (regardless of the reason). + */ + protected abstract void onDisconnect(); } diff --git a/src/main/java/com/github/shyiko/mysql/binlog/BinaryLogClient.java b/src/main/java/com/github/shyiko/mysql/binlog/BinaryLogClient.java index a315fae7..db5cddc1 100644 --- a/src/main/java/com/github/shyiko/mysql/binlog/BinaryLogClient.java +++ b/src/main/java/com/github/shyiko/mysql/binlog/BinaryLogClient.java @@ -16,43 +16,17 @@ package com.github.shyiko.mysql.binlog; import com.github.shyiko.mysql.binlog.event.Event; -import com.github.shyiko.mysql.binlog.event.EventHeader; -import com.github.shyiko.mysql.binlog.event.EventHeaderV4; -import com.github.shyiko.mysql.binlog.event.EventType; -import com.github.shyiko.mysql.binlog.event.RotateEventData; -import com.github.shyiko.mysql.binlog.event.deserialization.ChecksumType; -import com.github.shyiko.mysql.binlog.event.deserialization.EventDeserializer; -import com.github.shyiko.mysql.binlog.io.ByteArrayInputStream; -import com.github.shyiko.mysql.binlog.jmx.BinaryLogClientMXBean; -import com.github.shyiko.mysql.binlog.network.AuthenticationException; -import com.github.shyiko.mysql.binlog.network.SocketFactory; -import com.github.shyiko.mysql.binlog.network.protocol.ErrorPacket; -import com.github.shyiko.mysql.binlog.network.protocol.GreetingPacket; -import com.github.shyiko.mysql.binlog.network.protocol.PacketChannel; -import com.github.shyiko.mysql.binlog.network.protocol.ResultSetRowPacket; -import com.github.shyiko.mysql.binlog.network.protocol.command.AuthenticateCommand; -import com.github.shyiko.mysql.binlog.network.protocol.command.DumpBinaryLogCommand; -import com.github.shyiko.mysql.binlog.network.protocol.command.PingCommand; -import com.github.shyiko.mysql.binlog.network.protocol.command.QueryCommand; - -import java.io.EOFException; + import java.io.IOException; -import java.net.InetSocketAddress; -import java.net.Socket; -import java.util.Arrays; import java.util.Collections; import java.util.Iterator; import java.util.LinkedList; import java.util.List; import java.util.concurrent.CountDownLatch; -import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadFactory; -import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicReference; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReentrantLock; import java.util.logging.Level; import java.util.logging.Logger; @@ -126,7 +100,7 @@ public void connect(long timeoutInMilliseconds) throws IOException, TimeoutExcep final CountDownLatch countDownLatch = new CountDownLatch(1); BinaryLogClient.AbstractLifecycleListener connectListener = new BinaryLogClient.AbstractLifecycleListener() { @Override - public void onConnect(AbstractBinaryLogClient client) { + public void onConnect(BinaryLogClient client) { countDownLatch.countDown(); } }; @@ -164,8 +138,8 @@ public void run() { } @Override - public EventListener getEventListener() { - return eventListener; + protected void onEvent(Event event) { + eventListener.onEvent(event); } /** @@ -197,9 +171,25 @@ public void unregisterEventListener(EventListener listener) { eventListener.unregisterEventListener(listener); } + @Override - public LifecycleListener getLifecycleListener() { - return lifecycleListener; + protected void onConnect() { + lifecycleListener.onConnect(this); + } + + @Override + protected void onCommunicationFailure(Exception ex) { + lifecycleListener.onCommunicationFailure(this, ex); + } + + @Override + protected void onEventDeserializationFailure(Exception ex) { + lifecycleListener.onEventDeserializationFailure(this, ex); + } + + @Override + protected void onDisconnect() { + lifecycleListener.onDisconnect(this); } /** @@ -239,9 +229,16 @@ public interface EventListener { void onEvent(Event event); } + /** + * An {@link EventListener} that rebroadcasts events to a dynamically managed list of other event listeners. + */ public class BroadcastEventListener implements EventListener { private final List eventListeners = new LinkedList(); + /** + * Rebroadcast the event to the child listeners. If any of the children throws an exception, we log it and + * continue with the next one. + */ @Override public void onEvent(Event event) { for (BinaryLogClient.EventListener eventListener : eventListeners) { @@ -305,25 +302,27 @@ public interface LifecycleListener { /** * Called once client has successfully logged in but before started to receive binlog events. + * @param client */ - void onConnect(AbstractBinaryLogClient client); + void onConnect(BinaryLogClient client); /** - * It's guarantied to be called before {@link #onDisconnect(AbstractBinaryLogClient)}) in case of + * It's guarantied to be called before {@link #onDisconnect(BinaryLogClient)}) in case of * communication failure. */ - void onCommunicationFailure(AbstractBinaryLogClient client, Exception ex); + void onCommunicationFailure(BinaryLogClient client, Exception ex); /** * Called in case of failed event deserialization. Note this type of error does NOT cause client to * disconnect. If you wish to stop receiving events you'll need to fire client.disconnect() manually. */ - void onEventDeserializationFailure(AbstractBinaryLogClient client, Exception ex); + void onEventDeserializationFailure(BinaryLogClient client, Exception ex); /** * Called upon disconnect (regardless of the reason). + * @param client */ - void onDisconnect(AbstractBinaryLogClient client); + void onDisconnect(BinaryLogClient client); } /** @@ -332,44 +331,63 @@ public interface LifecycleListener { public static abstract class AbstractLifecycleListener implements LifecycleListener { @Override - public void onConnect(AbstractBinaryLogClient client) { + public void onConnect(BinaryLogClient client) { } @Override - public void onCommunicationFailure(AbstractBinaryLogClient client, Exception ex) { + public void onCommunicationFailure(BinaryLogClient client, Exception ex) { } @Override - public void onEventDeserializationFailure(AbstractBinaryLogClient client, Exception ex) { + public void onEventDeserializationFailure(BinaryLogClient client, Exception ex) { } @Override - public void onDisconnect(AbstractBinaryLogClient client) { + public void onDisconnect(BinaryLogClient client) { } } + /** + * A {@link LifecycleListener} that rebroadcasts events to a dynamic list of children. + */ public static class BroadcastLifecycleListener implements LifecycleListener { final List lifecycleListeners = new LinkedList(); @Override - public void onConnect(AbstractBinaryLogClient client) { - throw new UnsupportedOperationException("UNIMPLEMENTED"); // TODO + public void onConnect(BinaryLogClient client) { + synchronized(lifecycleListeners) { + for (LifecycleListener listener : lifecycleListeners) { + listener.onConnect(client); + } + } } @Override - public void onCommunicationFailure(AbstractBinaryLogClient client, Exception ex) { - throw new UnsupportedOperationException("UNIMPLEMENTED"); // TODO + public void onCommunicationFailure(BinaryLogClient client, Exception ex) { + synchronized(lifecycleListeners) { + for (LifecycleListener listener : lifecycleListeners) { + listener.onCommunicationFailure(client, ex); + } + } } @Override - public void onEventDeserializationFailure(AbstractBinaryLogClient client, Exception ex) { - throw new UnsupportedOperationException("UNIMPLEMENTED"); // TODO + public void onEventDeserializationFailure(BinaryLogClient client, Exception ex) { + synchronized(lifecycleListeners) { + for (LifecycleListener listener : lifecycleListeners) { + listener.onEventDeserializationFailure(client, ex); + } + } } @Override - public void onDisconnect(AbstractBinaryLogClient client) { - throw new UnsupportedOperationException("UNIMPLEMENTED"); // TODO + public void onDisconnect(BinaryLogClient client) { + synchronized(lifecycleListeners) { + for (LifecycleListener listener : lifecycleListeners) { + listener.onDisconnect(client); + } + } } /** diff --git a/src/main/java/com/github/shyiko/mysql/binlog/jmx/BinaryLogClientStatistics.java b/src/main/java/com/github/shyiko/mysql/binlog/jmx/BinaryLogClientStatistics.java index e02353dc..211385e5 100644 --- a/src/main/java/com/github/shyiko/mysql/binlog/jmx/BinaryLogClientStatistics.java +++ b/src/main/java/com/github/shyiko/mysql/binlog/jmx/BinaryLogClientStatistics.java @@ -15,7 +15,6 @@ */ package com.github.shyiko.mysql.binlog.jmx; -import com.github.shyiko.mysql.binlog.AbstractBinaryLogClient; import com.github.shyiko.mysql.binlog.BinaryLogClient; import com.github.shyiko.mysql.binlog.event.Event; import com.github.shyiko.mysql.binlog.event.EventHeader; @@ -100,7 +99,7 @@ public void onEvent(Event event) { } @Override - public void onEventDeserializationFailure(AbstractBinaryLogClient client, Exception ex) { + public void onEventDeserializationFailure(BinaryLogClient client, Exception ex) { numberOfSkippedEvents.getAndIncrement(); lastEventHeader.set(null); timestampOfLastEvent.set(getCurrentTimeMillis()); @@ -108,16 +107,16 @@ public void onEventDeserializationFailure(AbstractBinaryLogClient client, Except } @Override - public void onDisconnect(AbstractBinaryLogClient client) { + public void onDisconnect(BinaryLogClient client) { numberOfDisconnects.getAndIncrement(); } @Override - public void onConnect(AbstractBinaryLogClient client) { + public void onConnect(BinaryLogClient client) { } @Override - public void onCommunicationFailure(AbstractBinaryLogClient client, Exception ex) { + public void onCommunicationFailure(BinaryLogClient client, Exception ex) { } protected long getCurrentTimeMillis() { From 6eaa9a32de6fd706fec441f87c39b0a325a97350 Mon Sep 17 00:00:00 2001 From: Luis Casillas Date: Thu, 17 Apr 2014 23:05:58 -0700 Subject: [PATCH 03/10] Tiny bug in integration test: it didn't like the -0700 timezone. --- .../shyiko/mysql/binlog/BinaryLogClientIntegrationTest.java | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/src/test/java/com/github/shyiko/mysql/binlog/BinaryLogClientIntegrationTest.java b/src/test/java/com/github/shyiko/mysql/binlog/BinaryLogClientIntegrationTest.java index 62e3510d..f06fa101 100644 --- a/src/test/java/com/github/shyiko/mysql/binlog/BinaryLogClientIntegrationTest.java +++ b/src/test/java/com/github/shyiko/mysql/binlog/BinaryLogClientIntegrationTest.java @@ -558,8 +558,10 @@ public void execute(Statement statement) throws SQLException { // fixing connection timezone to the "client's one" TimeZone currentTimeZone = TimeZone.getDefault(); int offset = currentTimeZone.getRawOffset() + currentTimeZone.getDSTSavings(); - String timeZoneAsAString = String.format("%s%02d:%02d", offset >= 0 ? "+" : "-", offset / 3600000, - (offset / 60000) % 60); + String timeZoneAsAString = String.format("%s%02d:%02d", + offset >= 0 ? "+" : "-", + Math.abs(offset / 3600000), + (offset / 60000) % 60); statement.execute("SET time_zone = '" + timeZoneAsAString + "'"); } }); From 20df3c23cbc1d6e78ef7e656f47a189bed9718fb Mon Sep 17 00:00:00 2001 From: Luis Casillas Date: Thu, 17 Apr 2014 23:09:20 -0700 Subject: [PATCH 04/10] Fix ConcurrentModificationException that I introduced. --- .../shyiko/mysql/binlog/BinaryLogClient.java | 16 +++++++++------- 1 file changed, 9 insertions(+), 7 deletions(-) diff --git a/src/main/java/com/github/shyiko/mysql/binlog/BinaryLogClient.java b/src/main/java/com/github/shyiko/mysql/binlog/BinaryLogClient.java index db5cddc1..0e98f0f3 100644 --- a/src/main/java/com/github/shyiko/mysql/binlog/BinaryLogClient.java +++ b/src/main/java/com/github/shyiko/mysql/binlog/BinaryLogClient.java @@ -240,13 +240,15 @@ public class BroadcastEventListener implements EventListener { * continue with the next one. */ @Override - public void onEvent(Event event) { - for (BinaryLogClient.EventListener eventListener : eventListeners) { - try { - eventListener.onEvent(event); - } catch (Exception e) { - if (logger.isLoggable(Level.WARNING)) { - logger.log(Level.WARNING, eventListener + " choked on " + event, e); + public synchronized void onEvent(Event event) { + synchronized (eventListeners) { + for (BinaryLogClient.EventListener eventListener : eventListeners) { + try { + eventListener.onEvent(event); + } catch (Exception e) { + if (logger.isLoggable(Level.WARNING)) { + logger.log(Level.WARNING, eventListener + " choked on " + event, e); + } } } } From b86504ba493a2c3eb0b075db6932551b4f531088 Mon Sep 17 00:00:00 2001 From: Luis Casillas Date: Thu, 17 Apr 2014 23:21:27 -0700 Subject: [PATCH 05/10] Now passes checkstyle. --- .../mysql/binlog/AbstractBinaryLogClient.java | 33 +++++++++++++++---- .../shyiko/mysql/binlog/BinaryLogClient.java | 14 +++----- 2 files changed, 32 insertions(+), 15 deletions(-) diff --git a/src/main/java/com/github/shyiko/mysql/binlog/AbstractBinaryLogClient.java b/src/main/java/com/github/shyiko/mysql/binlog/AbstractBinaryLogClient.java index 6b2bda32..05112666 100644 --- a/src/main/java/com/github/shyiko/mysql/binlog/AbstractBinaryLogClient.java +++ b/src/main/java/com/github/shyiko/mysql/binlog/AbstractBinaryLogClient.java @@ -1,6 +1,25 @@ +/* + * Copyright 2013 Stanley Shyiko + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package com.github.shyiko.mysql.binlog; -import com.github.shyiko.mysql.binlog.event.*; +import com.github.shyiko.mysql.binlog.event.Event; +import com.github.shyiko.mysql.binlog.event.EventHeader; +import com.github.shyiko.mysql.binlog.event.EventHeaderV4; +import com.github.shyiko.mysql.binlog.event.EventType; +import com.github.shyiko.mysql.binlog.event.RotateEventData; import com.github.shyiko.mysql.binlog.event.deserialization.ChecksumType; import com.github.shyiko.mysql.binlog.event.deserialization.EventDeserializer; import com.github.shyiko.mysql.binlog.io.ByteArrayInputStream; @@ -23,15 +42,19 @@ import java.util.Arrays; import java.util.LinkedList; import java.util.List; -import java.util.concurrent.*; -import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; import java.util.logging.Level; import java.util.logging.Logger; /** - * @author Luis Casillas + * Abstract base class for MySQL replication stream client. + * + * @author Stanley Shyiko */ public abstract class AbstractBinaryLogClient implements BinaryLogClientMXBean { protected final String hostname; @@ -54,7 +77,6 @@ public abstract class AbstractBinaryLogClient implements BinaryLogClientMXBean { private volatile ThreadPoolExecutor keepAliveThreadExecutor; private long keepAliveThreadShutdownTimeout = TimeUnit.SECONDS.toMillis(6); - public AbstractBinaryLogClient(String username, int port, String hostname, String password, String schema) { this.username = username; this.port = port; @@ -266,7 +288,6 @@ protected boolean isKeepAliveThreadRunning() { return keepAliveThreadExecutor != null && !keepAliveThreadExecutor.isShutdown(); } - /** * @return true if client is connected, false otherwise */ diff --git a/src/main/java/com/github/shyiko/mysql/binlog/BinaryLogClient.java b/src/main/java/com/github/shyiko/mysql/binlog/BinaryLogClient.java index 0e98f0f3..76cfbd3e 100644 --- a/src/main/java/com/github/shyiko/mysql/binlog/BinaryLogClient.java +++ b/src/main/java/com/github/shyiko/mysql/binlog/BinaryLogClient.java @@ -42,7 +42,6 @@ public class BinaryLogClient extends AbstractBinaryLogClient { private ThreadFactory threadFactory; private final Logger logger = Logger.getLogger(getClass().getName()); - /** * Alias for BinaryLogClient("localhost", 3306, <no schema> = null, username, password). * @see BinaryLogClient#BinaryLogClient(String, int, String, String, String) @@ -87,8 +86,6 @@ public void setThreadFactory(ThreadFactory threadFactory) { this.threadFactory = threadFactory; } - - /** * Connect to the replication stream in a separate thread. * @param timeoutInMilliseconds timeout in milliseconds @@ -171,7 +168,6 @@ public void unregisterEventListener(EventListener listener) { eventListener.unregisterEventListener(listener); } - @Override protected void onConnect() { lifecycleListener.onConnect(this); @@ -354,11 +350,11 @@ public void onDisconnect(BinaryLogClient client) { * A {@link LifecycleListener} that rebroadcasts events to a dynamic list of children. */ public static class BroadcastLifecycleListener implements LifecycleListener { - final List lifecycleListeners = new LinkedList(); + private final List lifecycleListeners = new LinkedList(); @Override public void onConnect(BinaryLogClient client) { - synchronized(lifecycleListeners) { + synchronized (lifecycleListeners) { for (LifecycleListener listener : lifecycleListeners) { listener.onConnect(client); } @@ -367,7 +363,7 @@ public void onConnect(BinaryLogClient client) { @Override public void onCommunicationFailure(BinaryLogClient client, Exception ex) { - synchronized(lifecycleListeners) { + synchronized (lifecycleListeners) { for (LifecycleListener listener : lifecycleListeners) { listener.onCommunicationFailure(client, ex); } @@ -376,7 +372,7 @@ public void onCommunicationFailure(BinaryLogClient client, Exception ex) { @Override public void onEventDeserializationFailure(BinaryLogClient client, Exception ex) { - synchronized(lifecycleListeners) { + synchronized (lifecycleListeners) { for (LifecycleListener listener : lifecycleListeners) { listener.onEventDeserializationFailure(client, ex); } @@ -385,7 +381,7 @@ public void onEventDeserializationFailure(BinaryLogClient client, Exception ex) @Override public void onDisconnect(BinaryLogClient client) { - synchronized(lifecycleListeners) { + synchronized (lifecycleListeners) { for (LifecycleListener listener : lifecycleListeners) { listener.onDisconnect(client); } From 8fa45cba99c113506e436a8f90930cd8af283df4 Mon Sep 17 00:00:00 2001 From: Luis Casillas Date: Thu, 17 Apr 2014 23:37:26 -0700 Subject: [PATCH 06/10] One of the serialization/deserialization test cases fails in the America/Los_Angeles timezone, because of Daylight Saving Time. I've added a comment to that effect. --- .../shyiko/mysql/binlog/BinaryLogClientIntegrationTest.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/test/java/com/github/shyiko/mysql/binlog/BinaryLogClientIntegrationTest.java b/src/test/java/com/github/shyiko/mysql/binlog/BinaryLogClientIntegrationTest.java index f06fa101..a495ec71 100644 --- a/src/test/java/com/github/shyiko/mysql/binlog/BinaryLogClientIntegrationTest.java +++ b/src/test/java/com/github/shyiko/mysql/binlog/BinaryLogClientIntegrationTest.java @@ -201,6 +201,8 @@ public void testDeserializationOfDifferentColumnTypes() throws Exception { new java.sql.Date(generateTime(1989, 3, 21, 0, 0, 0, 0))}); assertEquals(writeAndCaptureRow("datetime", "'1989-03-21 01:02:03.000000'"), new Serializable[]{ new java.util.Date(generateTime(1989, 3, 21, 1, 2, 3, 0))}); + // FIXME: '1989-03-18 01:02:03.000000' fails in the America/Los_Angeles timezone (Daylight Saving Time) + // '1989-05-18 01:02:03.000000' passes assertEquals(writeAndCaptureRow("timestamp", "'1989-03-18 01:02:03.000000'"), new Serializable[]{ new java.sql.Timestamp(generateTime(1989, 3, 18, 1, 2, 3, 0))}); assertEquals(writeAndCaptureRow("time", "'1:2:3.000000'"), new Serializable[]{ From 71abcc2dc7cdd1fb569564d8be7080ad00410f7e Mon Sep 17 00:00:00 2001 From: Luis Casillas Date: Thu, 17 Apr 2014 23:52:15 -0700 Subject: [PATCH 07/10] Undo the BroadcastEventListener and BroadcastLifecycleListener refactor, since now it only serves to make the code longer. --- .../shyiko/mysql/binlog/BinaryLogClient.java | 178 ++++++++---------- 1 file changed, 81 insertions(+), 97 deletions(-) diff --git a/src/main/java/com/github/shyiko/mysql/binlog/BinaryLogClient.java b/src/main/java/com/github/shyiko/mysql/binlog/BinaryLogClient.java index 76cfbd3e..6f4eca77 100644 --- a/src/main/java/com/github/shyiko/mysql/binlog/BinaryLogClient.java +++ b/src/main/java/com/github/shyiko/mysql/binlog/BinaryLogClient.java @@ -37,8 +37,8 @@ */ public class BinaryLogClient extends AbstractBinaryLogClient { - private final BroadcastEventListener eventListener = new BroadcastEventListener(); - private final BroadcastLifecycleListener lifecycleListener = new BroadcastLifecycleListener(); + private final List eventListeners = new LinkedList(); + private final List lifecycleListeners = new LinkedList(); private ThreadFactory threadFactory; private final Logger logger = Logger.getLogger(getClass().getName()); @@ -134,163 +134,147 @@ public void run() { } } - @Override - protected void onEvent(Event event) { - eventListener.onEvent(event); - } - /** * @return registered event listeners */ public List getEventListeners() { - return eventListener.getEventListeners(); + return Collections.unmodifiableList(eventListeners); } /** * Register event listener. Note that multiple event listeners will be called in order they * where registered. */ - public void registerEventListener(EventListener listener) { - eventListener.registerEventListener(listener); + public void registerEventListener(EventListener eventListener) { + synchronized (eventListeners) { + eventListeners.add(eventListener); + } } /** * Unregister all event listener of specific type. */ public void unregisterEventListener(Class listenerClass) { - eventListener.unregisterEventListener(listenerClass); + synchronized (eventListeners) { + Iterator iterator = eventListeners.iterator(); + while (iterator.hasNext()) { + EventListener eventListener = iterator.next(); + if (listenerClass.isInstance(eventListener)) { + iterator.remove(); + } + } + } } /** * Unregister single event listener. */ - public void unregisterEventListener(EventListener listener) { - eventListener.unregisterEventListener(listener); - } - - @Override - protected void onConnect() { - lifecycleListener.onConnect(this); - } - - @Override - protected void onCommunicationFailure(Exception ex) { - lifecycleListener.onCommunicationFailure(this, ex); - } - - @Override - protected void onEventDeserializationFailure(Exception ex) { - lifecycleListener.onEventDeserializationFailure(this, ex); + public void unregisterEventListener(EventListener eventListener) { + synchronized (eventListeners) { + eventListeners.remove(eventListener); + } } @Override - protected void onDisconnect() { - lifecycleListener.onDisconnect(this); + protected void onEvent(Event event) { + synchronized (eventListeners) { + for (EventListener eventListener : eventListeners) { + try { + eventListener.onEvent(event); + } catch (Exception e) { + if (logger.isLoggable(Level.WARNING)) { + logger.log(Level.WARNING, eventListener + " choked on " + event, e); + } + } + } + } } /** * @return registered lifecycle listeners */ public List getLifecycleListeners() { - return lifecycleListener.getLifecycleListeners(); + return Collections.unmodifiableList(lifecycleListeners); } /** * Register lifecycle listener. Note that multiple lifecycle listeners will be called in order they * where registered. */ - public void registerLifecycleListener(LifecycleListener listener) { - lifecycleListener.registerLifecycleListener(listener); + public void registerLifecycleListener(LifecycleListener lifecycleListener) { + synchronized (lifecycleListeners) { + lifecycleListeners.add(lifecycleListener); + } } /** * Unregister all lifecycle listener of specific type. */ public synchronized void unregisterLifecycleListener(Class listenerClass) { - lifecycleListener.unregisterLifecycleListener(listenerClass); + synchronized (lifecycleListeners) { + Iterator iterator = lifecycleListeners.iterator(); + while (iterator.hasNext()) { + LifecycleListener lifecycleListener = iterator.next(); + if (listenerClass.isInstance(lifecycleListener)) { + iterator.remove(); + } + } + } } /** * Unregister single lifecycle listener. */ - public synchronized void unregisterLifecycleListener(LifecycleListener listener) { - lifecycleListener.unregisterLifecycleListener(listener); + public synchronized void unregisterLifecycleListener(LifecycleListener eventListener) { + synchronized (lifecycleListeners) { + lifecycleListeners.remove(eventListener); + } } - /** - * {@link BinaryLogClient}'s event listener. - */ - public interface EventListener { - - void onEvent(Event event); + @Override + protected void onConnect() { + synchronized (lifecycleListeners) { + for (LifecycleListener lifecycleListener : lifecycleListeners) { + lifecycleListener.onConnect(this); + } + } } - /** - * An {@link EventListener} that rebroadcasts events to a dynamically managed list of other event listeners. - */ - public class BroadcastEventListener implements EventListener { - private final List eventListeners = new LinkedList(); - - /** - * Rebroadcast the event to the child listeners. If any of the children throws an exception, we log it and - * continue with the next one. - */ - @Override - public synchronized void onEvent(Event event) { - synchronized (eventListeners) { - for (BinaryLogClient.EventListener eventListener : eventListeners) { - try { - eventListener.onEvent(event); - } catch (Exception e) { - if (logger.isLoggable(Level.WARNING)) { - logger.log(Level.WARNING, eventListener + " choked on " + event, e); - } - } - } + @Override + protected void onCommunicationFailure(Exception ex) { + synchronized (lifecycleListeners) { + for (LifecycleListener lifecycleListener : lifecycleListeners) { + lifecycleListener.onCommunicationFailure(this, ex); } } - /** - * @return registered event listeners - */ - public List getEventListeners() { - return Collections.unmodifiableList(eventListeners); - } + } - /** - * Register event listener. Note that multiple event listeners will be called in order they - * where registered. - */ - public void registerEventListener(EventListener eventListener) { - synchronized (eventListeners) { - eventListeners.add(eventListener); + @Override + protected void onEventDeserializationFailure(Exception ex) { + synchronized (lifecycleListeners) { + for (LifecycleListener lifecycleListener : lifecycleListeners) { + lifecycleListener.onEventDeserializationFailure(this, ex); } } - /** - * Unregister all event listener of specific type. - */ - public void unregisterEventListener(Class listenerClass) { - synchronized (eventListeners) { - Iterator iterator = eventListeners.iterator(); - while (iterator.hasNext()) { - EventListener eventListener = iterator.next(); - if (listenerClass.isInstance(eventListener)) { - iterator.remove(); - } - } - } - } + } - /** - * Unregister single event listener. - */ - public void unregisterEventListener(EventListener eventListener) { - synchronized (eventListeners) { - eventListeners.remove(eventListener); + @Override + protected void onDisconnect() { + synchronized (lifecycleListeners) { + for (LifecycleListener lifecycleListener : lifecycleListeners) { + lifecycleListener.onDisconnect(this); } } + } + + /** + * {@link BinaryLogClient}'s event listener. + */ + public interface EventListener { + void onEvent(Event event); } /** From 620d0dfe32a81a5470cc01d5bf351749313579f9 Mon Sep 17 00:00:00 2001 From: Luis Casillas Date: Thu, 17 Apr 2014 23:57:58 -0700 Subject: [PATCH 08/10] Forgot to delete the BroadcastLifecycleListener. --- .../shyiko/mysql/binlog/BinaryLogClient.java | 86 ------------------- 1 file changed, 86 deletions(-) diff --git a/src/main/java/com/github/shyiko/mysql/binlog/BinaryLogClient.java b/src/main/java/com/github/shyiko/mysql/binlog/BinaryLogClient.java index 6f4eca77..854092ed 100644 --- a/src/main/java/com/github/shyiko/mysql/binlog/BinaryLogClient.java +++ b/src/main/java/com/github/shyiko/mysql/binlog/BinaryLogClient.java @@ -284,7 +284,6 @@ public interface LifecycleListener { /** * Called once client has successfully logged in but before started to receive binlog events. - * @param client */ void onConnect(BinaryLogClient client); @@ -302,7 +301,6 @@ public interface LifecycleListener { /** * Called upon disconnect (regardless of the reason). - * @param client */ void onDisconnect(BinaryLogClient client); } @@ -330,88 +328,4 @@ public void onDisconnect(BinaryLogClient client) { } - /** - * A {@link LifecycleListener} that rebroadcasts events to a dynamic list of children. - */ - public static class BroadcastLifecycleListener implements LifecycleListener { - private final List lifecycleListeners = new LinkedList(); - - @Override - public void onConnect(BinaryLogClient client) { - synchronized (lifecycleListeners) { - for (LifecycleListener listener : lifecycleListeners) { - listener.onConnect(client); - } - } - } - - @Override - public void onCommunicationFailure(BinaryLogClient client, Exception ex) { - synchronized (lifecycleListeners) { - for (LifecycleListener listener : lifecycleListeners) { - listener.onCommunicationFailure(client, ex); - } - } - } - - @Override - public void onEventDeserializationFailure(BinaryLogClient client, Exception ex) { - synchronized (lifecycleListeners) { - for (LifecycleListener listener : lifecycleListeners) { - listener.onEventDeserializationFailure(client, ex); - } - } - } - - @Override - public void onDisconnect(BinaryLogClient client) { - synchronized (lifecycleListeners) { - for (LifecycleListener listener : lifecycleListeners) { - listener.onDisconnect(client); - } - } - } - - /** - * @return registered lifecycle listeners - */ - public List getLifecycleListeners() { - return Collections.unmodifiableList(lifecycleListeners); - } - - /** - * Register lifecycle listener. Note that multiple lifecycle listeners will be called in order they - * where registered. - */ - public void registerLifecycleListener(LifecycleListener listener) { - synchronized (lifecycleListeners) { - lifecycleListeners.add(listener); - } - } - - /** - * Unregister all lifecycle listener of specific type. - */ - public synchronized void unregisterLifecycleListener(Class listenerClass) { - synchronized (lifecycleListeners) { - Iterator iterator = lifecycleListeners.iterator(); - while (iterator.hasNext()) { - LifecycleListener lifecycleListener = iterator.next(); - if (listenerClass.isInstance(lifecycleListener)) { - iterator.remove(); - } - } - } - } - - /** - * Unregister single lifecycle listener. - */ - public synchronized void unregisterLifecycleListener(LifecycleListener listener) { - synchronized (lifecycleListeners) { - lifecycleListeners.remove(listener); - } - } - } - } From 0f98923a4c49c80f7a8a0732bacb9cf7ad59f824 Mon Sep 17 00:00:00 2001 From: Luis Casillas Date: Fri, 18 Apr 2014 00:02:34 -0700 Subject: [PATCH 09/10] Cosmetic changes to reduce diff noise for the pull request --- .../com/github/shyiko/mysql/binlog/BinaryLogClient.java | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/main/java/com/github/shyiko/mysql/binlog/BinaryLogClient.java b/src/main/java/com/github/shyiko/mysql/binlog/BinaryLogClient.java index 854092ed..460b5d81 100644 --- a/src/main/java/com/github/shyiko/mysql/binlog/BinaryLogClient.java +++ b/src/main/java/com/github/shyiko/mysql/binlog/BinaryLogClient.java @@ -37,10 +37,10 @@ */ public class BinaryLogClient extends AbstractBinaryLogClient { + private final Logger logger = Logger.getLogger(getClass().getName()); private final List eventListeners = new LinkedList(); private final List lifecycleListeners = new LinkedList(); private ThreadFactory threadFactory; - private final Logger logger = Logger.getLogger(getClass().getName()); /** * Alias for BinaryLogClient("localhost", 3306, <no schema> = null, username, password). @@ -90,12 +90,12 @@ public void setThreadFactory(ThreadFactory threadFactory) { * Connect to the replication stream in a separate thread. * @param timeoutInMilliseconds timeout in milliseconds * @throws com.github.shyiko.mysql.binlog.network.AuthenticationException in case of failed authentication - * @throws java.io.IOException if anything goes wrong while trying to connect - * @throws java.util.concurrent.TimeoutException if client wasn't able to connect in the requested period of time + * @throws IOException if anything goes wrong while trying to connect + * @throws TimeoutException if client wasn't able to connect in the requested period of time */ public void connect(long timeoutInMilliseconds) throws IOException, TimeoutException { final CountDownLatch countDownLatch = new CountDownLatch(1); - BinaryLogClient.AbstractLifecycleListener connectListener = new BinaryLogClient.AbstractLifecycleListener() { + AbstractLifecycleListener connectListener = new BinaryLogClient.AbstractLifecycleListener() { @Override public void onConnect(BinaryLogClient client) { countDownLatch.countDown(); From 3adf1e5c7faf391c785d36700cd099f030d0d693 Mon Sep 17 00:00:00 2001 From: Luis Casillas Date: Fri, 18 Apr 2014 00:06:01 -0700 Subject: [PATCH 10/10] more cosmetics --- .../java/com/github/shyiko/mysql/binlog/BinaryLogClient.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/java/com/github/shyiko/mysql/binlog/BinaryLogClient.java b/src/main/java/com/github/shyiko/mysql/binlog/BinaryLogClient.java index 460b5d81..660e69c2 100644 --- a/src/main/java/com/github/shyiko/mysql/binlog/BinaryLogClient.java +++ b/src/main/java/com/github/shyiko/mysql/binlog/BinaryLogClient.java @@ -95,7 +95,7 @@ public void setThreadFactory(ThreadFactory threadFactory) { */ public void connect(long timeoutInMilliseconds) throws IOException, TimeoutException { final CountDownLatch countDownLatch = new CountDownLatch(1); - AbstractLifecycleListener connectListener = new BinaryLogClient.AbstractLifecycleListener() { + AbstractLifecycleListener connectListener = new AbstractLifecycleListener() { @Override public void onConnect(BinaryLogClient client) { countDownLatch.countDown();