本文整理了Java中rx.Observable.using()
方法的一些代码示例,展示了Observable.using()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Observable.using()
方法的具体详情如下:
包路径:rx.Observable
类名称:Observable
方法名:using
[英]Constructs an Observable that creates a dependent resource object.
Scheduler: using does not operate by default on a particular Scheduler.
[中]构造一个可观察对象,创建一个依赖的资源对象。
调度程序:默认情况下,使用不会在特定调度程序上运行。
代码示例来源:origin: com.github.davidmoten/rxjava-extras
public <R> Observable<R> observable(
Func1<? super T, ? extends Observable<? extends R>> observableFactory) {
return Observable.using(resourceFactory, observableFactory, disposeAction, disposeEagerly);
}
代码示例来源:origin: davidmoten/rxjava-extras
public static Observable<String> from(final Func0<Reader> readerFactory) {
Func1<Reader, Observable<String>> observableFactory = new Func1<Reader, Observable<String>>() {
@Override
public Observable<String> call(Reader reader) {
return from(reader);
}
};
return Observable.using(readerFactory, observableFactory, DisposeActionHolder.INSTANCE,
true);
}
代码示例来源:origin: com.github.davidmoten/rxjava-extras
public static Observable<String> from(final Func0<Reader> readerFactory) {
Func1<Reader, Observable<String>> observableFactory = new Func1<Reader, Observable<String>>() {
@Override
public Observable<String> call(Reader reader) {
return from(reader);
}
};
return Observable.using(readerFactory, observableFactory, DisposeActionHolder.INSTANCE,
true);
}
代码示例来源:origin: davidmoten/rxjava-extras
public <R> Observable<R> observable(
Func1<? super T, ? extends Observable<? extends R>> observableFactory) {
return Observable.using(resourceFactory, observableFactory, disposeAction, disposeEagerly);
}
代码示例来源:origin: davidmoten/rxjava-extras
public static Observable<byte[]> from(final File file, final int size) {
Func0<InputStream> resourceFactory = new Func0<InputStream>() {
@Override
public InputStream call() {
try {
return new BufferedInputStream(new FileInputStream(file), size);
} catch (FileNotFoundException e) {
throw new RuntimeException(e);
}
}
};
Func1<InputStream, Observable<byte[]>> observableFactory = new Func1<InputStream, Observable<byte[]>>() {
@Override
public Observable<byte[]> call(InputStream is) {
return from(is, size);
}
};
return Observable.using(resourceFactory, observableFactory, InputStreamCloseHolder.INSTANCE, true);
}
代码示例来源:origin: davidmoten/rxjava-extras
public <T> Observable<T> read(final Class<T> cls, final File file, final int bufferSize) {
Func0<Input> resourceFactory = new Func0<Input>() {
@Override
public Input call() {
try {
return new Input(new FileInputStream(file), bufferSize);
} catch (FileNotFoundException e) {
throw new RuntimeException(e);
}
}
};
Func1<Input, Observable<? extends T>> observableFactory = new Func1<Input, Observable<? extends T>>() {
@Override
public Observable<? extends T> call(final Input input) {
return read(cls, input, bufferSize);
}
};
Action1<Input> disposeAction = new Action1<Input>() {
@Override
public void call(Input input) {
input.close();
}
};
return Observable.using(resourceFactory, observableFactory, disposeAction, true);
}
代码示例来源:origin: com.github.davidmoten/rxjava-extras
public static Observable<byte[]> from(final File file, final int size) {
Func0<InputStream> resourceFactory = new Func0<InputStream>() {
@Override
public InputStream call() {
try {
return new BufferedInputStream(new FileInputStream(file), size);
} catch (FileNotFoundException e) {
throw new RuntimeException(e);
}
}
};
Func1<InputStream, Observable<byte[]>> observableFactory = new Func1<InputStream, Observable<byte[]>>() {
@Override
public Observable<byte[]> call(InputStream is) {
return from(is, size);
}
};
return Observable.using(resourceFactory, observableFactory, InputStreamCloseHolder.INSTANCE, true);
}
代码示例来源:origin: davidmoten/rxjava-extras
return Observable.using(resourceFactory, observableFactory, disposeAction, true);
代码示例来源:origin: com.github.davidmoten/rxjava-extras
public static Observable<ZippedEntry> unzip(final File file) {
Func0<ZipInputStream> resourceFactory = new Func0<ZipInputStream>() {
@Override
public ZipInputStream call() {
try {
return new ZipInputStream(new FileInputStream(file));
} catch (FileNotFoundException e) {
throw new RuntimeException(e);
}
}
};
Func1<ZipInputStream, Observable<ZippedEntry>> observableFactory = ZipHolder.OBSERVABLE_FACTORY;
Action1<ZipInputStream> disposeAction = ZipHolder.DISPOSER;
return Observable.using(resourceFactory, observableFactory, disposeAction);
}
代码示例来源:origin: com.github.davidmoten/rxjava-extras
public <T> Observable<T> read(final Class<T> cls, final File file, final int bufferSize) {
Func0<Input> resourceFactory = new Func0<Input>() {
@Override
public Input call() {
try {
return new Input(new FileInputStream(file), bufferSize);
} catch (FileNotFoundException e) {
throw new RuntimeException(e);
}
}
};
Func1<Input, Observable<? extends T>> observableFactory = new Func1<Input, Observable<? extends T>>() {
@Override
public Observable<? extends T> call(final Input input) {
return read(cls, input, bufferSize);
}
};
Action1<Input> disposeAction = new Action1<Input>() {
@Override
public void call(Input input) {
input.close();
}
};
return Observable.using(resourceFactory, observableFactory, disposeAction, true);
}
代码示例来源:origin: com.github.davidmoten/rxjava-extras
return Observable.using(resourceFactory, observableFactory, disposeAction, true);
代码示例来源:origin: davidmoten/rxjava-extras
public static Observable<ZippedEntry> unzip(final File file) {
Func0<ZipInputStream> resourceFactory = new Func0<ZipInputStream>() {
@Override
public ZipInputStream call() {
try {
return new ZipInputStream(new FileInputStream(file));
} catch (FileNotFoundException e) {
throw new RuntimeException(e);
}
}
};
Func1<ZipInputStream, Observable<ZippedEntry>> observableFactory = ZipHolder.OBSERVABLE_FACTORY;
Action1<ZipInputStream> disposeAction = ZipHolder.DISPOSER;
return Observable.using(resourceFactory, observableFactory, disposeAction);
}
代码示例来源:origin: au.gov.amsa.risky/ais
public static Observable<String> nmeaFrom(final File file) {
return Observable.using(
//
Checked.f0(() -> new FileInputStream(file)), is -> nmeaFrom(is),
//
is -> {
try {
is.close();
} catch (IOException e) {
// don't care
}
} , true);
}
代码示例来源:origin: au.gov.amsa.risky/ais
public static Observable<String> nmeaFromGzip(final File file) {
Func0<Reader> resourceFactory = () -> {
try {
return new InputStreamReader(new GZIPInputStream(new FileInputStream(file)), UTF8);
} catch (IOException e) {
throw new RuntimeException(e);
}
};
Func1<Reader, Observable<String>> observableFactory = reader -> Strings
.split(Strings.from(reader), "\n");
Action1<Reader> disposeAction = reader -> {
try {
reader.close();
} catch (IOException e) {
// ignore
}
};
return Observable.using(resourceFactory, observableFactory, disposeAction, true);
}
代码示例来源:origin: au.gov.amsa.risky/ihs-reader
public static Observable<Map<String, String>> extractMaps(File file, String parentElementName) {
Func0<InputStream> resourceFactory = Checked.f0(() -> new FileInputStream(file));
Func1<InputStream, Observable<Map<String, String>>> observableFactory = is -> Observable
.just(is).lift(new OperatorIhsReader(parentElementName));
Action1<InputStream> disposeAction = Checked.a1(is -> is.close());
return Observable.using(resourceFactory, observableFactory, disposeAction);
}
代码示例来源:origin: com.github.davidmoten/rxjava-extras
public static Observable<Observable<byte[]>> create(
final Func0<? extends ServerSocket> serverSocketFactory, final int timeoutMs,
final int bufferSize, Action0 preAcceptAction, int acceptTimeoutMs,
Func1<? super Socket, Boolean> acceptSocket) {
Func1<ServerSocket, Observable<Observable<byte[]>>> observableFactory = createObservableFactory(
timeoutMs, bufferSize, preAcceptAction, acceptSocket);
return Observable.<Observable<byte[]>, ServerSocket> using( //
createServerSocketFactory(serverSocketFactory, acceptTimeoutMs), //
observableFactory, //
new Action1<ServerSocket>() {
@Override
public void call(ServerSocket ss) {
try {
ss.close();
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}, true);
}
代码示例来源:origin: davidmoten/rxjava-extras
public static Observable<Observable<byte[]>> create(
final Func0<? extends ServerSocket> serverSocketFactory, final int timeoutMs,
final int bufferSize, Action0 preAcceptAction, int acceptTimeoutMs,
Func1<? super Socket, Boolean> acceptSocket) {
Func1<ServerSocket, Observable<Observable<byte[]>>> observableFactory = createObservableFactory(
timeoutMs, bufferSize, preAcceptAction, acceptSocket);
return Observable.<Observable<byte[]>, ServerSocket> using( //
createServerSocketFactory(serverSocketFactory, acceptTimeoutMs), //
observableFactory, //
new Action1<ServerSocket>() {
@Override
public void call(ServerSocket ss) {
try {
ss.close();
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}, true);
}
代码示例来源:origin: davidmoten/rxjava-extras
private static Observable<byte[]> createSocketObservable(final Socket socket, long timeoutMs,
final int bufferSize) {
setTimeout(socket, timeoutMs);
return Observable.using( //
Checked.f0(new F0<InputStream>() {
@Override
public InputStream call() throws Exception {
return socket.getInputStream();
}
}), //
new Func1<InputStream, Observable<byte[]>>() {
@Override
public Observable<byte[]> call(InputStream is) {
return Bytes.from(is, bufferSize);
}
}, //
Actions.close(), //
true);
}
代码示例来源:origin: com.github.davidmoten/rxjava-extras
private static Observable<byte[]> createSocketObservable(final Socket socket, long timeoutMs,
final int bufferSize) {
setTimeout(socket, timeoutMs);
return Observable.using( //
Checked.f0(new F0<InputStream>() {
@Override
public InputStream call() throws Exception {
return socket.getInputStream();
}
}), //
new Func1<InputStream, Observable<byte[]>>() {
@Override
public Observable<byte[]> call(InputStream is) {
return Bytes.from(is, bufferSize);
}
}, //
Actions.close(), //
true);
}
代码示例来源:origin: au.gov.amsa.risky/streams
public static Observable<String> strings(Func0<Socket> socketCreator, final Charset charset) {
Preconditions.checkNotNull(socketCreator);
Preconditions.checkNotNull(charset);
return Observable
// create a stream from a socket and dispose of socket
// appropriately
.using(socketCreator, socketObservableFactory(charset), socketDisposer(), true);
}
内容来源于网络,如有侵权,请联系作者删除!