我有一堆表示数据的对象。这些对象可以写入相应的文件。用户可能会要求比以前写入文件的更改更快地进行某些更改。
比如说,我对文件a、文件b和文件c进行更改,然后提交执行。然后,在编写过程中,我对文件a进行更改并将其发布。例如,有3个线程在运行。一旦对a、b和c的第一次更改被执行(写入文件),对a的第一次和第二次更改将几乎同时执行。但是,我希望第二个更改在第一个更改完成后应用。
在rxjava中如何做到这一点?
还有一点。在另一个地方,我想用最新的更改运行操作。一种选择是等待所有任务完成。
是否有合适的rxjava原语/方法能够覆盖这两个用例?
我是rxjava的新手,但我希望这是有意义的。 Subjects
在我看来是相关的,但会有成百上千的文件。
我已经有了自定义的实现 Executor
.
public class OrderingExecutor
implements Executor
{
@Delegate
private final Executor delegate;
private final Map<Object, Queue<Runnable>> keyedTasks = new HashMap<>();
public OrderingExecutor(
Executor delegate)
{
this.delegate = delegate;
}
public void execute(
Runnable task,
Object key)
{
Objects.requireNonNull(key);
boolean first;
Runnable wrappedTask;
synchronized (keyedTasks)
{
Queue<Runnable> dependencyQueue = keyedTasks.get(key);
first = (dependencyQueue == null);
if (dependencyQueue == null)
{
dependencyQueue = new LinkedList<>();
keyedTasks.put(key, dependencyQueue);
}
wrappedTask = wrap(task, dependencyQueue, key);
if (!first)
{
dependencyQueue.add(wrappedTask);
}
}
// execute method can block, call it outside synchronize block
if (first)
{
delegate.execute(wrappedTask);
}
}
private Runnable wrap(
Runnable task,
Queue<Runnable> dependencyQueue,
Object key)
{
return new OrderedTask(task, dependencyQueue, key);
}
class OrderedTask
implements Runnable
{
private final Queue<Runnable> dependencyQueue;
private final Runnable task;
private final Object key;
public OrderedTask(
Runnable task,
Queue<Runnable> dependencyQueue,
Object key)
{
this.task = task;
this.dependencyQueue = dependencyQueue;
this.key = key;
}
@Override
public void run()
{
try
{
task.run();
}
finally
{
Runnable nextTask = null;
synchronized (keyedTasks)
{
if (dependencyQueue.isEmpty())
{
keyedTasks.remove(key);
}
else
{
nextTask = dependencyQueue.poll();
}
}
if (nextTask != null)
{
delegate.execute(nextTask);
}
}
}
}
}
也许有什么明智的方法可以把它插入rxjava?
1条答案
按热度按时间8yparm6h1#
现在还不完全清楚您试图在这里实现什么,但是您可以在rxjava之上分层优先级队列。