मैंने एक फ्लक्स बनाया है जो लगातार एक नया इंटरगर मान उत्पन्न करता है। मेरे पास एक ग्राहक अपने स्वयं के धागे में चल रहा है (.publishOn(single()) )। कोई फर्क नहीं पड़ता कि मैं किस रणनीति का उपयोग करता हूं (नवीनतम या कोई अन्य), मुझे हमेशा एक ही परिणाम मिलता है:

*** Received 1 with thread single-1
>>> Generated 1 with thread main
>>> Generated 2 with thread main
>>> Generated 3 with thread main
>>> Generated 4 with thread main
>>> Generated 5 with thread main
>>> Generated 6 with thread main
>>> Generated 7 with thread main
>>> Generated 8 with thread main
>>> Generated 9 with thread main
*** Received 2 with thread single-1
*** Received 3 with thread single-1
*** Received 4 with thread single-1
*** Received 5 with thread single-1
*** Received 6 with thread single-1
*** Received 7 with thread single-1

जो मैंने समझा, latest सेट करके, मुझे केवल अंतिम पूर्णांक प्राप्त करना चाहिए .. कुछ पूर्णांक छोड़ दिए जाने चाहिए थे?

@Test
    @DisplayName("test")
    public void workingFlux() throws InterruptedException {
        CountDownLatch latch = new CountDownLatch(1);

        Flux<Integer> IntGenerator = Flux.create(e -> {
            AtomicInteger iteration = new AtomicInteger(1);
            while (iteration.intValue() < 10) {
                int value = iteration.getAndIncrement();
                e.next(value);
                if (value < 10) {
                    System.out.println(">>> Generated " + value + " with thread " + Thread.currentThread().getName());
                }
            }
        }, FluxSink.OverflowStrategy.DROP);

        IntGenerator.publishOn(single())
                .subscribe(new Subscriber<Integer>() {
                    private Subscription s;

                    @Override
                    public void onSubscribe(final Subscription subscription) {
                        s = subscription;
                        s.request(1);
                    }

                    @Override
                    public void onNext(final Integer integer) {
                        System.out.println("*** Received " + integer + " with thread " + Thread.currentThread().getName());
                        try {
                            Thread.sleep(10000);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                        s.request(1);
                    }

                    @Override
                    public void onError(final Throwable throwable) {

                    }

                    @Override
                    public void onComplete() {

                    }
                });
Flux<Integer> IntGenerator = Flux.create(e -> {
            AtomicInteger iteration = new AtomicInteger(1);
            while (iteration.intValue() < 10) {
                int value = iteration.getAndIncrement();
                e.next(value);
                if (value < 10) {
                    System.out.println(">>> Generated " + value + " with thread " + Thread.currentThread().getName());
                }
            }
        }, FluxSink.OverflowStrategy.LATEST);

        IntGenerator.publishOn(single())
                .subscribe(new Subscriber<Integer>() {
                    private Subscription s;

                    @Override
                    public void onSubscribe(final Subscription subscription) {
                        System.out.println("Subscribed");
                        s = subscription;
                        s.request(1);
                    }

                    @Override
                    public void onNext(final Integer integer) {
                        System.out.println("*** Received " + integer + " with thread " + Thread.currentThread().getName());
                        try {
                            Thread.sleep(10000);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                        s.request(1);
                    }

                    @Override
                    public void onError(final Throwable throwable) {

                    }

                    @Override
                    public void onComplete() {

                    }
                });

        latch.await(120L, TimeUnit.SECONDS);
        latch.await(120L, TimeUnit.SECONDS);
    }
0
Stéphane Traumat 11 जिंदा 2020, 01:42
आपके पास क्या सवाल है? और/या आप किस परिणाम की अपेक्षा करते हैं? कृपया अपने प्रश्न को संपादित करें स्रोत कोड को न्यूनतम प्रतिलिपि प्रस्तुत करने योग्य उदाहरण, जिसे दूसरों द्वारा संकलित और परीक्षण किया जा सकता है।
 – 
Progman
12 जिंदा 2020, 00:40

1 उत्तर

PublishOn में डिफ़ॉल्ट रूप से 32 आइटम क्षमता वाला एक आंतरिक बफर है, इसलिए आप पर्याप्त आइटम नहीं बना रहे हैं।

0
Stéphane Traumat 14 जिंदा 2020, 11:03