You cannot select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
399 lines
14 KiB
C#
399 lines
14 KiB
C#
using Renci.SshNet.Abstractions;
|
|
using Renci.SshNet.Common;
|
|
using System;
|
|
using System.Collections.Generic;
|
|
using System.Globalization;
|
|
using System.Threading;
|
|
|
|
namespace Renci.SshNet.Sftp
|
|
{
|
|
internal class SftpFileReader : ISftpFileReader
|
|
{
|
|
private const int ReadAheadWaitTimeoutInMilliseconds = 1000;
|
|
|
|
private readonly byte[] _handle;
|
|
private readonly ISftpSession _sftpSession;
|
|
private readonly uint _chunkSize;
|
|
private ulong _offset;
|
|
|
|
/// <summary>
|
|
/// Holds the size of the file, when available.
|
|
/// </summary>
|
|
private readonly long? _fileSize;
|
|
private readonly Dictionary<int, BufferedRead> _queue;
|
|
private readonly WaitHandle[] _waitHandles;
|
|
|
|
private int _readAheadChunkIndex;
|
|
private ulong _readAheadOffset;
|
|
private readonly ManualResetEvent _readAheadCompleted;
|
|
private ManualResetEvent _disposingWaitHandle;
|
|
private int _nextChunkIndex;
|
|
|
|
/// <summary>
|
|
/// Holds a value indicating whether EOF has already been signaled by the SSH server.
|
|
/// </summary>
|
|
private bool _endOfFileReceived;
|
|
/// <summary>
|
|
/// Holds a value indicating whether the client has read up to the end of the file.
|
|
/// </summary>
|
|
private bool _isEndOfFileRead;
|
|
private readonly SemaphoreLight _semaphore;
|
|
private readonly object _readLock;
|
|
|
|
private Exception _exception;
|
|
|
|
/// <summary>
|
|
/// Initializes a new <see cref="SftpFileReader"/> instance with the specified handle,
|
|
/// <see cref="ISftpSession"/> and the maximum number of pending reads.
|
|
/// </summary>
|
|
/// <param name="handle"></param>
|
|
/// <param name="sftpSession"></param>
|
|
/// <param name="chunkSize">The size of a individual read-ahead chunk.</param>
|
|
/// <param name="maxPendingReads">The maximum number of pending reads.</param>
|
|
/// <param name="fileSize">The size of the file, if known; otherwise, <c>null</c>.</param>
|
|
public SftpFileReader(byte[] handle, ISftpSession sftpSession, uint chunkSize, int maxPendingReads, long? fileSize)
|
|
{
|
|
_handle = handle;
|
|
_sftpSession = sftpSession;
|
|
_chunkSize = chunkSize;
|
|
_fileSize = fileSize;
|
|
_semaphore = new SemaphoreLight(maxPendingReads);
|
|
_queue = new Dictionary<int, BufferedRead>(maxPendingReads);
|
|
_readLock = new object();
|
|
_readAheadCompleted = new ManualResetEvent(false);
|
|
_disposingWaitHandle = new ManualResetEvent(false);
|
|
_waitHandles = _sftpSession.CreateWaitHandleArray(_disposingWaitHandle, _semaphore.AvailableWaitHandle);
|
|
|
|
StartReadAhead();
|
|
}
|
|
|
|
public byte[] Read()
|
|
{
|
|
if (_disposingWaitHandle == null)
|
|
throw new ObjectDisposedException(GetType().FullName);
|
|
if (_exception != null)
|
|
throw _exception;
|
|
if (_isEndOfFileRead)
|
|
throw new SshException("Attempting to read beyond the end of the file.");
|
|
|
|
BufferedRead nextChunk;
|
|
|
|
lock (_readLock)
|
|
{
|
|
// wait until either the next chunk is avalable or an exception has occurred
|
|
while (!_queue.TryGetValue(_nextChunkIndex, out nextChunk) && _exception == null)
|
|
{
|
|
Monitor.Wait(_readLock);
|
|
}
|
|
|
|
if (_exception != null)
|
|
throw _exception;
|
|
|
|
var data = nextChunk.Data;
|
|
|
|
if (nextChunk.Offset == _offset)
|
|
{
|
|
// have we reached EOF?
|
|
if (data.Length == 0)
|
|
{
|
|
// PERF: we do not bother updating internal state when we've EOF
|
|
|
|
_isEndOfFileRead = true;
|
|
}
|
|
else
|
|
{
|
|
// remove processed chunk
|
|
_queue.Remove(_nextChunkIndex);
|
|
// update offset
|
|
_offset += (ulong) data.Length;
|
|
// move to next chunk
|
|
_nextChunkIndex++;
|
|
}
|
|
// unblock wait in read-ahead
|
|
_semaphore.Release();
|
|
|
|
return data;
|
|
}
|
|
|
|
// when we received an EOF for the next chunk and the size of the file is known, then
|
|
// we only complete the current chunk if we haven't already read up to the file size;
|
|
// this way we save an extra round-trip to the server
|
|
if (data.Length == 0 && _fileSize.HasValue && _offset == (ulong) _fileSize.Value)
|
|
{
|
|
// avoid future reads
|
|
_isEndOfFileRead = true;
|
|
// unblock wait in read-ahead
|
|
_semaphore.Release();
|
|
// signal EOF to caller
|
|
return nextChunk.Data;
|
|
}
|
|
}
|
|
|
|
// when the server returned less bytes than requested (for the previous chunk)
|
|
// we'll synchronously request the remaining data
|
|
//
|
|
// due to the optimization above, we'll only get here in one of the following cases:
|
|
// - an EOF situation for files for which we were unable to obtain the file size
|
|
// - fewer bytes that requested were returned
|
|
//
|
|
// according to the SSH specification, this last case should never happen for normal
|
|
// disk files (but can happen for device files).
|
|
//
|
|
// Important:
|
|
// to avoid a deadlock, this read must be done outside of the read lock
|
|
|
|
var bytesToCatchUp = nextChunk.Offset - _offset;
|
|
|
|
// TODO: break loop and interrupt blocking wait in case of exception
|
|
|
|
var read = _sftpSession.RequestRead(_handle, _offset, (uint) bytesToCatchUp);
|
|
if (read.Length == 0)
|
|
{
|
|
// process data in read lock to avoid ObjectDisposedException while releasing semaphore
|
|
lock (_readLock)
|
|
{
|
|
// a zero-length (EOF) response is only valid for the read-back when EOF has
|
|
// been signaled for the next read-ahead chunk
|
|
if (nextChunk.Data.Length == 0)
|
|
{
|
|
_isEndOfFileRead = true;
|
|
// ensure we've not yet disposed the current instance
|
|
if (_semaphore != null)
|
|
{
|
|
// unblock wait in read-ahead
|
|
_semaphore.Release();
|
|
}
|
|
// signal EOF to caller
|
|
return read;
|
|
}
|
|
|
|
// move reader to error state
|
|
_exception = new SshException("Unexpectedly reached end of file.");
|
|
// ensure we've not yet disposed the current instance
|
|
if (_semaphore != null)
|
|
{
|
|
// unblock wait in read-ahead
|
|
_semaphore.Release();
|
|
}
|
|
// notify caller of error
|
|
throw _exception;
|
|
}
|
|
}
|
|
|
|
_offset += (uint) read.Length;
|
|
|
|
return read;
|
|
}
|
|
|
|
~SftpFileReader()
|
|
{
|
|
Dispose(false);
|
|
}
|
|
|
|
public void Dispose()
|
|
{
|
|
Dispose(true);
|
|
GC.SuppressFinalize(this);
|
|
}
|
|
|
|
/// <summary>
|
|
/// Releases unmanaged and - optionally - managed resources
|
|
/// </summary>
|
|
/// <param name="disposing"><c>true</c> to release both managed and unmanaged resources; <c>false</c> to release only unmanaged resources.</param>
|
|
protected void Dispose(bool disposing)
|
|
{
|
|
if (_disposingWaitHandle == null)
|
|
return;
|
|
|
|
if (disposing)
|
|
{
|
|
// record exception to break prevent future Read()
|
|
_exception = new ObjectDisposedException(GetType().FullName);
|
|
|
|
// signal that we're disposing to interrupt wait in read-ahead
|
|
_disposingWaitHandle.Set();
|
|
|
|
// wait until the read-ahead thread has completed
|
|
_readAheadCompleted.WaitOne();
|
|
|
|
// unblock the Read()
|
|
lock (_readLock)
|
|
{
|
|
// dispose semaphore in read lock to ensure we don't run into an ObjectDisposedException
|
|
// in Read()
|
|
_semaphore.Dispose();
|
|
// awake Read
|
|
Monitor.PulseAll(_readLock);
|
|
}
|
|
|
|
var closeAsyncResult = _sftpSession.BeginClose(_handle, null, null);
|
|
|
|
_readAheadCompleted.Dispose();
|
|
_disposingWaitHandle.Dispose();
|
|
|
|
_sftpSession.EndClose(closeAsyncResult);
|
|
|
|
// dereference the disposing waithandle to indicate that the current instance is disposed
|
|
_disposingWaitHandle = null;
|
|
}
|
|
}
|
|
|
|
private void StartReadAhead()
|
|
{
|
|
ThreadAbstraction.ExecuteThread(() =>
|
|
{
|
|
while (!_endOfFileReceived && _exception == null)
|
|
{
|
|
// check if we should continue with the read-ahead loop
|
|
// note that the EOF and exception check are not included
|
|
// in this check as they do not require Read() to be
|
|
// unblocked (or have already done this)
|
|
if (!ContinueReadAhead())
|
|
{
|
|
// unblock the Read()
|
|
lock (_readLock)
|
|
{
|
|
Monitor.PulseAll(_readLock);
|
|
}
|
|
// break the read-ahead loop
|
|
break;
|
|
}
|
|
|
|
// attempt to obtain the semaphore; this may time out when all semaphores are
|
|
// in use due to pending read-aheads (which in turn can happen when the server
|
|
// is slow to respond or when the session is broken)
|
|
if (!_semaphore.Wait(ReadAheadWaitTimeoutInMilliseconds))
|
|
{
|
|
// re-evaluate whether an exception occurred, and - if not - wait again
|
|
continue;
|
|
}
|
|
|
|
// don't bother reading any more chunks if we received EOF, or an exception has occurred
|
|
// while processing a chunk
|
|
if (_endOfFileReceived || _exception != null)
|
|
break;
|
|
|
|
// start reading next chunk
|
|
try
|
|
{
|
|
_sftpSession.BeginRead(_handle, _readAheadOffset, _chunkSize, ReadCompleted,
|
|
new BufferedRead(_readAheadChunkIndex, _readAheadOffset));
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
HandleFailure(ex);
|
|
break;
|
|
}
|
|
|
|
// advance read-ahead offset
|
|
_readAheadOffset += _chunkSize;
|
|
|
|
_readAheadChunkIndex++;
|
|
}
|
|
|
|
_readAheadCompleted.Set();
|
|
});
|
|
}
|
|
|
|
/// <summary>
|
|
/// Returns a value indicating whether the read-ahead loop should be continued.
|
|
/// </summary>
|
|
/// <returns>
|
|
/// <c>true</c> if the read-ahead loop should be continued; otherwise, <c>false</c>.
|
|
/// </returns>
|
|
private bool ContinueReadAhead()
|
|
{
|
|
try
|
|
{
|
|
var waitResult = _sftpSession.WaitAny(_waitHandles, _sftpSession.OperationTimeout);
|
|
switch (waitResult)
|
|
{
|
|
case 0: // disposing
|
|
return false;
|
|
case 1: // semaphore available
|
|
return true;
|
|
default:
|
|
throw new NotImplementedException(string.Format(CultureInfo.InvariantCulture, "WaitAny return value '{0}' is not implemented.", waitResult));
|
|
}
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
_exception = ex;
|
|
return false;
|
|
}
|
|
}
|
|
|
|
private void ReadCompleted(IAsyncResult result)
|
|
{
|
|
var readAsyncResult = (SftpReadAsyncResult) result;
|
|
|
|
byte[] data;
|
|
|
|
try
|
|
{
|
|
data = readAsyncResult.EndInvoke();
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
HandleFailure(ex);
|
|
return;
|
|
}
|
|
|
|
// a read that completes with a zero-byte result signals EOF
|
|
// but there may be pending reads before that read
|
|
var bufferedRead = (BufferedRead) readAsyncResult.AsyncState;
|
|
bufferedRead.Complete(data);
|
|
|
|
lock (_readLock)
|
|
{
|
|
// add item to queue
|
|
_queue.Add(bufferedRead.ChunkIndex, bufferedRead);
|
|
// signal that a chunk has been read or EOF has been reached;
|
|
// in both cases, Read() will eventually also unblock the "read-ahead" thread
|
|
Monitor.PulseAll(_readLock);
|
|
}
|
|
|
|
// check if server signaled EOF
|
|
if (data.Length == 0)
|
|
{
|
|
// set a flag to stop read-aheads
|
|
_endOfFileReceived = true;
|
|
}
|
|
}
|
|
|
|
private void HandleFailure(Exception cause)
|
|
{
|
|
_exception = cause;
|
|
|
|
// unblock read-ahead
|
|
_semaphore.Release();
|
|
|
|
// unblock Read()
|
|
lock (_readLock)
|
|
{
|
|
Monitor.PulseAll(_readLock);
|
|
}
|
|
}
|
|
|
|
internal class BufferedRead
|
|
{
|
|
public int ChunkIndex { get; private set; }
|
|
|
|
public byte[] Data { get; private set; }
|
|
|
|
public ulong Offset { get; private set; }
|
|
|
|
public BufferedRead(int chunkIndex, ulong offset)
|
|
{
|
|
ChunkIndex = chunkIndex;
|
|
Offset = offset;
|
|
}
|
|
|
|
public void Complete(byte[] data)
|
|
{
|
|
Data = data;
|
|
}
|
|
}
|
|
}
|
|
}
|