鍍金池/ 教程/ Android/ Incrementally Agerifying legacy code
              Custom observables
              Compiled functions
              Reactive programming
              Reservoirs and parallelism
              Incrementally Agerifying legacy code
              Observables and updatables
              Compiled repositories
              Repositories

              Incrementally Agerifying legacy code

              Agera引入的代碼風格也許適合從零開始的新建app項目。這篇包括一些提示,來幫助想在遺留代碼中使用Agera的開發者,如此往下做。

              Upgrading legacy observer pattern

              --升級原有觀察者模式

              觀察者模式有很多種實現方式,但不是所有的都可以通過簡單的放入遷入到Agera中。下面是一個例子:演示將一個監聽(listenable)類添加到Observable接口的一種方法。

              MyListenable類可以增加(addListener)和刪除(removeListener)Listener,作為額外完整的演示,它繼承了SomeBaseClass。

              該實例使用UpdateDispatcher來解決Java的單繼承約束,使用一個內部類(Bridge)來做橋接, 保持其完整的原始API,同時也使Agera可見。

              public final class MyListenable extends SomeBaseClass implements Observable {
              
                private final UpdateDispatcher updateDispatcher;
              
                public MyListenable() {
                  // Original constructor code here...
                  updateDispatcher = Observables.updateDispatcher(new Bridge());
                }
              
                // Original class body here... including:
                public void addListener(Listener listener) { … }
                public void removeListener(Listener listener) { … }
              
                @Override
                public void addUpdatable(Updatable updatable) {
                  updateDispatcher.addUpdatable(updatable);
                }
              
                @Override
                public void removeUpdatable(Updatable updatable) {
                  updateDispatcher.removeUpdatable(updatable);
                }
              
                private final class Bridge implements ActivationHandler, Listener {
                  @Override
                  public void observableActivated(UpdateDispatcher caller) {
                    addListener(this);
                  }
              
                  @Override
                  public void observableDeactivated(UpdateDispatcher caller) {
                    removeListener(this);
                  }
              
                  @Override
                  public void onEvent() { // Listener implementation
                    updateDispatcher.update();
                  }
                }
              }

              Exposing synchronous operations as repositories

              --揭秘repository的同步操作

              Java本質是一種同步語言,如:在Java中最低級別的操作都是同步方法。 當操作可能會花一些時間才能返回(耗時操作),這種方法通常稱為阻塞方法,而且禁止開發者在主線程(UI Thread)調用。

              假設app的UI需要從阻塞的方法獲得數據。Agera可以很容易的通過后臺線程完成調用,然后UI可以在主線程中接收數據。首先,這個阻塞操作需要封裝成一個Agera操作,像這樣:

              public class NetworkCallingSupplier implements Supplier<Result<ResponseBlob>> {
                private final RequestBlob request = …;
              
                @Override
                public Result<ResponseBlob> get() {
                  try {
                     ResponseBlob blob = networkStack.execute(request); // blocking call
                     return Result.success(blob);
                  } catch (Throwable e) {
                     return Result.failure(e);
                  }
                }
              }
              
              Supplier<Result<ResponseBlob>> networkCall = new NetworkCallingSupplier();
              
              Repository<Result<ResponseBlob>> responseRepository =
                  Repositories.repositoryWithInitialValue(Result.<ResponseBlob>absent())
                      .observe() // no event source; works on activation
                      .onUpdatesPerLoop() // but this line is still needed to compile
                      .goTo(networkingExecutor)
                      .thenGetFrom(networkCall)
                      .compile();

              上面的代碼段假定了,在Repository.compile()之前這個請求是已知且永遠不變的。

              這個很容易升級成為一個動態請求,甚至在repository同樣的激活周期期間。

              要可以修改請求,簡單的方式是使用MutableRepository。 此外,為了在第一次請求為完成之前就可以提供數據,可以在Result中一個提供初始值:absent()。

              MutableRepository這種用法類似于是一個可變的變量(可為null),故命名為requestVariable。

              // MutableRepository<RequestBlob> requestVariable =
              //     mutableRepository(firstRequest);
              // OR:
              MutableRepository<Result<RequestBlob>> requestVariable =
                  mutableRepository(Result.<RequestBlob>absent());

              然后, 不是在supplier中封裝阻塞方法,使用function實現動態請求:

              public class NetworkCallingFunction
                  implements Function<RequestBlob, Result<ResponseBlob>> {
                @Override
                public Result<ResponseBlob> apply(RequestBlob request) {
                  try {
                     ResponseBlob blob = networkStack.execute(request);
                     return Result.success(blob);
                  } catch (Throwable e) {
                     return Result.failure(e);
                  }
                }
              }
              
              Function<RequestBlob, Result<ResponseBlob>> networkCallingFunction =
                  new NetworkCallingFunction();

              升級后的repository可以像這樣compiled:

              Result<ResponseBlob> noResponse = Result.absent();
              Function<Throwable, Result<ResponseBlob>> withNoResponse =
                  Functions.staticFunction(noResponse);
              Repository<Result<ResponseBlob>> responseRepository =
                  Repositories.repositoryWithInitialValue(noResponse)
                      .observe(requestVariable)
                      .onUpdatesPerLoop()
                      // .getFrom(requestVariable) if it does not supply Result, OR:
                      .attemptGetFrom(requestVariable).orEnd(withNoResponse)
                      .goTo(networkingExecutor)
                      .thenTransform(networkCallingFunction)
                      .compile();

              這部分代碼段還演示了一點:通過給操作一個特殊的名字,讓repository的編譯表達式更易讀。

              Wrapping asynchronous calls in repositories

              --repository的異步調用封裝

              現在的很多Library都有異步API和內置的線程,但是客戶端不能控制或禁用。

              app中有這樣的Library的話,引入Agera將是一個具有挑戰性的工作。 一個直接的辦法就是找到Library中同步選擇的點,采用[[如上段所述|Incrementally-Agerifying-legacy-code#exposing-synchronous-operations-as-repositories]]方法。

              另一個方式(反模式):切換后臺線程執行異步調用并等待結果,然后同步拿結果。上面方式不可行時,這一節討論一個合適的解決方法。

              異步調用的一個循環模式是請求-響應 結構。下面的示例假定這樣結構:未完成的工作可以被取消,但是不指定回調的線程。

              interface AsyncOperator<P, R> {
                Cancellable request(P param, Callback<R> callback);
              }
              
              interface Callback<R> {
                void onResponse(R response); // Can be called from any thread
              }
              
              interface Cancellable {
                void cancel();
              }

              下面repository例子,使用AsyncOperator提供數據, 完成響應式請求(一個抽象的supplier類)。

              這段代碼假定AsyncOperator已經有足夠的緩存,因此重復的請求不會影響性能。

              public class AsyncOperatorRepository<P, R> extends BaseObservable
                  implements Repository<Result<R>>, Callback<R> {
              
                private final AsyncOperator<P, R> asyncOperator;
                private final Supplier<P> paramSupplier;
              
                private Result<R> result;
                private Cancellable cancellable;
              
                public AsyncOperatorRepository(AsyncOperator<P, R> asyncOperator,
                    Supplier<P> paramSupplier) {
                  this.asyncOperator = asyncOperator;
                  this.paramSupplier = paramSupplier;
                  this.result = Result.absent();
                }
              
                @Override
                protected synchronized void observableActivated() {
                  cancellable = asyncOperator.request(paramSupplier.get(), this);
                }
              
                @Override
                protected synchronized void observableDeactivated() {
                  if (cancellable != null) {
                    cancellable.cancel();
                    cancellable = null;
                  }
                }
              
                @Override
                public synchronized void onResponse(R response) {
                  cancellable = null;
                  result = Result.absentIfNull(response);
                  dispatchUpdate();
                }
              
                @Override
                public synchronized Result<R> get() {
                  return result;
                }
              }

              這個類可以很容易地升級到可以修改請求參數,而這個過程就類似于前面的討論:讓repository提供請求參數,并讓AsyncOperatorRepository觀察請求參數變化。

              在激活期間,觀察請求參數的變化,取消任何正在進行的請求,并發出新的請求,如下所示:

              public class AsyncOperatorRepository<P, R> extends BaseObservable
                  implements Repository<Result<R>>, Callback<R>, Updatable {
              
                private final AsyncOperator<P, R> asyncOperator;
                private final Repository<P> paramRepository;
              
                private Result<R> result;
                private Cancellable cancellable;
              
                public AsyncOperatorRepository(AsyncOperator<P, R> asyncOperator,
                    Repository<P> paramRepository) {
                  this.asyncOperator = asyncOperator;
                  this.paramRepository = paramRepository;
                  this.result = Result.absent();
                }
              
                @Override
                protected void observableActivated() {
                  paramRepository.addUpdatable(this);
                  update();
                }
              
                @Override
                protected synchronized void observableDeactivated() {
                  paramRepository.removeUpdatable(this);
                  cancelOngoingRequestLocked();
                }
              
                @Override
                public synchronized void update() {
                  cancelOngoingRequestLocked();
                  // Adapt accordingly if paramRepository supplies a Result.
                  cancellable = asyncOperator.request(paramRepository.get(), this);
                }
              
                private void cancelOngoingRequestLocked() {
                  if (cancellable != null) {
                    cancellable.cancel();
                    cancellable = null;
                  }
                }
              
                @Override
                public synchronized void onResponse(R response) {
                  cancellable = null;
                  result = Result.absentIfNull(response);
                  dispatchUpdate();
                }
              
                // Similar process for fallible requests (typically with an
                // onError(Throwable) callback): wrap the failure in a Result and
                // dispatchUpdate().
              
                @Override
                public synchronized Result<R> get() {
                  return result;
                }
              }
              在线资源天堂www