Add speed payload

This commit is contained in:
anthonyraymond 2018-02-08 00:25:18 +01:00
parent a933989eb8
commit 9502eabdb1
4 changed files with 319 additions and 233 deletions

View file

@ -1,69 +1,73 @@
package org.araymond.joal.web.messages.outgoing;
import org.araymond.joal.web.messages.outgoing.impl.announce.FailedToAnnouncePayload;
import org.araymond.joal.web.messages.outgoing.impl.announce.SuccessfullyAnnouncePayload;
import org.araymond.joal.web.messages.outgoing.impl.announce.TooManyAnnouncesFailedPayload;
import org.araymond.joal.web.messages.outgoing.impl.announce.WillAnnouncePayload;
import org.araymond.joal.web.messages.outgoing.impl.config.ConfigHasBeenLoadedPayload;
import org.araymond.joal.web.messages.outgoing.impl.config.ConfigIsInDirtyStatePayload;
import org.araymond.joal.web.messages.outgoing.impl.config.InvalidConfigPayload;
import org.araymond.joal.web.messages.outgoing.impl.config.ListOfClientFilesPayload;
import org.araymond.joal.web.messages.outgoing.impl.files.FailedToAddTorrentFilePayload;
import org.araymond.joal.web.messages.outgoing.impl.files.TorrentFileAddedPayload;
import org.araymond.joal.web.messages.outgoing.impl.files.TorrentFileDeletedPayload;
import org.araymond.joal.web.messages.outgoing.impl.global.state.GlobalSeedStartedPayload;
import org.araymond.joal.web.messages.outgoing.impl.global.state.GlobalSeedStoppedPayload;
import java.util.HashMap;
import java.util.Map;
/**
* Created by raymo on 29/06/2017.
*/
public enum StompMessageTypes {
//announce
FAILED_TO_ANNOUNCE(FailedToAnnouncePayload.class),
SUCCESSFULLY_ANNOUNCE(SuccessfullyAnnouncePayload.class),
TOO_MANY_ANNOUNCES_FAILED(TooManyAnnouncesFailedPayload.class),
WILL_ANNOUNCE(WillAnnouncePayload.class),
//config
CONFIG_HAS_BEEN_LOADED(ConfigHasBeenLoadedPayload.class),
CONFIG_IS_IN_DIRTY_STATE(ConfigIsInDirtyStatePayload.class),
INVALID_CONFIG(InvalidConfigPayload.class),
LIST_OF_CLIENT_FILES(ListOfClientFilesPayload.class),
// files
TORRENT_FILE_ADDED(TorrentFileAddedPayload.class),
TORRENT_FILE_DELETED(TorrentFileDeletedPayload.class),
FAILED_TO_ADD_TORRENT_FILE(FailedToAddTorrentFilePayload.class),
//global.state
GLOBAL_SEED_STARTED(GlobalSeedStartedPayload.class),
GLOBAL_SEED_STOPPED(GlobalSeedStoppedPayload.class);
private static final Map<Class<? extends MessagePayload>, StompMessageTypes> classToType = new HashMap<>();
private final Class<? extends MessagePayload> clazz;
static {
for (final StompMessageTypes type : StompMessageTypes.values()) {
classToType.put(type.clazz, type);
}
}
StompMessageTypes(final Class<? extends MessagePayload> clazz) {
this.clazz = clazz;
}
static StompMessageTypes typeFor(final MessagePayload payload) {
return typeFor(payload.getClass());
}
static StompMessageTypes typeFor(final Class<? extends MessagePayload> clazz) {
final StompMessageTypes type = classToType.get(clazz);
if (type == null) {
throw new IllegalArgumentException(clazz.getSimpleName() + " is not mapped with a StompMessageType.");
}
return type;
}
}
package org.araymond.joal.web.messages.outgoing;
import org.araymond.joal.web.messages.outgoing.impl.announce.FailedToAnnouncePayload;
import org.araymond.joal.web.messages.outgoing.impl.announce.SuccessfullyAnnouncePayload;
import org.araymond.joal.web.messages.outgoing.impl.announce.TooManyAnnouncesFailedPayload;
import org.araymond.joal.web.messages.outgoing.impl.announce.WillAnnouncePayload;
import org.araymond.joal.web.messages.outgoing.impl.config.ConfigHasBeenLoadedPayload;
import org.araymond.joal.web.messages.outgoing.impl.config.ConfigIsInDirtyStatePayload;
import org.araymond.joal.web.messages.outgoing.impl.config.InvalidConfigPayload;
import org.araymond.joal.web.messages.outgoing.impl.config.ListOfClientFilesPayload;
import org.araymond.joal.web.messages.outgoing.impl.files.FailedToAddTorrentFilePayload;
import org.araymond.joal.web.messages.outgoing.impl.files.TorrentFileAddedPayload;
import org.araymond.joal.web.messages.outgoing.impl.files.TorrentFileDeletedPayload;
import org.araymond.joal.web.messages.outgoing.impl.global.state.GlobalSeedStartedPayload;
import org.araymond.joal.web.messages.outgoing.impl.global.state.GlobalSeedStoppedPayload;
import org.araymond.joal.web.messages.outgoing.impl.speed.SeedingSpeedHasChangedPayload;
import java.util.HashMap;
import java.util.Map;
/**
* Created by raymo on 29/06/2017.
*/
public enum StompMessageTypes {
//announce
FAILED_TO_ANNOUNCE(FailedToAnnouncePayload.class),
SUCCESSFULLY_ANNOUNCE(SuccessfullyAnnouncePayload.class),
TOO_MANY_ANNOUNCES_FAILED(TooManyAnnouncesFailedPayload.class),
WILL_ANNOUNCE(WillAnnouncePayload.class),
//config
CONFIG_HAS_BEEN_LOADED(ConfigHasBeenLoadedPayload.class),
CONFIG_IS_IN_DIRTY_STATE(ConfigIsInDirtyStatePayload.class),
INVALID_CONFIG(InvalidConfigPayload.class),
LIST_OF_CLIENT_FILES(ListOfClientFilesPayload.class),
// files
TORRENT_FILE_ADDED(TorrentFileAddedPayload.class),
TORRENT_FILE_DELETED(TorrentFileDeletedPayload.class),
FAILED_TO_ADD_TORRENT_FILE(FailedToAddTorrentFilePayload.class),
//global.state
GLOBAL_SEED_STARTED(GlobalSeedStartedPayload.class),
GLOBAL_SEED_STOPPED(GlobalSeedStoppedPayload.class),
// speed
SEEDING_SPEED_HAS_CHANGED(SeedingSpeedHasChangedPayload.class);
private static final Map<Class<? extends MessagePayload>, StompMessageTypes> classToType = new HashMap<>();
private final Class<? extends MessagePayload> clazz;
static {
for (final StompMessageTypes type : StompMessageTypes.values()) {
classToType.put(type.clazz, type);
}
}
StompMessageTypes(final Class<? extends MessagePayload> clazz) {
this.clazz = clazz;
}
static StompMessageTypes typeFor(final MessagePayload payload) {
return typeFor(payload.getClass());
}
static StompMessageTypes typeFor(final Class<? extends MessagePayload> clazz) {
final StompMessageTypes type = classToType.get(clazz);
if (type == null) {
throw new IllegalArgumentException(clazz.getSimpleName() + " is not mapped with a StompMessageType.");
}
return type;
}
}

View file

@ -0,0 +1,41 @@
package org.araymond.joal.web.messages.outgoing.impl.speed;
import org.araymond.joal.core.events.speed.SeedingSpeedsHasChangedEvent;
import org.araymond.joal.core.torrent.torrent.InfoHash;
import org.araymond.joal.web.messages.outgoing.MessagePayload;
import java.util.AbstractMap;
import java.util.List;
import java.util.stream.Collectors;
public class SeedingSpeedHasChangedPayload implements MessagePayload {
private final List<SpeedPayload> speeds;
public SeedingSpeedHasChangedPayload(final SeedingSpeedsHasChangedEvent event) {
this.speeds = event.getSpeeds().entrySet().stream()
.map(entry -> new SpeedPayload(entry.getKey(), String.valueOf(entry.getValue().getBytesPerSeconds())))
.collect(Collectors.toList());
}
public List<SpeedPayload> getSpeeds() {
return speeds;
}
public static final class SpeedPayload {
private final InfoHash infoHash;
private final String bytesPerSeconds;
public SpeedPayload(final InfoHash infoHash, final String bytesPerSeconds) {
this.infoHash = infoHash;
this.bytesPerSeconds = bytesPerSeconds;
}
public InfoHash getInfoHash() {
return infoHash;
}
public String getBytesPerSeconds() {
return bytesPerSeconds;
}
}
}

View file

@ -1,164 +1,168 @@
package org.araymond.joal.web.services;
import org.araymond.joal.core.torrent.torrent.InfoHash;
import org.araymond.joal.web.annotations.ConditionalOnWebUi;
import org.araymond.joal.web.messages.outgoing.MessagePayload;
import org.araymond.joal.web.messages.outgoing.StompMessage;
import org.araymond.joal.web.messages.outgoing.impl.announce.AnnouncePayload;
import org.araymond.joal.web.messages.outgoing.impl.files.TorrentFileAddedPayload;
import org.araymond.joal.web.messages.outgoing.impl.files.TorrentFileDeletedPayload;
import org.springframework.messaging.MessagingException;
import org.springframework.messaging.simp.SimpMessageSendingOperations;
import org.springframework.stereotype.Service;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import static org.araymond.joal.web.messages.outgoing.StompMessageTypes.*;
/**
* Created by raymo on 29/06/2017.
*/
@ConditionalOnWebUi
@Service
public class JoalMessageSendingTemplate {
private final SimpMessageSendingOperations messageSendingOperations;
private final List<StompMessage> replayablePayloads;
private final ReentrantReadWriteLock lock;
public JoalMessageSendingTemplate(final SimpMessageSendingOperations messageSendingOperations) {
this.messageSendingOperations = messageSendingOperations;
this.replayablePayloads = new ArrayList<>(30);
lock = new ReentrantReadWriteLock(true);
}
public List<StompMessage> getReplayablePayloads() {
try {
lock.readLock().lock();
return Collections.unmodifiableList(replayablePayloads);
} finally {
lock.readLock().unlock();
}
}
public void convertAndSend(final String s, final MessagePayload payload) throws MessagingException {
final StompMessage stompMessage = StompMessage.wrap(payload);
addToReplayable(stompMessage);
messageSendingOperations.convertAndSend(s, stompMessage);
}
private void addToReplayable(final StompMessage stompMessage) {
try {
lock.writeLock().lock();
switch (stompMessage.getType()) {
case GLOBAL_SEED_STARTED: {
replayablePayloads.removeIf(message -> message.getType() == GLOBAL_SEED_STARTED || message.getType() == GLOBAL_SEED_STOPPED);
replayablePayloads.add(stompMessage);
break;
}
case GLOBAL_SEED_STOPPED: {
replayablePayloads.removeIf(message ->
message.getType() != CONFIG_HAS_BEEN_LOADED
&& message.getType() != CONFIG_IS_IN_DIRTY_STATE
&& message.getType() != LIST_OF_CLIENT_FILES
);
replayablePayloads.add(stompMessage);
break;
}
case CONFIG_HAS_BEEN_LOADED: {
replayablePayloads.removeIf(message -> message.getType() == CONFIG_HAS_BEEN_LOADED && message.getType() == CONFIG_IS_IN_DIRTY_STATE);
replayablePayloads.add(stompMessage);
break;
}
case CONFIG_IS_IN_DIRTY_STATE: {
replayablePayloads.removeIf(message -> message.getType() == CONFIG_IS_IN_DIRTY_STATE);
replayablePayloads.add(stompMessage);
break;
}
case INVALID_CONFIG: {
break;
}
case LIST_OF_CLIENT_FILES: {
replayablePayloads.removeIf(message -> message.getType() == LIST_OF_CLIENT_FILES);
replayablePayloads.add(stompMessage);
break;
}
case TORRENT_FILE_ADDED: {
replayablePayloads.add(stompMessage);
break;
}
case TORRENT_FILE_DELETED: {
replayablePayloads.removeIf(message -> {
if (message.getType() != TORRENT_FILE_ADDED) {
return false;
}
final TorrentFileDeletedPayload newMsg = (TorrentFileDeletedPayload) stompMessage.getPayload();
return ((TorrentFileAddedPayload) message.getPayload()).getInfoHash().equals(newMsg.getInfoHash());
});
break;
}
case FAILED_TO_ADD_TORRENT_FILE: {
break;
}
case WILL_ANNOUNCE: {
final InfoHash infoHash = ((AnnouncePayload) stompMessage.getPayload()).getInfoHash();
replayablePayloads.removeIf(message -> {
//noinspection SimplifiableIfStatement
if (!AnnouncePayload.class.isAssignableFrom(message.getPayload().getClass())) {
return false;
}
return ((AnnouncePayload) message.getPayload()).getInfoHash().equals(infoHash);
});
replayablePayloads.add(stompMessage);
break;
}
case SUCCESSFULLY_ANNOUNCE: {
final InfoHash infoHash = ((AnnouncePayload) stompMessage.getPayload()).getInfoHash();
replayablePayloads.removeIf(message -> {
//noinspection SimplifiableIfStatement
if (!AnnouncePayload.class.isAssignableFrom(message.getPayload().getClass())) {
return false;
}
return ((AnnouncePayload) message.getPayload()).getInfoHash().equals(infoHash);
});
replayablePayloads.add(stompMessage);
break;
}
case FAILED_TO_ANNOUNCE: {
final InfoHash infoHash = ((AnnouncePayload) stompMessage.getPayload()).getInfoHash();
replayablePayloads.removeIf(message -> {
//noinspection SimplifiableIfStatement
if (!AnnouncePayload.class.isAssignableFrom(message.getPayload().getClass())) {
return false;
}
return ((AnnouncePayload) message.getPayload()).getInfoHash().equals(infoHash);
});
replayablePayloads.add(stompMessage);
break;
}
case TOO_MANY_ANNOUNCES_FAILED: {
final InfoHash infoHash = ((AnnouncePayload) stompMessage.getPayload()).getInfoHash();
replayablePayloads.removeIf(message -> {
//noinspection SimplifiableIfStatement
if (!AnnouncePayload.class.isAssignableFrom(message.getPayload().getClass())) {
return false;
}
return ((AnnouncePayload) message.getPayload()).getInfoHash().equals(infoHash);
});
break;
}
}
} finally {
lock.writeLock().unlock();
}
}
}
package org.araymond.joal.web.services;
import org.araymond.joal.core.torrent.torrent.InfoHash;
import org.araymond.joal.web.annotations.ConditionalOnWebUi;
import org.araymond.joal.web.messages.outgoing.MessagePayload;
import org.araymond.joal.web.messages.outgoing.StompMessage;
import org.araymond.joal.web.messages.outgoing.impl.announce.AnnouncePayload;
import org.araymond.joal.web.messages.outgoing.impl.files.TorrentFileAddedPayload;
import org.araymond.joal.web.messages.outgoing.impl.files.TorrentFileDeletedPayload;
import org.springframework.messaging.MessagingException;
import org.springframework.messaging.simp.SimpMessageSendingOperations;
import org.springframework.stereotype.Service;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import static org.araymond.joal.web.messages.outgoing.StompMessageTypes.*;
/**
* Created by raymo on 29/06/2017.
*/
@ConditionalOnWebUi
@Service
public class JoalMessageSendingTemplate {
private final SimpMessageSendingOperations messageSendingOperations;
private final List<StompMessage> replayablePayloads;
private final ReentrantReadWriteLock lock;
public JoalMessageSendingTemplate(final SimpMessageSendingOperations messageSendingOperations) {
this.messageSendingOperations = messageSendingOperations;
this.replayablePayloads = new ArrayList<>(30);
lock = new ReentrantReadWriteLock(true);
}
public List<StompMessage> getReplayablePayloads() {
try {
lock.readLock().lock();
return Collections.unmodifiableList(replayablePayloads);
} finally {
lock.readLock().unlock();
}
}
public void convertAndSend(final String s, final MessagePayload payload) throws MessagingException {
final StompMessage stompMessage = StompMessage.wrap(payload);
addToReplayable(stompMessage);
messageSendingOperations.convertAndSend(s, stompMessage);
}
private void addToReplayable(final StompMessage stompMessage) {
try {
lock.writeLock().lock();
switch (stompMessage.getType()) {
case GLOBAL_SEED_STARTED: {
replayablePayloads.removeIf(message -> message.getType() == GLOBAL_SEED_STARTED || message.getType() == GLOBAL_SEED_STOPPED);
replayablePayloads.add(stompMessage);
break;
}
case GLOBAL_SEED_STOPPED: {
replayablePayloads.removeIf(message ->
message.getType() != CONFIG_HAS_BEEN_LOADED
&& message.getType() != CONFIG_IS_IN_DIRTY_STATE
&& message.getType() != LIST_OF_CLIENT_FILES
);
replayablePayloads.add(stompMessage);
break;
}
case CONFIG_HAS_BEEN_LOADED: {
replayablePayloads.removeIf(message -> message.getType() == CONFIG_HAS_BEEN_LOADED && message.getType() == CONFIG_IS_IN_DIRTY_STATE);
replayablePayloads.add(stompMessage);
break;
}
case CONFIG_IS_IN_DIRTY_STATE: {
replayablePayloads.removeIf(message -> message.getType() == CONFIG_IS_IN_DIRTY_STATE);
replayablePayloads.add(stompMessage);
break;
}
case INVALID_CONFIG: {
break;
}
case LIST_OF_CLIENT_FILES: {
replayablePayloads.removeIf(message -> message.getType() == LIST_OF_CLIENT_FILES);
replayablePayloads.add(stompMessage);
break;
}
case TORRENT_FILE_ADDED: {
replayablePayloads.add(stompMessage);
break;
}
case TORRENT_FILE_DELETED: {
replayablePayloads.removeIf(message -> {
if (message.getType() != TORRENT_FILE_ADDED) {
return false;
}
final TorrentFileDeletedPayload newMsg = (TorrentFileDeletedPayload) stompMessage.getPayload();
return ((TorrentFileAddedPayload) message.getPayload()).getInfoHash().equals(newMsg.getInfoHash());
});
break;
}
case FAILED_TO_ADD_TORRENT_FILE: {
break;
}
case WILL_ANNOUNCE: {
final InfoHash infoHash = ((AnnouncePayload) stompMessage.getPayload()).getInfoHash();
replayablePayloads.removeIf(message -> {
//noinspection SimplifiableIfStatement
if (!AnnouncePayload.class.isAssignableFrom(message.getPayload().getClass())) {
return false;
}
return ((AnnouncePayload) message.getPayload()).getInfoHash().equals(infoHash);
});
replayablePayloads.add(stompMessage);
break;
}
case SUCCESSFULLY_ANNOUNCE: {
final InfoHash infoHash = ((AnnouncePayload) stompMessage.getPayload()).getInfoHash();
replayablePayloads.removeIf(message -> {
//noinspection SimplifiableIfStatement
if (!AnnouncePayload.class.isAssignableFrom(message.getPayload().getClass())) {
return false;
}
return ((AnnouncePayload) message.getPayload()).getInfoHash().equals(infoHash);
});
replayablePayloads.add(stompMessage);
break;
}
case FAILED_TO_ANNOUNCE: {
final InfoHash infoHash = ((AnnouncePayload) stompMessage.getPayload()).getInfoHash();
replayablePayloads.removeIf(message -> {
//noinspection SimplifiableIfStatement
if (!AnnouncePayload.class.isAssignableFrom(message.getPayload().getClass())) {
return false;
}
return ((AnnouncePayload) message.getPayload()).getInfoHash().equals(infoHash);
});
replayablePayloads.add(stompMessage);
break;
}
case TOO_MANY_ANNOUNCES_FAILED: {
final InfoHash infoHash = ((AnnouncePayload) stompMessage.getPayload()).getInfoHash();
replayablePayloads.removeIf(message -> {
//noinspection SimplifiableIfStatement
if (!AnnouncePayload.class.isAssignableFrom(message.getPayload().getClass())) {
return false;
}
return ((AnnouncePayload) message.getPayload()).getInfoHash().equals(infoHash);
});
break;
}
case SEEDING_SPEED_HAS_CHANGED: {
this.replayablePayloads.removeIf(message -> message.getType() == SEEDING_SPEED_HAS_CHANGED);
this.replayablePayloads.add(stompMessage);
}
}
} finally {
lock.writeLock().unlock();
}
}
}

View file

@ -0,0 +1,37 @@
package org.araymond.joal.web.services.corelistener;
import org.araymond.joal.core.events.speed.SeedingSpeedsHasChangedEvent;
import org.araymond.joal.web.annotations.ConditionalOnWebUi;
import org.araymond.joal.web.messages.outgoing.impl.speed.SeedingSpeedHasChangedPayload;
import org.araymond.joal.web.services.JoalMessageSendingTemplate;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.event.EventListener;
import org.springframework.core.Ordered;
import org.springframework.core.annotation.Order;
import org.springframework.stereotype.Service;
import javax.inject.Inject;
/**
* Created by raymo on 25/06/2017.
*/
@ConditionalOnWebUi
@Service
public class WebSpeedEventListener extends WebEventListener {
private static final Logger logger = LoggerFactory.getLogger(WebSpeedEventListener.class);
@Inject
public WebSpeedEventListener(final JoalMessageSendingTemplate messagingTemplate) {
super(messagingTemplate);
}
@Order(Ordered.LOWEST_PRECEDENCE)
@EventListener
void failedToAnnounce(final SeedingSpeedsHasChangedEvent event) {
logger.debug("Send SeedingSpeedHasChangedPayload to clients.");
this.messagingTemplate.convertAndSend("/speed", new SeedingSpeedHasChangedPayload(event));
}
}