40
loading...
This website collects cookies to deliver better user experience
import com.rds.demo.api.Message;
public class MyMessage implements Message {
}
public interface MessageBus<T> {
void publish(T message);
void subscribe(Observer<T> observer);
}
@Component(service = MessageBus.class, scope = ServiceScope.SINGLETON)
public class MessageBusProvider<T extends Message> implements MessageBus<T> {
private final PublishSubject<T> publishSubject = PublishSubject.create();
@Override
public void publish(T message) {
publishSubject.onNext(message);
}
@Override
public void subscribe(Observer<T> observer) {
publishSubject.subscribe(observer);
}
}
@Test
public void publishMessageTest() {
TestObserver<Message> test = TestObserver.create();
MessageBus<Message> bus = new MessageBusProvider<>();
bus.subscribe(test);
bus.publish(new TestMessage());
test.assertValueCount(1);
test.assertNoErrors();
test.assertNotComplete();
}
@Override
public void subscribe(Observer<T> observer) {
publishSubject
.subscribeOn(schedulers.computation())
.subscribe(observer);
}
public interface SchedulersFactory {
Scheduler blocking();
Scheduler pooled();
}
@Component(service = MessageBus.class, scope = ServiceScope.SINGLETON)
public class MessageBusProvider<T extends Message> implements MessageBus<T> {
private final PublishSubject<T> publishSubject = PublishSubject.create();
@Reference
private SchedulersFactory schedulersFactory;
public MessageBusProvider() {
}
public MessageBusProvider(SchedulersFactory schedulersFactory) {
this.schedulersFactory = schedulersFactory;
}
@Override
public void publish(T message) {
publishSubject.onNext(message);
}
@Override
public void subscribe(Observer<T> observer) {
publishSubject.subscribeOn(schedulersFactory.pooled()).subscribe(observer);
}
}
public class TestSchedulersFactory implements SchedulersFactory {
@Override
public Scheduler pooled() {
return Schedulers.trampoline();
}
@Override
public Scheduler blocking() {
return Schedulers.trampoline();
}
}
MessageBus<Message> bus = new MessageBusProvider<>(new TestSchedulersFactory());
public @interface SchedulersFactoryProviderConfiguration {
@AttributeDefinition(
name = ".blocking",
type = AttributeType.STRING,
description = "Blocking scheduler",
required = false,
options = {
@Option(label = "io", value = "IO"),
@Option(label = "computation", value = "COMPUTATION")
}
)
String _blocking() default "IO";
@AttributeDefinition(
name = ".pooled",
type = AttributeType.STRING,
description = "Pooled scheduler",
required = false,
options = {
@Option(label = "io", value = "IO"),
@Option(label = "computation", value = "COMPUTATION")
}
)
String _pooled() default "COMPUTATION";
}
@Component(
name = "com.rds.reactive.provider.schedulers.provider",
service = SchedulersFactory.class,
configurationPolicy = ConfigurationPolicy.OPTIONAL)
@Designate(
ocd = SchedulersFactoryProviderConfiguration.class
)
public class SchedulersFactoryProvider implements SchedulersFactory {
private Scheduler pooled;
private Scheduler blocking;
@Activate
public void activate(SchedulersFactoryProviderConfiguration cfg) {
System.out.println("ACTIVATE");
blocking = SchedulerType.get(cfg._blocking()).value();
pooled = SchedulerType.get(cfg._pooled()).value();
}
@Override
public Scheduler blocking() {
return blocking;
}
@Override
public Scheduler pooled() {
return pooled;
}
}
public enum SchedulerType {
IO(Schedulers.io()),
COMPUTATION(Schedulers.computation()),
TRAMPOLINE(Schedulers.trampoline());
private final Scheduler scheduler;
SchedulerType(Scheduler scheduler) {
this.scheduler = scheduler;
}
Scheduler value() {
return scheduler;
}
private static final Map<String, SchedulerType> ENUM_MAP;
static {
Map<String, SchedulerType> map = new HashMap<>();
for (SchedulerType instance : SchedulerType.values()) {
map.put(instance.name(), instance);
}
ENUM_MAP = Collections.unmodifiableMap(map);
}
public static SchedulerType get(String name) {
return ENUM_MAP.getOrDefault(name, COMPUTATION);
}
}
@Reference
private SchedulersFactory schedulersFactory;
-runproperties: \
felix.fileinstall.dir=${.}/config
config/com.rds.reactive.provider.schedulers.provider.cfg
.blocking=TRAMPOILNE
.pooled=TRAMPOLINE
public class MessageBusTest {
private final BundleContext context = FrameworkUtil.getBundle(MessageBusTest.class).getBundleContext();
private ServiceTracker<MessageBus, MessageBus> messageBusMessageBusServiceTracker;
private MessageBus bus;
@Before
public void before() throws InterruptedException {
messageBusMessageBusServiceTracker = new ServiceTracker<MessageBus, MessageBus>(context, MessageBus.class, null);
messageBusMessageBusServiceTracker.open();
bus = messageBusMessageBusServiceTracker.waitForService(500);
Assert.assertNotNull(bus);
}
@After
public void after() {
messageBusMessageBusServiceTracker.close();
}
@Test
public void canPublishToBusTest() {
TestObserver<Message> test = TestObserver.create();
bus.subscribe(test);
bus.publish(new TestMessage());
test.assertValueCount(1);
test.assertNotComplete();
test.assertNoErrors();
}
@Test
public void multipleMessagesTest() {
TestObserver<Message> test = TestObserver.create();
bus.subscribe(test);
// it is critical to set the scheduler here otherwise it is using the inmternal computation scheduler.
Observable.interval(10, TimeUnit.MILLISECONDS, Schedulers.trampoline())
.take(10)
.map(t -> new TestMessage())
.subscribe(testMessage -> bus.publish(testMessage));
test.assertValueCount(10);
test.assertNotComplete();
test.assertNoErrors();
}
}
Observable.interval(10, TimeUnit.MILLISECONDS, Schedulers.trampoline())