rx.Observable.using()方法的使用及代码示例

x33g5p2x  于2022-01-25 转载在 其他  
字(10.1k)|赞(0)|评价(0)|浏览(243)

本文整理了Java中rx.Observable.using()方法的一些代码示例,展示了Observable.using()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Observable.using()方法的具体详情如下:
包路径:rx.Observable
类名称:Observable
方法名:using

Observable.using介绍

[英]Constructs an Observable that creates a dependent resource object.

Scheduler: using does not operate by default on a particular Scheduler.
[中]构造一个可观察对象,创建一个依赖的资源对象。
调度程序:默认情况下,使用不会在特定调度程序上运行。

代码示例

代码示例来源:origin: com.github.davidmoten/rxjava-extras

public <R> Observable<R> observable(
    Func1<? super T, ? extends Observable<? extends R>> observableFactory) {
  return Observable.using(resourceFactory, observableFactory, disposeAction, disposeEagerly);
}

代码示例来源:origin: davidmoten/rxjava-extras

public static Observable<String> from(final Func0<Reader> readerFactory) {
  Func1<Reader, Observable<String>> observableFactory = new Func1<Reader, Observable<String>>() {
    @Override
    public Observable<String> call(Reader reader) {
      return from(reader);
    }
  };
  return Observable.using(readerFactory, observableFactory, DisposeActionHolder.INSTANCE,
      true);
}

代码示例来源:origin: com.github.davidmoten/rxjava-extras

public static Observable<String> from(final Func0<Reader> readerFactory) {
  Func1<Reader, Observable<String>> observableFactory = new Func1<Reader, Observable<String>>() {
    @Override
    public Observable<String> call(Reader reader) {
      return from(reader);
    }
  };
  return Observable.using(readerFactory, observableFactory, DisposeActionHolder.INSTANCE,
      true);
}

代码示例来源:origin: davidmoten/rxjava-extras

public <R> Observable<R> observable(
    Func1<? super T, ? extends Observable<? extends R>> observableFactory) {
  return Observable.using(resourceFactory, observableFactory, disposeAction, disposeEagerly);
}

代码示例来源:origin: davidmoten/rxjava-extras

public static Observable<byte[]> from(final File file, final int size) {
  Func0<InputStream> resourceFactory = new Func0<InputStream>() {
    @Override
    public InputStream call() {
      try {
        return new BufferedInputStream(new FileInputStream(file), size);
      } catch (FileNotFoundException e) {
        throw new RuntimeException(e);
      }
    }
  };
  Func1<InputStream, Observable<byte[]>> observableFactory = new Func1<InputStream, Observable<byte[]>>() {
    @Override
    public Observable<byte[]> call(InputStream is) {
      return from(is, size);
    }
  };
  return Observable.using(resourceFactory, observableFactory, InputStreamCloseHolder.INSTANCE, true);
}

代码示例来源:origin: davidmoten/rxjava-extras

public <T> Observable<T> read(final Class<T> cls, final File file, final int bufferSize) {
  Func0<Input> resourceFactory = new Func0<Input>() {
    @Override
    public Input call() {
      try {
        return new Input(new FileInputStream(file), bufferSize);
      } catch (FileNotFoundException e) {
        throw new RuntimeException(e);
      }
    }
  };
  Func1<Input, Observable<? extends T>> observableFactory = new Func1<Input, Observable<? extends T>>() {
    @Override
    public Observable<? extends T> call(final Input input) {
      return read(cls, input, bufferSize);
    }
  };
  Action1<Input> disposeAction = new Action1<Input>() {
    @Override
    public void call(Input input) {
      input.close();
    }
  };
  return Observable.using(resourceFactory, observableFactory, disposeAction, true);
}

代码示例来源:origin: com.github.davidmoten/rxjava-extras

public static Observable<byte[]> from(final File file, final int size) {
  Func0<InputStream> resourceFactory = new Func0<InputStream>() {
    @Override
    public InputStream call() {
      try {
        return new BufferedInputStream(new FileInputStream(file), size);
      } catch (FileNotFoundException e) {
        throw new RuntimeException(e);
      }
    }
  };
  Func1<InputStream, Observable<byte[]>> observableFactory = new Func1<InputStream, Observable<byte[]>>() {
    @Override
    public Observable<byte[]> call(InputStream is) {
      return from(is, size);
    }
  };
  return Observable.using(resourceFactory, observableFactory, InputStreamCloseHolder.INSTANCE, true);
}

代码示例来源:origin: davidmoten/rxjava-extras

return Observable.using(resourceFactory, observableFactory, disposeAction, true);

代码示例来源:origin: com.github.davidmoten/rxjava-extras

public static Observable<ZippedEntry> unzip(final File file) {
  Func0<ZipInputStream> resourceFactory = new Func0<ZipInputStream>() {
    @Override
    public ZipInputStream call() {
      try {
        return new ZipInputStream(new FileInputStream(file));
      } catch (FileNotFoundException e) {
        throw new RuntimeException(e);
      }
    }
  };
  Func1<ZipInputStream, Observable<ZippedEntry>> observableFactory = ZipHolder.OBSERVABLE_FACTORY;
  Action1<ZipInputStream> disposeAction = ZipHolder.DISPOSER;
  return Observable.using(resourceFactory, observableFactory, disposeAction);
}

代码示例来源:origin: com.github.davidmoten/rxjava-extras

public <T> Observable<T> read(final Class<T> cls, final File file, final int bufferSize) {
  Func0<Input> resourceFactory = new Func0<Input>() {
    @Override
    public Input call() {
      try {
        return new Input(new FileInputStream(file), bufferSize);
      } catch (FileNotFoundException e) {
        throw new RuntimeException(e);
      }
    }
  };
  Func1<Input, Observable<? extends T>> observableFactory = new Func1<Input, Observable<? extends T>>() {
    @Override
    public Observable<? extends T> call(final Input input) {
      return read(cls, input, bufferSize);
    }
  };
  Action1<Input> disposeAction = new Action1<Input>() {
    @Override
    public void call(Input input) {
      input.close();
    }
  };
  return Observable.using(resourceFactory, observableFactory, disposeAction, true);
}

代码示例来源:origin: com.github.davidmoten/rxjava-extras

return Observable.using(resourceFactory, observableFactory, disposeAction, true);

代码示例来源:origin: davidmoten/rxjava-extras

public static Observable<ZippedEntry> unzip(final File file) {
  Func0<ZipInputStream> resourceFactory = new Func0<ZipInputStream>() {
    @Override
    public ZipInputStream call() {
      try {
        return new ZipInputStream(new FileInputStream(file));
      } catch (FileNotFoundException e) {
        throw new RuntimeException(e);
      }
    }
  };
  Func1<ZipInputStream, Observable<ZippedEntry>> observableFactory = ZipHolder.OBSERVABLE_FACTORY;
  Action1<ZipInputStream> disposeAction = ZipHolder.DISPOSER;
  return Observable.using(resourceFactory, observableFactory, disposeAction);
}

代码示例来源:origin: au.gov.amsa.risky/ais

public static Observable<String> nmeaFrom(final File file) {
  return Observable.using(
      //
      Checked.f0(() -> new FileInputStream(file)), is -> nmeaFrom(is),
      //
      is -> {
        try {
          is.close();
        } catch (IOException e) {
          // don't care
        }
      } , true);
}

代码示例来源:origin: au.gov.amsa.risky/ais

public static Observable<String> nmeaFromGzip(final File file) {
  Func0<Reader> resourceFactory = () -> {
    try {
      return new InputStreamReader(new GZIPInputStream(new FileInputStream(file)), UTF8);
    } catch (IOException e) {
      throw new RuntimeException(e);
    }
  };
  Func1<Reader, Observable<String>> observableFactory = reader -> Strings
      .split(Strings.from(reader), "\n");
  Action1<Reader> disposeAction = reader -> {
    try {
      reader.close();
    } catch (IOException e) {
      // ignore
    }
  };
  return Observable.using(resourceFactory, observableFactory, disposeAction, true);
}

代码示例来源:origin: au.gov.amsa.risky/ihs-reader

public static Observable<Map<String, String>> extractMaps(File file, String parentElementName) {
  Func0<InputStream> resourceFactory = Checked.f0(() -> new FileInputStream(file));
  Func1<InputStream, Observable<Map<String, String>>> observableFactory = is -> Observable
      .just(is).lift(new OperatorIhsReader(parentElementName));
  Action1<InputStream> disposeAction = Checked.a1(is -> is.close());
  return Observable.using(resourceFactory, observableFactory, disposeAction);
}

代码示例来源:origin: com.github.davidmoten/rxjava-extras

public static Observable<Observable<byte[]>> create(
    final Func0<? extends ServerSocket> serverSocketFactory, final int timeoutMs,
    final int bufferSize, Action0 preAcceptAction, int acceptTimeoutMs,
    Func1<? super Socket, Boolean> acceptSocket) {
  Func1<ServerSocket, Observable<Observable<byte[]>>> observableFactory = createObservableFactory(
      timeoutMs, bufferSize, preAcceptAction, acceptSocket);
  return Observable.<Observable<byte[]>, ServerSocket> using( //
      createServerSocketFactory(serverSocketFactory, acceptTimeoutMs), //
      observableFactory, //
      new Action1<ServerSocket>() {
        @Override
        public void call(ServerSocket ss) {
          try {
            ss.close();
          } catch (IOException e) {
            throw new RuntimeException(e);
          }
        }
      }, true);
}

代码示例来源:origin: davidmoten/rxjava-extras

public static Observable<Observable<byte[]>> create(
    final Func0<? extends ServerSocket> serverSocketFactory, final int timeoutMs,
    final int bufferSize, Action0 preAcceptAction, int acceptTimeoutMs,
    Func1<? super Socket, Boolean> acceptSocket) {
  Func1<ServerSocket, Observable<Observable<byte[]>>> observableFactory = createObservableFactory(
      timeoutMs, bufferSize, preAcceptAction, acceptSocket);
  return Observable.<Observable<byte[]>, ServerSocket> using( //
      createServerSocketFactory(serverSocketFactory, acceptTimeoutMs), //
      observableFactory, //
      new Action1<ServerSocket>() {
        @Override
        public void call(ServerSocket ss) {
          try {
            ss.close();
          } catch (IOException e) {
            throw new RuntimeException(e);
          }
        }
      }, true);
}

代码示例来源:origin: davidmoten/rxjava-extras

private static Observable<byte[]> createSocketObservable(final Socket socket, long timeoutMs,
    final int bufferSize) {
  setTimeout(socket, timeoutMs);
  return Observable.using( //
      Checked.f0(new F0<InputStream>() {
        @Override
        public InputStream call() throws Exception {
          return socket.getInputStream();
        }
      }), //
      new Func1<InputStream, Observable<byte[]>>() {
        @Override
        public Observable<byte[]> call(InputStream is) {
          return Bytes.from(is, bufferSize);
        }
      }, //
      Actions.close(), //
      true);
}

代码示例来源:origin: com.github.davidmoten/rxjava-extras

private static Observable<byte[]> createSocketObservable(final Socket socket, long timeoutMs,
    final int bufferSize) {
  setTimeout(socket, timeoutMs);
  return Observable.using( //
      Checked.f0(new F0<InputStream>() {
        @Override
        public InputStream call() throws Exception {
          return socket.getInputStream();
        }
      }), //
      new Func1<InputStream, Observable<byte[]>>() {
        @Override
        public Observable<byte[]> call(InputStream is) {
          return Bytes.from(is, bufferSize);
        }
      }, //
      Actions.close(), //
      true);
}

代码示例来源:origin: au.gov.amsa.risky/streams

public static Observable<String> strings(Func0<Socket> socketCreator, final Charset charset) {
  Preconditions.checkNotNull(socketCreator);
  Preconditions.checkNotNull(charset);
  return Observable
      // create a stream from a socket and dispose of socket
      // appropriately
      .using(socketCreator, socketObservableFactory(charset), socketDisposer(), true);
}

相关文章

Observable类方法