在.NET中从SQLite接收DB更新事件

0ve6wy6x  于 2023-10-23  发布在  SQLite
关注(0)|答案(2)|浏览(163)

我最近发现了SQLite的惊人之处,特别是http://sqlite.phxsoftware.com/上SQLite的.NET Package 器。
现在,假设我正在开发将在同一网络上的多台机器上运行的软件。没有什么疯狂的,可能只有5或6台机器。每个软件示例都将访问存储在共享目录中的文件中的SQLite数据库(这是个坏主意吗?如果有,告诉我!).
如果一个应用程序示例更新了数据库文件,是否有方法通知应用程序的每个示例?一个明显的方法是使用FileSystemWatcher类,将整个数据库读入一个数据集,然后...你知道...列举整个事件看看有什么新的...但是,是的,**这看起来很愚蠢,实际上。**有这样一个东西作为SQLite更新的提供者吗?
这是一个有意义的问题吗?当涉及到ADO.NET时,我也是一个新手,所以我可能从完全错误的Angular 来处理这个问题。

41ik7eoe

41ik7eoe1#

在网络上使用SQLite并不是一个好主意。看看SQLite自己对这个here的建议。
客户端-服务器数据库将更加可靠,并且还可以解决通知问题。例如,PostgreSQL有一个通过NOTIFY和LISTEN语句的客户端间信令机制,可以直接从客户端或从函数、存储过程或触发器内部使用。
即使您决定使用SQLite,也不要使用文件监视API。由于文件系统内部的竞争条件,它们在Windows上完全被破坏。从FileSystemWatcher的MSDN条目:
请注意,由于与Windows操作系统的依赖关系,当错过事件或超过缓冲区大小时,FileSystemWatcher不会引发Error事件。
它提供了缓解这种情况的建议,但没有一个提供任何可靠性保证。

k75qkfdt

k75qkfdt2#

虽然没有任何内置机制来挂钩到某种更改事件,但您可以跨多个进程检测SQLite数据库中的更改。
SQLite数据库不应该通过网络共享共享,原因有很多,创建者在这里解释:https://www.sqlite.org/useovernet.html
如果你有一个运行TCP客户端服务器应用程序的服务器,它会接收你的请求并发送数据作为响应,那么效率和一致性要高得多。

轮询文件头

SQLite文件头包含文件更改计数器字段,每当数据库或其表被修改时,该字段就会更新。我们可以在头上使用轮询来检测更改。我不知道为什么这么多人反对轮询,这在低级编程中是完全正常的,如果软件或硬件不提供任何中断,你无论如何都要坚持轮询。

SQLiteHeader结构体

/// <summary>
/// The first 100 bytes of the database file comprise 
/// the database file header. The database file header 
/// is divided into fields as shown by the <see cref="SQLiteHeader"/> struct. 
/// All multibyte fields in the database file header 
/// are stored with the most significant byte first (big-endian).
/// The <see cref="SQLiteHeader"/> already checks for endianess and
/// converts the mulitbyte fields for you.
/// </summary>
[StructLayout(LayoutKind.Explicit)]
unsafe public struct SQLiteHeader
{       
    public static SQLiteHeader FromFile(string fileName, out SQLiteHeaderError error)
    {
        return SQLiteHeaderParser.FromFile(fileName, out error);
    }

    /// <summary>
    /// The header string: "SQLite format 3\000"
    /// </summary>
    [FieldOffset(0)]
    [MarshalAs(UnmanagedType.ByValTStr, SizeConst = 16)]
    public string HeaderString;
    /// <summary>
    /// The database page size in bytes. 
    /// Must be a power of two between 512 and 32768 inclusive, 
    /// or the value 1 representing a page size of 65536.
    /// </summary>
    [FieldOffset(16)]
    public short PageSize;
    /// <summary>
    /// File format write version. 1 for legacy; 2 for WAL.
    /// </summary>
    [FieldOffset(18)]
    public byte WriteVersion;
    /// <summary>
    /// File format read version. 1 for legacy; 2 for WAL.
    /// </summary>
    [FieldOffset(19)]
    public byte ReadVersion;
    /// <summary>
    /// Bytes of unused "reserved" space at the end of each page. Usually 0.
    /// </summary>
    [FieldOffset(20)]
    public byte Reserved;
    /// <summary>
    /// Maximum embedded payload fraction. Must be 64.
    /// </summary>
    [FieldOffset(21)]
    public byte MaxPayloadFract;
    /// <summary>
    /// Minimum embedded payload fraction. Must be 32.
    /// </summary>
    [FieldOffset(22)]
    public byte MinPayloadFract;
    /// <summary>
    /// Leaf payload fraction. Must be 32.
    /// </summary>
    [FieldOffset(23)]
    public byte LeafPayloadFract;
    /// <summary>
    /// File change counter.
    /// </summary>
    [FieldOffset(24)]
    public Int32 FileChangeCounter;
    /// <summary>
    /// Size of the database file in pages. 
    /// The "in-header database size".
    /// </summary>
    [FieldOffset(28)]
    public Int32 InHeaderDatabaseSize;
    /// <summary>
    /// Page number of the first freelist trunk page.
    /// </summary>
    [FieldOffset(32)]
    public Int32 FirstFreeListTrunkPage;
    /// <summary>
    /// Total number of freelist pages.
    /// </summary>
    [FieldOffset(36)]
    public Int32 NumFreeListPages;
    /// <summary>
    /// The schema cookie.
    /// </summary>
    [FieldOffset(40)]
    public Int32 SchemaCookie;
    /// <summary>
    /// The schema format number. 
    /// Supported schema formats are 1, 2, 3, and 4.
    /// </summary>
    [FieldOffset(44)]
    public Int32 SchemaFormatNumber;
    /// <summary>
    /// Default page cache size.
    /// </summary>
    [FieldOffset(48)]
    public Int32 PageCacheSize;
    /// <summary>
    /// The page number of the largest root b-tree 
    /// page when in auto-vacuum or incremental-vacuum modes, 
    /// or zero otherwise.
    /// </summary>
    [FieldOffset(52)]
    public Int32 MaxRootBTreePage;
    /// <summary>
    /// The database text encoding. 
    /// A value of 1 means UTF-8. 
    /// A value of 2 means UTF-16le. 
    /// A value of 3 means UTF-16be.
    /// </summary>
    [FieldOffset(56)]
    public Int32 TextEncoding;
    /// <summary>
    /// The "user version" as read and set 
    /// by the user_version pragma.
    /// </summary>
    [FieldOffset(60)]
    public Int32 UserVersion;
    /// <summary>
    /// True (non-zero) for incremental-vacuum mode. 
    /// False (zero) otherwise.
    /// </summary>
    [FieldOffset(64)]
    public Int32 IncrementalVacuumMode;
    /// <summary>
    /// The "Application ID" set by 
    /// PRAGMA application_id.
    /// </summary>
    [FieldOffset(68)]
    public Int32 ApplicationId;
    /// <summary>
    /// Reserved for expansion. Must be zero.
    /// </summary>
    [FieldOffset(72)]
    [MarshalAs(UnmanagedType.ByValTStr, SizeConst = 20)]
    public string ReservedExpansion;
    /// <summary>
    /// The version-valid-for number.
    /// </summary>
    [FieldOffset(92)]
    public Int32 VersionValidFor;
    /// <summary>
    /// SQLITE_VERSION_NUMBER
    /// </summary>
    [FieldOffset(96)]
    public Int32 SQLiteVersionNumber;        
}

SQLiteHeaderParser类

internal static class SQLiteHeaderParser
{
    internal static readonly byte[] MagicHeaderBytes = new byte[] {
        0x53, 0x51, 0x4c, 0x69,
        0x74, 0x65, 0x20, 0x66,
        0x6f, 0x72, 0x6d, 0x61,
        0x74, 0x20, 0x33, 0x00
    };

    private static void ToLittleEndian(byte[] buffer, int offset, int length)
    {
        switch (length)
        {
            case 2:
                {
                    ushort num = BitConverter.ToUInt16(buffer, offset);
                    num = (ushort)(
                        ((num & 0xff) >> 8) |
                         (num << 8)
                    );
                    var bytes = BitConverter.GetBytes(num);
                    bytes.CopyTo(buffer, offset);
                    break;
                }
            case 4:
                {
                    uint num = BitConverter.ToUInt32(buffer, offset);
                    num = (uint)(
                        ((num & 0xff000000) >> 24) |
                        ((num & 0x00ff0000) >> 8) |
                        ((num & 0x0000ff00) << 8) |
                         (num << 24)
                    );
                    var bytes = BitConverter.GetBytes(num);
                    bytes.CopyTo(buffer, offset);
                    break;
                }
        }
    }

    private static int GetOffset(string name)
    {
        return Marshal.OffsetOf<SQLiteHeader>(name).ToInt32();
    }        

    private static void SetEndianess(byte[] buffer)
    {
        if (!(BitConverter.IsLittleEndian)) {
            return;
        }

        SQLiteHeader h = new SQLiteHeader();

        ToLittleEndian(buffer, GetOffset(nameof(h.PageSize)),               Marshal.SizeOf(h.PageSize));
        ToLittleEndian(buffer, GetOffset(nameof(h.FileChangeCounter)),      Marshal.SizeOf(h.FileChangeCounter));
        ToLittleEndian(buffer, GetOffset(nameof(h.InHeaderDatabaseSize)),   Marshal.SizeOf(h.InHeaderDatabaseSize));
        ToLittleEndian(buffer, GetOffset(nameof(h.FirstFreeListTrunkPage)), Marshal.SizeOf(h.FirstFreeListTrunkPage));
        ToLittleEndian(buffer, GetOffset(nameof(h.NumFreeListPages)),       Marshal.SizeOf(h.NumFreeListPages));
        ToLittleEndian(buffer, GetOffset(nameof(h.SchemaCookie)),           Marshal.SizeOf(h.SchemaCookie));
        ToLittleEndian(buffer, GetOffset(nameof(h.SchemaFormatNumber)),     Marshal.SizeOf(h.SchemaFormatNumber));
        ToLittleEndian(buffer, GetOffset(nameof(h.PageCacheSize)),          Marshal.SizeOf(h.PageCacheSize));
        ToLittleEndian(buffer, GetOffset(nameof(h.MaxRootBTreePage)),       Marshal.SizeOf(h.MaxRootBTreePage));
        ToLittleEndian(buffer, GetOffset(nameof(h.TextEncoding)),           Marshal.SizeOf(h.TextEncoding));
        ToLittleEndian(buffer, GetOffset(nameof(h.PageCacheSize)),          Marshal.SizeOf(h.PageCacheSize));
        ToLittleEndian(buffer, GetOffset(nameof(h.UserVersion)),            Marshal.SizeOf(h.UserVersion));
        ToLittleEndian(buffer, GetOffset(nameof(h.IncrementalVacuumMode)),  Marshal.SizeOf(h.IncrementalVacuumMode));
        ToLittleEndian(buffer, GetOffset(nameof(h.ApplicationId)),          Marshal.SizeOf(h.ApplicationId));
        ToLittleEndian(buffer, GetOffset(nameof(h.VersionValidFor)),        Marshal.SizeOf(h.VersionValidFor));
        ToLittleEndian(buffer, GetOffset(nameof(h.SQLiteVersionNumber)),    Marshal.SizeOf(h.SQLiteVersionNumber));
    }

    private static byte[] PrepareHeader(BinaryReader reader, out SQLiteHeaderError error)
    {
        int hdrSize = Marshal.SizeOf<SQLiteHeader>();
        byte[] buffer = reader.ReadBytes(hdrSize);

        var loadedMagicHeader = buffer.Take(MagicHeaderBytes.Length).ToArray();
        if (!(loadedMagicHeader.SequenceEqual(MagicHeaderBytes))) {
            System.Diagnostics.Debug.WriteLine("magic number miss match -> invalid header");
            error = SQLiteHeaderError.InvalidMagicNumber;
            return new byte[0];
        }

        SetEndianess(buffer);
        error = SQLiteHeaderError.Success;
        return buffer;
    }

    private static SQLiteHeader BufferToHeader(byte[] buffer)
    {
        GCHandle gchHeader = GCHandle.Alloc(buffer, GCHandleType.Pinned);
        SQLiteHeader hdr = Marshal.PtrToStructure<SQLiteHeader>(gchHeader.AddrOfPinnedObject());
        gchHeader.Free();

        return hdr;
    }

    public static SQLiteHeader FromFile(string filePath, out SQLiteHeaderError error)
    {
        if (!(File.Exists(filePath))) {
            System.Diagnostics.Debug.WriteLine("invalid file path");
            error = SQLiteHeaderError.FileNotFound;
            return default(SQLiteHeader);
        }

        int hdrSize = Marshal.SizeOf<SQLiteHeader>();

        var fInfo = new FileInfo(filePath);
        if (fInfo.Length < hdrSize) {
            System.Diagnostics.Debug.WriteLine("not enough bytes");
            error = SQLiteHeaderError.InvalidHeaderSize;
            return default(SQLiteHeader);
        }

        byte[] buffer;
        try
        {
            using (var stream = new FileStream(
                filePath, 
                FileMode.Open, 
                FileAccess.Read, 
                FileShare.ReadWrite, hdrSize))
            {
                using (var reader = new BinaryReader(stream))
                {
                    buffer = PrepareHeader(reader, out error);
                    if (buffer.Length < 1) {                   
                        return default(SQLiteHeader);
                    }
                }
            }                
        }
        catch (Exception ex)
        {
            System.Diagnostics.Debug.WriteLine(ex.Message);
            error = SQLiteHeaderError.Undefined;
            return default(SQLiteHeader);
        }

        SQLiteHeader header = BufferToHeader(buffer);
        return header;
    }
}

SQLiteHeaderError枚举

[Flags]
public enum SQLiteHeaderError
{
    Success             = 0,
    FileNotFound        = 1 << 1,
    InvalidHeaderSize   = 1 << 2,
    InvalidMagicNumber  = 1 << 3,
    Undefined           = 1 << 8
}

SQLiteChangeMonitor类

public class SQLiteChangeMonitor : IDisposable
{
    private string          _sqliteFile;
    private SQLiteHeader    _sqliteHeader;

    private Timer _pollingTimer;

    private int _fileChangeCounter = -1;

    private static CancellationTokenSource  _cancelTokenSource;
    private static CancellationToken        _cancelToken;       

    private event EventHandler _onChangeDetected;
    public event EventHandler ChangeDetected
    {
        add {
            _onChangeDetected += value;
        }
        remove { 
            _onChangeDetected -= value; 
        }
    }

    //
    // Check for any changes in the header and restart timer.
    //
    private void TimerCallback(object state)
    {
        AutoResetEvent autoEvent = (AutoResetEvent)state;
        if (autoEvent == null) {
            return;
        }
        CheckForChanges();
        autoEvent.Set();
    }

    //
    // Check if the file change counter of the header has changed
    // and fire event to signal change.
    //
    private void CheckForChanges()
    {
        if (_cancelToken.IsCancellationRequested) {
            return;
        }

        try
        {
            SQLiteHeaderError err;
            _sqliteHeader = SQLiteHeader.FromFile(_sqliteFile, out err);
            int fcc = _sqliteHeader.FileChangeCounter;
            if (fcc != _fileChangeCounter)
            {
                _fileChangeCounter = fcc;
                _onChangeDetected?.Invoke(this, EventArgs.Empty);
            }
        }
        catch (Exception ex) 
        {
            // Implement your error handling here.
            System.Diagnostics.Debug.WriteLine(ex.Message);
        }
    }   

    //
    // Start the System.Threading.Timer
    //
    private void StartPolling(int timeout)
    {
        AutoResetEvent autoResetEvent = new AutoResetEvent(false);

        _pollingTimer = new Timer(TimerCallback, autoResetEvent, 0, timeout);
        autoResetEvent.WaitOne();
    }

    //
    // Get current change counter from file header to 
    // avoid missfire when starting the polling.
    //
    private void SetInitalChangeCounter()
    {
        SQLiteHeaderError err;
        _sqliteHeader = SQLiteHeader.FromFile(_sqliteFile, out err);
        _fileChangeCounter = _sqliteHeader.FileChangeCounter;
    }

    /// <summary>
    /// Starts polling for database changes.
    /// </summary>
    /// <param name="pollingInterval">
    /// The polling interval in milliseconds</param>
    public void Start(int pollingInterval = 1000)
    {
        SetInitalChangeCounter();
        StartPolling(pollingInterval);
    }

    /// <summary>
    /// Stops polling for changes.
    /// </summary>
    public void Stop()
    {
        _cancelTokenSource?.Cancel();
    }
    
    
    //
    // Set up cancellation token to end polling.
    //
    private void InitToken()
    {
        _cancelTokenSource = new CancellationTokenSource();
        _cancelToken = _cancelTokenSource.Token;
    }

    public SQLiteChangeMonitor(string databaseFile)
    {
        if (string.IsNullOrEmpty(databaseFile)) {
            throw new ArgumentNullException(
                nameof(databaseFile), 
                "Database file path cannot be null or empty."
            );
        }

        if (!(File.Exists(databaseFile))) {
            throw new FileNotFoundException(
                "Unable to locate file.", 
                databaseFile
            );
        }

        _sqliteFile = databaseFile;

        InitToken();
    }

    ~SQLiteChangeMonitor() { Dispose(false); }
    public void Dispose() { Dispose(true); }
    private void Dispose(bool disposing)
    {
        if (disposing)
        {
            GC.SuppressFinalize(this);
        }
        
        _cancelTokenSource?.Cancel();            
        _pollingTimer?.Dispose();
    }
}

高频轮询

SQLiteChangeMonitor类使用1000ms(1s)的默认轮询间隔。对于高频率的轮询间隔,我建议使用以下答案中描述的模式:https://stackoverflow.com/a/23341005/22191764

缺点

这并不提供任何关于修改了什么的信息,但是轮询头文件然后更新数据库视图,然后总是对数据库运行查询并最终更新视图仍然要有效得多。

相关问题