Bir Android geliştiriciyseniz, muhtemelen RxJava’yı duymuşsunuzdur . Android geliştirmede Reaktif Programlamayı etkinleştirmek için en çok tartışılan kitaplıklardan biridir. Mobil programlamanın doğasında bulunan eşzamanlılık / eşzamansız görevleri basitleştirmek için temel çerçeve olarak lanse edildi.

* RxJava,  bir Java VM uygulamasıdır

Neden RxJava’yı Düşünmelisiniz?

Özünde, RxJava geliştirmeyi basitleştirir çünkü iş parçacığı ile ilgili soyutlama seviyesini yükseltir . Yani, bir geliştirici olarak, farklı iş parçacıkları üzerinde gerçekleşmesi gereken işlemlerin nasıl gerçekleştirileceği konusunda çok fazla endişelenmenize gerek yok. Bu özellikle çekicidir çünkü iş parçacığı oluşturmanın doğru yapılması zordur ve doğru şekilde uygulanmazsa, en zor hataların bazılarının hata ayıklaması ve düzeltilmesine neden olabilir.

Elbette bu, konu iş parçacığı oluşturmaya geldiğinde RxJava’nın kurşun geçirmez olduğu anlamına gelmez ve perde arkasında neler olduğunu anlamak yine de önemlidir; ancak, RxJava kesinlikle hayatınızı kolaylaştırabilir.

Örneğin :

Network Call – RxJava vs AsyncTask

Diyelim ki ağ üzerinden veri almak ve bunun sonucunda kullanıcı arayüzünü güncellemek istiyoruz.

Bunu yapmanın AsyncTask yolu :

  1. AsyncTaskbizim Activity/Fragment içinde bir iç alt sınıf oluşturmak
  2. Arka planda ağ işlemini gerçekleştirmek 
  3. İşlemin sonucunu kullanıcı arayüzüne set etmek.
public class NetworkRequestTask extends AsyncTask<Void, Void, User> {

    private final int userId;

    public NetworkRequestTask(int userId) {
        this.userId = userId;
    }

    @Override protected User doInBackground(Void... params) {
        return networkService.getUser(userId);
    }

    @Override protected void onPostExecute(User user) {
        nameTextView.setText(user.getName());
        // ...set other views
    }
}
   
private void onButtonClicked(Button button) {
   new NetworkRequestTask(123).execute()
}

Zararsız görünse de, bu yaklaşımın bazı sorunları ve sınırlamaları vardır.  memory / context sızıntıları NetworkRequestTask, bir iç sınıf olduğundan ve bu nedenle dış sınıfa örtük bir atıfta bulunduğundan kolayca yaratılır .  Ayrıca, ağ aramasından sonra başka bir uzun işlemi zincirlemek istersek ne olur? AsyncTask okunabilirliği önemli ölçüde azaltabilecek iki URL’yi iç içe geçirmemiz gerekir. İç içe geçen bu yapıda erişim zorluğu oluşacaktır.

RxJava 

private Subscription subscription;

private void onButtonClicked(Button button) {
   subscription = networkService.getObservableUser(123)
                      .subscribeOn(Schedulers.io())
                      .observeOn(AndroidSchedulers.mainThread())
                      .subscribe(new Action1<User>() {
                          @Override public void call(User user) {
                              nameTextView.setText(user.getName());
                              // ... set other views
                          }
                      });
}

@Override protected void onDestroy() {
   if (subscription != null && !subscription.isUnsubscribed()) {
       subscription.unsubscribe();
   }
   super.onDestroy();
}

Bu yaklaşım sayesinde, döndürülen Subscription nesneye bir referans bağlamı olduğunda sızıntıları en aza indirebiliriz. Bu Subscription nesne ActivityFragment nesnesine bağlı olduğundan onDestroy() yönetimiyle işlem sonrası kalıcı kaldırma işlemi yapabiliriz.

Dönüş türünü kendimiz belirleyebiliriz.(getObservableUser(…) yani Observable<User>). Subscription üzerinden daha fazla ağ çağrısı yapabilme imkanımız mevcuttur.

Eş Zamanlı İşlemler

RxJava Kavramları

 RxJava  “3 O” temel yapısını temsil etmektedir. Bunlar :

  • Observable
  • Observer
  • Operator 

RxJava dünyasında her şey akış olarak modellenebilir. Bir akış, zamanla öğeler yayar(Observer) ve her emisyon tüketilebilir / gözlemlenebilir (Observable). Operatör çağrılara zincirlenerek daha fazla değiştirilebilir, dönüştürülebilir ve manipüle edilebilir .

Observable (Gözlemlenebilir)

Gözlemlenebilir, RxJava’daki akım soyutlamasıdır. Bir Yineleyiciye benzer , bir sıra verildiğinde, bu öğeleri düzenli bir şekilde yineler ve üretir. Daha sonra bir tüketici, temeldeki sıraya bakılmaksızın bu öğeleri aynı arayüz üzerinden tüketebilir.

Diyelim ki 1, 2, 3 sayılarını bu sırayla yaymak istiyoruz. Bunu yapmak için Observable.create(OnSubscribe<T>)yöntemi kullanabiliriz .

Observable<Integer> observable = Observable.create(new Observable.OnSubscribe<Integer>() {
   @Override public void call(Subscriber<? super Integer> subscriber) {
       subscriber.onNext(1);
       subscriber.onNext(2);
       subscriber.onNext(3);
       subscriber.onCompleted();
   }
});

Çağırma subscriber.onNext(Integer), akıştaki bir öğeyi yayar ve akış yayımı tamamlandığında subscriber.onCompleted()çağrılır.

Bir Gözlemlenebilir oluşturmaya yönelik bu yaklaşım oldukça ayrıntılıdır. Bu nedenle, hemen hemen her durumda tercih edilmesi gereken pratik gözlenebilir örnekler oluşturmak için uygun yöntemler vardır. Yukarda bulunan gözlemlenebilir oluşturmanın en basit yolu Observable.just(T) belirlediğiniz değişkene göre gözlemlenebilir oluşturabilirsiniz.

Observable.just(1, 2, 3);

Observer (Gözlemci)

Gözlemlenebilir(Observable) akışın bir sonraki bileşeni, ona abone olan Gözlemci’dir(Observer) (veya Gözlemcilerdir).

Gözlemciler aşağıdaki olaylarla bilgi döner.

  • Observer.onNext(T) – akıştan bir öğe yayıldığında çağrılır
  • Observable.onError(Throwable) – akış içinde bir hata oluştuğunda çağrılır
  • Observable.onCompleted() – akış öğe yaymayı bitirdiğinde çağrılır.
Observable<Integer> observable = Observable.just(1, 2, 3);
observable.subscribe(new Observer<Integer>() {
   @Override public void onCompleted() {
       Log.d("Test", "onCompleted()");
   }

   @Override public void onError(Throwable e) {
       Log.d("Test", "onError()");
   }

   @Override public void onNext(Integer integer) {
       Log.d("Test", "onNext():" + integer);
   }
});

Logcat çıktı (Test) :

onNext(): 1
onNext(): 2
onNext(): 3
onNext(): 4
onCompleted()

Bir gözlemlenebilirin(Observable) bir gözlemci(Observer) tarafından oluşan abonelik (Subscription) üzerinde işimiz bittiğinde bellekten yer kazanım sağlamak için Subscription.unsubscribe() metodu çağırılır.

Subscription subscription = someInfiniteObservable.subscribe(new Observer<Integer>() {
   @Override public void onCompleted() {
       
   }

   @Override public void onError(Throwable e) {
     
   }

   @Override public void onNext(Integer integer) {
      
   }
});

// unsubscribe 
subscription.unsubscribe();

Bu metodu Activity#onDestroy() yada Fragment#onDestroy() içinde çağırmak çok anlamlı olacaktır.

Operator (Operatör )

Bir gözlemlenebilir(Observable) tarafından yayılan öğeler, abone (Subscription) olunan Gözlemci(Observer) nesnelerine bildirilmeden önce Operatörler(Operator) aracılığıyla dönüştürülebilir, değiştirilebilir ve filtrelenebilir. İşlevsel programlamada bulunan en yaygın işlemlerden bazıları (map, filtre, küçültme, vb.). Gözlenebilir(Observable) bir akışa da uygulanabilir.

Observable.just(1, 2, 3, 4, 5).map(new Func1<Integer, Integer>() {
   @Override public Integer call(Integer integer) {
       return integer * 3;
   }
}).filter(new Func1<Integer, Boolean>() {
   @Override public Boolean call(Integer integer) {
       return integer % 2 == 0;
   }
}).subscribe(new Observer<Integer>() {
   @Override public void onCompleted() {
      
   }

   @Override public void onError(Throwable e) {
       
   }

   @Override public void onNext(Integer integer) {
       
   }
});

Yukarıdaki kod parçacığı, Gözlemlenebilirden (Observable) her bir emisyonu alır ve her birini 3 ile çarparak sırasıyla 3, 6, 9, 12, 15 akışını üretir. Bu işlem sonrasında 2 ye bölünebilenleri getirir. Yani 2’ye bölüneni filtreler.

REPO :

https://github.com/harunkor/AndroidRxJava2SimpleSample_V1