diff --git a/src/main/java/org/araymond/joal/web/messages/outgoing/StompMessageTypes.java b/src/main/java/org/araymond/joal/web/messages/outgoing/StompMessageTypes.java index b1bda5e..89607ca 100644 --- a/src/main/java/org/araymond/joal/web/messages/outgoing/StompMessageTypes.java +++ b/src/main/java/org/araymond/joal/web/messages/outgoing/StompMessageTypes.java @@ -1,69 +1,73 @@ -package org.araymond.joal.web.messages.outgoing; - -import org.araymond.joal.web.messages.outgoing.impl.announce.FailedToAnnouncePayload; -import org.araymond.joal.web.messages.outgoing.impl.announce.SuccessfullyAnnouncePayload; -import org.araymond.joal.web.messages.outgoing.impl.announce.TooManyAnnouncesFailedPayload; -import org.araymond.joal.web.messages.outgoing.impl.announce.WillAnnouncePayload; -import org.araymond.joal.web.messages.outgoing.impl.config.ConfigHasBeenLoadedPayload; -import org.araymond.joal.web.messages.outgoing.impl.config.ConfigIsInDirtyStatePayload; -import org.araymond.joal.web.messages.outgoing.impl.config.InvalidConfigPayload; -import org.araymond.joal.web.messages.outgoing.impl.config.ListOfClientFilesPayload; -import org.araymond.joal.web.messages.outgoing.impl.files.FailedToAddTorrentFilePayload; -import org.araymond.joal.web.messages.outgoing.impl.files.TorrentFileAddedPayload; -import org.araymond.joal.web.messages.outgoing.impl.files.TorrentFileDeletedPayload; -import org.araymond.joal.web.messages.outgoing.impl.global.state.GlobalSeedStartedPayload; -import org.araymond.joal.web.messages.outgoing.impl.global.state.GlobalSeedStoppedPayload; - -import java.util.HashMap; -import java.util.Map; - -/** - * Created by raymo on 29/06/2017. - */ -public enum StompMessageTypes { - //announce - FAILED_TO_ANNOUNCE(FailedToAnnouncePayload.class), - SUCCESSFULLY_ANNOUNCE(SuccessfullyAnnouncePayload.class), - TOO_MANY_ANNOUNCES_FAILED(TooManyAnnouncesFailedPayload.class), - WILL_ANNOUNCE(WillAnnouncePayload.class), - - //config - CONFIG_HAS_BEEN_LOADED(ConfigHasBeenLoadedPayload.class), - CONFIG_IS_IN_DIRTY_STATE(ConfigIsInDirtyStatePayload.class), - INVALID_CONFIG(InvalidConfigPayload.class), - LIST_OF_CLIENT_FILES(ListOfClientFilesPayload.class), - - // files - TORRENT_FILE_ADDED(TorrentFileAddedPayload.class), - TORRENT_FILE_DELETED(TorrentFileDeletedPayload.class), - FAILED_TO_ADD_TORRENT_FILE(FailedToAddTorrentFilePayload.class), - - //global.state - GLOBAL_SEED_STARTED(GlobalSeedStartedPayload.class), - GLOBAL_SEED_STOPPED(GlobalSeedStoppedPayload.class); - - private static final Map, StompMessageTypes> classToType = new HashMap<>(); - private final Class clazz; - - static { - for (final StompMessageTypes type : StompMessageTypes.values()) { - classToType.put(type.clazz, type); - } - } - - StompMessageTypes(final Class clazz) { - this.clazz = clazz; - } - - static StompMessageTypes typeFor(final MessagePayload payload) { - return typeFor(payload.getClass()); - } - - static StompMessageTypes typeFor(final Class clazz) { - final StompMessageTypes type = classToType.get(clazz); - if (type == null) { - throw new IllegalArgumentException(clazz.getSimpleName() + " is not mapped with a StompMessageType."); - } - return type; - } -} +package org.araymond.joal.web.messages.outgoing; + +import org.araymond.joal.web.messages.outgoing.impl.announce.FailedToAnnouncePayload; +import org.araymond.joal.web.messages.outgoing.impl.announce.SuccessfullyAnnouncePayload; +import org.araymond.joal.web.messages.outgoing.impl.announce.TooManyAnnouncesFailedPayload; +import org.araymond.joal.web.messages.outgoing.impl.announce.WillAnnouncePayload; +import org.araymond.joal.web.messages.outgoing.impl.config.ConfigHasBeenLoadedPayload; +import org.araymond.joal.web.messages.outgoing.impl.config.ConfigIsInDirtyStatePayload; +import org.araymond.joal.web.messages.outgoing.impl.config.InvalidConfigPayload; +import org.araymond.joal.web.messages.outgoing.impl.config.ListOfClientFilesPayload; +import org.araymond.joal.web.messages.outgoing.impl.files.FailedToAddTorrentFilePayload; +import org.araymond.joal.web.messages.outgoing.impl.files.TorrentFileAddedPayload; +import org.araymond.joal.web.messages.outgoing.impl.files.TorrentFileDeletedPayload; +import org.araymond.joal.web.messages.outgoing.impl.global.state.GlobalSeedStartedPayload; +import org.araymond.joal.web.messages.outgoing.impl.global.state.GlobalSeedStoppedPayload; +import org.araymond.joal.web.messages.outgoing.impl.speed.SeedingSpeedHasChangedPayload; + +import java.util.HashMap; +import java.util.Map; + +/** + * Created by raymo on 29/06/2017. + */ +public enum StompMessageTypes { + //announce + FAILED_TO_ANNOUNCE(FailedToAnnouncePayload.class), + SUCCESSFULLY_ANNOUNCE(SuccessfullyAnnouncePayload.class), + TOO_MANY_ANNOUNCES_FAILED(TooManyAnnouncesFailedPayload.class), + WILL_ANNOUNCE(WillAnnouncePayload.class), + + //config + CONFIG_HAS_BEEN_LOADED(ConfigHasBeenLoadedPayload.class), + CONFIG_IS_IN_DIRTY_STATE(ConfigIsInDirtyStatePayload.class), + INVALID_CONFIG(InvalidConfigPayload.class), + LIST_OF_CLIENT_FILES(ListOfClientFilesPayload.class), + + // files + TORRENT_FILE_ADDED(TorrentFileAddedPayload.class), + TORRENT_FILE_DELETED(TorrentFileDeletedPayload.class), + FAILED_TO_ADD_TORRENT_FILE(FailedToAddTorrentFilePayload.class), + + //global.state + GLOBAL_SEED_STARTED(GlobalSeedStartedPayload.class), + GLOBAL_SEED_STOPPED(GlobalSeedStoppedPayload.class), + + // speed + SEEDING_SPEED_HAS_CHANGED(SeedingSpeedHasChangedPayload.class); + + private static final Map, StompMessageTypes> classToType = new HashMap<>(); + private final Class clazz; + + static { + for (final StompMessageTypes type : StompMessageTypes.values()) { + classToType.put(type.clazz, type); + } + } + + StompMessageTypes(final Class clazz) { + this.clazz = clazz; + } + + static StompMessageTypes typeFor(final MessagePayload payload) { + return typeFor(payload.getClass()); + } + + static StompMessageTypes typeFor(final Class clazz) { + final StompMessageTypes type = classToType.get(clazz); + if (type == null) { + throw new IllegalArgumentException(clazz.getSimpleName() + " is not mapped with a StompMessageType."); + } + return type; + } +} diff --git a/src/main/java/org/araymond/joal/web/messages/outgoing/impl/speed/SeedingSpeedHasChangedPayload.java b/src/main/java/org/araymond/joal/web/messages/outgoing/impl/speed/SeedingSpeedHasChangedPayload.java new file mode 100644 index 0000000..f9ae037 --- /dev/null +++ b/src/main/java/org/araymond/joal/web/messages/outgoing/impl/speed/SeedingSpeedHasChangedPayload.java @@ -0,0 +1,41 @@ +package org.araymond.joal.web.messages.outgoing.impl.speed; + +import org.araymond.joal.core.events.speed.SeedingSpeedsHasChangedEvent; +import org.araymond.joal.core.torrent.torrent.InfoHash; +import org.araymond.joal.web.messages.outgoing.MessagePayload; + +import java.util.AbstractMap; +import java.util.List; +import java.util.stream.Collectors; + +public class SeedingSpeedHasChangedPayload implements MessagePayload { + private final List speeds; + + public SeedingSpeedHasChangedPayload(final SeedingSpeedsHasChangedEvent event) { + this.speeds = event.getSpeeds().entrySet().stream() + .map(entry -> new SpeedPayload(entry.getKey(), String.valueOf(entry.getValue().getBytesPerSeconds()))) + .collect(Collectors.toList()); + } + + public List getSpeeds() { + return speeds; + } + + public static final class SpeedPayload { + private final InfoHash infoHash; + private final String bytesPerSeconds; + + public SpeedPayload(final InfoHash infoHash, final String bytesPerSeconds) { + this.infoHash = infoHash; + this.bytesPerSeconds = bytesPerSeconds; + } + + public InfoHash getInfoHash() { + return infoHash; + } + + public String getBytesPerSeconds() { + return bytesPerSeconds; + } + } +} diff --git a/src/main/java/org/araymond/joal/web/services/JoalMessageSendingTemplate.java b/src/main/java/org/araymond/joal/web/services/JoalMessageSendingTemplate.java index 85b4c05..e61b1e2 100644 --- a/src/main/java/org/araymond/joal/web/services/JoalMessageSendingTemplate.java +++ b/src/main/java/org/araymond/joal/web/services/JoalMessageSendingTemplate.java @@ -1,164 +1,168 @@ -package org.araymond.joal.web.services; - -import org.araymond.joal.core.torrent.torrent.InfoHash; -import org.araymond.joal.web.annotations.ConditionalOnWebUi; -import org.araymond.joal.web.messages.outgoing.MessagePayload; -import org.araymond.joal.web.messages.outgoing.StompMessage; -import org.araymond.joal.web.messages.outgoing.impl.announce.AnnouncePayload; -import org.araymond.joal.web.messages.outgoing.impl.files.TorrentFileAddedPayload; -import org.araymond.joal.web.messages.outgoing.impl.files.TorrentFileDeletedPayload; -import org.springframework.messaging.MessagingException; -import org.springframework.messaging.simp.SimpMessageSendingOperations; -import org.springframework.stereotype.Service; - -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; -import java.util.concurrent.locks.ReentrantReadWriteLock; - -import static org.araymond.joal.web.messages.outgoing.StompMessageTypes.*; - -/** - * Created by raymo on 29/06/2017. - */ -@ConditionalOnWebUi -@Service -public class JoalMessageSendingTemplate { - - private final SimpMessageSendingOperations messageSendingOperations; - private final List replayablePayloads; - private final ReentrantReadWriteLock lock; - - public JoalMessageSendingTemplate(final SimpMessageSendingOperations messageSendingOperations) { - this.messageSendingOperations = messageSendingOperations; - this.replayablePayloads = new ArrayList<>(30); - lock = new ReentrantReadWriteLock(true); - } - - public List getReplayablePayloads() { - try { - lock.readLock().lock(); - return Collections.unmodifiableList(replayablePayloads); - } finally { - lock.readLock().unlock(); - } - } - - public void convertAndSend(final String s, final MessagePayload payload) throws MessagingException { - final StompMessage stompMessage = StompMessage.wrap(payload); - addToReplayable(stompMessage); - messageSendingOperations.convertAndSend(s, stompMessage); - } - - - private void addToReplayable(final StompMessage stompMessage) { - try { - lock.writeLock().lock(); - switch (stompMessage.getType()) { - case GLOBAL_SEED_STARTED: { - replayablePayloads.removeIf(message -> message.getType() == GLOBAL_SEED_STARTED || message.getType() == GLOBAL_SEED_STOPPED); - replayablePayloads.add(stompMessage); - break; - } - case GLOBAL_SEED_STOPPED: { - replayablePayloads.removeIf(message -> - message.getType() != CONFIG_HAS_BEEN_LOADED - && message.getType() != CONFIG_IS_IN_DIRTY_STATE - && message.getType() != LIST_OF_CLIENT_FILES - ); - replayablePayloads.add(stompMessage); - break; - } - case CONFIG_HAS_BEEN_LOADED: { - replayablePayloads.removeIf(message -> message.getType() == CONFIG_HAS_BEEN_LOADED && message.getType() == CONFIG_IS_IN_DIRTY_STATE); - replayablePayloads.add(stompMessage); - break; - } - case CONFIG_IS_IN_DIRTY_STATE: { - replayablePayloads.removeIf(message -> message.getType() == CONFIG_IS_IN_DIRTY_STATE); - replayablePayloads.add(stompMessage); - break; - } - case INVALID_CONFIG: { - break; - } - case LIST_OF_CLIENT_FILES: { - replayablePayloads.removeIf(message -> message.getType() == LIST_OF_CLIENT_FILES); - replayablePayloads.add(stompMessage); - break; - } - case TORRENT_FILE_ADDED: { - replayablePayloads.add(stompMessage); - break; - } - case TORRENT_FILE_DELETED: { - replayablePayloads.removeIf(message -> { - if (message.getType() != TORRENT_FILE_ADDED) { - return false; - } - final TorrentFileDeletedPayload newMsg = (TorrentFileDeletedPayload) stompMessage.getPayload(); - return ((TorrentFileAddedPayload) message.getPayload()).getInfoHash().equals(newMsg.getInfoHash()); - }); - break; - } - case FAILED_TO_ADD_TORRENT_FILE: { - break; - } - case WILL_ANNOUNCE: { - final InfoHash infoHash = ((AnnouncePayload) stompMessage.getPayload()).getInfoHash(); - replayablePayloads.removeIf(message -> { - //noinspection SimplifiableIfStatement - if (!AnnouncePayload.class.isAssignableFrom(message.getPayload().getClass())) { - return false; - } - - return ((AnnouncePayload) message.getPayload()).getInfoHash().equals(infoHash); - }); - replayablePayloads.add(stompMessage); - break; - } - case SUCCESSFULLY_ANNOUNCE: { - final InfoHash infoHash = ((AnnouncePayload) stompMessage.getPayload()).getInfoHash(); - replayablePayloads.removeIf(message -> { - //noinspection SimplifiableIfStatement - if (!AnnouncePayload.class.isAssignableFrom(message.getPayload().getClass())) { - return false; - } - - return ((AnnouncePayload) message.getPayload()).getInfoHash().equals(infoHash); - }); - replayablePayloads.add(stompMessage); - break; - } - case FAILED_TO_ANNOUNCE: { - final InfoHash infoHash = ((AnnouncePayload) stompMessage.getPayload()).getInfoHash(); - replayablePayloads.removeIf(message -> { - //noinspection SimplifiableIfStatement - if (!AnnouncePayload.class.isAssignableFrom(message.getPayload().getClass())) { - return false; - } - - return ((AnnouncePayload) message.getPayload()).getInfoHash().equals(infoHash); - }); - replayablePayloads.add(stompMessage); - break; - } - case TOO_MANY_ANNOUNCES_FAILED: { - final InfoHash infoHash = ((AnnouncePayload) stompMessage.getPayload()).getInfoHash(); - replayablePayloads.removeIf(message -> { - //noinspection SimplifiableIfStatement - if (!AnnouncePayload.class.isAssignableFrom(message.getPayload().getClass())) { - return false; - } - - return ((AnnouncePayload) message.getPayload()).getInfoHash().equals(infoHash); - }); - break; - } - } - } finally { - lock.writeLock().unlock(); - } - } - -} +package org.araymond.joal.web.services; + +import org.araymond.joal.core.torrent.torrent.InfoHash; +import org.araymond.joal.web.annotations.ConditionalOnWebUi; +import org.araymond.joal.web.messages.outgoing.MessagePayload; +import org.araymond.joal.web.messages.outgoing.StompMessage; +import org.araymond.joal.web.messages.outgoing.impl.announce.AnnouncePayload; +import org.araymond.joal.web.messages.outgoing.impl.files.TorrentFileAddedPayload; +import org.araymond.joal.web.messages.outgoing.impl.files.TorrentFileDeletedPayload; +import org.springframework.messaging.MessagingException; +import org.springframework.messaging.simp.SimpMessageSendingOperations; +import org.springframework.stereotype.Service; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.locks.ReentrantReadWriteLock; + +import static org.araymond.joal.web.messages.outgoing.StompMessageTypes.*; + +/** + * Created by raymo on 29/06/2017. + */ +@ConditionalOnWebUi +@Service +public class JoalMessageSendingTemplate { + + private final SimpMessageSendingOperations messageSendingOperations; + private final List replayablePayloads; + private final ReentrantReadWriteLock lock; + + public JoalMessageSendingTemplate(final SimpMessageSendingOperations messageSendingOperations) { + this.messageSendingOperations = messageSendingOperations; + this.replayablePayloads = new ArrayList<>(30); + lock = new ReentrantReadWriteLock(true); + } + + public List getReplayablePayloads() { + try { + lock.readLock().lock(); + return Collections.unmodifiableList(replayablePayloads); + } finally { + lock.readLock().unlock(); + } + } + + public void convertAndSend(final String s, final MessagePayload payload) throws MessagingException { + final StompMessage stompMessage = StompMessage.wrap(payload); + addToReplayable(stompMessage); + messageSendingOperations.convertAndSend(s, stompMessage); + } + + + private void addToReplayable(final StompMessage stompMessage) { + try { + lock.writeLock().lock(); + switch (stompMessage.getType()) { + case GLOBAL_SEED_STARTED: { + replayablePayloads.removeIf(message -> message.getType() == GLOBAL_SEED_STARTED || message.getType() == GLOBAL_SEED_STOPPED); + replayablePayloads.add(stompMessage); + break; + } + case GLOBAL_SEED_STOPPED: { + replayablePayloads.removeIf(message -> + message.getType() != CONFIG_HAS_BEEN_LOADED + && message.getType() != CONFIG_IS_IN_DIRTY_STATE + && message.getType() != LIST_OF_CLIENT_FILES + ); + replayablePayloads.add(stompMessage); + break; + } + case CONFIG_HAS_BEEN_LOADED: { + replayablePayloads.removeIf(message -> message.getType() == CONFIG_HAS_BEEN_LOADED && message.getType() == CONFIG_IS_IN_DIRTY_STATE); + replayablePayloads.add(stompMessage); + break; + } + case CONFIG_IS_IN_DIRTY_STATE: { + replayablePayloads.removeIf(message -> message.getType() == CONFIG_IS_IN_DIRTY_STATE); + replayablePayloads.add(stompMessage); + break; + } + case INVALID_CONFIG: { + break; + } + case LIST_OF_CLIENT_FILES: { + replayablePayloads.removeIf(message -> message.getType() == LIST_OF_CLIENT_FILES); + replayablePayloads.add(stompMessage); + break; + } + case TORRENT_FILE_ADDED: { + replayablePayloads.add(stompMessage); + break; + } + case TORRENT_FILE_DELETED: { + replayablePayloads.removeIf(message -> { + if (message.getType() != TORRENT_FILE_ADDED) { + return false; + } + final TorrentFileDeletedPayload newMsg = (TorrentFileDeletedPayload) stompMessage.getPayload(); + return ((TorrentFileAddedPayload) message.getPayload()).getInfoHash().equals(newMsg.getInfoHash()); + }); + break; + } + case FAILED_TO_ADD_TORRENT_FILE: { + break; + } + case WILL_ANNOUNCE: { + final InfoHash infoHash = ((AnnouncePayload) stompMessage.getPayload()).getInfoHash(); + replayablePayloads.removeIf(message -> { + //noinspection SimplifiableIfStatement + if (!AnnouncePayload.class.isAssignableFrom(message.getPayload().getClass())) { + return false; + } + + return ((AnnouncePayload) message.getPayload()).getInfoHash().equals(infoHash); + }); + replayablePayloads.add(stompMessage); + break; + } + case SUCCESSFULLY_ANNOUNCE: { + final InfoHash infoHash = ((AnnouncePayload) stompMessage.getPayload()).getInfoHash(); + replayablePayloads.removeIf(message -> { + //noinspection SimplifiableIfStatement + if (!AnnouncePayload.class.isAssignableFrom(message.getPayload().getClass())) { + return false; + } + + return ((AnnouncePayload) message.getPayload()).getInfoHash().equals(infoHash); + }); + replayablePayloads.add(stompMessage); + break; + } + case FAILED_TO_ANNOUNCE: { + final InfoHash infoHash = ((AnnouncePayload) stompMessage.getPayload()).getInfoHash(); + replayablePayloads.removeIf(message -> { + //noinspection SimplifiableIfStatement + if (!AnnouncePayload.class.isAssignableFrom(message.getPayload().getClass())) { + return false; + } + + return ((AnnouncePayload) message.getPayload()).getInfoHash().equals(infoHash); + }); + replayablePayloads.add(stompMessage); + break; + } + case TOO_MANY_ANNOUNCES_FAILED: { + final InfoHash infoHash = ((AnnouncePayload) stompMessage.getPayload()).getInfoHash(); + replayablePayloads.removeIf(message -> { + //noinspection SimplifiableIfStatement + if (!AnnouncePayload.class.isAssignableFrom(message.getPayload().getClass())) { + return false; + } + + return ((AnnouncePayload) message.getPayload()).getInfoHash().equals(infoHash); + }); + break; + } + case SEEDING_SPEED_HAS_CHANGED: { + this.replayablePayloads.removeIf(message -> message.getType() == SEEDING_SPEED_HAS_CHANGED); + this.replayablePayloads.add(stompMessage); + } + } + } finally { + lock.writeLock().unlock(); + } + } + +} diff --git a/src/main/java/org/araymond/joal/web/services/corelistener/WebSpeedEventListener.java b/src/main/java/org/araymond/joal/web/services/corelistener/WebSpeedEventListener.java new file mode 100644 index 0000000..bd5617d --- /dev/null +++ b/src/main/java/org/araymond/joal/web/services/corelistener/WebSpeedEventListener.java @@ -0,0 +1,37 @@ +package org.araymond.joal.web.services.corelistener; + +import org.araymond.joal.core.events.speed.SeedingSpeedsHasChangedEvent; +import org.araymond.joal.web.annotations.ConditionalOnWebUi; +import org.araymond.joal.web.messages.outgoing.impl.speed.SeedingSpeedHasChangedPayload; +import org.araymond.joal.web.services.JoalMessageSendingTemplate; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.context.event.EventListener; +import org.springframework.core.Ordered; +import org.springframework.core.annotation.Order; +import org.springframework.stereotype.Service; + +import javax.inject.Inject; + +/** + * Created by raymo on 25/06/2017. + */ +@ConditionalOnWebUi +@Service +public class WebSpeedEventListener extends WebEventListener { + private static final Logger logger = LoggerFactory.getLogger(WebSpeedEventListener.class); + + @Inject + public WebSpeedEventListener(final JoalMessageSendingTemplate messagingTemplate) { + super(messagingTemplate); + } + + @Order(Ordered.LOWEST_PRECEDENCE) + @EventListener + void failedToAnnounce(final SeedingSpeedsHasChangedEvent event) { + logger.debug("Send SeedingSpeedHasChangedPayload to clients."); + + this.messagingTemplate.convertAndSend("/speed", new SeedingSpeedHasChangedPayload(event)); + } + +}