قالب وردپرس درنا توس
Home / Apple / Reactive programming with RxAndroid in Kotlin: An introduction

Reactive programming with RxAndroid in Kotlin: An introduction



Update Note: This tutorial has been updated to Kotlin 1.3, Android 28 (Pie) and Android Studio 3.3.2 by Kyle Jablonski. This tutorial has previously been updated to Kotlin, Android 26 (Oreo) and Android Studio 3.0 Beta 5 by Irina Galata. The original tutorial was written by Artem Kholodnyi.

Reactive programming is not just another API. It is a whole new programming paradigm that deals with data streams and propagation of change. You can read the full definition of reactive programming, but you will learn more about being reactive below.

RxJava is a reactive implementation to bring this concept to the Android platform. Android applications are the perfect place to start exploring the reactive world. It is even easier with RxAndroid a library that breaks into asynchronous UI events to be more RxJava like.

Don't be afraid ̵

1; I bet you already know the basic concepts of reactive programming, even if you're not yet aware of it!

Note: This tutorial requires good knowledge of Android and Kotlin. To get up to speed, check out our Android Development Tutorials first and return to this tutorial when you are ready.

In this RxAndroid Reactive Programming Tutorial, you will learn how to do the following:

  • Address the terms
  • Define a Observe .
  • Turn asynchronous events as button cranes and text field context changes into observable structures.
  • Transform and filter observable objects.
  • Utilization of Rx Threads
  • Combine multiple observable objects into a stream.
  • Enter all your observable data into Flowable constructions.
  • Use RxJava's Maybe to add a favorite feature to the app.

I hope you're not lactose intolerant – because you're going to build a cheese-winning app that you learn to use RxJava! :]

Getting Started

Download the Ostfinder Starter Pack and open it in Android Studio 3.3.2 or later.

You work in both CheeseActivity.kt and CheeseAdapter .kt . The class CheeseActivity expands BaseSearchActivity ; take some time to explore BaseSearchActivity and check out the following ready-to-use features:

  • showProgress () : A feature to display a progress bar …
  • hideProgress () : … and a function to hide it.
  • showResults (result: List ) : A function to display a list of cheeses.
  • cheeseSearchEngine : A field that is an instance of CheeseSearchEngine . It has a search function that you call when you want to search for cheeses. It accepts a text search and returns a list of corresponding cheeses.

Build and run the project on your Android device or emulator. You should see a brilliant blank search screen:

Of course, it won't be that way forever, you'll soon start adding reactive functionality to the app. Before you make your first observable, treat yourself to some theory first.

What is reactive programming?

In imperative programming, an expression is evaluated once and the value assigned to a variable:

  was x = 2
was y = 3
where z = x * y // z is 6

x = 10
// z is still 6

On the other hand, is reactive programming about that responds to value changes .

You've probably done some reactive programming – even if you didn't understand it at the time.

  • Defining cell values ​​ in spreadsheets is similar to defining variables in important programming.
  • Defining Cell Expression in Spreadsheets is similar to defining and operating on observable data in reactive programming.

Take the following spreadsheets that implement the example above:

The spreadsheet assigns cell B1 with a value of 2, cell B2 with a value of 3 and a third cell, B3, with an expression that multiplies the value of B1 with the value of B2. When the value of one of the components referred to in the expression changes, the change is observed and the expression is automatically reconsidered in B3:

The idea of ​​reactive programming, to put it simply, is to have components which forms a larger image – which can be observed. And the program has listened to and consumes the changes when they happen.

The Difference Between RxJava and RxKotlin

As you probably know, it is possible to use Java libraries in Kotlin projects thanks to Kotlin's language compatibility with Java. If so, why was RxKotlin created in the first place? RxKotlin is a Kotlin wrapper around RxJava, which also provides many useful expansion features for reactive programming. Effectively, RxKotlin does work with RxJava, no less reactive, but much more Kotlin-y.

In this article, we will focus on using RxJava, since it is critical to understand the core concepts of this approach. But all you learn also applies to RxKotlin.

Note: Take a look at the build.gradle file and the project's dependencies in particular. Apart from the UI libraries, it contains [RxKotlin and RxAndroid packages. We do not need to specify RxJava here explicitly since RxKotlin already contains it.

RxJava Observable Contract

RxJava uses the observation pattern .

In the Observer pattern, you have objects that implement two central RxJava interfaces: Observe and Observer . When a Observable changes state, all observe objects that subscribe to it are notified. Among the methods of the [observable] interface subscribe as a Observer will call to begin the subscription.

From this point, the interface Observe three methods called Observable are needed:

  • onNext (T value) gives a new element of type T to Observer .
  • onComplete () Notice Obviously observable has finished sending items. onError (Throwable e) alerts Observer observable has experienced an error.

As a rule, a well behaved observable emits zero or more elements that can be followed by either completion or error.

It sounds complicated, but some marble charts can clear things up.

 network request

The circle represents an element that has been transmitted from the observable and the black block represents a completion or error. For example, take a network request observable

A mouse movement observable will deliver mouse coordinates, but will never complete:

 mouse coordinates

Here you can see more items that have been sent out but no block as shows that the mouse has completed or raised an error.

No more items can be sent out after an observable is completed. Here is an example of a misbehavior observable as a breaker Observable contract :

 misbehaving-stream

It is a bad, poorly observable because it violates observable contract by sending issue a topic after it signaled completion.

How to create an observable

There are many libraries that help you create observable data from almost any type of event. Sometimes you just need to roll your own. Also, it's a great way to learn about Observable pattern and reactive programming!

You want to create an observation could use Observable.create () . Here is his signature:

  Observable  create (ObservableOnSubscribe  source)

It's nice and concise, but what does that mean? What is the "source?" To understand this signature, you need to know what a ObservableOnSubscribe is. It is an interface, with this contract:

  public interface ObservableOnSubscribe  {
invalid subscribe (ObservableEmitter  e) casts Exceptions;
}

As an episode of a J.J. Abrams shows as "Lost" or "Westworld", which answers some questions while inevitably asking more. So, the "source" you need to create Observable must expose subscribe () which in turn requires what is called it to give an "emitter" as a parameter. What is an emitter then?

RxJavas Emitter Observer one:

  public interface Emitter [194590016] {
empty on Next (T value);
invalid onError (Throwable error);
void onComplete ();
}

A ObservableEmitter also provides a means to cancel the subscription.

To visualize this whole situation, think of a water tap that regulates the water flow. The water pipes are like a Observable willing to deliver a stream of water if you have a means to drain it. You construct a crane that can turn on and off, which is like a ObservableEmitter and connects it to the water pipes of Observable.create () . The result is a nice fancy tap. And of course, the crane is reactive, since once you close it, the flow of water data is no longer active. :]

An example will make the situation less abstract and clearer. It's time to make your first observable one!

Follow button click

Add the following code into the CheeseActivity class:

  // 1
private fun createButtonClickObservable (): Observable  {
// 2
return Observable.create {emitter ->
// 3
searchButton.setOnClickListener {
// 4
emitter.onNext (queryEditText.text.toString ())
}

// 5
emitter.setCancellable {
// 6
searchButton.setOnClickListener (null)
}
}
}

Your import should look like this after you enter the code above:

  Import io.reactivex.Observable
importer kotlinx.android.synthetic.main.activity_cheeses. *

You have imported the correct Observable class and you use Kotlin Android Extensions to get references to display objects.

Here is what happens in the code above

  1. You create an observable with Observable.create (19459016] and deliver it with a new ObservableOnSubscribe . You declare a function that returns an observable that will send strings.
  2. Set up a OnClickListener on searchButton .
  3. When the click event occurs, call onNext on the emitter and send the current text value of queryEditText .
  4. Keeping references can cause memory leakage in Java or Kotlin. It is a useful habit to remove listeners as soon as they are no longer needed. But what do you call when you make your own Observable ? For this reason, ObservableEmitter has setCancellable () . Override cancel () and your implementation will be called when Observable is located, for example, when Observable is completed or all Observers have subscribed from it.
  5. OnClickListener the code that removes the listener is setOnClickListener (null) .

Now that you've defined your Observable, you need to set up your subscription for it. Before doing so, you need to learn about one interface, Consumer . It is an easy way to accept values ​​coming in from an emitter.

  public interface Consumer  {
invalid acceptance (T t) casts Exceptions;
}

The interface is useful when you want to set up a single subscription to an Observable.

Obviously requires several versions of subscribe () all with different parameters. For example, you can send a full Observer if you want, but then you need to implement all necessary methods.

If everything you need out of your subscription is the observer to respond to values ​​sent to onNext () you can use the version of subscribe () which captures a single Consumer (the parameter is even named on Next to clear the connection).

You will do just that when you subscribe to the activity onStart () . Add the following code to CheeseActivity.kt :

  override fun on Start () {
super.onStart ()
// 1
select searchTextObservable = createButtonClickObservable ()

searchTextObservable
// 2
.subscribe {query ->
// 3
exhibition result (cheeseSearchEngine.search (query))
}
}

Here is an explanation of each step:

  1. First, create an observable by calling the method you just wrote.
  2. Subscribe to the observable with subscribe () and live a simple ] Consumer .
  3. Finally, perform the search and show the results.

Build and run the app. Enter some letters and press the Search button . After a simulated delay (see CheeseSearchEngine ), you should see a list of cheeses that match your request:

Sounds great! :]

RxJava Threading Model

You've had your first taste of reactive programming. However, there is one problem, but the user interface freezes for a few seconds when the search button is dropped.

You can also mark the following line in Android Monitor:

> 08-24 14: 36: 34.554 3500-3500 /com.raywenderlich.cheesefinder I / choreographer: skipped 119 photos! The program can do too much work on the main thread.

This happens because search is performed on the main thread. If search were to perform a network request, the Android will crash the app with a NetworkOnMainThreadException exception. It's time to fix it.

A popular myth about RxJava is that it is multi-threaded by default, similar to AsyncTask . But unless otherwise stated, RxJava does all the work in the same thread it was called from.

You can change this behavior with subscribeOn and observeOn operators.

] subscribeOn should only be called once in the operator chain. If it is not, the first call wins. subscribeOn specifies the thread that the observable should subscribe to (ie, created). If you are using observable data that delivers events from an Android view, make sure your subscription is done on the Android user group.

On the other hand, it is okay to call observeOn as many times as you want in the chain. specifies the thread that the next operators in the chain will perform. For example:

  myObservable // observable will be drawn on the i / o thread
.subscribeOn (Schedulers.io ())
.observeOn (AndroidSchedulers.mainThread ())
.karte {/ * this will be called on the main thread ... * /}
.doOnNext {/ * ... and everything below to the next observeOn * /}
.observeOn (Schedulers.io ())
.subscribe {/ * this will be called i / o thread * /}

The most useful planners are:

  • Schedulers.io () : Suitable for I / O bound work, such as network requests or disk operations.
  • Schedulers.computation () : Works best with computational tasks such as event loops and callback processing.
  • AndroidSchedulers.mainThread () performs the next operators on the user interface.

Map Operator

The Operator map uses a function for each element sent out by an observable and returned Another observable which outputs the results of these function calls.

If you have an observable cold number that issues the following:

 map-0

And if you use map as follows: [19659028] numbers.map {number -> number * number}

The result will be the following:

 map-1

It is a convenient way to recognize multiple items with small code. Let's put it to use!

Change onStart in the CheeseActivity class to look like this:

  override fun on Start () {)
super.onStart ()

select searchTextObservable = createButtonClickObservable ()

searchTextObservable
// 1
.subscribeOn (AndroidSchedulers.mainThread ())
// 2
.observeOn (Schedulers.io ())
// 3
.kart {cheeseSearchEngine.search (it)}
// 4
.observeOn (AndroidSchedulers.mainThread ())
.subscribe {
show result (it)
}
}

Going over the code above:

  1. First, indicate that code down the chain should start on the main thread instead of on the I / O thread. In Android, all code that works with View should be run on the main thread.
  2. Specify that the next operator should be called on the I / O thread.
  3. For each search request, return a list of results.
  4. Finally, make sure the results are sent to the list on the main thread.

Build and run your project. Now, the user interface should be responsive even when a search is in progress.

Show progress bar with doOnNext

It's time to show the progress bar!

For you to need a doOnNext operator. doOnNext takes a consumer and allows you to do something every time an item is triggered by observable.

In the same CheeseActivity class, onStart () changes to the following:

  override fun on Start () {)
super.onStart ()

select searchTextObservable = createButtonClickObservable ()

searchTextObservable
// 1
.observeOn (AndroidSchedulers.mainThread ())
// 2
.doOnNext {showProgress ()}
.observeOn (Schedulers.io ())
.kart {cheeseSearchEngine.search (it)}
.observeOn (AndroidSchedulers.mainThread ())
.subscribe {
// 3
hideProgress ()
show result (it)
}
}

Take each numbered comment in turn:

  1. Make sure the next operator in the chain runs on the main thread.
  2. Add doOnNext the operator so that shows Progress () will be called each time a new item is sent out.
  3. Don't forget to call hideProgress () when you are about to show a result.

Build and run your project. You should see that the progress bar appears when you start the search:

Observe text changes

What if you want to perform searches automatically when the user writes some text, just like Google?

First, you must subscribe to TextView text changes. Add the following function to the class CheeseActivity :

  // 1
private fun createTextChangeObservable (): Observable  {
// 2
select textChangeObservable = Observable.create  {emitter ->
// 3
choice textWatcher = item: TextWatcher {

override funny afterTextChanged (s: Editable?) = Device

override fun before TextChanged (s: CharSequence ?, start: Int, count: Int, after: Int) = Unit

// 4
override fun onTextChanged (s: CharSequence ?, start: Int, count: Int, after: Int) {
s? .toString () ?. la {emitter.onNext (it)}
}
}

// 5
queryEditText.addTextChangedListener (textWatcher)

// 6
emitter.setCancellable {
queryEditText.removeTextChangedListener (textWatcher)
}
}

// 7
return textContactObservable
}

Here is the play-for-play of each step above:

  1. Declare a function that will return an observable to text changes.

    textChangeObservable with create () ] which takes a ObservableOnSubscribe .

  2. When an observer writes a subscription, the first is to create a TextWatcher .
  3. You are not interested in beforeTextChanged () and afterTextChanged () . When the user writes and onTextChanged () triggers, you send the new text value to an observer.
  4. Add the viewer to TextView by calling addTextChangedListener () [).
  5. Don't forget to remove your caretaker. To do this, call emitter.setCancellable () and overwrite cancel () to call removeTextChangedListener ()
  6. Finally, return the created observable.

To see this observe in action, replace the statement of searchTextObservable in onStart () by CheeseActivity as follows:

  select searchTextObservable = createTextChangeObservable ( )

Build and run your app. You should see the query turn off when you start entering text in TextView :

Filter Requests by Length

It doesn't make sense to search for questions like cards as a single letter. To fix this, let's introduce the powerful filter operator. filters only those elements that satisfy a particular condition. filter records a predicate which is an interface that defines the test as the input of a given type must pass, with a boolean result. In this case, the predicate takes a String and returns true if the length of the string is two or more characters.

Replace Return TextContactObservable in createTextChangeObservable () with the following code:

  return textChangeObservable.filter {it.length> = 2}

Everything will work exactly the same except that text requests with less than less than will not be sent down the chain.

Run the app; [19659000]

Debounce Operator

You will not send a new request to the server every time the query changes with one symbol.

debounce is one of the operators showing the real power of reactive paradigm. Like the filter operator, debounce filters elements released by the observable. However, the decision to filter out the item is not based on what the item is, but based on when the item was shipped.

debounce waits for a certain time after each issue for another item. If no element comes out during this wait, the last item is finally sent out:

 719f0e58_1472502674

I createTextChangeObservable () add debounce the operator just below the filter so that the sentence return will look like the following code:

  return textContactObservable
.filter {it.length> = 2}
.debounce (1000, TimeUnit.MILLISECONDS) // add this line

Run the app. You will notice that the search only begins when you stop making quick changes:

debounce waits 1000 milliseconds before sending the last quiz.

Merge Operator

Merge Operator

] You started by creating an observable that responded to button clicks and then implemented an observable response to text field changes. But how do you react to both?

There are many operators to combine observables. fusion takes items from two or more observable and adds them to a single observable: "650" height = "296" class = "aligncenter size-large wp-image-142669" srcset = "https: // koenig-media.raywenderlich.com/uploads/2016/08/ae08759b_1472502259-650x296.png 650w, https: //koenig-media.raywenderlich.com/uploads/2016/08/ae08759b_1472502259-480x218.png 480w, https: // koenig-media.raywenderlich.com/uploads/2016/08/ae08759b_1472502259.png 809w "sizes =" Change the beginning of onStart () to the following:

  select buttonClickStream = createButtonClickObservable ()
drop textChangeStream = createTextChangeObservable ()

select searchTextObservable = Observable.merge  (buttonClickStream, textChangeStream)

Run your app. Play with the text field and the search button; The search will kick either when you have finished writing two or more symbols or when you just press the search button.

Removable

With the release of RxJava2, the frame is completely redesigned from scratch to solve any issues that were not addressed in the original library. A very important topic addressed in the update is the idea of ​​reprinting.

Backpressure is the concept that an observable emitting object is faster than the consumer can handle them. Think of the example of the Twitter fire extinguisher, which constantly sends out tweets as they are added to the twitter platform. If you were to use observable objects, which buffer elements until there is no more memory available, the app would crash and it would not be possible to use the fire hose program using them. Flowables takes this into account and allows you to specify a BackPressureStrategy to tell portable how you want the consumer to handle items shipped faster than can be used.

Backpressure Strategies:

  • BUFFER - Handles items in the same way as RxJava 1, but you can also add a buffer size.
  • DROP - Drops any objects that the consumer cannot handle.
  • ERROR - Throws an error when the downstream cannot continue.
  • LATEST - Only holds the last item sent out by onNext, overwrites the previous value.
  • MISSING - No buffering or slipping during onNext events.

Turn Observable Into Flowables

Time to transform the observables above into flowables using this new knowledge retrieval knowledge. First, consider the observability you added to your app. You have an observable that emits elements when a button is clicked and another from the keyboard input. With these two in mind you can imagine in the first case that you can use the latest strategy and in the second you can use BUFFER.

Open CheeseActivity.kt and change your observable objects to the following:

  select buttonClickStream = createButtonClickObservable ()
.Flowable (BackpressureStrategy.LATEST) // 1

drop textChangeStream = createTextChangeObservable ()
.Flowable (BackpressureStrategy.BUFFER) // 2
  1. Convert the button click stream to a removable bar using LATEST BackpressureStrategy.
  2. Convert the text input change stream to a removable port using BUFFER BackpressureStrategy.

Change, change the merging operator to use Removable as well:

  select searchTextFlowable = Flowable.merge  (buttonClickStream, textChangeStream)

Change the call to use the new searchTextFlowable value, instead of the previous Observable :

  searchTextFlowable
// 1
.observeOn (AndroidSchedulers.mainThread ())
// 2
.doOnNext {showProgress ()}
.observeOn (Schedulers.io ())
.map {cheeseSearchEngine.search (it)}
.observeOn (AndroidSchedulers.mainThread ())
.subscribe {
// 3
hideProgress ()
showResult (it)
}

Re-run the application and you should see a working app with none of the pitfalls of observables.

Maybe

A Maybe is a computation that emits either a single value, no value or an error. They are good for things like database updates and deletes.

Open the CheeseAdapter class and add the following code in onBindView:

  ] // 1
Maybe.create  {emitter ->
emitter.setCancellable {
holder.itemView.imageFavorite.setOnClickListener (zero)
}

holder.itemView.imageFavorite.setOnClickListener {
emitter.onSuccess ((it as CheckableImageView) .isChecked) // 2
}
} .toFlowable (). onBackpressureLatest () // 3
.observeOn (Schedulers.io ())
.map {isChecked ->
cheese.favorite = if (! isChecked) 1 else 0
fall database = CheeseDatabase.getInstance (holder.itemView.context) .cheeseDao ()
database.favoriteCheese (cheese) // 4
cheese.favorite // 5
}
.subscribeOn (AndroidSchedulers.mainThread ())
.subscribe {
holder.itemView.imageFavorite.isChecked = it == 1 // 6
}
  1. Emit the checked state on success
  2. Turn the Maybe into a Flowable
  3. Perform the update on the Cheeses table
  4. Return the result of the operation .
  5. Use the result from the emission to change the outline to a filled in heart.

Note: It would probably be better to use Maybe in context with a delete operation but for example purpose here you can favorite a cheese.

RxJava2 & Null

Null is no longer supported in RxJava2. Supplying null will result in a NullPointerException immediately or in a downstream signal. You can read all about this change here.

RxJava and Activity/Fragment lifecycle

Remember those setCancellable methods you set up? They won’t fire until the observable is unsubscribed.

The Observable.subscribe() call returns a Disposable. Disposable is an interface that has two methods:

public interface Disposable {
  void dispose();  // ends a subscription
  boolean isDisposed(); // returns true if resource is disposed (unsubscribed)
}

Add the following property to CheeseActivity:

private lateinit var disposable: Disposable

In onStart()set the returned value of subscribe() to disposable with the following code (only the first line changes):

disposable = searchTextObservable // change this line
      .observeOn(AndroidSchedulers.mainThread())
      .doOnNext { showProgress() }
      .observeOn(Schedulers.io())
      .map { cheeseSearchEngine.search(it) }
      .observeOn(AndroidSchedulers.mainThread())
      .subscribe {
        hideProgress()
        showResult(it)
}

Since you subscribed to the observable in onStart()onStop() would be a perfect place to unsubscribe.

Add the following code to CheeseActivity.kt:

@Override
override fun onStop() {
  super.onStop()
  if (!disposable.isDisposed) {
    disposable.dispose()
}
}

And that’s it! Build and run the app. You won’t “observe” any changes yourself, but now the app is successfully avoiding RxJava memory leaks. :]

Where to Go From Here?

You can download the final project from this tutorial here. If you want to challenge yourself a bit more you can swap out this implementation of RxJava and replace it with Room’s RxJava support which you can find more about here.

You’ve learned a lot in this tutorial. But that’s only a glimpse of the RxJava world. For example, there is RxBinding, a library that includes most of the Android View APIs. Using this library, you can create a click observable by just calling RxView.clicks(viewVariable).

To learn more about RxJava refer to the ReactiveX documentation.

If you have any comments or questions, don’t hesitate to join the discussion below!


Source link