socketchannel.write()尝试写入大缓冲区时抛出outofmemoryerror

brtdzjyr  于 2021-07-09  发布在  Java
关注(0)|答案(1)|浏览(424)

我的代码在运行以下行时抛出outofmemoryerror:

int numBytes = socketChannel.write(_send_buffer);

哪里 socketChannel 是java.nio.channels.socketchannel的示例
以及 _send_buffer 是java.nio.bytebuffer的示例
代码通过非阻塞选择器写入操作到达这一点,当 _send_buffer 它很大。我对代码没有任何问题 _send_buffer 小于20mb,但当尝试使用更大的缓冲区(例如>100mb)进行测试时失败。
根据java.nio.channels.socketchannel.write()的文档:
尝试将最多r个字节写入通道,其中r是调用此方法时缓冲区中剩余的字节数,即src.remaining()。假设写入长度为n的字节序列,其中0<=n<=r。这个字节序列将从索引p开始从缓冲区传输,其中p是调用这个方法时缓冲区的位置;最后写入的字节的索引将是p+n-1。返回时,缓冲区的位置将等于p+n;它的极限不会改变。除非另有规定,否则只有在写入所有r请求的字节后,写入操作才会返回。某些类型的通道,根据它们的状态,可能只写入部分字节,也可能根本不写入。例如,非阻塞模式下的套接字通道不能写入比套接字输出缓冲区中可用字节更多的字节。
我的通道应该设置为非阻塞的,所以我认为写操作应该只尝试写入套接字输出缓冲区的容量。因为我之前没有指定这个,所以我尝试通过setoption方法和sou sndbuf选项将它设置为1024字节。即:

socketChannel.setOption(SO_SNDBUF, 1024);

尽管我仍然在摆脱记忆中的错误。以下是完整的错误消息:

2021-04-22 11:52:44.260 11591-11733/jp.oist.abcvlib.serverLearning I/.serverLearnin: Clamp target GC heap from 195MB to 192MB
2021-04-22 11:52:44.260 11591-11733/jp.oist.abcvlib.serverLearning I/.serverLearnin: Alloc concurrent copying GC freed 2508(64KB) AllocSpace objects, 0(0B) LOS objects, 10% free, 171MB/192MB, paused 27us total 12.714ms
2021-04-22 11:52:44.261 11591-11733/jp.oist.abcvlib.serverLearning W/.serverLearnin: Throwing OutOfMemoryError "Failed to allocate a 49915610 byte allocation with 21279560 free bytes and 20MB until OOM, target footprint 201326592, growth limit 201326592" (VmSize 5585608 kB)
2021-04-22 11:52:44.261 11591-11733/jp.oist.abcvlib.serverLearning I/.serverLearnin: Starting a blocking GC Alloc
2021-04-22 11:52:44.261 11591-11733/jp.oist.abcvlib.serverLearning I/.serverLearnin: Starting a blocking GC Alloc

现在我可以内联调试并在写行停止,并且没有崩溃,所以我相信处理的内存需求没有问题 _send_buffer 但是当试图写的时候,后台的一些东西正在创建另一个分配,这个分配太多了。
也许我想的不对,需要限制我的生活 _send_buffer 我认为应该有一种方法来限制write命令所做的分配no?或者至少可以通过某种方式为我的应用程序分配更多的android内存。我用的是一个像素3a,根据规格它应该有4gb的内存。现在我意识到这必须与系统的其他部分共享,但这是一个基本的测试设备(没有安装游戏、个人应用程序等),所以我假设我应该可以访问相当大一部分4gb。由于我的增长极限是201326592(根据上面的logcat),所以我觉得奇怪的是,我的增长极限是规范内存的0.2/4.0=5%。
任何关于我的方法中的一个基本缺陷的正确方向的提示,或者避免outofmemory错误的建议都将不胜感激!

编辑1:

根据注解的请求添加一些代码上下文。注意这不是一个可运行的示例,因为代码库非常大,而且由于公司的策略,我不允许共享所有代码。请注意 _send_buffer is与socketchannel本身的sendbuffer无关(即getsendbuffersize引用的内容,它只是一个bytebuffer,在通过通道发送之前,我使用它将所有内容捆绑在一起。因为我不能共享所有与生成 _send_buffer 请注意,它是一个bytebuffer,可以非常大(>100mb)。如果这是一个根本的问题,那么请指出这一点和原因。
因此,考虑到上面的内容,nio相关的代码粘贴在下面。注意,这是非常原型的alpha代码,所以我为注解和日志语句的过载道歉。

socketconnectionmanager.java文件

(基本上是一个负责选择器的runnable)
注意 sendMsgToServer 方法被重写(没有修改)并从主android活动(未显示)调用。这个 byte[] episode arg是被 Package 成 ByteBufferSocketMessage.java (下一节)以后会放进 _send_buffer 中的示例 write 方法 SocketMessage.java .

package jp.oist.abcvlib.util;

import android.util.Log;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketOption;
import java.nio.channels.CancelledKeyException;
import java.nio.channels.ClosedSelectorException;
import java.nio.channels.IllegalBlockingModeException;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.Set;

import static java.net.StandardSocketOptions.SO_SNDBUF;

public class SocketConnectionManager implements Runnable{

    private SocketChannel sc;
    private Selector selector;
    private SocketListener socketListener;
    private final String TAG = "SocketConnectionManager";
    private SocketMessage socketMessage;
    private final String serverIp;
    private final int serverPort;

    public SocketConnectionManager(SocketListener socketListener, String serverIp, int serverPort){
        this.socketListener = socketListener;
        this.serverIp = serverIp;
        this.serverPort = serverPort;
    }

    @Override
    public void run() {
        try {
            selector = Selector.open();
            start_connection(serverIp, serverPort);
            do {
                int eventCount = selector.select(0);
                Set<SelectionKey> events = selector.selectedKeys(); // events is int representing how many keys have changed state
                if (eventCount != 0){
                    Set<SelectionKey> selectedKeys = selector.selectedKeys();
                    for (SelectionKey selectedKey : selectedKeys){
                        try{
                            SocketMessage socketMessage = (SocketMessage) selectedKey.attachment();
                            socketMessage.process_events(selectedKey);
                        }catch (ClassCastException e){
                            Log.e(TAG,"Error", e);
                            Log.e(TAG, "selectedKey attachment not a SocketMessage type");
                        }
                    }
                }
            } while (selector.isOpen()); //todo remember to close the selector somewhere

        } catch (IOException e) {
            Log.e(TAG,"Error", e);
        }
    }

    private void start_connection(String serverIp, int serverPort){
        try {
            InetSocketAddress inetSocketAddress = new InetSocketAddress(serverIp, serverPort);
            sc = SocketChannel.open();
            sc.configureBlocking(false);
            sc.setOption(SO_SNDBUF, 1024);
            socketMessage = new SocketMessage(socketListener, sc, selector);

            Log.v(TAG, "registering with selector to connect");
            int ops = SelectionKey.OP_CONNECT;
            sc.register(selector, ops, socketMessage);

            Log.d(TAG, "Initializing connection with " + inetSocketAddress);
            boolean connected = sc.connect(inetSocketAddress);
            Log.v(TAG, "socketChannel.isConnected ? : " + sc.isConnected());

        } catch (IOException | ClosedSelectorException | IllegalBlockingModeException
                | CancelledKeyException | IllegalArgumentException e) {
            Log.e(TAG, "Initial socket connect and registration:", e);
        }
    }

    public void sendMsgToServer(byte[] episode){
        boolean writeSuccess = socketMessage.addEpisodeToWriteBuffer(episode);
    }

    /**
     * Should be called prior to exiting app to ensure zombie threads don't remain in memory.
     */
    public void close(){
        try {
            Log.v(TAG, "Closing connection: " + sc.getRemoteAddress());
            selector.close();
            sc.close();
        } catch (IOException e) {
            Log.e(TAG,"Error", e);
        }
    }
}

socketmessage.java文件

这从这里给出的示例python代码中得到了很大的启发,特别是 libclient.py 以及 app-client.py . 这是因为服务器运行的是python代码,而客户机运行的是java。因此,如果您想了解为什么事情是这样的,请参考realpythonsocket教程。我基本上使用app-server.py作为代码的模板,并为客户机翻译(经过修改)为java。

package jp.oist.abcvlib.util;

import android.util.Log;

import org.json.JSONException;
import org.json.JSONObject;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.nio.charset.StandardCharsets;
import java.text.DecimalFormat;
import java.util.Vector;

public class SocketMessage {

    private final SocketChannel sc;
    private final Selector selector;
    private final ByteBuffer _recv_buffer;
    private ByteBuffer _send_buffer;
    private int _jsonheader_len = 0;
    private JSONObject jsonHeaderRead; // Will tell Java at which points in msgContent each model lies (e.g. model1 is from 0 to 1018, model2 is from 1019 to 2034, etc.)
    private byte[] jsonHeaderBytes;
    private ByteBuffer msgContent; // Should contain ALL model files. Parse to individual files after reading
    private final Vector<ByteBuffer> writeBufferVector = new Vector<>(); // List of episodes
    private final String TAG = "SocketConnectionManager";
    private JSONObject jsonHeaderWrite;
    private boolean msgReadComplete = false;
    private SocketListener socketListener;
    private long socketWriteTimeStart;
    private long socketReadTimeStart;

    public SocketMessage(SocketListener socketListener, SocketChannel sc, Selector selector){
        this.socketListener = socketListener;
        this.sc = sc;
        this.selector = selector;
        this._recv_buffer = ByteBuffer.allocate(1024);
        this._send_buffer = ByteBuffer.allocate(1024);
    }

    public void process_events(SelectionKey selectionKey){
        SocketChannel sc = (SocketChannel) selectionKey.channel();
//        Log.i(TAG, "process_events");
        try{
            if (selectionKey.isConnectable()){
                sc.finishConnect();
                Log.d(TAG, "Finished connecting to " + ((SocketChannel) selectionKey.channel()).getRemoteAddress());
                Log.v(TAG, "socketChannel.isConnected ? : " + sc.isConnected());

            }
            if (selectionKey.isWritable()){
//                Log.i(TAG, "write event");
                write(selectionKey);
            }
            if (selectionKey.isReadable()){
//                Log.i(TAG, "read event");
                read(selectionKey);
//                int ops = SelectionKey.OP_WRITE;
//                sc.register(selectionKey.selector(), ops, selectionKey.attachment());
            }

        } catch (ClassCastException | IOException | JSONException e){
            Log.e(TAG,"Error", e);
        }
    }

    private void read(SelectionKey selectionKey) throws IOException, JSONException {

        SocketChannel socketChannel = (SocketChannel) selectionKey.channel();

        while(!msgReadComplete){
            // At this point the _recv_buffer should have been cleared (pointer 0 limit=cap, no mark)
            int bitsRead = socketChannel.read(_recv_buffer);

            if (bitsRead > 0 || _recv_buffer.position() > 0){
                if (bitsRead > 0){
//                    Log.v(TAG, "Read " + bitsRead + " bytes from " + socketChannel.getRemoteAddress());
                }

                // If you have not determined the length of the header via the 2 byte short protoheader,
                // try to determine it, though there is no gaurantee it will have enough bytes. So it may
                // pass through this if statement multiple times. Only after it has been read will
                // _jsonheader_len have a non-zero length;
                if (this._jsonheader_len == 0){
                    socketReadTimeStart = System.nanoTime();
                    process_protoheader();
                }
                // _jsonheader_len will only be larger than 0 if set properly (finished being set).
                // jsonHeaderRead will be null until the buffer gathering it has filled and converted it to
                // a JSONobject.
                else if (this.jsonHeaderRead == null){
                    process_jsonheader();
                }
                else if (!msgReadComplete){
                    process_msgContent(selectionKey);
                } else {
                    Log.e(TAG, "bitsRead but don't know what to do with them");
                }
            }
        }
    }

    private void write(SelectionKey selectionKey) throws IOException, JSONException {

        if (!writeBufferVector.isEmpty()){
            SocketChannel socketChannel = (SocketChannel) selectionKey.channel();

            Log.v(TAG, "writeBufferVector contains data");

            if (jsonHeaderWrite == null){
                int numBytesToWrite = writeBufferVector.get(0).limit();

                // Create JSONHeader containing length of episode in Bytes
                Log.v(TAG, "generating jsonheader");
                jsonHeaderWrite = generate_jsonheader(numBytesToWrite);
                byte[] jsonBytes = jsonHeaderWrite.toString().getBytes(StandardCharsets.UTF_8);

                // Encode length of JSONHeader to first two bytes and write to socketChannel
                int jsonLength = jsonBytes.length;

                // Add up length of protoHeader, JSONheader and episode bytes
                int totalNumBytesToWrite = Integer.BYTES + jsonLength + numBytesToWrite;

                // Create new buffer that compiles protoHeader, JsonHeader, and Episode
                _send_buffer = ByteBuffer.allocate(totalNumBytesToWrite);

                Log.v(TAG, "Assembling _send_buffer");
                // Assemble all bytes and flip to prepare to read
                _send_buffer.putInt(jsonLength);
                _send_buffer.put(jsonBytes);
                _send_buffer.put(writeBufferVector.get(0));
                _send_buffer.flip();

                Log.d(TAG, "Writing to server ...");

                // Write Bytes to socketChannel //todo shouldn't be while as should be non-blocking
                if (_send_buffer.remaining() > 0){
                    int numBytes = socketChannel.write(_send_buffer); // todo memory dump error here!
                    int percentDone = (int) Math.ceil((((double) _send_buffer.limit() - (double) _send_buffer.remaining())
                            / (double) _send_buffer.limit()) * 100);
                    int total = _send_buffer.limit() / 1000000;
//                    Log.d(TAG, "Sent " + percentDone + "% of " + total + "Mb to " + socketChannel.getRemoteAddress());
                }
            } else{
                // Write Bytes to socketChannel
                if (_send_buffer.remaining() > 0){
                    socketChannel.write(_send_buffer);
                }
            }
            if (_send_buffer.remaining() == 0){
                int total = _send_buffer.limit() / 1000000;
                double timeTaken = (System.nanoTime() - socketWriteTimeStart) * 10e-10;
                DecimalFormat df = new DecimalFormat();
                df.setMaximumFractionDigits(2);
                Log.i(TAG, "Sent " + total + "Mb in " + df.format(timeTaken) + "s");
                // Remove episode from buffer so as to not write it again.
                writeBufferVector.remove(0);
                // Clear sending buffer
                _send_buffer.clear();
                // make null so as to catch the initial if statement to write a new one.
                jsonHeaderWrite = null;

                // Set socket to read now that writing has finished.
                Log.d(TAG, "Reading from server ...");
                int ops = SelectionKey.OP_READ;
                sc.register(selectionKey.selector(), ops, selectionKey.attachment());
            }

        }
    }

    private JSONObject generate_jsonheader(int numBytesToWrite) throws JSONException {
        JSONObject jsonHeader = new JSONObject();

        jsonHeader.put("byteorder", ByteOrder.nativeOrder().toString());
        jsonHeader.put("content-length", numBytesToWrite);
        jsonHeader.put("content-type", "flatbuffer"); // todo Change to flatbuffer later
        jsonHeader.put("content-encoding", "flatbuffer"); //Change to flatbuffer later
        return jsonHeader;
    }

    /**
     * recv_buffer may contain 0, 1, or several bytes. If it has more than hdrlen, then process
     * the first two bytes to obtain the length of the jsonheader. Else exit this function and
     * read from the buffer again until it fills past length hdrlen.
     */
    private void process_protoheader() {
        Log.v(TAG, "processing protoheader");
        int hdrlen = 2;
        if (_recv_buffer.position() >= hdrlen){
            _recv_buffer.flip(); //pos at 0 and limit set to bitsRead
            _jsonheader_len = _recv_buffer.getShort(); // Read 2 bytes converts to short and move pos to 2
            // allocate new ByteBuffer to store full jsonheader
            jsonHeaderBytes = new byte[_jsonheader_len];

            _recv_buffer.compact();

            Log.v(TAG, "finished processing protoheader");
        }
    }

    /**
     *  As with the process_protoheader we will check if _recv_buffer contains enough bytes to
     *  generate the jsonHeader objects, and if not, leave it alone and read more from socket.
     */
    private void process_jsonheader() throws JSONException {

        Log.v(TAG, "processing jsonheader");

        // If you have enough bytes in the _recv_buffer to write out the jsonHeader
        if (_jsonheader_len - _recv_buffer.position() < 0){
            _recv_buffer.flip();
            _recv_buffer.get(jsonHeaderBytes);
            // jsonheaderBuffer should now be full and ready to convert to a JSONobject
            jsonHeaderRead = new JSONObject(new String(jsonHeaderBytes));
            Log.d(TAG, "JSONheader from server: " + jsonHeaderRead.toString());

            try{
                int msgLength = (int) jsonHeaderRead.get("content-length");
                msgContent = ByteBuffer.allocate(msgLength);
            }catch (JSONException e) {
                Log.e(TAG, "Couldn't get content-length from jsonHeader sent from server", e);
            }
        }
        // Else return to selector and read more bytes into the _recv_buffer

        // If there are any bytes left over (part of the msg) then move them to the front of the buffer
        // to prepare for another read from the socket
        _recv_buffer.compact();
    }

    /**
     * Here a bit different as it may take multiple full _recv_buffers to fill the msgContent.
     * So check if msgContent.remaining is larger than 0 and if so, dump everything from _recv_buffer to it
     * @param selectionKey : Used to reference the instance and selector
     * @throws ClosedChannelException :
     */
    private void process_msgContent(SelectionKey selectionKey) throws IOException {

        if (msgContent.remaining() > 0){
            _recv_buffer.flip(); //pos at 0 and limit set to bitsRead set ready to read
            msgContent.put(_recv_buffer);
            _recv_buffer.clear();
        }

        if (msgContent.remaining() == 0){
            // msgContent should now be full and ready to convert to a various model files.
            socketListener.onServerReadSuccess(jsonHeaderRead, msgContent);

            // Clear for next round of communication
            _recv_buffer.clear();
            _jsonheader_len = 0;
            jsonHeaderRead = null;
            msgContent.clear();

            int totalBytes = msgContent.capacity() / 1000000;
            double timeTaken = (System.nanoTime() - socketReadTimeStart) * 10e-10;
            DecimalFormat df = new DecimalFormat();
            df.setMaximumFractionDigits(2);
            Log.i(TAG, "Entire message containing " + totalBytes + "Mb recv'd in " + df.format(timeTaken) + "s");

            msgReadComplete = true;

            // Set socket to write now that reading has finished.
            int ops = SelectionKey.OP_WRITE;
            sc.register(selectionKey.selector(), ops, selectionKey.attachment());
        }
    }

    //todo should send this to the mainactivity listener so it can be customized/overridden
    private void onNewMessageFromServer(){
        // Take info from JSONheader to parse msgContent into individual model files

        // After parsing all models notify MainActivity that models have been updated
    }

    // todo should be able deal with ByteBuffer from FlatBuffer rather than byte[]
    public boolean addEpisodeToWriteBuffer(byte[] episode){
        boolean success = false;
        try{
            ByteBuffer bb = ByteBuffer.wrap(episode);
            success = writeBufferVector.add(bb);
            Log.v(TAG, "Added data to writeBuffer");
            int ops = SelectionKey.OP_WRITE;
            socketWriteTimeStart = System.nanoTime();
            sc.register(selector, ops, this);
            // I want this to trigger the selector that this channel is writeReady.
        } catch (NullPointerException | ClosedChannelException e){
            Log.e(TAG,"Error", e);
            Log.e(TAG, "SocketConnectionManager.data not initialized yet");
        }
        return success;
    }
}
brvekthn

brvekthn1#

在android文档中偶然发现了这个问题,它回答了为什么我会得到outofmemoryerror这个问题。
为了维护一个功能性的多任务环境,android对每个应用程序的堆大小设置了一个硬限制。确切的堆大小限制取决于设备的可用ram总量。如果应用程序已达到堆容量并尝试分配更多内存,则可能会收到outofmemoryerror。
在某些情况下,您可能需要查询系统以确定当前设备上有多少堆空间,例如,确定有多少数据可以安全地保存在缓存中。您可以通过调用getmemoryclass()在系统中查询此图。此方法返回一个整数,指示应用程序堆可用的兆字节数。
在运行activitymanager.getmemoryclass方法之后,我看到像素3a的硬限制是192mb。当我试图分配刚刚超过200 mb时,我达到了这个限制。
我还检查了activitymanager.getlargememoryclass,看到了512MB的硬限制。因此,我可以设置我的应用程序有一个“大堆”,但尽管有4gb的内存,我有一个512MB的硬性限制,我需要解决。
除非其他人知道如何解决这个问题,否则我必须写一些逻辑来分段编写 episode 如果它超过某个点,则归档,然后通过通道分段发送。我想这会让事情慢一点,所以如果有人有一个答案可以避免这一点,或者告诉我,如果做得好,为什么这不会让事情慢下来,那么我很乐意给你答案。只是把这个作为一个答案,因为它确实回答了我原来的问题,但并不令人满意。

相关问题